Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / prototypes / tree.py: 0%
438 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 14:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 14:23 +0000
1import logging
2import time
3import typing as t
5from deprecated.sphinx import deprecated
7from viur.core import current, db, errors
8from viur.core.bones import BooleanBone, KeyBone, SortIndexBone
9from viur.core.cache import flushCache
10from viur.core.decorators import *
11from viur.core.skeleton import Skeleton, SkeletonInstance
12from viur.core.tasks import CallDeferred
13from .skelmodule import SkelModule
15SkelType = t.Literal["node", "leaf"]
18class TreeSkel(Skeleton):
19 parententry = KeyBone( # TODO VIUR4: Why is this not a RelationalBone?
20 descr="Parent",
21 visible=False,
22 readOnly=True,
23 )
25 parentrepo = KeyBone( # TODO VIUR4: Why is this not a RelationalBone?
26 descr="BaseRepo",
27 visible=False,
28 readOnly=True,
29 )
31 sortindex = SortIndexBone(
32 visible=False,
33 readOnly=True,
34 )
36 is_root_node = BooleanBone(
37 defaultValue=False,
38 readOnly=True,
39 visible=False,
40 )
42 @classmethod
43 def refresh(cls, skelValues): # ViUR2 Compatibility
44 super().refresh(skelValues)
45 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility
46 skelValues["parententry"] = db.normalize_key(skelValues.dbEntity["parentdir"])
49class Tree(SkelModule):
50 """
51 Tree module prototype.
53 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only.
54 """
55 accessRights = ("add", "edit", "view", "delete", "manage")
57 nodeSkelCls = None
58 leafSkelCls = None
60 default_order = "sortindex"
62 def __init__(self, moduleName, modulePath, *args, **kwargs):
63 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}"
64 super().__init__(moduleName, modulePath, *args, **kwargs)
66 @property
67 def handler(self):
68 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy)
70 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]:
71 """
72 Checks for correct skelType.
74 Either returns the type provided, or None in case it is invalid.
75 """
76 skelType = skelType.lower()
77 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls):
78 return skelType
80 return None
82 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]:
83 if not (skelType := self._checkSkelType(skelType)):
84 raise ValueError("Unsupported skelType")
86 if skelType == "leaf":
87 return self.leafSkelCls
89 return self.nodeSkelCls
91 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
92 """
93 Return unmodified base skeleton for the given skelType.
95 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel`
96 """
97 return self._resolveSkelCls(skelType, *args, **kwargs)()
99 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
100 """
101 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
102 for viewing an existing entry from the tree.
104 The default is a Skeleton instance returned by :func:`~baseSkel`.
106 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel`
108 :return: Returns a Skeleton instance for viewing an entry.
109 """
110 return self.baseSkel(skelType, *args, **kwargs)
112 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
113 """
114 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
115 for adding an entry to the tree.
117 The default is a Skeleton instance returned by :func:`~baseSkel`.
119 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
121 :return: Returns a Skeleton instance for adding an entry.
122 """
123 return self.baseSkel(skelType, *args, **kwargs)
125 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
126 """
127 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
128 for editing an existing entry from the tree.
130 The default is a Skeleton instance returned by :func:`~baseSkel`.
132 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
134 :return: Returns a Skeleton instance for editing an entry.
135 """
136 return self.baseSkel(skelType, *args, **kwargs)
138 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
139 """
140 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application
141 for cloning an existing entry of the tree.
143 The default is a SkeletonInstance returned by :func:`~baseSkel`.
145 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
147 :return: Returns a SkeletonInstance for cloning an entry.
148 """
149 return self.baseSkel(skelType, *args, **kwargs)
151 def rootnodeSkel(
152 self,
153 *,
154 identifier: str = "rep_module_repo",
155 ensure: bool | dict | t.Callable[[SkeletonInstance], None] = False,
156 ) -> SkeletonInstance:
157 """
158 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application
159 for rootnode entries.
161 The default is a SkeletonInstance returned by :func:`~baseSkel`, with a preset key created from identifier.
163 :param identifier: Unique identifier (name) for this rootnode.
164 :param ensure: If provided, ensures that the skeleton is available, and created with optionally provided values.
166 :return: Returns a SkeletonInstance for handling root nodes.
167 """
168 skel = self.baseSkel("node")
170 skel["key"] = db.Key(skel.kindName, identifier)
171 skel["is_root_node"] = True
173 if ensure not in (False, None):
174 return skel.read(create=ensure)
176 return skel
178 @deprecated(
179 version="3.7.0",
180 reason="Use rootnodeSkel(ensure=True) instead.",
181 action="always"
182 )
183 def ensureOwnModuleRootNode(self) -> db.Entity:
184 """
185 Ensures, that general root-node for the current module exists.
186 If no root-node exists yet, it will be created.
188 :returns: The entity of the root-node.
189 """
190 return self.rootnodeSkel(ensure=True).dbEntity
192 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]:
193 """
194 Default function for providing a list of root node items.
195 This list is requested by several module-internal functions and *must* be
196 overridden by a custom functionality. The default stub for this function
197 returns an empty list.
198 An example implementation could be the following:
200 .. code-block:: python
202 # Example
203 def getAvailableRootNodes(self, *args, **kwargs):
204 q = db.Query(self.rootKindName)
205 ret = [{"key": str(e.key()),
206 "name": e.get("name", str(e.key().id_or_name()))} #FIXME
207 for e in q.run(limit=25)]
208 return ret
210 :param args: Can be used in custom implementations.
211 :param kwargs: Can be used in custom implementations.
212 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \
213 respective information.
214 """
215 return []
217 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None:
218 """
219 Returns the root-node for a given child.
221 :param key: Key of the child node entry.
223 :returns: The skeleton of the root-node.
224 """
225 skel = self.nodeSkelCls()
227 while key:
228 if not skel.read(key):
229 return None
231 key = skel["parententry"]
233 return skel
235 @CallDeferred
236 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0):
237 """
238 Recursively fixes the parentrepo key after a move operation.
240 This will delete all entries which are children of *nodeKey*, except *key* nodeKey.
242 :param parentNode: URL-safe key of the node which children should be fixed.
243 :param newRepoKey: URL-safe key of the new repository.
244 :param depth: Safety level depth preventing infinitive loops.
245 """
246 if depth > 99:
247 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo")
248 logging.critical("Your data is corrupt!")
249 logging.debug(f"{parentNode=}, {newRepoKey=}")
250 return
252 def fixTxn(nodeKey, newRepoKey):
253 node = db.get(nodeKey)
254 node["parentrepo"] = newRepoKey
255 db.put(node)
257 # Fix all nodes
258 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode)
259 for repo in q.iter():
260 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1)
261 db.run_in_transaction(fixTxn, repo.key, newRepoKey)
263 # Fix the leafs on this level
264 if self.leafSkelCls:
265 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode)
266 for repo in q.iter():
267 db.run_in_transaction(fixTxn, repo.key, newRepoKey)
269 ## Internal exposed functions
271 @internal_exposed
272 def pathToKey(self, key: db.Key):
273 """
274 Returns the recursively expanded path through the Tree from the root-node to a
275 requested node.
276 :param key: Key of the destination *node*.
277 :returns: An nested dictionary with information about all nodes in the path from root to the requested node.
278 """
279 lastLevel = []
280 for x in range(0, 99):
281 currentNodeSkel = self.viewSkel("node")
282 if not currentNodeSkel.read(key):
283 return [] # Either invalid key or listFilter prevented us from fetching anything
284 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level
285 break
286 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"])
287 currentLevel = [{"skel": x,
288 "active": x["key"] == currentNodeSkel["key"],
289 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []}
290 for x in self.listFilter(levelQry).fetch(99)]
291 assert currentLevel, "Got emtpy parent list?"
292 lastLevel = currentLevel
293 key = currentNodeSkel["parententry"]
294 return lastLevel
296 ## External exposed functions
298 @exposed
299 def index(self, skelType: SkelType = "node", parententry: t.Optional[db.Key | int | str] = None, **kwargs):
300 if not parententry:
301 repos = self.getAvailableRootNodes(**kwargs)
302 match len(repos):
303 case 0:
304 raise errors.Unauthorized()
305 case 1:
306 parententry = repos[0]["key"]
307 case _:
308 raise errors.NotAcceptable(f"Missing required parameter {'parententry'!r}")
310 return self.list(skelType=skelType, parententry=parententry, **kwargs)
312 @exposed
313 def listRootNodes(self, *args, **kwargs) -> t.Any:
314 """
315 Renders a list of all available repositories for the current user using the
316 modules default renderer.
318 :returns: The rendered representation of the available root-nodes.
319 """
320 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs))
322 @exposed
323 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any:
324 """
325 Prepares and renders a list of entries.
327 All supplied parameters are interpreted as filters for the elements displayed.
329 Unlike other module prototypes in ViUR, the access control in this function is performed
330 by calling the function :func:`listFilter`, which updates the query-filter to match only
331 elements which the user is allowed to see.
333 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter`
335 :returns: The rendered list objects for the matching entries.
337 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
338 """
339 if not (skelType := self._checkSkelType(skelType)):
340 raise errors.NotAcceptable("Invalid skelType provided.")
342 # The general access control is made via self.listFilter()
343 if not (query := self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))):
344 raise errors.Unauthorized()
346 self._apply_default_order(query)
347 return self.render.list(query.fetch())
349 @exposed
350 def structure(self, skelType: SkelType, action: t.Optional[str] = "view") -> t.Any:
351 """
352 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set
353 in each bone.
355 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
356 """
357 # FIXME: In ViUR > 3.7 this could also become dynamic (ActionSkel paradigm).
358 match action:
359 case "view":
360 skel = self.viewSkel(skelType)
361 if not self.canView(skelType, skel):
362 raise errors.Unauthorized()
364 case "edit":
365 skel = self.editSkel(skelType)
366 if not self.canEdit(skelType, skel):
367 raise errors.Unauthorized()
369 case "add":
370 if not self.canAdd(skelType):
371 raise errors.Unauthorized()
373 skel = self.addSkel(skelType)
375 case "clone":
376 skel = self.cloneSkel(skelType)
377 if not (self.canAdd(skelType) and self.canEdit(skelType, skel)):
378 raise errors.Unauthorized()
380 case _:
381 raise errors.NotImplemented(f"The action {action!r} is not implemented.")
383 return self.render.render(f"structure.{skelType}.{action}", skel)
385 @exposed
386 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any:
387 """
388 Prepares and renders a single entry for viewing.
390 The entry is fetched by its *key* and its *skelType*.
391 The function performs several access control checks on the requested entity before it is rendered.
393 .. seealso:: :func:`canView`, :func:`onView`
395 :returns: The rendered representation of the requested entity.
397 :param skelType: May either be "node" or "leaf".
398 :param key: URL-safe key of the parent.
400 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided.
401 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
402 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
403 """
404 if not (skelType := self._checkSkelType(skelType)):
405 raise errors.NotAcceptable(f"Invalid skelType provided.")
407 skel = self.viewSkel(skelType)
408 if not skel.read(key):
409 raise errors.NotFound()
411 if not self.canView(skelType, skel):
412 raise errors.Unauthorized()
414 self.onView(skelType, skel)
415 return self.render.view(skel)
417 @exposed
418 @force_ssl
419 @skey(allow_empty=True)
420 def add(self, skelType: SkelType, node: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any:
421 # FIXME: VIUR4 rename node into key...
422 """
423 Add a new entry with the given parent *node*, and render the entry, eventually with error notes
424 on incorrect data. Data is taken by any other arguments in *kwargs*.
426 The function performs several access control checks on the requested entity before it is added.
428 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded`
430 :param skelType: Defines the type of the new entry and may either be "node" or "leaf".
431 :param node: URL-safe key of the parent.
433 :returns: The rendered, added object of the entry, eventually with error hints.
435 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
436 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found.
437 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
438 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
439 """
440 if not (skelType := self._checkSkelType(skelType)):
441 raise errors.NotAcceptable(f"Invalid skelType provided.")
443 skel = self.addSkel(skelType)
444 parentNodeSkel = self.editSkel("node")
446 # TODO VIUR4: Why is this parameter called "node"?
447 if not parentNodeSkel.read(node):
448 raise errors.NotFound("The provided parent node could not be found.")
449 if not self.canAdd(skelType, parentNodeSkel):
450 raise errors.Unauthorized()
452 skel["parententry"] = parentNodeSkel["key"]
453 # parentrepo may not exist in parentNodeSkel as it may be an rootNode
454 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"]
456 if (
457 not kwargs # no data supplied
458 or not current.request.get().isPostRequest # failure if not using POST-method
459 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones
460 or bounce # review before adding
461 ):
462 return self.render.add(skel)
464 self.onAdd(skelType, skel)
465 skel.write()
466 self.onAdded(skelType, skel)
468 return self.render.addSuccess(skel)
470 @force_ssl
471 @force_post
472 @exposed
473 @skey
474 @access("root")
475 def add_or_edit(self, skelType: SkelType, key: db.Key | int | str, **kwargs) -> t.Any:
476 """
477 This function is intended to be used by importers.
478 Only "root"-users are allowed to use it.
479 """
480 if not (skelType := self._checkSkelType(skelType)):
481 raise errors.NotAcceptable("Invalid skelType provided.")
483 kind_name = self.nodeSkelCls.kindName if skelType == "node" else self.leafSkelCls.kindName
485 # Adjust key
486 db_key = db.key_helper(key, target_kind=kind_name, adjust_kind=True)
488 # Retrieve and verify existing entry
489 db_entity = db.get(db_key)
490 is_add = not bool(db_entity)
492 # Instanciate relevant skeleton
493 if is_add:
494 skel = self.addSkel(skelType)
495 else:
496 skel = self.editSkel(skelType)
497 skel.dbEntity = db_entity # assign existing entity
499 skel = skel.ensure_is_cloned()
500 skel.parententry.required = True
501 skel.parententry.readOnly = False
503 skel["key"] = db_key
505 if (
506 not kwargs # no data supplied
507 or not skel.fromClient(kwargs) # failure on reading into the bones
508 ):
509 # render the skeleton in the version it could as far as it could be read.
510 return self.render.render("add_or_edit", skel)
512 # Ensure the parententry exists
513 parentNodeSkel = self.editSkel("node")
514 if not parentNodeSkel.read(skel["parententry"]):
515 raise errors.NotFound("The provided parent node could not be found.")
516 if not self.canAdd(skelType, parentNodeSkel):
517 raise errors.Unauthorized()
519 skel["parententry"] = parentNodeSkel["key"]
520 # parentrepo may not exist in parentNodeSkel as it may be an rootNode
521 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"]
523 if is_add:
524 self.onAdd(skelType, skel)
525 else:
526 self.onEdit(skelType, skel)
528 skel.write()
530 if is_add:
531 self.onAdded(skelType, skel)
532 return self.render.addSuccess(skel)
534 self.onEdited(skelType, skel)
535 return self.render.editSuccess(skel)
537 @exposed
538 @force_ssl
539 @skey(allow_empty=True)
540 def edit(self, skelType: SkelType, key: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any:
541 """
542 Modify an existing entry, and render the entry, eventually with error notes on incorrect data.
543 Data is taken by any other arguments in *kwargs*.
545 The function performs several access control checks on the requested entity before it is added.
547 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited`
549 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf".
550 :param key: URL-safe key of the item to be edited.
552 :returns: The rendered, modified object of the entry, eventually with error hints.
554 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
555 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found.
556 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
557 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
558 """
559 if not (skelType := self._checkSkelType(skelType)):
560 raise errors.NotAcceptable(f"Invalid skelType provided.")
562 skel = self.editSkel(skelType)
563 if not skel.read(key):
564 raise errors.NotFound()
566 if not self.canEdit(skelType, skel):
567 raise errors.Unauthorized()
569 if (
570 not kwargs # no data supplied
571 or not current.request.get().isPostRequest # failure if not using POST-method
572 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones
573 or bounce # review before adding
574 ):
575 return self.render.edit(skel)
577 self.onEdit(skelType, skel)
578 skel.write()
579 self.onEdited(skelType, skel)
581 return self.render.editSuccess(skel)
583 @exposed
584 @force_ssl
585 @force_post
586 @skey
587 def delete(self, skelType: SkelType, key: str, **kwargs) -> t.Any:
588 """
589 Deletes an entry or an directory (including its contents).
591 The function runs several access control checks on the data before it is deleted.
593 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted`
595 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf".
596 :param key: URL-safe key of the item to be deleted.
598 :returns: The rendered, deleted object of the entry.
600 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
601 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
602 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
603 """
604 if not (skelType := self._checkSkelType(skelType)):
605 raise errors.NotAcceptable(f"Invalid skelType provided.")
607 skel = self.editSkel(skelType)
608 if not skel.read(key):
609 raise errors.NotFound()
611 if not self.canDelete(skelType, skel):
612 raise errors.Unauthorized()
614 if skelType == "node":
615 self.deleteRecursive(skel["key"])
617 self.onDelete(skelType, skel)
618 skel.delete()
619 self.onDeleted(skelType, skel)
621 return self.render.deleteSuccess(skel, skelType=skelType)
623 @CallDeferred
624 def deleteRecursive(self, parentKey: str):
625 """
626 Recursively processes a delete request.
628 This will delete all entries which are children of *nodeKey*, except *key* nodeKey.
630 :param parentKey: URL-safe key of the node which children should be deleted.
631 """
632 nodeKey = db.key_helper(parentKey, self.viewSkel("node").kindName)
633 if self.leafSkelCls:
634 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter():
635 leafSkel = self.viewSkel("leaf")
636 if not leafSkel.read(leaf.key):
637 continue
638 leafSkel.delete()
639 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter():
640 self.deleteRecursive(node.key)
641 nodeSkel = self.viewSkel("node")
642 if not nodeSkel.read(node.key):
643 continue
644 nodeSkel.delete()
646 @exposed
647 @force_ssl
648 @force_post
649 @skey
650 def move(
651 self,
652 skelType: SkelType,
653 key: db.Key | int | str,
654 parentNode: db.Key | int | str,
655 sortindex: t.Optional[float] = None
656 ) -> str:
657 """
658 Move a node (including its contents) or a leaf to another node.
660 .. seealso:: :func:`canMove`
662 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf".
663 :param key: URL-safe key of the item to be moved.
664 :param parentNode: URL-safe key of the destination node, which must be a node.
665 :param sortindex: An optional sortindex for the key.
667 :returns: The rendered, edited object of the entry.
669 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
670 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
671 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
672 """
673 if not (skelType := self._checkSkelType(skelType)):
674 raise errors.NotAcceptable("Invalid skelType provided.")
676 skel = self.editSkel(skelType)
677 parentnode_skel = self.baseSkel("node")
679 if not skel.read(key):
680 raise errors.NotFound("Cannot find entity to move")
682 if not parentnode_skel.read(parentNode):
683 parentNode = db.normalize_key(parentNode)
685 if parentNode.kind != parentnode_skel.kindName:
686 raise errors.NotFound(
687 f"You provided a key of kind {parentNode.kind}, but require a {parentnode_skel.kindName}."
688 )
690 raise errors.NotFound("Cannot find parentNode entity")
692 if skel["key"] == parentnode_skel["key"]:
693 raise errors.NotAcceptable("Cannot move a node into itself")
695 # Test if we try to move a rootNode
696 if not skel["parententry"]:
697 raise errors.NotAcceptable("Can't move a rootNode to somewhere else")
699 if not self.canMove(skelType, skel, parentnode_skel):
700 raise errors.Unauthorized()
702 # Check if parentNodeSkel is descendant of the skel
703 walk_skel = parentnode_skel.clone()
705 while walk_skel["parententry"]:
706 if walk_skel["parententry"] == skel["key"]:
707 raise errors.NotAcceptable(
708 f"Invalid move: Entry {key} cannot be moved below its own descendant {parentNode}."
709 )
711 walk_skel = walk_skel.read(walk_skel["parententry"])
713 old_parentrepo = skel["parentrepo"]
715 self.onEdit(skelType, skel)
716 skel.patch({
717 "parententry": parentnode_skel["key"],
718 "parentrepo": parentnode_skel["parentrepo"],
719 "sortindex": sortindex or time.time()
720 })
721 self.onEdited(skelType, skel)
723 # Ensure a changed parentRepo get's propagated
724 if old_parentrepo != parentnode_skel["parentrepo"]:
725 self.updateParentRepo(key, parentnode_skel["parentrepo"])
727 return self.render.render("moveSuccess", skel)
729 @exposed
730 @force_ssl
731 @skey(allow_empty=True)
732 def clone(
733 self,
734 skelType: SkelType,
735 key: db.Key | str | int,
736 *,
737 bounce: bool = False,
738 parententry: t.Optional[db.Key | str | int] = None,
739 **kwargs,
740 ):
741 """
742 Clone an existing entry, and render the entry, eventually with error notes on incorrect data.
743 Data is taken by any other arguments in *kwargs*.
745 The function performs several access control checks on the requested entity before it is added.
747 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned`
749 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf".
750 :param key: URL-safe key of the item to be edited.
751 :param bounce: Return the skeleton after applying client data and validtion without writing.
752 :param parententry: URL-safe key of the destination parent node.
754 :returns: The cloned object of the entry, eventually with error hints.
756 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
757 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found.
758 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
759 """
761 if not (skelType := self._checkSkelType(skelType)):
762 raise errors.NotAcceptable(f"Invalid skelType provided.")
764 skel = self.cloneSkel(skelType)
765 if not skel.read(key):
766 raise errors.NotFound()
768 if parententry is not None:
769 if not (parent_node_skel := self.viewSkel("node").read(parententry)):
770 raise errors.NotFound("The provided parent node could not be found.")
771 else:
772 parent_node_skel = None
774 # a clone-operation is some kind of edit and add...
775 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, parent_node_skel)):
776 raise errors.Unauthorized()
778 # Remember source skel and unset the key for clone operation!
779 src_skel = skel
780 skel = skel.clone(apply_clone_strategy=True)
781 skel["key"] = None
783 # make parententry required and writeable when provided
784 if "parententry" in kwargs:
785 skel.parententry.readOnly = False
786 skel.parententry.required = True
787 else:
788 _ = skel["parententry"] # TODO: because of accessedValues...
790 # make parentrepo required and writeable when provided
791 if "parentrepo" in kwargs:
792 skel.parentrepo.readOnly = False
793 skel.parentrepo.required = True
794 else:
795 _ = skel["parentrepo"] # TODO: because of accessedValues...
797 # Check all required preconditions for clone
798 if (
799 not kwargs # no data supplied
800 or not current.request.get().isPostRequest # failure if not using POST-method
801 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones
802 or bounce # review before changing
803 ):
804 return self.render.edit(skel, action="clone")
806 self.onClone(skelType, skel, src_skel=src_skel)
807 assert skel.write()
808 self.onCloned(skelType, skel, src_skel=src_skel)
810 return self.render.editSuccess(skel, action="cloneSuccess")
812 ## Default access control functions
814 def listFilter(self, query: db.Query) -> t.Optional[db.Query]:
815 """
816 Access control function on item listing.
818 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function,
819 and is used to modify the provided filter parameter to match only items that the current user
820 is allowed to see.
822 :param query: Query which should be altered.
824 :returns: The altered filter, or None if access is not granted.
825 """
827 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]):
828 return query
830 return None
832 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
833 """
834 Checks if the current user can view the given entry.
835 Should be identical to what's allowed by listFilter.
836 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this
837 method can be overridden for performance improvements (to eliminate that additional database access).
838 :param skel: The entry we check for
839 :return: True if the current session is authorized to view that entry, False otherwise
840 """
841 query = self.viewSkel(skelType).all()
843 if key := skel["key"]:
844 query.mergeExternalFilter({"key": key})
846 query = self.listFilter(query) # Access control
848 if query is None or (key and not query.getEntry()):
849 return False
851 return True
853 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance] = None) -> bool:
854 """
855 Access control function for adding permission.
857 Checks if the current user has the permission to add a new entry.
859 The default behavior is:
860 - If no user is logged in, adding is generally refused.
861 - If the user has "root" access, adding is generally allowed.
862 - If the user has the modules "add" permission (module-add) enabled, adding is allowed.
864 It should be overridden for a module-specific behavior.
866 .. seealso:: :func:`add`
868 :param skelType: Defines the type of the node that should be added.
869 :param parentNodeSkel: The parent node where a new entry should be added.
871 :returns: True, if adding entries is allowed, False otherwise.
872 """
874 if not (user := current.user.get()):
875 return False
876 # root user is always allowed.
877 if user["access"] and "root" in user["access"]:
878 return True
879 # user with add-permission is allowed.
880 if user and user["access"] and f"{self.moduleName}-add" in user["access"]:
881 return True
882 return False
884 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
885 """
886 Access control function for modification permission.
888 Checks if the current user has the permission to edit an entry.
890 The default behavior is:
891 - If no user is logged in, editing is generally refused.
892 - If the user has "root" access, editing is generally allowed.
893 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed.
895 It should be overridden for a module-specific behavior.
897 .. seealso:: :func:`edit`
899 :param skelType: Defines the type of the node that should be edited.
900 :param skel: The Skeleton that should be edited.
902 :returns: True, if editing entries is allowed, False otherwise.
903 """
904 if not (user := current.user.get()):
905 return False
906 if user["access"] and "root" in user["access"]:
907 return True
908 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]:
909 return True
910 return False
912 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
913 """
914 Access control function for delete permission.
916 Checks if the current user has the permission to delete an entry.
918 The default behavior is:
919 - If no user is logged in, deleting is generally refused.
920 - If the user has "root" access, deleting is generally allowed.
921 - If the user has the modules "deleting" permission (module-delete) enabled, \
922 deleting is allowed.
924 It should be overridden for a module-specific behavior.
926 :param skelType: Defines the type of the node that should be deleted.
927 :param skel: The Skeleton that should be deleted.
929 .. seealso:: :func:`delete`
931 :returns: True, if deleting entries is allowed, False otherwise.
932 """
933 if not (user := current.user.get()):
934 return False
935 if user["access"] and "root" in user["access"]:
936 return True
937 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]:
938 return True
939 return False
941 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool:
942 """
943 Access control function for moving permission.
945 Checks if the current user has the permission to move an entry.
947 The default behavior is:
948 - If no user is logged in, deleting is generally refused.
949 - If the user has "root" access, deleting is generally allowed.
950 - If the user has the modules "edit" permission (module-edit) enabled, \
951 moving is allowed.
953 It should be overridden for a module-specific behavior.
955 :param skelType: Defines the type of the node that shall be deleted.
956 :param node: URL-safe key of the node to be moved.
957 :param destNode: URL-safe key of the node where *node* should be moved to.
959 .. seealso:: :func:`move`
961 :returns: True, if deleting entries is allowed, False otherwise.
962 """
963 if not (user := current.user.get()):
964 return False
965 if user["access"] and "root" in user["access"]:
966 return True
967 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]:
968 return True
969 return False
971 ## Overridable eventhooks
973 def onAdd(self, skelType: SkelType, skel: SkeletonInstance):
974 """
975 Hook function that is called before adding an entry.
977 It can be overridden for a module-specific behavior.
979 :param skelType: Defines the type of the node that shall be added.
980 :param skel: The Skeleton that is going to be added.
982 .. seealso:: :func:`add`, :func:`onAdded`
983 """
984 pass
986 def onAdded(self, skelType: SkelType, skel: SkeletonInstance):
987 """
988 Hook function that is called after adding an entry.
990 It should be overridden for a module-specific behavior.
991 The default is writing a log entry.
993 :param skelType: Defines the type of the node that has been added.
994 :param skel: The Skeleton that has been added.
996 .. seealso:: :func:`add`, :func:`onAdd`
997 """
998 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""")
999 flushCache(kind=skel.kindName)
1000 if user := current.user.get():
1001 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1003 def onEdit(self, skelType: SkelType, skel: SkeletonInstance):
1004 """
1005 Hook function that is called before editing an entry.
1007 It can be overridden for a module-specific behavior.
1009 :param skelType: Defines the type of the node that shall be edited.
1010 :param skel: The Skeleton that is going to be edited.
1012 .. seealso:: :func:`edit`, :func:`onEdited`
1013 """
1014 pass
1016 def onEdited(self, skelType: SkelType, skel: SkeletonInstance):
1017 """
1018 Hook function that is called after modifying an entry.
1020 It should be overridden for a module-specific behavior.
1021 The default is writing a log entry.
1023 :param skelType: Defines the type of the node that has been edited.
1024 :param skel: The Skeleton that has been modified.
1026 .. seealso:: :func:`edit`, :func:`onEdit`
1027 """
1028 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""")
1029 flushCache(key=skel["key"])
1030 if user := current.user.get():
1031 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1033 def onView(self, skelType: SkelType, skel: SkeletonInstance):
1034 """
1035 Hook function that is called when viewing an entry.
1037 It should be overridden for a module-specific behavior.
1038 The default is doing nothing.
1040 :param skelType: Defines the type of the node that is viewed.
1041 :param skel: The Skeleton that is viewed.
1043 .. seealso:: :func:`view`
1044 """
1045 pass
1047 def onDelete(self, skelType: SkelType, skel: SkeletonInstance):
1048 """
1049 Hook function that is called before deleting an entry.
1051 It can be overridden for a module-specific behavior.
1053 :param skelType: Defines the type of the node that shall be deleted.
1054 :param skel: The Skeleton that is going to be deleted.
1056 .. seealso:: :func:`delete`, :func:`onDeleted`
1057 """
1058 pass
1060 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance):
1061 """
1062 Hook function that is called after deleting an entry.
1064 It should be overridden for a module-specific behavior.
1065 The default is writing a log entry.
1067 ..warning: Saving the skeleton again will undo the deletion
1068 (if the skeleton was a leaf or a node with no children).
1070 :param skelType: Defines the type of the node that is deleted.
1071 :param skel: The Skeleton that has been deleted.
1073 .. seealso:: :func:`delete`, :func:`onDelete`
1074 """
1075 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""")
1076 flushCache(key=skel["key"])
1077 if user := current.user.get():
1078 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1080 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance):
1081 """
1082 Hook function that is called before cloning an entry.
1084 It can be overwritten to a module-specific behavior.
1086 :param skelType: Defines the type of the node that is cloned.
1087 :param skel: The new SkeletonInstance that is being created.
1088 :param src_skel: The source SkeletonInstance `skel` is cloned from.
1090 .. seealso:: :func:`clone`, :func:`onCloned`
1091 """
1092 pass
1094 @CallDeferred
1095 def _clone_recursive(
1096 self,
1097 skel_type: SkelType,
1098 src_key: db.Key,
1099 target_key: db.Key,
1100 target_repo: db.Key,
1101 cursor=None
1102 ):
1103 """
1104 Helper function which is used by default onCloned() to clone a recursive structure.
1105 """
1106 assert (skel_type := self._checkSkelType(skel_type))
1108 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}")
1110 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex")
1111 q.setCursor(cursor)
1113 count = 0
1114 for skel in q.fetch():
1115 src_skel = skel
1117 skel = skel.clone()
1118 skel["key"] = None
1119 skel["parententry"] = target_key
1120 skel["parentrepo"] = target_repo
1122 self.onClone(skel_type, skel, src_skel=src_skel)
1123 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written..
1124 assert skel.write()
1125 self.onCloned(skel_type, skel, src_skel=src_skel)
1126 count += 1
1128 logging.debug(f"_clone_recursive {count=}")
1130 if cursor := q.getCursor():
1131 self._clone_recursive(skel_type, src_key, target_key, target_repo, cursor)
1133 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance):
1134 """
1135 Hook function that is called after cloning an entry.
1137 It can be overwritten to a module-specific behavior.
1139 By default, when cloning a "node", this function calls :func:`_clone_recursive`
1140 which recursively clones the entire structure below this node in the background.
1141 If this is not wanted, or wanted by a specific setting, overwrite this function
1142 without a super-call.
1144 :param skelType: Defines the type of the node that is cloned.
1145 :param skel: The new SkeletonInstance that was created.
1146 :param src_skel: The source SkeletonInstance `skel` was cloned from.
1148 .. seealso:: :func:`clone`, :func:`onClone`
1149 """
1150 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""")
1151 flushCache(kind=skel.kindName)
1153 if user := current.user.get():
1154 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1156 # Clone entire structure below, in case this is a node.
1157 if skelType == "node":
1158 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"])
1160 if self.leafSkelCls:
1161 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"])
1164Tree.vi = True
1165Tree.admin = True