Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/tasks.py: 24%
427 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import abc
2import datetime
3import functools
4import logging
5import os
6import sys
7import traceback
8import typing as t
10import grpc
11import requests
12from google import protobuf
13from google.cloud import tasks_v2
15from viur.core import current, db, errors, utils
16from viur.core.config import conf
17from viur.core.decorators import exposed, skey
18from viur.core.module import Module
20CUSTOM_OBJ = t.TypeVar("CUSTOM_OBJ") # A JSON serializable object
23class CustomEnvironmentHandler(abc.ABC):
24 @abc.abstractmethod
25 def serialize(self) -> CUSTOM_OBJ:
26 """Serialize custom environment data
28 This function must not require any parameters and must
29 return a JSON serializable object with the desired information.
30 """
31 ...
33 @abc.abstractmethod
34 def restore(self, obj: CUSTOM_OBJ) -> None:
35 """Restore custom environment data
37 This function will receive the object from :meth:`serialize` and should write
38 the information it contains to the environment of the deferred request.
39 """
40 ...
43_gaeApp = os.environ.get("GAE_APPLICATION")
45queueRegion = None
46if _gaeApp: 46 ↛ 63line 46 didn't jump to line 63 because the condition on line 46 was always true
48 try:
49 headers = {"Metadata-Flavor": "Google"}
50 r = requests.get("http://metadata.google.internal/computeMetadata/v1/instance/region", headers=headers)
51 # r.text should be look like this "projects/(project-number)/region/(region)"
52 # like so "projects/1234567890/region/europe-west3"
53 queueRegion = r.text.split("/")[-1]
54 except Exception as e: # Something went wrong with the Google Metadata Sever we use the old way
55 logging.warning(f"Can't obtain queueRegion from Google MetaData Server due exception {e=}")
56 regionPrefix = _gaeApp.split("~")[0]
57 regionMap = {
58 "h": "europe-west3",
59 "e": "europe-west1"
60 }
61 queueRegion = regionMap.get(regionPrefix)
63if not queueRegion and conf.instance.is_dev_server and os.getenv("TASKS_EMULATOR") is None: 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true
64 # Probably local development server
65 logging.warning("Taskqueue disabled, tasks will run inline!")
67if not conf.instance.is_dev_server or os.getenv("TASKS_EMULATOR") is None: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true
68 taskClient = tasks_v2.CloudTasksClient()
69else:
70 taskClient = tasks_v2.CloudTasksClient(
71 transport=tasks_v2.services.cloud_tasks.transports.CloudTasksGrpcTransport(
72 channel=grpc.insecure_channel(os.getenv("TASKS_EMULATOR"))
73 )
74 )
75 queueRegion = "local"
77_periodicTasks: dict[str, dict[t.Callable, datetime.timedelta]] = {}
78_callableTasks = {}
79_deferred_tasks = {}
80_startupTasks = []
81_appengineServiceIPs = {"10.0.0.1", "0.1.0.1", "0.1.0.2"}
84class PermanentTaskFailure(Exception):
85 """Indicates that a task failed, and will never succeed."""
86 pass
89def removePeriodicTask(task: t.Callable) -> None:
90 """
91 Removes a periodic task from the queue. Useful to unqueue an task
92 that has been inherited from an overridden module.
93 """
94 global _periodicTasks
95 assert "periodicTaskName" in dir(task), "This is not a periodic task? "
96 for queueDict in _periodicTasks.values():
97 if task in queueDict:
98 del queueDict[task]
101class CallableTaskBase:
102 """
103 Base class for user-callable tasks.
104 Must be subclassed.
105 """
106 key = None # Unique identifier for this task
107 name = None # Human-Readable name
108 descr = None # Human-Readable description
109 kindName = "server-task"
111 def canCall(self) -> bool:
112 """
113 Checks wherever the current user can execute this task
114 :returns: bool
115 """
116 return False
118 def dataSkel(self):
119 """
120 If additional data is needed, return a skeleton-instance here.
121 These values are then passed to *execute*.
122 """
123 return None
125 def execute(self):
126 """
127 The actual code that should be run goes here.
128 """
129 raise NotImplementedError()
132class TaskHandler(Module):
133 """
134 Task Handler.
135 Handles calling of Tasks (queued and periodic), and performs updatechecks
136 Do not Modify. Do not Subclass.
137 """
138 adminInfo = None
139 retryCountWarningThreshold = 25
141 def findBoundTask(self, task: t.Callable, obj: object, depth: int = 0) -> t.Optional[tuple[t.Callable, object]]:
143 """
144 Tries to locate the instance, this function belongs to.
145 If it succeeds in finding it, it returns the function and its instance (-> its "self").
146 Otherwise, None is returned.
147 :param task: A callable decorated with @PeriodicTask
148 :param obj: Object, which will be scanned in the current iteration.
149 :param depth: Current iteration depth.
150 """
151 if depth > 3 or "periodicTaskName" not in dir(task): # Limit the maximum amount of recursions
152 return None
153 for attr in dir(obj):
154 if attr.startswith("_"):
155 continue
156 try:
157 v = getattr(obj, attr)
158 except AttributeError:
159 continue
160 if callable(v) and "periodicTaskName" in dir(v) and str(v.periodicTaskName) == str(task.periodicTaskName):
161 return v, obj
162 if not isinstance(v, str) and not callable(v):
163 res = self.findBoundTask(task, v, depth + 1)
164 if res:
165 return res
166 return None
168 @exposed
169 def queryIter(self, *args, **kwargs):
170 """
171 This processes one chunk of a queryIter (see below).
172 """
173 req = current.request.get().request
174 self._validate_request()
175 data = utils.json.loads(req.body)
176 if data["classID"] not in MetaQueryIter._classCache:
177 logging.error(f"""Could not continue queryIter - {data["classID"]} not known on this instance""")
178 MetaQueryIter._classCache[data["classID"]]._qryStep(data)
180 @exposed
181 def deferred(self, *args, **kwargs):
182 """
183 This catches one deferred call and routes it to its destination
184 """
185 req = current.request.get().request
186 self._validate_request()
187 # Check if the retry count exceeds our warning threshold
188 retryCount = req.headers.get("X-Appengine-Taskretrycount", None)
189 if retryCount and int(retryCount) == self.retryCountWarningThreshold:
190 from viur.core import email
191 email.send_email_to_admins(
192 "Deferred task retry counter exceeded warning threshold",
193 f"""Task {req.headers.get("X-Appengine-Taskname", "")} is retried for the {retryCount}th time."""
194 )
196 cmd, data = utils.json.loads(req.body)
197 funcPath, args, kwargs, env = data
198 if conf.debug.trace:
199 logging.debug(f"Call task {funcPath} with {cmd=} {args=} {kwargs=} {env=}")
201 if env:
202 if "user" in env and env["user"]:
203 current.session.get()["user"] = env["user"]
204 # FIXME: We do not have a fully loaded session from the cookie here,
205 # but only a partial session.
206 # But we still leave `loaded` on False, which leads to problems.
208 # Load current user into context variable if user module is there.
209 if user_mod := getattr(conf.main_app.vi, "user", None):
210 current.user.set(user_mod.getCurrentUser())
211 if "lang" in env and env["lang"]:
212 current.language.set(env["lang"])
213 if "transactionMarker" in env:
214 marker = db.get(db.Key("viur-transactionmarker", env["transactionMarker"]))
215 if not marker:
216 logging.info(f"""Dropping task, transaction {env["transactionMarker"]} did not apply""")
217 return
218 else:
219 logging.info(f"""Executing task, transaction {env["transactionMarker"]} did succeed""")
220 if "custom" in env and conf.tasks_custom_environment_handler:
221 # Check if we need to restore additional environmental data
222 conf.tasks_custom_environment_handler.restore(env["custom"])
223 if cmd == "rel":
224 caller = conf.main_app
225 pathlist = [x for x in funcPath.split("/") if x]
226 for currpath in pathlist:
227 if currpath not in dir(caller):
228 logging.error(f"Could not resolve {funcPath=} (failed part was {currpath!r})")
229 return
230 caller = getattr(caller, currpath)
231 try:
232 caller(*args, **kwargs)
233 except PermanentTaskFailure:
234 logging.error("PermanentTaskFailure")
235 except Exception as e:
236 logging.exception(e)
237 raise errors.RequestTimeout() # Task-API should retry
238 elif cmd == "unb":
239 if funcPath not in _deferred_tasks:
240 logging.error(f"Missed deferred task {funcPath=} ({args=},{kwargs=})")
241 # We call the deferred function *directly* (without walking through the mkDeferred lambda), so ensure
242 # that any hit to another deferred function will defer again
244 current.request.get().DEFERRED_TASK_CALLED = True
245 try:
246 _deferred_tasks[funcPath](*args, **kwargs)
247 except PermanentTaskFailure:
248 logging.error("PermanentTaskFailure")
249 except Exception as e:
250 logging.exception(e)
251 raise errors.RequestTimeout() # Task-API should retry
253 @exposed
254 def cron(self, cronName="default", *args, **kwargs):
255 req = current.request.get()
256 if not conf.instance.is_dev_server:
257 self._validate_request(require_cron=True, require_taskname=False)
258 if cronName not in _periodicTasks:
259 logging.warning(f"Cron request {cronName} doesn't have any tasks")
260 # We must defer from cron, as tasks will interpret it as a call originating from task-queue - causing deferred
261 # functions to be called directly, wich causes calls with _countdown etc set to fail.
262 req.DEFERRED_TASK_CALLED = True
263 for task, interval in _periodicTasks[cronName].items(): # Call all periodic tasks bound to that queue
264 periodicTaskName = task.periodicTaskName.lower()
265 if interval: # Ensure this task doesn't get called to often
266 lastCall = db.get(db.Key("viur-task-interval", periodicTaskName))
267 if lastCall and utils.utcNow() - lastCall["date"] < interval:
268 logging.debug(f"Task {periodicTaskName!r} has already run recently - skipping.")
269 continue
270 res = self.findBoundTask(task, conf.main_app)
271 try:
272 if res: # Its bound, call it this way :)
273 res[0]()
274 else:
275 task() # It seems it wasn't bound - call it as a static method
276 except Exception as e:
277 logging.error(f"Error calling periodic task {periodicTaskName}")
278 logging.exception(e)
279 else:
280 logging.debug(f"Successfully called task {periodicTaskName}")
281 if interval:
282 # Update its last-call timestamp
283 entry = db.Entity(db.Key("viur-task-interval", periodicTaskName))
284 entry["date"] = utils.utcNow()
285 db.put(entry)
286 logging.debug("Periodic tasks complete")
288 def _validate_request(
289 self,
290 *,
291 require_cron: bool = False,
292 require_taskname: bool = True,
293 ) -> None:
294 """
295 Validate the header and metadata of a request
297 If the request is valid, None will be returned.
298 Otherwise, an exception will be raised.
300 :param require_taskname: Require "X-AppEngine-TaskName" header
301 :param require_cron: Require "X-Appengine-Cron" header
302 """
303 req = current.request.get().request
304 if (
305 req.environ.get("HTTP_X_APPENGINE_USER_IP") not in _appengineServiceIPs
306 and (not conf.instance.is_dev_server or os.getenv("TASKS_EMULATOR") is None)
307 ):
308 logging.critical("Detected an attempted XSRF attack. This request did not originate from Task Queue.")
309 raise errors.Forbidden()
310 if require_cron and "X-Appengine-Cron" not in req.headers:
311 logging.critical('Detected an attempted XSRF attack. The header "X-AppEngine-Cron" was not set.')
312 raise errors.Forbidden()
313 if require_taskname and "X-AppEngine-TaskName" not in req.headers:
314 logging.critical('Detected an attempted XSRF attack. The header "X-AppEngine-Taskname" was not set.')
315 raise errors.Forbidden()
317 @exposed
318 def list(self, *args, **kwargs):
319 """Lists all user-callable tasks which are callable by this user"""
320 global _callableTasks
322 from viur.core.skeleton import SkeletonInstance, SkelList, RelSkel
323 from viur.core.bones import BaseBone, StringBone
325 class TaskSkel(RelSkel):
326 key = BaseBone()
327 name = StringBone()
328 descr = StringBone()
330 tasks = SkelList(TaskSkel, *(
331 SkeletonInstance(TaskSkel, {
332 "key": task.key,
333 "name": str(task.name),
334 "descr": str(task.descr)
335 }) for task in _callableTasks.values() if task().canCall()
336 ))
338 return self.render.list(tasks)
340 @exposed
341 @skey(allow_empty=True)
342 def execute(self, taskID, *, bounce: bool = False, **kwargs):
343 """Queues a specific task for the next maintenance run"""
344 global _callableTasks
346 if taskID in _callableTasks:
347 task = _callableTasks[taskID]()
348 else:
349 return
351 if not task.canCall():
352 raise errors.Unauthorized()
354 skel = task.dataSkel()
355 if (
356 not kwargs
357 or not skel.fromClient(kwargs)
358 or bounce
359 ):
360 return self.render.add(skel)
362 task.execute(**skel.accessedValues)
364 return self.render.addSuccess(skel)
367TaskHandler.admin = True
368TaskHandler.vi = True
369TaskHandler.html = True
372# Decorators
374def retry_n_times(retries: int, email_recipients: None | str | list[str] = None,
375 tpl: None | str = None) -> t.Callable:
376 """
377 Wrapper for deferred tasks to limit the amount of retries
379 :param retries: Number of maximum allowed retries
380 :param email_recipients: Email addresses to which a info should be sent
381 when the retry limit is reached.
382 :param tpl: Instead of the standard text, a custom template can be used.
383 The name of an email template must be specified.
384 """
385 # language=Jinja2
386 string_template = \
387 """Task {{func_name}} failed {{retries}} times
388 This was the last attempt.<br>
389 <pre>{{func_module|escape}}.{{func_name|escape}}({{signature|escape}})</pre>
390 <pre>{{traceback|escape}}</pre>"""
392 def outer_wrapper(func):
393 @functools.wraps(func)
394 def inner_wrapper(*args, **kwargs):
395 try:
396 retry_count = int(current.request.get().request.headers.get("X-Appengine-Taskretrycount", -1))
397 except AttributeError:
398 # During warmup current.request is None (at least on local devserver)
399 retry_count = -1
400 try:
401 return func(*args, **kwargs)
402 except Exception as exc:
403 logging.exception(f"Task {func.__qualname__} failed: {exc}")
404 logging.info(
405 f"This was the {retry_count}. retry."
406 f"{retries - retry_count} retries remaining. (total = {retries})"
407 )
408 if retry_count < retries:
409 # Raise the exception to mark this task as failed, so the task queue can retry it.
410 raise exc
411 else:
412 if email_recipients:
413 args_repr = [repr(arg) for arg in args]
414 kwargs_repr = [f"{k!s}={v!r}" for k, v in kwargs.items()]
415 signature = ", ".join(args_repr + kwargs_repr)
416 try:
417 from viur.core import email
418 email.send_email(
419 dests=email_recipients,
420 tpl=tpl,
421 stringTemplate=string_template if tpl is None else string_template,
422 # The following params provide information for the emails templates
423 func_name=func.__name__,
424 func_qualname=func.__qualname__,
425 func_module=func.__module__,
426 retries=retries,
427 args=args,
428 kwargs=kwargs,
429 signature=signature,
430 traceback=traceback.format_exc(),
431 )
432 except Exception:
433 logging.exception("Failed to send email to %r", email_recipients)
434 # Mark as permanently failed (could return nothing too)
435 raise PermanentTaskFailure()
437 return inner_wrapper
439 return outer_wrapper
442def noRetry(f):
443 """Prevents a deferred Function from being called a second time"""
444 logging.warning("Use of `@noRetry` is deprecated; Use `@retry_n_times(0)` instead!", stacklevel=2)
445 return retry_n_times(0)(f)
448def CallDeferred(func: t.Callable) -> t.Callable:
449 """
450 This is a decorator, which always calls the wrapped method deferred.
452 The call will be packed and queued into a Cloud Tasks queue.
453 The Task Queue calls the TaskHandler which executed the wrapped function
454 with the originally arguments in a different request.
457 In addition to the arguments for the wrapped methods you can set these:
459 _queue: Specify the queue in which the task should be pushed.
460 If no value is given, the queue name set in `conf.tasks_default_queues`
461 will be used. If the config does not have a value for this task, "default"
462 is used as the default. The queue must exist (use the queue.yaml).
464 _countdown: Specify a time in seconds after which the task should be called.
465 This time is relative to the moment where the wrapped method has been called.
467 _eta: Instead of a relative _countdown value you can specify a `datetime`
468 when the task is scheduled to be attempted or retried.
470 _name: Specify a custom name for the cloud task. Must be unique and can
471 contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), colons (:), or periods (.).
473 _target_version: Specify a version on which to run this task.
474 By default, a task will be run on the same version where the wrapped method has been called.
476 _call_deferred: Calls the @CallDeferred decorated method directly.
477 This is for example necessary, to call a super method which is decorated with @CallDeferred.
479 .. code-block:: python
481 # Example for use of the _call_deferred-parameter
482 class A(Module):
483 @CallDeferred
484 def task(self):
485 ...
487 class B(A):
488 @CallDeferred
489 def task(self):
490 super().task(_call_deferred=False) # avoid secondary deferred call
491 ...
493 See also:
494 https://cloud.google.com/python/docs/reference/cloudtasks/latest/google.cloud.tasks_v2.types.Task
495 https://cloud.google.com/python/docs/reference/cloudtasks/latest/google.cloud.tasks_v2.types.CreateTaskRequest
496 """
497 if "viur_doc_build" in dir(sys): 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 return func
500 __undefinedFlag_ = object()
502 def make_deferred(
503 func: t.Callable,
504 self=__undefinedFlag_,
505 *args,
506 _queue: str = None,
507 _name: str | None = None,
508 _call_deferred: bool = True,
509 _target_version: str = conf.instance.app_version,
510 _eta: datetime.datetime | None = None,
511 _countdown: int = 0,
512 **kwargs
513 ):
514 if _eta is not None and _countdown:
515 raise ValueError("You cannot set the _countdown and _eta argument together!")
516 if conf.debug.trace:
517 logging.debug(
518 f"make_deferred {func=}, {self=}, {args=}, {kwargs=}, "
519 f"{_queue=}, {_name=}, {_call_deferred=}, {_target_version=}, {_eta=}, {_countdown=}"
520 )
522 try:
523 req = current.request.get()
524 except Exception: # This will fail for warmup requests
525 req = None
527 if not queueRegion:
528 # Run tasks inline
529 logging.debug(f"{func=} will be executed inline")
531 @functools.wraps(func)
532 def task():
533 if self is __undefinedFlag_:
534 return func(*args, **kwargs)
535 else:
536 return func(self, *args, **kwargs)
538 if req:
539 req.pendingTasks.append(task) # This property only exists on development server!
540 else:
541 # Warmup request or something - we have to call it now as we can't defer it :/
542 task()
544 return # Ensure no result gets passed back
546 # It's the deferred method which is called from the task queue, this has to be called directly
547 _call_deferred &= not (req and req.request.headers.get("X-Appengine-Taskretrycount")
548 and "DEFERRED_TASK_CALLED" not in dir(req))
550 if not _call_deferred:
551 if self is __undefinedFlag_:
552 return func(*args, **kwargs)
554 req.DEFERRED_TASK_CALLED = True
555 return func(self, *args, **kwargs)
557 else:
558 try:
559 if self.__class__.__name__ == "index":
560 funcPath = func.__name__
561 else:
562 funcPath = f"{self.modulePath}/{func.__name__}"
563 command = "rel"
564 except Exception:
565 funcPath = f"{func.__name__}.{func.__module__}"
566 if self is not __undefinedFlag_:
567 args = (self,) + args # Re-append self to args, as this function is (hopefully) unbound
568 command = "unb"
570 if _queue is None:
571 _queue = conf.tasks_default_queues.get(
572 funcPath, conf.tasks_default_queues.get("__default__", "default")
573 )
575 # Try to preserve the important data from the current environment
576 try: # We might get called inside a warmup request without session
577 usr = current.session.get().get("user")
578 if "password" in usr:
579 del usr["password"]
580 except Exception:
581 usr = None
583 env = {"user": usr}
585 try:
586 env["lang"] = current.language.get()
587 except AttributeError: # This isn't originating from a normal request
588 pass
590 if db.is_in_transaction():
591 # We have to ensure transaction guarantees for that task also
592 env["transactionMarker"] = db.acquire_transaction_success_marker()
593 # We move that task at least 90 seconds into the future so the transaction has time to settle
594 _countdown = max(90, _countdown) # Countdown can be set to None
596 if conf.tasks_custom_environment_handler:
597 # Check if this project relies on additional environmental variables and serialize them too
598 env["custom"] = conf.tasks_custom_environment_handler.serialize()
600 # Create task description
601 task = tasks_v2.Task(
602 app_engine_http_request=tasks_v2.AppEngineHttpRequest(
603 body=utils.json.dumps((command, (funcPath, args, kwargs, env))).encode(),
604 http_method=tasks_v2.HttpMethod.POST,
605 relative_uri="/_tasks/deferred",
606 app_engine_routing=tasks_v2.AppEngineRouting(
607 version=_target_version,
608 ),
609 ),
610 )
611 if _name is not None:
612 task.name = taskClient.task_path(conf.instance.project_id, queueRegion, _queue, _name)
614 # Set a schedule time in case eta (absolut) or countdown (relative) was set.
615 if seconds := _countdown:
616 _eta = utils.utcNow() + datetime.timedelta(seconds=seconds)
617 if _eta:
618 # We must send a Timestamp Protobuf instead of a date-string
619 timestamp = protobuf.timestamp_pb2.Timestamp()
620 timestamp.FromDatetime(_eta)
621 task.schedule_time = timestamp
623 # Use the client to build and send the task.
624 parent = taskClient.queue_path(conf.instance.project_id, queueRegion, _queue)
625 logging.debug(f"{parent=}, {task=}")
626 taskClient.create_task(tasks_v2.CreateTaskRequest(parent=parent, task=task))
628 logging.info(f"Created task {func.__name__}.{func.__module__} with {args=} {kwargs=} {env=}")
630 global _deferred_tasks
631 _deferred_tasks[f"{func.__name__}.{func.__module__}"] = func
633 @functools.wraps(func)
634 def wrapper(*args, **kwargs):
635 return make_deferred(func, *args, **kwargs)
637 return wrapper
640def callDeferred(func):
641 """
642 Deprecated version of CallDeferred
643 """
644 import logging, warnings
646 msg = "Use of @callDeferred is deprecated, use @CallDeferred instead."
647 logging.warning(msg, stacklevel=3)
648 warnings.warn(msg, stacklevel=3)
650 return CallDeferred(func)
653def PeriodicTask(interval: datetime.timedelta | int | float = 0, cronName: str = "default") -> t.Callable:
654 """
655 Decorator to call a function periodically during cron job execution.
657 Interval defines a lower bound for the call-frequency for the given task, specified as a timedelta.
659 The true interval of how often cron jobs are being executed is defined in the project's cron.yaml file.
660 This defaults to 4 hours (see https://github.com/viur-framework/viur-base/blob/main/deploy/cron.yaml).
661 In case the interval defined here is lower than 4 hours, the task will be fired once every 4 hours anyway.
663 :param interval: Call at most the given timedelta.
664 """
666 def make_decorator(fn):
667 nonlocal interval
668 if fn.__name__.startswith("_"): 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true
669 raise RuntimeError("Periodic called methods cannot start with an underscore! "
670 f"Please rename {fn.__name__!r}")
672 if cronName not in _periodicTasks:
673 _periodicTasks[cronName] = {}
675 if isinstance(interval, (int, float)) and "tasks.periodic.useminutes" in conf.compatibility: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true
676 logging.warning(
677 f"PeriodicTask assuming {interval=} minutes here. This is changed into seconds in future. "
678 f"Please use `datetime.timedelta(minutes={interval})` for clarification.",
679 stacklevel=2,
680 )
681 interval *= 60
683 _periodicTasks[cronName][fn] = utils.parse.timedelta(interval)
684 fn.periodicTaskName = f"{fn.__module__}_{fn.__qualname__}".replace(".", "_").lower()
685 return fn
687 return make_decorator
690def CallableTask(fn: t.Callable) -> t.Callable:
691 """Marks a Class as representing a user-callable Task.
692 It *should* extend CallableTaskBase and *must* provide
693 its API
694 """
695 global _callableTasks
696 _callableTasks[fn.key] = fn
697 return fn
700def StartupTask(fn: t.Callable) -> t.Callable:
701 """
702 Functions decorated with this are called shortly at instance startup.
703 It's *not* guaranteed that they actually run on the instance that just started up!
704 Wrapped functions must not take any arguments.
705 """
706 global _startupTasks
707 _startupTasks.append(fn)
708 return fn
711@CallDeferred
712def runStartupTasks():
713 """
714 Runs all queued startupTasks.
715 Do not call directly!
716 """
717 global _startupTasks
718 for st in _startupTasks:
719 st()
722class MetaQueryIter(type):
723 """
724 This is the meta class for QueryIters.
725 Used only to keep track of all subclasses of QueryIter so we can emit the callbacks
726 on the correct class.
727 """
728 _classCache = {} # Mapping className -> Class
730 def __init__(cls, name, bases, dct):
731 MetaQueryIter._classCache[str(cls)] = cls
732 cls.__classID__ = str(cls)
733 super(MetaQueryIter, cls).__init__(name, bases, dct)
736class QueryIter(object, metaclass=MetaQueryIter):
737 """
738 BaseClass to run a database Query and process each entry matched.
739 This will run each step deferred, so it is possible to process an arbitrary number of entries
740 without being limited by time or memory.
742 To use this class create a subclass, override the classmethods handleEntry and handleFinish and then
743 call startIterOnQuery with an instance of a database Query (and possible some custom data to pass along)
744 """
745 queueName = "default" # Name of the taskqueue we will run on
747 @classmethod
748 def startIterOnQuery(cls, query: db.Query, customData: t.Any = None) -> None:
749 """
750 Starts iterating the given query on this class. Will return immediately, the first batch will already
751 run deferred.
753 Warning: Any custom data *must* be json-serializable and *must* be passed in customData. You cannot store
754 any data on this class as each chunk may run on a different instance!
755 """
756 assert not (query._customMultiQueryMerge or query._calculateInternalMultiQueryLimit), \
757 "Cannot iter a query with postprocessing"
758 assert isinstance(query.queries, db.QueryDefinition), "Unsatisfiable query or query with an IN filter"
759 qryDict = {
760 "kind": query.kind,
761 "srcSkel": query.srcSkel.kindName if query.srcSkel is not None else None,
762 "filters": query.queries.filters,
763 "orders": [(propName, sortOrder.value) for propName, sortOrder in query.queries.orders],
764 "startCursor": query.queries.startCursor,
765 "endCursor": query.queries.endCursor,
766 "origKind": query.origKind,
767 "distinct": query.queries.distinct,
768 "classID": cls.__classID__,
769 "customData": customData,
770 "totalCount": 0
771 }
772 cls._requeueStep(qryDict)
774 @classmethod
775 def _requeueStep(cls, qryDict: dict[str, t.Any]) -> None:
776 """
777 Internal use only. Pushes a new step defined in qryDict to either the taskqueue or append it to
778 the current request if we are on the local development server.
779 """
780 if not queueRegion: # Run tasks inline - hopefully development server
781 req = current.request.get()
782 task = lambda *args, **kwargs: cls._qryStep(qryDict)
783 if req:
784 req.pendingTasks.append(task) # < This property will be only exist on development server!
785 return
786 taskClient.create_task(tasks_v2.CreateTaskRequest(
787 parent=taskClient.queue_path(conf.instance.project_id, queueRegion, cls.queueName),
788 task=tasks_v2.Task(
789 app_engine_http_request=tasks_v2.AppEngineHttpRequest(
790 body=utils.json.dumps(qryDict).encode(),
791 http_method=tasks_v2.HttpMethod.POST,
792 relative_uri="/_tasks/queryIter",
793 app_engine_routing=tasks_v2.AppEngineRouting(
794 version=conf.instance.app_version,
795 ),
796 )
797 ),
798 ))
800 @classmethod
801 def _qryStep(cls, qryDict: dict[str, t.Any]) -> None:
802 """
803 Internal use only. Processes one block of five entries from the query defined in qryDict and
804 reschedules the next block.
805 """
806 from viur.core.skeleton import skeletonByKind
808 qry = db.Query(qryDict["kind"])
809 qry.srcSkel = skeletonByKind(qryDict["srcSkel"])() if qryDict["srcSkel"] else None
810 qry.queries.filters = qryDict["filters"]
811 qry.queries.orders = [(propName, db.SortOrder(sortOrder)) for propName, sortOrder in qryDict["orders"]]
812 qry.setCursor(qryDict["startCursor"], qryDict["endCursor"])
813 qry.origKind = qryDict["origKind"]
814 qry.queries.distinct = qryDict["distinct"]
816 if qry.srcSkel is not None:
817 qryIter = qry.fetch(5)
818 else:
819 qryIter = qry.run(5)
821 for item in qryIter:
822 try:
823 cls.handleEntry(item, qryDict["customData"])
824 except Exception as exception:
825 logging.error(f"{exception=}")
826 try:
827 cls.handleEntry(item, qryDict["customData"])
828 except Exception as e: # Second exception - call error_handler
829 try:
830 doCont = cls.handleError(item, qryDict["customData"], e)
831 except Exception as e:
832 logging.error(f"handleError failed on {item} - bailing out")
833 logging.exception(e)
834 doCont = False
836 if not doCont:
837 logging.error(f"Exiting queryIter on cursor {qry.getCursor()!r}")
838 return
840 qryDict["totalCount"] += 1
842 cursor = qry.getCursor()
844 if cursor:
845 qryDict["startCursor"] = cursor
846 cls._requeueStep(qryDict)
847 else:
848 cls.handleFinish(qryDict["totalCount"], qryDict["customData"])
850 @classmethod
851 def handleEntry(cls, entry, customData):
852 """
853 Overridable hook to process one entry. "entry" will be either an db.Entity or an
854 SkeletonInstance (if that query has been created by skel.all())
856 Warning: If your query has an sortOrder other than __key__ and you modify that property here
857 it is possible to encounter that object later one *again* (as it may jump behind the current cursor).
858 """
859 logging.debug(f"handleEntry called on {cls} with {entry}.")
861 @classmethod
862 def handleFinish(cls, totalCount: int, customData):
863 """
864 Overridable hook that indicates the current run has been finished.
865 """
866 logging.debug(f"handleFinish called on {cls} with {totalCount} total Entries processed")
868 @classmethod
869 def handleError(cls, entry, customData, exception) -> bool:
870 """
871 Handle a error occurred in handleEntry.
872 If this function returns True, the queryIter continues, otherwise it breaks and prints the current cursor.
873 """
874 logging.debug(f"handleError called on {cls} with {entry}.")
875 logging.exception(exception)
876 return True
879class DeleteEntitiesIter(QueryIter):
880 """
881 Simple Query-Iter to delete all entities encountered.
883 ..Warning: When iterating over skeletons, make sure that the
884 query was created using `Skeleton().all()`.
885 This way the `Skeleton.delete()` method can be used and
886 the appropriate post-processing can be done.
887 """
889 @classmethod
890 def handleEntry(cls, entry, customData):
891 from viur.core.skeleton import SkeletonInstance
892 if isinstance(entry, SkeletonInstance):
893 entry.delete()
894 else:
895 db.delete(entry.key)
898@PeriodicTask(interval=datetime.timedelta(hours=4))
899def start_clear_transaction_marker():
900 """
901 Removes old (expired) Transaction marker
902 https://cloud.google.com/datastore/docs/concepts/transactions?hl=en#using_transactions
903 https://cloud.google.com/tasks/docs/quotas?hl=en
904 """
905 query = db.Query("viur-transactionmarker").filter("creationdate <",
906 datetime.datetime.now() - datetime.timedelta(days=31))
907 DeleteEntitiesIter.startIterOnQuery(query)