Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/skeleton/tasks.py: 27%

108 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 09:00 +0000

1import logging 

2import typing as t 

3import logics 

4import time 

5 

6from viur.core import ( 

7 conf, 

8 current, 

9 db, 

10 email, 

11 errors, 

12 tasks, 

13 utils, 

14) 

15from .utils import skeletonByKind, listKnownSkeletons 

16from .relskel import RelSkel 

17 

18from ..bones.raw import RawBone 

19from ..bones.record import RecordBone 

20from ..bones.relational import RelationalBone, RelationalConsistency, RelationalUpdateLevel 

21from ..bones.select import SelectBone 

22from ..bones.string import StringBone 

23 

24 

25@tasks.CallDeferred 

26def update_relations( 

27 key: db.Key, 

28 *, 

29 min_change_time: t.Optional[float] = None, 

30 changed_bones: t.Optional[t.Iterable[str] | str] = (), 

31 cursor: t.Optional[str] = None, 

32 total: int = 0, 

33 **kwargs 

34): 

35 """ 

36 This function updates Entities, which may have a copy of values from another entity which has been recently 

37 edited (updated). In ViUR, relations are implemented by copying the values from the referenced entity into the 

38 entity that's referencing them. This allows ViUR to run queries over properties of referenced entities and 

39 prevents additional db.Get's to these referenced entities if the main entity is read. However, this forces 

40 us to track changes made to entities as we might have to update these mirrored values. This is the deferred 

41 call from meth:`viur.core.skeleton.Skeleton.write()` after an update (edit) on one Entity to do exactly that. 

42 

43 :param key: The database-key of the entity that has been edited 

44 :param min_change_time: The timestamp on which the edit occurred. As we run deferred, and the entity might have 

45 been edited multiple times before we get acutally called, we can ignore entities that have been updated 

46 in the meantime as they're already up-to-date 

47 :param changed_bones: If set, we'll update only entites that have a copy of that bones. Relations mirror only 

48 key and name by default, so we don't have to update these if only another bone has been changed. 

49 :param cursor: The database cursor for the current request as we only process five entities at once and then 

50 defer again. 

51 """ 

52 # TODO: Remove in VIUR4 

53 for _dep, _new in { 

54 "changedBone": "changed_bones", 

55 "minChangeTime": "min_change_time", 

56 "destKey": "key", 

57 }.items(): 

58 if _dep in kwargs: 

59 logging.warning(f"{_dep!r} parameter is deprecated, please use {_new!r} instead",) 

60 locals()[_new] = kwargs.pop(_dep) 

61 

62 if min_change_time is None: 

63 min_change_time = time.time() + 1 

64 

65 changed_bones = utils.ensure_iterable(changed_bones) 

66 

67 if not cursor: 

68 logging.debug(f"update_relations {key=} {min_change_time=} {changed_bones=}") 

69 

70 if request_data := current.request_data.get(): 

71 request_data["__update_relations_bones"] = changed_bones 

72 

73 query = db.Query("viur-relations") \ 

74 .filter("dest.__key__ =", key) \ 

75 .filter("viur_delayed_update_tag <", min_change_time) \ 

76 .filter("viur_relational_updateLevel =", RelationalUpdateLevel.Always.value) 

77 

78 if changed_bones: 

79 query.filter("viur_foreign_keys IN", changed_bones) 

80 

81 query.setCursor(cursor) 

82 

83 for src_rel in query.run(): 

84 try: 

85 skel = skeletonByKind(src_rel["viur_src_kind"])() 

86 except AssertionError: 

87 logging.info(f"""Ignoring {src_rel.key!r} which refers to unknown kind {src_rel["viur_src_kind"]!r}""") 

88 continue 

89 

90 try: 

91 skel.patch(lambda skel: skel.refresh(), key=src_rel["src"].key, update_relations=False) 

92 except ValueError: 

93 logging.warning(f"Cannot update stale reference to {src_rel["src"].key!r} referenced by {src_rel.key!r}") 

94 continue 

95 

96 total += 1 

97 

98 if next_cursor := query.getCursor(): 

99 update_relations( 

100 key=key, 

101 min_change_time=min_change_time, 

102 changed_bones=changed_bones, 

103 cursor=next_cursor, 

104 total=total 

105 ) 

106 else: 

107 logging.debug(f"update_relations finished with {total=} on {key=} {min_change_time=} {changed_bones=}") 

108 

109 

110class SkelIterTask(tasks.QueryIter): 

