Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/tasks.py: 24%

427 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import abc 

2import datetime 

3import functools 

4import logging 

5import os 

6import sys 

7import traceback 

8import typing as t 

9 

10import grpc 

11import requests 

12from google import protobuf 

13from google.cloud import tasks_v2 

14 

15from viur.core import current, db, errors, utils 

16from viur.core.config import conf 

17from viur.core.decorators import exposed, skey 

18from viur.core.module import Module 

19 

20CUSTOM_OBJ = t.TypeVar("CUSTOM_OBJ") # A JSON serializable object 

21 

22 

23class CustomEnvironmentHandler(abc.ABC): 

24 @abc.abstractmethod 

25 def serialize(self) -> CUSTOM_OBJ: 

26 """Serialize custom environment data 

27 

28 This function must not require any parameters and must 

29 return a JSON serializable object with the desired information. 

30 """ 

31 ... 

32 

33 @abc.abstractmethod 

34 def restore(self, obj: CUSTOM_OBJ) -> None: 

35 """Restore custom environment data 

36 

37 This function will receive the object from :meth:`serialize` and should write 

38 the information it contains to the environment of the deferred request. 

39 """ 

40 ... 

41 

42 

43_gaeApp = os.environ.get("GAE_APPLICATION") 

44 

45queueRegion = None 

46if _gaeApp: 46 ↛ 63line 46 didn't jump to line 63 because the condition on line 46 was always true

47 

48 try: 

49 headers = {"Metadata-Flavor": "Google"} 

50 r = requests.get("http://metadata.google.internal/computeMetadata/v1/instance/region", headers=headers) 

51 # r.text should be look like this "projects/(project-number)/region/(region)" 

52 # like so "projects/1234567890/region/europe-west3" 

53 queueRegion = r.text.split("/")[-1] 

54 except Exception as e: # Something went wrong with the Google Metadata Sever we use the old way 

55 logging.warning(f"Can't obtain queueRegion from Google MetaData Server due exception {e=}") 

56 regionPrefix = _gaeApp.split("~")[0] 

57 regionMap = { 

58 "h": "europe-west3", 

59 "e": "europe-west1" 

60 } 

61 queueRegion = regionMap.get(regionPrefix) 

62 

63if not queueRegion and conf.instance.is_dev_server and os.getenv("TASKS_EMULATOR") is None: 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true

64 # Probably local development server 

65 logging.warning("Taskqueue disabled, tasks will run inline!") 

66 

67if not conf.instance.is_dev_server or os.getenv("TASKS_EMULATOR") is None: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true

68 taskClient = tasks_v2.CloudTasksClient() 

69else: 

70 taskClient = tasks_v2.CloudTasksClient( 

71 transport=tasks_v2.services.cloud_tasks.transports.CloudTasksGrpcTransport( 

72 channel=grpc.insecure_channel(os.getenv("TASKS_EMULATOR")) 

73 ) 

74 ) 

75 queueRegion = "local" 

76 

77_periodicTasks: dict[str, dict[t.Callable, datetime.timedelta]] = {} 

78_callableTasks = {} 

79_deferred_tasks = {} 

80_startupTasks = [] 

81_appengineServiceIPs = {"10.0.0.1", "0.1.0.1", "0.1.0.2"} 

82 

83 

84class PermanentTaskFailure(Exception): 

85 """Indicates that a task failed, and will never succeed.""" 

86 pass 

87 

88 

89def removePeriodicTask(task: t.Callable) -> None: 

90 """ 

91 Removes a periodic task from the queue. Useful to unqueue an task 

92 that has been inherited from an overridden module. 

93 """ 

94 global _periodicTasks 

95 assert "periodicTaskName" in dir(task), "This is not a periodic task? " 

96 for queueDict in _periodicTasks.values(): 

97 if task in queueDict: 

98 del queueDict[task] 

99 

100 

101class CallableTaskBase: 

102 """ 

103 Base class for user-callable tasks. 

104 Must be subclassed. 

105 """ 

106 key = None # Unique identifier for this task 

107 name = None # Human-Readable name 

108 descr = None # Human-Readable description 

109 kindName = "server-task" 

110 

111 def canCall(self) -> bool: 

112 """ 

113 Checks wherever the current user can execute this task 

114 :returns: bool 

115 """ 

116 return False 

117 

118 def dataSkel(self): 

119 """ 

120 If additional data is needed, return a skeleton-instance here. 

121 These values are then passed to *execute*. 

122 """ 

123 return None 

124 

125 def execute(self): 

