Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/skeleton/tasks.py: 27%

105 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import logging 

2import typing as t 

3import logics 

4import time 

5 

6from viur.core import ( 

7 conf, 

8 current, 

9 db, 

10 email, 

11 errors, 

12 tasks, 

13 utils, 

14) 

15from .utils import skeletonByKind, listKnownSkeletons 

16from .relskel import RelSkel 

17 

18from ..bones.raw import RawBone 

19from ..bones.record import RecordBone 

20from ..bones.relational import RelationalBone, RelationalConsistency, RelationalUpdateLevel 

21from ..bones.select import SelectBone 

22from ..bones.string import StringBone 

23 

24 

25@tasks.CallDeferred 

26def update_relations( 

27 key: db.Key, 

28 *, 

29 min_change_time: t.Optional[float] = None, 

30 changed_bones: t.Optional[t.Iterable[str] | str] = (), 

31 cursor: t.Optional[str] = None, 

32 total: int = 0, 

33 **kwargs 

34): 

35 """ 

36 This function updates Entities, which may have a copy of values from another entity which has been recently 

37 edited (updated). In ViUR, relations are implemented by copying the values from the referenced entity into the 

38 entity that's referencing them. This allows ViUR to run queries over properties of referenced entities and 

39 prevents additional db.Get's to these referenced entities if the main entity is read. However, this forces 

40 us to track changes made to entities as we might have to update these mirrored values. This is the deferred 

41 call from meth:`viur.core.skeleton.Skeleton.write()` after an update (edit) on one Entity to do exactly that. 

42 

43 :param key: The database-key of the entity that has been edited 

44 :param min_change_time: The timestamp on which the edit occurred. As we run deferred, and the entity might have 

45 been edited multiple times before we get acutally called, we can ignore entities that have been updated 

46 in the meantime as they're already up-to-date 

47 :param changed_bones: If set, we'll update only entites that have a copy of that bones. Relations mirror only 

48 key and name by default, so we don't have to update these if only another bone has been changed. 

49 :param cursor: The database cursor for the current request as we only process five entities at once and then 

50 defer again. 

51 """ 

52 # TODO: Remove in VIUR4 

53 for _dep, _new in { 

54 "changedBone": "changed_bones", 

55 "minChangeTime": "min_change_time", 

56 "destKey": "key", 

57 }.items(): 

58 if _dep in kwargs: 

59 logging.warning(f"{_dep!r} parameter is deprecated, please use {_new!r} instead",) 

60 locals()[_new] = kwargs.pop(_dep) 

61 

62 if min_change_time is None: 

63 min_change_time = time.time() + 1 

64 

65 changed_bones = utils.ensure_iterable(changed_bones) 

66 

67 if not cursor: 

68 logging.debug(f"update_relations {key=} {min_change_time=} {changed_bones=}") 

69 

70 if request_data := current.request_data.get(): 

71 request_data["__update_relations_bones"] = changed_bones 

72 

73 query = db.Query("viur-relations") \ 

74 .filter("dest.__key__ =", key) \ 

75 .filter("viur_delayed_update_tag <", min_change_time) \ 

76 .filter("viur_relational_updateLevel =", RelationalUpdateLevel.Always.value) 

77 

78 if changed_bones: 

79 query.filter("viur_foreign_keys IN", changed_bones) 

80 

81 query.setCursor(cursor) 

82 

83 for src_rel in query.run(): 

84 try: 

85 skel = skeletonByKind(src_rel["viur_src_kind"])() 

86 except AssertionError: 

87 logging.info(f"""Ignoring {src_rel.key!r} which refers to unknown kind {src_rel["viur_src_kind"]!r}""") 

88 continue 

89 

90 if not skel.patch(lambda skel: skel.refresh(), key=src_rel["src"].key, update_relations=False): 

91 logging.warning(f"Cannot update stale reference to {src_rel["src"].key!r} referenced by {src_rel.key!r}") 

92 

93 total += 1 

94 

95 if next_cursor := query.getCursor(): 

96 update_relations( 

97 key=key, 

98 min_change_time=min_change_time, 

99 changed_bones=changed_bones, 

100 cursor=next_cursor, 

101 total=total 

102 ) 

103 else: 

104 logging.debug(f"update_relations finished with {total=} on {key=} {min_change_time=} {changed_bones=}") 

105 

106 

107class SkelIterTask(tasks.QueryIter): 