111 """ 

112 Iterates the skeletons of a query, and additionally checks a Logics expression. 

113 When the skeleton is valid, it performs the action `data["action"]` on each entry. 

114 """ 

115 

116 @classmethod 

117 def handleEntry(cls, skel, data): 

118 data["total"] += 1 

119 

120 if logics.Logics(data["condition"]).run(skel): 

121 data["count"] += 1 

122 

123 match data["action"]: 

124 case "refresh": 

125 skel.refresh() 

126 if skel["key"]: 

127 skel.write(update_relations=False) 

128 

129 case "delete": 

130 skel.delete() 

131 

132 case other: 

133 assert other == "count" 

134 

135 @classmethod 

136 def handleError(cls, skel, data, exception) -> bool: 

137 logging.exception(exception) 

138 

139 try: 

140 logging.debug(f"{skel=!r}") 

141 except Exception: # noqa 

142 logging.warning("Failed to dump skel") 

143 logging.debug(f"{skel.dbEntity=}") 

144 

145 data["error"] += 1 

146 return True 

147 

148 @classmethod 

149 def handleFinish(cls, total, data): 

150 super().handleFinish(total, data) 

151 

152 if not data["notify"]: 

153 return 

154 

155 txt = ( 

156 f"{conf.instance.project_id}: {data['action']!s} finished for {data['kind']!r}: " 

157 f"{data['count']} of {data['total']}\n" 

158 f"ViUR {data['action']!s}ed {data['count']} skeletons with condition <code>{data['condition']}</code> on a " 

159 f"total of {data['total']} ({data['error']} errored) of kind {data['kind']}.\n" 

160 ) 

161 

162 try: 

163 email.send_email(dests=data["notify"], stringTemplate=txt, skel=None) 

164 except Exception as exc: # noqa; OverQuota, whatever 

165 logging.exception(f'Failed to notify {data["notify"]}') 

166 

167 

168@tasks.CallableTask 

169class SkeletonMaintenanceTask(tasks.CallableTaskBase): 

170 key = "SkeletonMaintenanceTask" 

171 name = "Skeleton Maintenance" 

172 descr = "Perform filtered maintenance operations on skeletons." 

173 

174 def canCall(self): 

175 user = current.user.get() 

176 return user and "root" in user["access"] 

177 

178 class dataSkel(RelSkel): 

179 task = SelectBone( 

180 descr="Task", 

181 required=True, 

182 values={ 

183 "count": "Count", 

184 "refresh": "Refresh (formerly: RebuildSearchIndex)", 

185 "delete": "Delete", 

186 }, 

187 defaultValue="refresh", 

188 ) 

189 

190 kinds = SelectBone( 

191 descr="Kind", 

192 values=listKnownSkeletons, 

193 required=True, 

194 multiple=True, 

195 ) 

196 

197 class FilterRowUsingSkel(RelSkel): 

198 name = StringBone( 

199 required=True, 

200 ) 

201 

202 op = SelectBone( 

203 required=True, 

204 values={ 

205 "$eq": "=", 

206 "$lt": "<", 

207 "$gt": ">", 

208 "$lk": "like", 

209 }, 

210 defaultValue=" ", 

211 ) 

212 

213 value = StringBone( 

214 required=True, 

215 ) 

216 

217 filters = RecordBone( 

218 descr="Filter", 

219 using=FilterRowUsingSkel, 

220 multiple=True, 

221 format="$(name)$(op)=$(value)", 

222 ) 

223 

224 condition = RawBone( 

225 descr="Condition", 

226 required=True, 

227 defaultValue="False # fused: by default, doesn't affect anything.\n", 

228 params={ 

229 "tooltip": "Enter a Logics expression here to filter entries by specific skeleton values." 

230 }, 

231 ) 

232 

233 def execute(self, task, kinds, filters, condition): 

234 try: 

235 logics.Logics(condition) 

236 except logics.ParseException as e: 

237 raise errors.BadRequest(f"Error parsing condition {e}") 

238 

239 notify = current.user.get()["name"] 

240 

241 for kind in kinds: 

242 q = skeletonByKind(kind)().all() 

243 

244 for flt in filters: 

245 q.mergeExternalFilter({(flt["name"] + flt["op"]).rstrip("$eq"): flt["value"]}) 

246 

247 params = { 

248 "action": task, 

249 "notify": notify, 

250 "condition": condition, 

251 "kind": kind, 

252 "count": 0, 

253 "total": 0, 

254 "error": 0, 

255 } 

256 

257 SkelIterTask.startIterOnQuery(q, params)