126 """ 

127 The actual code that should be run goes here. 

128 """ 

129 raise NotImplementedError() 

130 

131 

132class TaskHandler(Module): 

133 """ 

134 Task Handler. 

135 Handles calling of Tasks (queued and periodic), and performs updatechecks 

136 Do not Modify. Do not Subclass. 

137 """ 

138 adminInfo = None 

139 retryCountWarningThreshold = 25 

140 

141 def findBoundTask(self, task: t.Callable, obj: object, depth: int = 0) -> t.Optional[tuple[t.Callable, object]]: 

142 

143 """ 

144 Tries to locate the instance, this function belongs to. 

145 If it succeeds in finding it, it returns the function and its instance (-> its "self"). 

146 Otherwise, None is returned. 

147 :param task: A callable decorated with @PeriodicTask 

148 :param obj: Object, which will be scanned in the current iteration. 

149 :param depth: Current iteration depth. 

150 """ 

151 if depth > 3 or "periodicTaskName" not in dir(task): # Limit the maximum amount of recursions 

152 return None 

153 for attr in dir(obj): 

154 if attr.startswith("_"): 

155 continue 

156 try: 

157 v = getattr(obj, attr) 

158 except AttributeError: 

159 continue 

160 if callable(v) and "periodicTaskName" in dir(v) and str(v.periodicTaskName) == str(task.periodicTaskName): 

161 return v, obj 

162 if not isinstance(v, str) and not callable(v): 

163 res = self.findBoundTask(task, v, depth + 1) 

164 if res: 

165 return res 

166 return None 

167 

168 @exposed 

169 def queryIter(self, *args, **kwargs): 

170 """ 

171 This processes one chunk of a queryIter (see below). 

172 """ 

173 req = current.request.get().request 

174 self._validate_request() 

175 data = utils.json.loads(req.body) 

176 if data["classID"] not in MetaQueryIter._classCache: 

177 logging.error(f"""Could not continue queryIter - {data["classID"]} not known on this instance""") 

178 MetaQueryIter._classCache[data["classID"]]._qryStep(data) 

179 

180 @exposed 

181 def deferred(self, *args, **kwargs): 

182 """ 

183 This catches one deferred call and routes it to its destination 

184 """ 

185 req = current.request.get().request 

186 self._validate_request() 

187 # Check if the retry count exceeds our warning threshold 

188 retryCount = req.headers.get("X-Appengine-Taskretrycount", None) 

189 if retryCount and int(retryCount) == self.retryCountWarningThreshold: 

190 from viur.core import email 

191 email.send_email_to_admins( 

192 "Deferred task retry counter exceeded warning threshold", 

193 f"""Task {req.headers.get("X-Appengine-Taskname", "")} is retried for the {retryCount}th time.""" 

194 ) 

195 

196 cmd, data = utils.json.loads(req.body) 

197 funcPath, args, kwargs, env = data 

198 if conf.debug.trace: 

199 logging.debug(f"Call task {funcPath} with {cmd=} {args=} {kwargs=} {env=}") 

200 

201 if env: 

202 if "user" in env and env["user"]: 

203 current.session.get()["user"] = env["user"] 

204 # FIXME: We do not have a fully loaded session from the cookie here, 

205 # but only a partial session. 

206 # But we still leave `loaded` on False, which leads to problems. 

207 

208 # Load current user into context variable if user module is there. 

209 if user_mod := getattr(conf.main_app.vi, "user", None): 

210 current.user.set(user_mod.getCurrentUser()) 

211 if "lang" in env and env["lang"]: 

212 current.language.set(env["lang"]) 

213 if "transactionMarker" in env: 

214 marker = db.get(db.Key("viur-transactionmarker", env["transactionMarker"])) 

215 if not marker: 

216 logging.info(f"""Dropping task, transaction {env["transactionMarker"]} did not apply""") 

217 return 

218 else: 

219 logging.info(f"""Executing task, transaction {env["transactionMarker"]} did succeed""") 

220 if "custom" in env and conf.tasks_custom_environment_handler: 

221 # Check if we need to restore additional environmental data 

222 conf.tasks_custom_environment_handler.restore(env["custom"]) 

223 if cmd == "rel": 

224 caller = conf.main_app 

225 pathlist = [x for x in funcPath.split("/") if x] 

226 for currpath in pathlist: 

227 if currpath not in dir(caller): 

228 logging.error(f"Could not resolve {funcPath=} (failed part was {currpath!r})") 

229 return 

230 caller = getattr(caller, currpath) 

231 try: 

232 caller(*args, **kwargs) 