108 """ 

109 Iterates the skeletons of a query, and additionally checks a Logics expression. 

110 When the skeleton is valid, it performs the action `data["action"]` on each entry. 

111 """ 

112 

113 @classmethod 

114 def handleEntry(cls, skel, data): 

115 data["total"] += 1 

116 

117 if logics.Logics(data["condition"]).run(skel): 

118 data["count"] += 1 

119 

120 match data["action"]: 

121 case "refresh": 

122 skel.refresh() 

123 if skel["key"]: 

124 skel.write(update_relations=False) 

125 

126 case "delete": 

127 skel.delete() 

128 

129 case other: 

130 assert other == "count" 

131 

132 @classmethod 

133 def handleError(cls, skel, data, exception) -> bool: 

134 logging.exception(exception) 

135 

136 try: 

137 logging.debug(f"{skel=!r}") 

138 except Exception: # noqa 

139 logging.warning("Failed to dump skel") 

140 logging.debug(f"{skel.dbEntity=}") 

141 

142 data["error"] += 1 

143 return True 

144 

145 @classmethod 

146 def handleFinish(cls, total, data): 

147 super().handleFinish(total, data) 

148 

149 if not data["notify"]: 

150 return 

151 

152 txt = ( 

153 f"{conf.instance.project_id}: {data['action']!s} finished for {data['kind']!r}: " 

154 f"{data['count']} of {data['total']}\n" 

155 f"ViUR {data['action']!s}ed {data['count']} skeletons with condition <code>{data['condition']}</code> on a " 

156 f"total of {data['total']} ({data['error']} errored) of kind {data['kind']}.\n" 

157 ) 

158 

159 try: 

160 email.send_email(dests=data["notify"], stringTemplate=txt, skel=None) 

161 except Exception as exc: # noqa; OverQuota, whatever 

162 logging.exception(f'Failed to notify {data["notify"]}') 

163 

164 

165@tasks.CallableTask 

166class SkeletonMaintenanceTask(tasks.CallableTaskBase): 

167 key = "SkeletonMaintenanceTask" 

168 name = "Skeleton Maintenance" 

169 descr = "Perform filtered maintenance operations on skeletons." 

170 

171 def canCall(self): 

172 user = current.user.get() 

173 return user and "root" in user["access"] 

174 

175 class dataSkel(RelSkel): 

176 task = SelectBone( 

177 descr="Task", 

178 required=True, 

179 values={ 

180 "count": "Count", 

181 "refresh": "Refresh (formerly: RebuildSearchIndex)", 

182 "delete": "Delete", 

183 }, 

184 defaultValue="refresh", 

185 ) 

186 

187 kinds = SelectBone( 

188 descr="Kind", 

189 values=listKnownSkeletons, 

190 required=True, 

191 multiple=True, 

192 ) 

193 

194 class FilterRowUsingSkel(RelSkel): 

195 name = StringBone( 

196 required=True, 

197 ) 

198 

199 op = SelectBone( 

200 required=True, 

201 values={ 

202 "$eq": "=", 

203 "$lt": "<", 

204 "$gt": ">", 

205 "$lk": "like", 

206 }, 

207 defaultValue=" ", 

208 ) 

209 

210 value = StringBone( 

211 required=True, 

212 ) 

213 

214 filters = RecordBone( 

215 descr="Filter", 

216 using=FilterRowUsingSkel, 

217 multiple=True, 

218 format="$(name)$(op)=$(value)", 

219 ) 

220 

221 condition = RawBone( 

222 descr="Condition", 

223 required=True, 

224 defaultValue="False # fused: by default, doesn't affect anything.\n", 

225 params={ 

226 "tooltip": "Enter a Logics expression here to filter entries by specific skeleton values." 

227 }, 

228 ) 

229 

230 def execute(self, task, kinds, filters, condition): 

231 try: 

232 logics.Logics(condition) 

233 except logics.ParseException as e: 

234 raise errors.BadRequest(f"Error parsing condition {e}") 

235 

236 notify = current.user.get()["name"] 

237 

238 for kind in kinds: 

239 q = skeletonByKind(kind)().all() 

240 

241 for flt in filters: 

242 q.mergeExternalFilter({(flt["name"] + flt["op"]).rstrip("$eq"): flt["value"]}) 

243 

244 params = { 

245 "action": task, 

246 "notify": notify, 

247 "condition": condition, 

248 "kind": kind, 

249 "count": 0, 

250 "total": 0, 

251 "error": 0, 

252 } 

253 

254 SkelIterTask.startIterOnQuery(q, params)