Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/skeleton/tasks.py: 27%
108 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 09:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 09:00 +0000
1import logging
2import typing as t
3import logics
4import time
6from viur.core import (
7 conf,
8 current,
9 db,
10 email,
11 errors,
12 tasks,
13 utils,
14)
15from .utils import skeletonByKind, listKnownSkeletons
16from .relskel import RelSkel
18from ..bones.raw import RawBone
19from ..bones.record import RecordBone
20from ..bones.relational import RelationalBone, RelationalConsistency, RelationalUpdateLevel
21from ..bones.select import SelectBone
22from ..bones.string import StringBone
25@tasks.CallDeferred
26def update_relations(
27 key: db.Key,
28 *,
29 min_change_time: t.Optional[float] = None,
30 changed_bones: t.Optional[t.Iterable[str] | str] = (),
31 cursor: t.Optional[str] = None,
32 total: int = 0,
33 **kwargs
34):
35 """
36 This function updates Entities, which may have a copy of values from another entity which has been recently
37 edited (updated). In ViUR, relations are implemented by copying the values from the referenced entity into the
38 entity that's referencing them. This allows ViUR to run queries over properties of referenced entities and
39 prevents additional db.Get's to these referenced entities if the main entity is read. However, this forces
40 us to track changes made to entities as we might have to update these mirrored values. This is the deferred
41 call from meth:`viur.core.skeleton.Skeleton.write()` after an update (edit) on one Entity to do exactly that.
43 :param key: The database-key of the entity that has been edited
44 :param min_change_time: The timestamp on which the edit occurred. As we run deferred, and the entity might have
45 been edited multiple times before we get acutally called, we can ignore entities that have been updated
46 in the meantime as they're already up-to-date
47 :param changed_bones: If set, we'll update only entites that have a copy of that bones. Relations mirror only
48 key and name by default, so we don't have to update these if only another bone has been changed.
49 :param cursor: The database cursor for the current request as we only process five entities at once and then
50 defer again.
51 """
52 # TODO: Remove in VIUR4
53 for _dep, _new in {
54 "changedBone": "changed_bones",
55 "minChangeTime": "min_change_time",
56 "destKey": "key",
57 }.items():
58 if _dep in kwargs:
59 logging.warning(f"{_dep!r} parameter is deprecated, please use {_new!r} instead",)
60 locals()[_new] = kwargs.pop(_dep)
62 if min_change_time is None:
63 min_change_time = time.time() + 1
65 changed_bones = utils.ensure_iterable(changed_bones)
67 if not cursor:
68 logging.debug(f"update_relations {key=} {min_change_time=} {changed_bones=}")
70 if request_data := current.request_data.get():
71 request_data["__update_relations_bones"] = changed_bones
73 query = db.Query("viur-relations") \
74 .filter("dest.__key__ =", key) \
75 .filter("viur_delayed_update_tag <", min_change_time) \
76 .filter("viur_relational_updateLevel =", RelationalUpdateLevel.Always.value)
78 if changed_bones:
79 query.filter("viur_foreign_keys IN", changed_bones)
81 query.setCursor(cursor)
83 for src_rel in query.run():
84 try:
85 skel = skeletonByKind(src_rel["viur_src_kind"])()
86 except AssertionError:
87 logging.info(f"""Ignoring {src_rel.key!r} which refers to unknown kind {src_rel["viur_src_kind"]!r}""")
88 continue
90 try:
91 skel.patch(lambda skel: skel.refresh(), key=src_rel["src"].key, update_relations=False)
92 except ValueError:
93 logging.warning(f"Cannot update stale reference to {src_rel["src"].key!r} referenced by {src_rel.key!r}")
94 continue
96 total += 1
98 if next_cursor := query.getCursor():
99 update_relations(
100 key=key,
101 min_change_time=min_change_time,
102 changed_bones=changed_bones,
103 cursor=next_cursor,
104 total=total
105 )
106 else:
107 logging.debug(f"update_relations finished with {total=} on {key=} {min_change_time=} {changed_bones=}")
110class SkelIterTask(tasks.QueryIter):
111 """
112 Iterates the skeletons of a query, and additionally checks a Logics expression.
113 When the skeleton is valid, it performs the action `data["action"]` on each entry.
114 """
116 @classmethod
117 def handleEntry(cls, skel, data):
118 data["total"] += 1
120 if logics.Logics(data["condition"]).run(skel):
121 data["count"] += 1
123 match data["action"]:
124 case "refresh":
125 skel.refresh()
126 if skel["key"]:
127 skel.write(update_relations=False)
129 case "delete":
130 skel.delete()
132 case other:
133 assert other == "count"
135 @classmethod
136 def handleError(cls, skel, data, exception) -> bool:
137 logging.exception(exception)
139 try:
140 logging.debug(f"{skel=!r}")
141 except Exception: # noqa
142 logging.warning("Failed to dump skel")
143 logging.debug(f"{skel.dbEntity=}")
145 data["error"] += 1
146 return True
148 @classmethod
149 def handleFinish(cls, total, data):
150 super().handleFinish(total, data)
152 if not data["notify"]:
153 return
155 txt = (
156 f"{conf.instance.project_id}: {data['action']!s} finished for {data['kind']!r}: "
157 f"{data['count']} of {data['total']}\n"
158 f"ViUR {data['action']!s}ed {data['count']} skeletons with condition <code>{data['condition']}</code> on a "
159 f"total of {data['total']} ({data['error']} errored) of kind {data['kind']}.\n"
160 )
162 try:
163 email.send_email(dests=data["notify"], stringTemplate=txt, skel=None)
164 except Exception as exc: # noqa; OverQuota, whatever
165 logging.exception(f'Failed to notify {data["notify"]}')
168@tasks.CallableTask
169class SkeletonMaintenanceTask(tasks.CallableTaskBase):
170 key = "SkeletonMaintenanceTask"
171 name = "Skeleton Maintenance"
172 descr = "Perform filtered maintenance operations on skeletons."
174 def canCall(self):
175 user = current.user.get()
176 return user and "root" in user["access"]
178 class dataSkel(RelSkel):
179 task = SelectBone(
180 descr="Task",
181 required=True,
182 values={
183 "count": "Count",
184 "refresh": "Refresh (formerly: RebuildSearchIndex)",
185 "delete": "Delete",
186 },
187 defaultValue="refresh",
188 )
190 kinds = SelectBone(
191 descr="Kind",
192 values=listKnownSkeletons,
193 required=True,
194 multiple=True,
195 )
197 class FilterRowUsingSkel(RelSkel):
198 name = StringBone(
199 required=True,
200 )
202 op = SelectBone(
203 required=True,
204 values={
205 "$eq": "=",
206 "$lt": "<",
207 "$gt": ">",
208 "$lk": "like",
209 },
210 defaultValue=" ",
211 )
213 value = StringBone(
214 required=True,
215 )
217 filters = RecordBone(
218 descr="Filter",
219 using=FilterRowUsingSkel,
220 multiple=True,
221 format="$(name)$(op)=$(value)",
222 )
224 condition = RawBone(
225 descr="Condition",
226 required=True,
227 defaultValue="False # fused: by default, doesn't affect anything.\n",
228 params={
229 "tooltip": "Enter a Logics expression here to filter entries by specific skeleton values."
230 },
231 )
233 def execute(self, task, kinds, filters, condition):
234 try:
235 logics.Logics(condition)
236 except logics.ParseException as e:
237 raise errors.BadRequest(f"Error parsing condition {e}")
239 notify = current.user.get()["name"]
241 for kind in kinds:
242 q = skeletonByKind(kind)().all()
244 for flt in filters:
245 q.mergeExternalFilter({(flt["name"] + flt["op"]).rstrip("$eq"): flt["value"]})
247 params = {
248 "action": task,
249 "notify": notify,
250 "condition": condition,
251 "kind": kind,
252 "count": 0,
253 "total": 0,
254 "error": 0,
255 }
257 SkelIterTask.startIterOnQuery(q, params)