233 except PermanentTaskFailure: 

234 logging.error("PermanentTaskFailure") 

235 except Exception as e: 

236 logging.exception(e) 

237 raise errors.RequestTimeout() # Task-API should retry 

238 elif cmd == "unb": 

239 if funcPath not in _deferred_tasks: 

240 logging.error(f"Missed deferred task {funcPath=} ({args=},{kwargs=})") 

241 # We call the deferred function *directly* (without walking through the mkDeferred lambda), so ensure 

242 # that any hit to another deferred function will defer again 

243 

244 current.request.get().DEFERRED_TASK_CALLED = True 

245 try: 

246 _deferred_tasks[funcPath](*args, **kwargs) 

247 except PermanentTaskFailure: 

248 logging.error("PermanentTaskFailure") 

249 except Exception as e: 

250 logging.exception(e) 

251 raise errors.RequestTimeout() # Task-API should retry 

252 

253 @exposed 

254 def cron(self, cronName="default", *args, **kwargs): 

255 req = current.request.get() 

256 if not conf.instance.is_dev_server: 

257 self._validate_request(require_cron=True, require_taskname=False) 

258 if cronName not in _periodicTasks: 

259 logging.warning(f"Cron request {cronName} doesn't have any tasks") 

260 # We must defer from cron, as tasks will interpret it as a call originating from task-queue - causing deferred 

261 # functions to be called directly, wich causes calls with _countdown etc set to fail. 

262 req.DEFERRED_TASK_CALLED = True 

263 for task, interval in _periodicTasks[cronName].items(): # Call all periodic tasks bound to that queue 

264 periodicTaskName = task.periodicTaskName.lower() 

265 if interval: # Ensure this task doesn't get called to often 

266 lastCall = db.get(db.Key("viur-task-interval", periodicTaskName)) 

267 if lastCall and utils.utcNow() - lastCall["date"] < interval: 

268 logging.debug(f"Task {periodicTaskName!r} has already run recently - skipping.") 

269 continue 

270 res = self.findBoundTask(task, conf.main_app) 

271 try: 

272 if res: # Its bound, call it this way :) 

273 res[0]() 

274 else: 

275 task() # It seems it wasn't bound - call it as a static method 

276 except Exception as e: 

277 logging.error(f"Error calling periodic task {periodicTaskName}") 

278 logging.exception(e) 

279 else: 

280 logging.debug(f"Successfully called task {periodicTaskName}") 

281 if interval: 

282 # Update its last-call timestamp 

283 entry = db.Entity(db.Key("viur-task-interval", periodicTaskName)) 

284 entry["date"] = utils.utcNow() 

285 db.put(entry) 

286 logging.debug("Periodic tasks complete") 

287 

288 def _validate_request( 

289 self, 

290 *, 

291 require_cron: bool = False, 

292 require_taskname: bool = True, 

293 ) -> None: 

294 """ 

295 Validate the header and metadata of a request 

296 

297 If the request is valid, None will be returned. 

298 Otherwise, an exception will be raised. 

299 

300 :param require_taskname: Require "X-AppEngine-TaskName" header 

301 :param require_cron: Require "X-Appengine-Cron" header 

302 """ 

303 req = current.request.get().request 

304 if ( 

305 req.environ.get("HTTP_X_APPENGINE_USER_IP") not in _appengineServiceIPs 

306 and (not conf.instance.is_dev_server or os.getenv("TASKS_EMULATOR") is None) 

307 ): 

308 logging.critical("Detected an attempted XSRF attack. This request did not originate from Task Queue.") 

309 raise errors.Forbidden() 

310 if require_cron and "X-Appengine-Cron" not in req.headers: 

311 logging.critical('Detected an attempted XSRF attack. The header "X-AppEngine-Cron" was not set.') 

312 raise errors.Forbidden() 

313 if require_taskname and "X-AppEngine-TaskName" not in req.headers: 

314 logging.critical('Detected an attempted XSRF attack. The header "X-AppEngine-Taskname" was not set.') 

315 raise errors.Forbidden() 

316 

317 @exposed 

318 def list(self, *args, **kwargs): 

319 """Lists all user-callable tasks which are callable by this user""" 

320 global _callableTasks 

321 

322 from viur.core.skeleton import SkeletonInstance, SkelList, RelSkel 

323 from viur.core.bones import BaseBone, StringBone 

324 

325 class TaskSkel(RelSkel): 

326 key = BaseBone() 

327 name = StringBone() 

328 descr = StringBone() 

329 

