Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/skeleton/tasks.py: 27%
105 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import logging
2import typing as t
3import logics
4import time
6from viur.core import (
7 conf,
8 current,
9 db,
10 email,
11 errors,
12 tasks,
13 utils,
14)
15from .utils import skeletonByKind, listKnownSkeletons
16from .relskel import RelSkel
18from ..bones.raw import RawBone
19from ..bones.record import RecordBone
20from ..bones.relational import RelationalBone, RelationalConsistency, RelationalUpdateLevel
21from ..bones.select import SelectBone
22from ..bones.string import StringBone
25@tasks.CallDeferred
26def update_relations(
27 key: db.Key,
28 *,
29 min_change_time: t.Optional[float] = None,
30 changed_bones: t.Optional[t.Iterable[str] | str] = (),
31 cursor: t.Optional[str] = None,
32 total: int = 0,
33 **kwargs
34):
35 """
36 This function updates Entities, which may have a copy of values from another entity which has been recently
37 edited (updated). In ViUR, relations are implemented by copying the values from the referenced entity into the
38 entity that's referencing them. This allows ViUR to run queries over properties of referenced entities and
39 prevents additional db.Get's to these referenced entities if the main entity is read. However, this forces
40 us to track changes made to entities as we might have to update these mirrored values. This is the deferred
41 call from meth:`viur.core.skeleton.Skeleton.write()` after an update (edit) on one Entity to do exactly that.
43 :param key: The database-key of the entity that has been edited
44 :param min_change_time: The timestamp on which the edit occurred. As we run deferred, and the entity might have
45 been edited multiple times before we get acutally called, we can ignore entities that have been updated
46 in the meantime as they're already up-to-date
47 :param changed_bones: If set, we'll update only entites that have a copy of that bones. Relations mirror only
48 key and name by default, so we don't have to update these if only another bone has been changed.
49 :param cursor: The database cursor for the current request as we only process five entities at once and then
50 defer again.
51 """
52 # TODO: Remove in VIUR4
53 for _dep, _new in {
54 "changedBone": "changed_bones",
55 "minChangeTime": "min_change_time",
56 "destKey": "key",
57 }.items():
58 if _dep in kwargs:
59 logging.warning(f"{_dep!r} parameter is deprecated, please use {_new!r} instead",)
60 locals()[_new] = kwargs.pop(_dep)
62 if min_change_time is None:
63 min_change_time = time.time() + 1
65 changed_bones = utils.ensure_iterable(changed_bones)
67 if not cursor:
68 logging.debug(f"update_relations {key=} {min_change_time=} {changed_bones=}")
70 if request_data := current.request_data.get():
71 request_data["__update_relations_bones"] = changed_bones
73 query = db.Query("viur-relations") \
74 .filter("dest.__key__ =", key) \
75 .filter("viur_delayed_update_tag <", min_change_time) \
76 .filter("viur_relational_updateLevel =", RelationalUpdateLevel.Always.value)
78 if changed_bones:
79 query.filter("viur_foreign_keys IN", changed_bones)
81 query.setCursor(cursor)
83 for src_rel in query.run():
84 try:
85 skel = skeletonByKind(src_rel["viur_src_kind"])()
86 except AssertionError:
87 logging.info(f"""Ignoring {src_rel.key!r} which refers to unknown kind {src_rel["viur_src_kind"]!r}""")
88 continue
90 if not skel.patch(lambda skel: skel.refresh(), key=src_rel["src"].key, update_relations=False):
91 logging.warning(f"Cannot update stale reference to {src_rel["src"].key!r} referenced by {src_rel.key!r}")
93 total += 1
95 if next_cursor := query.getCursor():
96 update_relations(
97 key=key,
98 min_change_time=min_change_time,
99 changed_bones=changed_bones,
100 cursor=next_cursor,
101 total=total
102 )
103 else:
104 logging.debug(f"update_relations finished with {total=} on {key=} {min_change_time=} {changed_bones=}")
107class SkelIterTask(tasks.QueryIter):
108 """
109 Iterates the skeletons of a query, and additionally checks a Logics expression.
110 When the skeleton is valid, it performs the action `data["action"]` on each entry.
111 """
113 @classmethod
114 def handleEntry(cls, skel, data):
115 data["total"] += 1
117 if logics.Logics(data["condition"]).run(skel):
118 data["count"] += 1
120 match data["action"]:
121 case "refresh":
122 skel.refresh()
123 if skel["key"]:
124 skel.write(update_relations=False)
126 case "delete":
127 skel.delete()
129 case other:
130 assert other == "count"
132 @classmethod
133 def handleError(cls, skel, data, exception) -> bool:
134 logging.exception(exception)
136 try:
137 logging.debug(f"{skel=!r}")
138 except Exception: # noqa
139 logging.warning("Failed to dump skel")
140 logging.debug(f"{skel.dbEntity=}")
142 data["error"] += 1
143 return True
145 @classmethod
146 def handleFinish(cls, total, data):
147 super().handleFinish(total, data)
149 if not data["notify"]:
150 return
152 txt = (
153 f"{conf.instance.project_id}: {data['action']!s} finished for {data['kind']!r}: "
154 f"{data['count']} of {data['total']}\n"
155 f"ViUR {data['action']!s}ed {data['count']} skeletons with condition <code>{data['condition']}</code> on a "
156 f"total of {data['total']} ({data['error']} errored) of kind {data['kind']}.\n"
157 )
159 try:
160 email.send_email(dests=data["notify"], stringTemplate=txt, skel=None)
161 except Exception as exc: # noqa; OverQuota, whatever
162 logging.exception(f'Failed to notify {data["notify"]}')
165@tasks.CallableTask
166class SkeletonMaintenanceTask(tasks.CallableTaskBase):
167 key = "SkeletonMaintenanceTask"
168 name = "Skeleton Maintenance"
169 descr = "Perform filtered maintenance operations on skeletons."
171 def canCall(self):
172 user = current.user.get()
173 return user and "root" in user["access"]
175 class dataSkel(RelSkel):
176 task = SelectBone(
177 descr="Task",
178 required=True,
179 values={
180 "count": "Count",
181 "refresh": "Refresh (formerly: RebuildSearchIndex)",
182 "delete": "Delete",
183 },
184 defaultValue="refresh",
185 )
187 kinds = SelectBone(
188 descr="Kind",
189 values=listKnownSkeletons,
190 required=True,
191 multiple=True,
192 )
194 class FilterRowUsingSkel(RelSkel):
195 name = StringBone(
196 required=True,
197 )
199 op = SelectBone(
200 required=True,
201 values={
202 "$eq": "=",
203 "$lt": "<",
204 "$gt": ">",
205 "$lk": "like",
206 },
207 defaultValue=" ",
208 )
210 value = StringBone(
211 required=True,
212 )
214 filters = RecordBone(
215 descr="Filter",
216 using=FilterRowUsingSkel,
217 multiple=True,
218 format="$(name)$(op)=$(value)",
219 )
221 condition = RawBone(
222 descr="Condition",
223 required=True,
224 defaultValue="False # fused: by default, doesn't affect anything.\n",
225 params={
226 "tooltip": "Enter a Logics expression here to filter entries by specific skeleton values."
227 },
228 )
230 def execute(self, task, kinds, filters, condition):
231 try:
232 logics.Logics(condition)
233 except logics.ParseException as e:
234 raise errors.BadRequest(f"Error parsing condition {e}")
236 notify = current.user.get()["name"]
238 for kind in kinds:
239 q = skeletonByKind(kind)().all()
241 for flt in filters:
242 q.mergeExternalFilter({(flt["name"] + flt["op"]).rstrip("$eq"): flt["value"]})
244 params = {
245 "action": task,
246 "notify": notify,
247 "condition": condition,
248 "kind": kind,
249 "count": 0,
250 "total": 0,
251 "error": 0,
252 }
254 SkelIterTask.startIterOnQuery(q, params)