330 tasks = SkelList(TaskSkel, *( 

331 SkeletonInstance(TaskSkel, { 

332 "key": task.key, 

333 "name": str(task.name), 

334 "descr": str(task.descr) 

335 }) for task in _callableTasks.values() if task().canCall() 

336 )) 

337 

338 return self.render.list(tasks) 

339 

340 @exposed 

341 @skey(allow_empty=True) 

342 def execute(self, taskID, *, bounce: bool = False, **kwargs): 

343 """Queues a specific task for the next maintenance run""" 

344 global _callableTasks 

345 

346 if taskID in _callableTasks: 

347 task = _callableTasks[taskID]() 

348 else: 

349 return 

350 

351 if not task.canCall(): 

352 raise errors.Unauthorized() 

353 

354 skel = task.dataSkel() 

355 if ( 

356 not kwargs 

357 or not skel.fromClient(kwargs) 

358 or bounce 

359 ): 

360 return self.render.add(skel) 

361 

362 task.execute(**skel.accessedValues) 

363 

364 return self.render.addSuccess(skel) 

365 

366 

367TaskHandler.admin = True 

368TaskHandler.vi = True 

369TaskHandler.html = True 

370 

371 

372# Decorators 

373 

374def retry_n_times(retries: int, email_recipients: None | str | list[str] = None, 

375 tpl: None | str = None) -> t.Callable: 

376 """ 

377 Wrapper for deferred tasks to limit the amount of retries 

378 

379 :param retries: Number of maximum allowed retries 

380 :param email_recipients: Email addresses to which a info should be sent 

381 when the retry limit is reached. 

382 :param tpl: Instead of the standard text, a custom template can be used. 

383 The name of an email template must be specified. 

384 """ 

385 # language=Jinja2 

386 string_template = \ 

387 """Task {{func_name}} failed {{retries}} times 

388 This was the last attempt.<br> 

389 <pre>{{func_module|escape}}.{{func_name|escape}}({{signature|escape}})</pre> 

390 <pre>{{traceback|escape}}</pre>""" 

391 

392 def outer_wrapper(func): 

393 @functools.wraps(func) 

394 def inner_wrapper(*args, **kwargs): 

395 try: 

396 retry_count = int(current.request.get().request.headers.get("X-Appengine-Taskretrycount", -1)) 

397 except AttributeError: 

398 # During warmup current.request is None (at least on local devserver) 

399 retry_count = -1 

400 try: 

401 return func(*args, **kwargs) 

402 except Exception as exc: 

403 logging.exception(f"Task {func.__qualname__} failed: {exc}") 

404 logging.info( 

405 f"This was the {retry_count}. retry." 

406 f"{retries - retry_count} retries remaining. (total = {retries})" 

407 ) 

408 if retry_count < retries: 

409 # Raise the exception to mark this task as failed, so the task queue can retry it. 

410 raise exc 

411 else: 

412 if email_recipients: 

413 args_repr = [repr(arg) for arg in args] 

414 kwargs_repr = [f"{k!s}={v!r}" for k, v in kwargs.items()] 

415 signature = ", ".join(args_repr + kwargs_repr) 

416 try: 

417 from viur.core import email 

418 email.send_email( 

419 dests=email_recipients, 

420 tpl=tpl, 

421 stringTemplate=string_template if tpl is None else string_template, 

422 # The following params provide information for the emails templates 

423 func_name=func.__name__, 

424 func_qualname=func.__qualname__, 

425 func_module=func.__module__, 

426 retries=retries, 

427 args=args, 

428 kwargs=kwargs, 

429 signature=signature, 

430 traceback=traceback.format_exc(), 

431 ) 

432 except Exception: 

433 logging.exception("Failed to send email to %r", email_recipients) 

434 # Mark as permanently failed (could return nothing too) 

435 raise PermanentTaskFailure() 

436 

437 return inner_wrapper 

438 

439 return outer_wrapper 

440 

441 

442def noRetry(f): 

443 """Prevents a deferred Function from being called a second time""" 

444 logging.warning("Use of `@noRetry` is deprecated; Use `@retry_n_times(0)` instead!", stacklevel=2) 

445 return retry_n_times(0)(f) 

446 

447 

448def CallDeferred(func: t.Callable) -> t.Callable: 

449 """ 

450 This is a decorator, which always calls the wrapped method deferred. 

451 

452 The call will be packed and queued into a Cloud Tasks queue. 

453 The Task Queue calls the TaskHandler which executed the wrapped function 

454 with the originally arguments in a different request. 

455 

456 

457 In addition to the arguments for the wrapped methods you can set these: 

458 

459 _queue: Specify the queue in which the task should be pushed. 

460 If no value is given, the queue name set in `conf.tasks_default_queues` 

461 will be used. If the config does not have a value for this task, "default" 

462 is used as the default. The queue must exist (use the queue.yaml). 

463 

464 _countdown: Specify a time in seconds after which the task should be called. 

465 This time is relative to the moment where the wrapped method has been called. 

466 

467 _eta: Instead of a relative _countdown value you can specify a `datetime` 

468 when the task is scheduled to be attempted or retried. 

469 

470 _name: Specify a custom name for the cloud task. Must be unique and can 

471 contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), colons (:), or periods (.). 

472 

473 _target_version: Specify a version on which to run this task. 

474 By default, a task will be run on the same version where the wrapped method has been called. 

475 

476 _call_deferred: Calls the @CallDeferred decorated method directly. 

477 This is for example necessary, to call a super method which is decorated with @CallDeferred. 

478 

479 .. code-block:: python 

480 

481 # Example for use of the _call_deferred-parameter 

482 class A(Module): 

483 @CallDeferred 

484 def task(self): 

485 ... 

486 

487 class B(A): 

488 @CallDeferred 

489 def task(self): 

490 super().task(_call_deferred=False) # avoid secondary deferred call 

491 ... 

492 

493 See also: 

494 https://cloud.google.com/python/docs/reference/cloudtasks/latest/google.cloud.tasks_v2.types.Task 

495 https://cloud.google.com/python/docs/reference/cloudtasks/latest/google.cloud.tasks_v2.types.CreateTaskRequest 

496 """ 

497 if "viur_doc_build" in dir(sys): 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true

498 return func 

499 

500 __undefinedFlag_ = object() 

501 

502 def make_deferred( 

503 func: t.Callable, 

504 self=__undefinedFlag_, 

505 *args, 

506 _queue: str = None, 

507 _name: str | None = None, 

508 _call_deferred: bool = True, 

509 _target_version: str = conf.instance.app_version, 

510 _eta: datetime.datetime | None = None, 

511 _countdown: int = 0, 

512 **kwargs 

513 ): 

514 if _eta is not None and _countdown: 

515 raise ValueError("You cannot set the _countdown and _eta argument together!") 

516 if conf.debug.trace: 

517 logging.debug( 

518 f"make_deferred {func=}, {self=}, {args=}, {kwargs=}, " 

519 f"{_queue=}, {_name=}, {_call_deferred=}, {_target_version=}, {_eta=}, {_countdown=}" 

520 ) 

521 

522 try: 

523 req = current.request.get() 

524 except Exception: # This will fail for warmup requests 

525 req = None 

526 

527 if not queueRegion: 

528 # Run tasks inline 

529 logging.debug(f"{func=} will be executed inline") 

530 

531 @functools.wraps(func) 

532 def task(): 

533 if self is __undefinedFlag_: 

534 return func(*args, **kwargs) 

535 else: 

536 return func(self, *args, **kwargs) 

537 

538 if req: 

539 req.pendingTasks.append(task) # This property only exists on development server! 

540 else: 

541 # Warmup request or something - we have to call it now as we can't defer it :/ 

542 task() 

543 

544 return # Ensure no result gets passed back 

545 

546 # It's the deferred method which is called from the task queue, this has to be called directly 

547 _call_deferred &= not (req and req.request.headers.get("X-Appengine-Taskretrycount") 

548 and "DEFERRED_TASK_CALLED" not in dir(req)) 

549 

550 if not _call_deferred: 

551 if self is __undefinedFlag_: 

552 return func(*args, **kwargs) 

553 

554 req.DEFERRED_TASK_CALLED = True 

555 return func(self, *args, **kwargs) 

556 

557 else: 

558 try: 

559 if self.__class__.__name__ == "index": 

560 funcPath = func.__name__ 

561 else: 

562 funcPath = f"{self.modulePath}/{func.__name__}" 

563 command = "rel" 

564 except Exception: 

565 funcPath = f"{func.__name__}.{func.__module__}" 

566 if self is not __undefinedFlag_: 

567 args = (self,) + args # Re-append self to args, as this function is (hopefully) unbound 

568 command = "unb" 

569 

570 if _queue is None: 

571 _queue = conf.tasks_default_queues.get( 

572 funcPath, conf.tasks_default_queues.get("__default__", "default") 

573 ) 

574 

575 # Try to preserve the important data from the current environment 

576 try: # We might get called inside a warmup request without session 

577 usr = current.session.get().get("user") 

578 if "password" in usr: 

579 del usr["password"] 

580 except Exception: 

581 usr = None 

582 

583 env = {"user": usr} 

584 

585 try: 

586 env["lang"] = current.language.get() 

587 except AttributeError: # This isn't originating from a normal request 

588 pass 

589 

590 if db.is_in_transaction(): 

591 # We have to ensure transaction guarantees for that task also 

592 env["transactionMarker"] = db.acquire_transaction_success_marker() 

593 # We move that task at least 90 seconds into the future so the transaction has time to settle 

594 _countdown = max(90, _countdown) # Countdown can be set to None 

595 

596 if conf.tasks_custom_environment_handler: 

597 # Check if this project relies on additional environmental variables and serialize them too 

598 env["custom"] = conf.tasks_custom_environment_handler.serialize() 

599 

600 # Create task description 

601 task = tasks_v2.Task( 

602 app_engine_http_request=tasks_v2.AppEngineHttpRequest( 

603 body=utils.json.dumps((command, (funcPath, args, kwargs, env))).encode(), 

604 http_method=tasks_v2.HttpMethod.POST, 

605 relative_uri="/_tasks/deferred", 

606 app_engine_routing=tasks_v2.AppEngineRouting( 

607 version=_target_version, 

608 ), 

609 ), 

610 ) 

611 if _name is not None: 

612 task.name = taskClient.task_path(conf.instance.project_id, queueRegion, _queue, _name) 

613 

614 # Set a schedule time in case eta (absolut) or countdown (relative) was set. 

615 if seconds := _countdown: 

616 _eta = utils.utcNow() + datetime.timedelta(seconds=seconds) 

617 if _eta: 

618 # We must send a Timestamp Protobuf instead of a date-string 

619 timestamp = protobuf.timestamp_pb2.Timestamp() 

620 timestamp.FromDatetime(_eta) 

621 task.schedule_time = timestamp 

622 

623 # Use the client to build and send the task. 

624 parent = taskClient.queue_path(conf.instance.project_id, queueRegion, _queue) 

625 logging.debug(f"{parent=}, {task=}") 

626 taskClient.create_task(tasks_v2.CreateTaskRequest(parent=parent, task=task)) 

627 

628 logging.info(f"Created task {func.__name__}.{func.__module__} with {args=} {kwargs=} {env=}") 

629 

630 global _deferred_tasks 

631 _deferred_tasks[f"{func.__name__}.{func.__module__}"] = func 

632 

633 @functools.wraps(func) 

634 def wrapper(*args, **kwargs): 

635 return make_deferred(func, *args, **kwargs) 

636 

637 return wrapper 

638 

639 

640def callDeferred(func): 

641 """ 

642 Deprecated version of CallDeferred 

643 """ 

644 import logging, warnings 

645 

646 msg = "Use of @callDeferred is deprecated, use @CallDeferred instead." 

647 logging.warning(msg, stacklevel=3) 

648 warnings.warn(msg, stacklevel=3) 

649 

650 return CallDeferred(func) 

651 

652 

653def PeriodicTask(interval: datetime.timedelta | int | float = 0, cronName: str = "default") -> t.Callable: 

654 """ 

655 Decorator to call a function periodically during cron job execution. 

656 

657 Interval defines a lower bound for the call-frequency for the given task, specified as a timedelta. 

658 

659 The true interval of how often cron jobs are being executed is defined in the project's cron.yaml file. 

660 This defaults to 4 hours (see https://github.com/viur-framework/viur-base/blob/main/deploy/cron.yaml). 

661 In case the interval defined here is lower than 4 hours, the task will be fired once every 4 hours anyway. 

662 

663 :param interval: Call at most the given timedelta. 

664 """ 

665 

666 def make_decorator(fn): 

667 nonlocal interval 

668 if fn.__name__.startswith("_"): 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true

669 raise RuntimeError("Periodic called methods cannot start with an underscore! " 

670 f"Please rename {fn.__name__!r}") 

671 

672 if cronName not in _periodicTasks: 

673 _periodicTasks[cronName] = {} 

674 

675 if isinstance(interval, (int, float)) and "tasks.periodic.useminutes" in conf.compatibility: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true

676 logging.warning( 

677 f"PeriodicTask assuming {interval=} minutes here. This is changed into seconds in future. " 

678 f"Please use `datetime.timedelta(minutes={interval})` for clarification.", 

679 stacklevel=2, 

680 ) 

681 interval *= 60 

682 

683 _periodicTasks[cronName][fn] = utils.parse.timedelta(interval) 

684 fn.periodicTaskName = f"{fn.__module__}_{fn.__qualname__}".replace(".", "_").lower() 

685 return fn 

686 

687 return make_decorator 

688 

689 

690def CallableTask(fn: t.Callable) -> t.Callable: 

691 """Marks a Class as representing a user-callable Task. 

692 It *should* extend CallableTaskBase and *must* provide 

693 its API 

694 """ 

695 global _callableTasks 

696 _callableTasks[fn.key] = fn 

697 return fn 

698 

699 

700def StartupTask(fn: t.Callable) -> t.Callable: 

701 """ 

702 Functions decorated with this are called shortly at instance startup. 

703 It's *not* guaranteed that they actually run on the instance that just started up! 

704 Wrapped functions must not take any arguments. 

705 """ 

706 global _startupTasks 

707 _startupTasks.append(fn) 

708 return fn 

709 

710 

711@CallDeferred 

712def runStartupTasks(): 

713 """ 

714 Runs all queued startupTasks. 

715 Do not call directly! 

716 """ 

717 global _startupTasks 

718 for st in _startupTasks: 

719 st() 

720 

721 

722class MetaQueryIter(type): 

723 """ 

724 This is the meta class for QueryIters. 

725 Used only to keep track of all subclasses of QueryIter so we can emit the callbacks 

726 on the correct class. 

727 """ 

728 _classCache = {} # Mapping className -> Class 

729 

730 def __init__(cls, name, bases, dct): 

731 MetaQueryIter._classCache[str(cls)] = cls 

732 cls.__classID__ = str(cls) 

733 super(MetaQueryIter, cls).__init__(name, bases, dct) 

734 

735 

736class QueryIter(object, metaclass=MetaQueryIter): 

737 """ 

738 BaseClass to run a database Query and process each entry matched. 

739 This will run each step deferred, so it is possible to process an arbitrary number of entries 

740 without being limited by time or memory. 

741 

742 To use this class create a subclass, override the classmethods handleEntry and handleFinish and then 

743 call startIterOnQuery with an instance of a database Query (and possible some custom data to pass along) 

744 """ 

745 queueName = "default" # Name of the taskqueue we will run on 

746 

747 @classmethod 

748 def startIterOnQuery(cls, query: db.Query, customData: t.Any = None) -> None: 

749 """ 

750 Starts iterating the given query on this class. Will return immediately, the first batch will already 

751 run deferred. 

752 

753 Warning: Any custom data *must* be json-serializable and *must* be passed in customData. You cannot store 

754 any data on this class as each chunk may run on a different instance! 

755 """ 

756 assert not (query._customMultiQueryMerge or query._calculateInternalMultiQueryLimit), \ 

757 "Cannot iter a query with postprocessing" 

758 assert isinstance(query.queries, db.QueryDefinition), "Unsatisfiable query or query with an IN filter" 

759 qryDict = { 

760 "kind": query.kind, 

761 "srcSkel": query.srcSkel.kindName if query.srcSkel is not None else None, 

762 "filters": query.queries.filters, 

763 "orders": [(propName, sortOrder.value) for propName, sortOrder in query.queries.orders], 

764 "startCursor": query.queries.startCursor, 

765 "endCursor": query.queries.endCursor, 

766 "origKind": query.origKind, 

767 "distinct": query.queries.distinct, 

768 "classID": cls.__classID__, 

769 "customData": customData, 

770 "totalCount": 0 

771 } 

772 cls._requeueStep(qryDict) 

773 

774 @classmethod 

775 def _requeueStep(cls, qryDict: dict[str, t.Any]) -> None: 

776 """ 

777 Internal use only. Pushes a new step defined in qryDict to either the taskqueue or append it to 

778 the current request if we are on the local development server. 

779 """ 

780 if not queueRegion: # Run tasks inline - hopefully development server 

781 req = current.request.get() 

782 task = lambda *args, **kwargs: cls._qryStep(qryDict) 

783 if req: 

784 req.pendingTasks.append(task) # < This property will be only exist on development server! 

785 return 

786 taskClient.create_task(tasks_v2.CreateTaskRequest( 

787 parent=taskClient.queue_path(conf.instance.project_id, queueRegion, cls.queueName), 

788 task=tasks_v2.Task( 

789 app_engine_http_request=tasks_v2.AppEngineHttpRequest( 

790 body=utils.json.dumps(qryDict).encode(), 

791 http_method=tasks_v2.HttpMethod.POST, 

792 relative_uri="/_tasks/queryIter", 

793 app_engine_routing=tasks_v2.AppEngineRouting( 

794 version=conf.instance.app_version, 

795 ), 

796 ) 

797 ), 

798 )) 

799 

800 @classmethod 

801 def _qryStep(cls, qryDict: dict[str, t.Any]) -> None: 

802 """ 

803 Internal use only. Processes one block of five entries from the query defined in qryDict and 

804 reschedules the next block. 

805 """ 

806 from viur.core.skeleton import skeletonByKind 

807 

808 qry = db.Query(qryDict["kind"]) 

809 qry.srcSkel = skeletonByKind(qryDict["srcSkel"])() if qryDict["srcSkel"] else None 

810 qry.queries.filters = qryDict["filters"] 

811 qry.queries.orders = [(propName, db.SortOrder(sortOrder)) for propName, sortOrder in qryDict["orders"]] 

812 qry.setCursor(qryDict["startCursor"], qryDict["endCursor"]) 

813 qry.origKind = qryDict["origKind"] 

814 qry.queries.distinct = qryDict["distinct"] 

815 

816 if qry.srcSkel is not None: 

817 qryIter = qry.fetch(5) 

818 else: 

819 qryIter = qry.run(5) 

820 

821 for item in qryIter: 

822 try: 

823 cls.handleEntry(item, qryDict["customData"]) 

824 except Exception as exception: 

825 logging.error(f"{exception=}") 

826 try: 

827 cls.handleEntry(item, qryDict["customData"]) 

828 except Exception as e: # Second exception - call error_handler 

829 try: 

830 doCont = cls.handleError(item, qryDict["customData"], e) 

831 except Exception as e: 

832 logging.error(f"handleError failed on {item} - bailing out") 

833 logging.exception(e) 

834 doCont = False 

835 

836 if not doCont: 

837 logging.error(f"Exiting queryIter on cursor {qry.getCursor()!r}") 

838 return 

839 

840 qryDict["totalCount"] += 1 

841 

842 cursor = qry.getCursor() 

843 

844 if cursor: 

845 qryDict["startCursor"] = cursor 

846 cls._requeueStep(qryDict) 

847 else: 

848 cls.handleFinish(qryDict["totalCount"], qryDict["customData"]) 

849 

850 @classmethod 

851 def handleEntry(cls, entry, customData): 

852 """ 

853 Overridable hook to process one entry. "entry" will be either an db.Entity or an 

854 SkeletonInstance (if that query has been created by skel.all()) 

855 

856 Warning: If your query has an sortOrder other than __key__ and you modify that property here 

857 it is possible to encounter that object later one *again* (as it may jump behind the current cursor). 

858 """ 

859 logging.debug(f"handleEntry called on {cls} with {entry}.") 

860 

861 @classmethod 

862 def handleFinish(cls, totalCount: int, customData): 

863 """ 

864 Overridable hook that indicates the current run has been finished. 

865 """ 

866 logging.debug(f"handleFinish called on {cls} with {totalCount} total Entries processed") 

867 

868 @classmethod 

869 def handleError(cls, entry, customData, exception) -> bool: 

870 """ 

871 Handle a error occurred in handleEntry. 

872 If this function returns True, the queryIter continues, otherwise it breaks and prints the current cursor. 

873 """ 

874 logging.debug(f"handleError called on {cls} with {entry}.") 

875 logging.exception(exception) 

876 return True 

877 

878 

879class DeleteEntitiesIter(QueryIter): 

880 """ 

881 Simple Query-Iter to delete all entities encountered. 

882 

883 ..Warning: When iterating over skeletons, make sure that the 

884 query was created using `Skeleton().all()`. 

885 This way the `Skeleton.delete()` method can be used and 

886 the appropriate post-processing can be done. 

887 """ 

888 

889 @classmethod 

890 def handleEntry(cls, entry, customData): 

891 from viur.core.skeleton import SkeletonInstance 

892 if isinstance(entry, SkeletonInstance): 

893 entry.delete() 

894 else: 

895 db.delete(entry.key) 

896 

897 

898@PeriodicTask(interval=datetime.timedelta(hours=4)) 

899def start_clear_transaction_marker(): 

900 """ 

901 Removes old (expired) Transaction marker 

902 https://cloud.google.com/datastore/docs/concepts/transactions?hl=en#using_transactions 

903 https://cloud.google.com/tasks/docs/quotas?hl=en 

904 """ 

905 query = db.Query("viur-transactionmarker").filter("creationdate <", 

906 datetime.datetime.now() - datetime.timedelta(days=31)) 

907 DeleteEntitiesIter.startIterOnQuery(query)