Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/prototypes/tree.py: 0%
434 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import time
2import logging
3import typing as t
4from deprecated.sphinx import deprecated
5from viur.core import errors, db, current
6from viur.core.decorators import *
7from viur.core.bones import KeyBone, SortIndexBone, BooleanBone
8from viur.core.cache import flushCache
9from viur.core.skeleton import Skeleton, SkeletonInstance
10from viur.core.tasks import CallDeferred
11from .skelmodule import SkelModule
14SkelType = t.Literal["node", "leaf"]
17class TreeSkel(Skeleton):
18 parententry = KeyBone( # TODO VIUR4: Why is this not a RelationalBone?
19 descr="Parent",
20 visible=False,
21 readOnly=True,
22 )
24 parentrepo = KeyBone( # TODO VIUR4: Why is this not a RelationalBone?
25 descr="BaseRepo",
26 visible=False,
27 readOnly=True,
28 )
30 sortindex = SortIndexBone(
31 visible=False,
32 readOnly=True,
33 )
35 is_root_node = BooleanBone(
36 defaultValue=False,
37 readOnly=True,
38 visible=False,
39 )
41 @classmethod
42 def refresh(cls, skelValues): # ViUR2 Compatibility
43 super().refresh(skelValues)
44 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility
45 skelValues["parententry"] = db.normalize_key(skelValues.dbEntity["parentdir"])
48class Tree(SkelModule):
49 """
50 Tree module prototype.
52 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only.
53 """
54 accessRights = ("add", "edit", "view", "delete", "manage")
56 nodeSkelCls = None
57 leafSkelCls = None
59 default_order = "sortindex"
61 def __init__(self, moduleName, modulePath, *args, **kwargs):
62 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}"
63 super().__init__(moduleName, modulePath, *args, **kwargs)
65 @property
66 def handler(self):
67 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy)
69 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]:
70 """
71 Checks for correct skelType.
73 Either returns the type provided, or None in case it is invalid.
74 """
75 skelType = skelType.lower()
76 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls):
77 return skelType
79 return None
81 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]:
82 if not (skelType := self._checkSkelType(skelType)):
83 raise ValueError("Unsupported skelType")
85 if skelType == "leaf":
86 return self.leafSkelCls
88 return self.nodeSkelCls
90 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
91 """
92 Return unmodified base skeleton for the given skelType.
94 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel`
95 """
96 return self._resolveSkelCls(skelType, *args, **kwargs)()
98 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
99 """
100 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
101 for viewing an existing entry from the tree.
103 The default is a Skeleton instance returned by :func:`~baseSkel`.
105 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel`
107 :return: Returns a Skeleton instance for viewing an entry.
108 """
109 return self.baseSkel(skelType, *args, **kwargs)
111 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
112 """
113 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
114 for adding an entry to the tree.
116 The default is a Skeleton instance returned by :func:`~baseSkel`.
118 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
120 :return: Returns a Skeleton instance for adding an entry.
121 """
122 return self.baseSkel(skelType, *args, **kwargs)
124 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
125 """
126 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
127 for editing an existing entry from the tree.
129 The default is a Skeleton instance returned by :func:`~baseSkel`.
131 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
133 :return: Returns a Skeleton instance for editing an entry.
134 """
135 return self.baseSkel(skelType, *args, **kwargs)
137 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
138 """
139 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application
140 for cloning an existing entry of the tree.
142 The default is a SkeletonInstance returned by :func:`~baseSkel`.
144 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
146 :return: Returns a SkeletonInstance for cloning an entry.
147 """
148 return self.baseSkel(skelType, *args, **kwargs)
150 def rootnodeSkel(
151 self,
152 *,
153 identifier: str = "rep_module_repo",
154 ensure: bool | dict | t.Callable[[SkeletonInstance], None] = False,
155 ) -> SkeletonInstance:
156 """
157 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application
158 for rootnode entries.
160 The default is a SkeletonInstance returned by :func:`~baseSkel`, with a preset key created from identifier.
162 :param identifier: Unique identifier (name) for this rootnode.
163 :param ensure: If provided, ensures that the skeleton is available, and created with optionally provided values.
165 :return: Returns a SkeletonInstance for handling root nodes.
166 """
167 skel = self.baseSkel("node")
169 skel["key"] = db.Key(skel.kindName, identifier)
170 skel["is_root_node"] = True
172 if ensure not in (False, None):
173 return skel.read(create=ensure)
175 return skel
177 @deprecated(
178 version="3.7.0",
179 reason="Use rootnodeSkel(ensure=True) instead.",
180 action="always"
181 )
182 def ensureOwnModuleRootNode(self) -> db.Entity:
183 """
184 Ensures, that general root-node for the current module exists.
185 If no root-node exists yet, it will be created.
187 :returns: The entity of the root-node.
188 """
189 return self.rootnodeSkel(ensure=True).dbEntity
191 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]:
192 """
193 Default function for providing a list of root node items.
194 This list is requested by several module-internal functions and *must* be
195 overridden by a custom functionality. The default stub for this function
196 returns an empty list.
197 An example implementation could be the following:
199 .. code-block:: python
201 # Example
202 def getAvailableRootNodes(self, *args, **kwargs):
203 q = db.Query(self.rootKindName)
204 ret = [{"key": str(e.key()),
205 "name": e.get("name", str(e.key().id_or_name()))} #FIXME
206 for e in q.run(limit=25)]
207 return ret
209 :param args: Can be used in custom implementations.
210 :param kwargs: Can be used in custom implementations.
211 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \
212 respective information.
213 """
214 return []
216 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None:
217 """
218 Returns the root-node for a given child.
220 :param key: Key of the child node entry.
222 :returns: The skeleton of the root-node.
223 """
224 skel = self.nodeSkelCls()
226 while key:
227 if not skel.read(key):
228 return None
230 key = skel["parententry"]
232 return skel
234 @CallDeferred
235 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0):
236 """
237 Recursively fixes the parentrepo key after a move operation.
239 This will delete all entries which are children of *nodeKey*, except *key* nodeKey.
241 :param parentNode: URL-safe key of the node which children should be fixed.
242 :param newRepoKey: URL-safe key of the new repository.
243 :param depth: Safety level depth preventing infinitive loops.
244 """
245 if depth > 99:
246 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo")
247 logging.critical("Your data is corrupt!")
248 logging.debug(f"{parentNode=}, {newRepoKey=}")
249 return
251 def fixTxn(nodeKey, newRepoKey):
252 node = db.get(nodeKey)
253 node["parentrepo"] = newRepoKey
254 db.put(node)
256 # Fix all nodes
257 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode)
258 for repo in q.iter():
259 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1)
260 db.run_in_transaction(fixTxn, repo.key, newRepoKey)
262 # Fix the leafs on this level
263 if self.leafSkelCls:
264 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode)
265 for repo in q.iter():
266 db.run_in_transaction(fixTxn, repo.key, newRepoKey)
268 ## Internal exposed functions
270 @internal_exposed
271 def pathToKey(self, key: db.Key):
272 """
273 Returns the recursively expanded path through the Tree from the root-node to a
274 requested node.
275 :param key: Key of the destination *node*.
276 :returns: An nested dictionary with information about all nodes in the path from root to the requested node.
277 """
278 lastLevel = []
279 for x in range(0, 99):
280 currentNodeSkel = self.viewSkel("node")
281 if not currentNodeSkel.read(key):
282 return [] # Either invalid key or listFilter prevented us from fetching anything
283 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level
284 break
285 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"])
286 currentLevel = [{"skel": x,
287 "active": x["key"] == currentNodeSkel["key"],
288 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []}
289 for x in self.listFilter(levelQry).fetch(99)]
290 assert currentLevel, "Got emtpy parent list?"
291 lastLevel = currentLevel
292 key = currentNodeSkel["parententry"]
293 return lastLevel
295 ## External exposed functions
297 @exposed
298 def index(self, skelType: SkelType = "node", parententry: t.Optional[db.Key | int | str] = None, **kwargs):
299 if not parententry:
300 repos = self.getAvailableRootNodes(**kwargs)
301 match len(repos):
302 case 0:
303 raise errors.Unauthorized()
304 case 1:
305 parententry = repos[0]["key"]
306 case _:
307 raise errors.NotAcceptable(f"Missing required parameter {'parententry'!r}")
309 return self.list(skelType=skelType, parententry=parententry, **kwargs)
311 @exposed
312 def listRootNodes(self, *args, **kwargs) -> t.Any:
313 """
314 Renders a list of all available repositories for the current user using the
315 modules default renderer.
317 :returns: The rendered representation of the available root-nodes.
318 """
319 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs))
321 @exposed
322 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any:
323 """
324 Prepares and renders a list of entries.
326 All supplied parameters are interpreted as filters for the elements displayed.
328 Unlike other module prototypes in ViUR, the access control in this function is performed
329 by calling the function :func:`listFilter`, which updates the query-filter to match only
330 elements which the user is allowed to see.
332 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter`
334 :returns: The rendered list objects for the matching entries.
336 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
337 """
338 if not (skelType := self._checkSkelType(skelType)):
339 raise errors.NotAcceptable("Invalid skelType provided.")
341 # The general access control is made via self.listFilter()
342 if not (query := self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))):
343 raise errors.Unauthorized()
345 self._apply_default_order(query)
346 return self.render.list(query.fetch())
348 @exposed
349 def structure(self, skelType: SkelType, action: t.Optional[str] = "view") -> t.Any:
350 """
351 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set
352 in each bone.
354 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
355 """
356 # FIXME: In ViUR > 3.7 this could also become dynamic (ActionSkel paradigm).
357 match action:
358 case "view":
359 skel = self.viewSkel(skelType)
360 if not self.canView(skelType, skel):
361 raise errors.Unauthorized()
363 case "edit":
364 skel = self.editSkel(skelType)
365 if not self.canEdit(skelType, skel):
366 raise errors.Unauthorized()
368 case "add":
369 if not self.canAdd(skelType):
370 raise errors.Unauthorized()
372 skel = self.addSkel(skelType)
374 case "clone":
375 skel = self.cloneSkel(skelType)
376 if not (self.canAdd(skelType) and self.canEdit(skelType, skel)):
377 raise errors.Unauthorized()
379 case _:
380 raise errors.NotImplemented(f"The action {action!r} is not implemented.")
382 return self.render.render(f"structure.{skelType}.{action}", skel)
384 @exposed
385 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any:
386 """
387 Prepares and renders a single entry for viewing.
389 The entry is fetched by its *key* and its *skelType*.
390 The function performs several access control checks on the requested entity before it is rendered.
392 .. seealso:: :func:`canView`, :func:`onView`
394 :returns: The rendered representation of the requested entity.
396 :param skelType: May either be "node" or "leaf".
397 :param key: URL-safe key of the parent.
399 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided.
400 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
401 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
402 """
403 if not (skelType := self._checkSkelType(skelType)):
404 raise errors.NotAcceptable(f"Invalid skelType provided.")
406 skel = self.viewSkel(skelType)
407 if not skel.read(key):
408 raise errors.NotFound()
410 if not self.canView(skelType, skel):
411 raise errors.Unauthorized()
413 self.onView(skelType, skel)
414 return self.render.view(skel)
416 @exposed
417 @force_ssl
418 @skey(allow_empty=True)
419 def add(self, skelType: SkelType, node: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any:
420 # FIXME: VIUR4 rename node into key...
421 """
422 Add a new entry with the given parent *node*, and render the entry, eventually with error notes
423 on incorrect data. Data is taken by any other arguments in *kwargs*.
425 The function performs several access control checks on the requested entity before it is added.
427 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded`
429 :param skelType: Defines the type of the new entry and may either be "node" or "leaf".
430 :param node: URL-safe key of the parent.
432 :returns: The rendered, added object of the entry, eventually with error hints.
434 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
435 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found.
436 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
437 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
438 """
439 if not (skelType := self._checkSkelType(skelType)):
440 raise errors.NotAcceptable(f"Invalid skelType provided.")
442 skel = self.addSkel(skelType)
443 parentNodeSkel = self.editSkel("node")
445 # TODO VIUR4: Why is this parameter called "node"?
446 if not parentNodeSkel.read(node):
447 raise errors.NotFound("The provided parent node could not be found.")
448 if not self.canAdd(skelType, parentNodeSkel):
449 raise errors.Unauthorized()
451 skel["parententry"] = parentNodeSkel["key"]
452 # parentrepo may not exist in parentNodeSkel as it may be an rootNode
453 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"]
455 if (
456 not kwargs # no data supplied
457 or not current.request.get().isPostRequest # failure if not using POST-method
458 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones
459 or bounce # review before adding
460 ):
461 return self.render.add(skel)
463 self.onAdd(skelType, skel)
464 skel.write()
465 self.onAdded(skelType, skel)
467 return self.render.addSuccess(skel)
469 @force_ssl
470 @force_post
471 @exposed
472 @skey
473 @access("root")
474 def add_or_edit(self, skelType: SkelType, key: db.Key | int | str, **kwargs) -> t.Any:
475 """
476 This function is intended to be used by importers.
477 Only "root"-users are allowed to use it.
478 """
479 if not (skelType := self._checkSkelType(skelType)):
480 raise errors.NotAcceptable("Invalid skelType provided.")
482 kind_name = self.nodeSkelCls.kindName if skelType == "node" else self.leafSkelCls.kindName
484 # Adjust key
485 db_key = db.key_helper(key, target_kind=kind_name, adjust_kind=True)
487 # Retrieve and verify existing entry
488 db_entity = db.get(db_key)
489 is_add = not bool(db_entity)
491 # Instanciate relevant skeleton
492 if is_add:
493 skel = self.addSkel(skelType)
494 else:
495 skel = self.editSkel(skelType)
496 skel.dbEntity = db_entity # assign existing entity
498 skel = skel.ensure_is_cloned()
499 skel.parententry.required = True
500 skel.parententry.readOnly = False
502 skel["key"] = db_key
504 if (
505 not kwargs # no data supplied
506 or not skel.fromClient(kwargs) # failure on reading into the bones
507 ):
508 # render the skeleton in the version it could as far as it could be read.
509 return self.render.render("add_or_edit", skel)
511 # Ensure the parententry exists
512 parentNodeSkel = self.editSkel("node")
513 if not parentNodeSkel.read(skel["parententry"]):
514 raise errors.NotFound("The provided parent node could not be found.")
515 if not self.canAdd(skelType, parentNodeSkel):
516 raise errors.Unauthorized()
518 skel["parententry"] = parentNodeSkel["key"]
519 # parentrepo may not exist in parentNodeSkel as it may be an rootNode
520 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"]
522 if is_add:
523 self.onAdd(skelType, skel)
524 else:
525 self.onEdit(skelType, skel)
527 skel.write()
529 if is_add:
530 self.onAdded(skelType, skel)
531 return self.render.addSuccess(skel)
533 self.onEdited(skelType, skel)
534 return self.render.editSuccess(skel)
536 @exposed
537 @force_ssl
538 @skey(allow_empty=True)
539 def edit(self, skelType: SkelType, key: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any:
540 """
541 Modify an existing entry, and render the entry, eventually with error notes on incorrect data.
542 Data is taken by any other arguments in *kwargs*.
544 The function performs several access control checks on the requested entity before it is added.
546 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited`
548 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf".
549 :param key: URL-safe key of the item to be edited.
551 :returns: The rendered, modified object of the entry, eventually with error hints.
553 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
554 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found.
555 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
556 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
557 """
558 if not (skelType := self._checkSkelType(skelType)):
559 raise errors.NotAcceptable(f"Invalid skelType provided.")
561 skel = self.editSkel(skelType)
562 if not skel.read(key):
563 raise errors.NotFound()
565 if not self.canEdit(skelType, skel):
566 raise errors.Unauthorized()
568 if (
569 not kwargs # no data supplied
570 or not current.request.get().isPostRequest # failure if not using POST-method
571 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones
572 or bounce # review before adding
573 ):
574 return self.render.edit(skel)
576 self.onEdit(skelType, skel)
577 skel.write()
578 self.onEdited(skelType, skel)
580 return self.render.editSuccess(skel)
582 @exposed
583 @force_ssl
584 @force_post
585 @skey
586 def delete(self, skelType: SkelType, key: str, **kwargs) -> t.Any:
587 """
588 Deletes an entry or an directory (including its contents).
590 The function runs several access control checks on the data before it is deleted.
592 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted`
594 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf".
595 :param key: URL-safe key of the item to be deleted.
597 :returns: The rendered, deleted object of the entry.
599 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
600 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
601 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
602 """
603 if not (skelType := self._checkSkelType(skelType)):
604 raise errors.NotAcceptable(f"Invalid skelType provided.")
606 skel = self.editSkel(skelType)
607 if not skel.read(key):
608 raise errors.NotFound()
610 if not self.canDelete(skelType, skel):
611 raise errors.Unauthorized()
613 if skelType == "node":
614 self.deleteRecursive(skel["key"])
616 self.onDelete(skelType, skel)
617 skel.delete()
618 self.onDeleted(skelType, skel)
620 return self.render.deleteSuccess(skel, skelType=skelType)
622 @CallDeferred
623 def deleteRecursive(self, parentKey: str):
624 """
625 Recursively processes a delete request.
627 This will delete all entries which are children of *nodeKey*, except *key* nodeKey.
629 :param parentKey: URL-safe key of the node which children should be deleted.
630 """
631 nodeKey = db.key_helper(parentKey, self.viewSkel("node").kindName)
632 if self.leafSkelCls:
633 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter():
634 leafSkel = self.viewSkel("leaf")
635 if not leafSkel.read(leaf.key):
636 continue
637 leafSkel.delete()
638 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter():
639 self.deleteRecursive(node.key)
640 nodeSkel = self.viewSkel("node")
641 if not nodeSkel.read(node.key):
642 continue
643 nodeSkel.delete()
645 @exposed
646 @force_ssl
647 @force_post
648 @skey
649 def move(
650 self,
651 skelType: SkelType,
652 key: db.Key | int | str,
653 parentNode: db.Key | int | str,
654 sortindex: t.Optional[float] = None
655 ) -> str:
656 """
657 Move a node (including its contents) or a leaf to another node.
659 .. seealso:: :func:`canMove`
661 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf".
662 :param key: URL-safe key of the item to be moved.
663 :param parentNode: URL-safe key of the destination node, which must be a node.
664 :param sortindex: An optional sortindex for the key.
666 :returns: The rendered, edited object of the entry.
668 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
669 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
670 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
671 """
672 if not (skelType := self._checkSkelType(skelType)):
673 raise errors.NotAcceptable("Invalid skelType provided.")
675 skel = self.editSkel(skelType)
676 parentnode_skel = self.baseSkel("node")
678 if not skel.read(key):
679 raise errors.NotFound("Cannot find entity to move")
681 if not parentnode_skel.read(parentNode):
682 parentNode = db.normalize_key(parentNode)
684 if parentNode.kind != parentnode_skel.kindName:
685 raise errors.NotFound(
686 f"You provided a key of kind {parentNode.kind}, but require a {parentnode_skel.kindName}."
687 )
689 raise errors.NotFound("Cannot find parentNode entity")
691 if skel["key"] == parentnode_skel["key"]:
692 raise errors.NotAcceptable("Cannot move a node into itself")
694 # Test if we try to move a rootNode
695 if not skel["parententry"]:
696 raise errors.NotAcceptable("Can't move a rootNode to somewhere else")
698 if not self.canMove(skelType, skel, parentnode_skel):
699 raise errors.Unauthorized()
701 # Check if parentNodeSkel is descendant of the skel
702 walk_skel = parentnode_skel.clone()
704 while walk_skel["parententry"]:
705 if walk_skel["parententry"] == skel["key"]:
706 raise errors.NotAcceptable(
707 f"Invalid move: Entry {key} cannot be moved below its own descendant {parentNode}."
708 )
710 walk_skel = walk_skel.read(walk_skel["parententry"])
712 old_parentrepo = skel["parentrepo"]
714 self.onEdit(skelType, skel)
715 skel.patch({
716 "parententry": parentnode_skel["key"],
717 "parentrepo": parentnode_skel["parentrepo"],
718 "sortindex": sortindex or time.time()
719 })
720 self.onEdited(skelType, skel)
722 # Ensure a changed parentRepo get's propagated
723 if old_parentrepo != parentnode_skel["parentrepo"]:
724 self.updateParentRepo(key, parentnode_skel["parentrepo"])
726 return self.render.render("moveSuccess", skel)
728 @exposed
729 @force_ssl
730 @skey(allow_empty=True)
731 def clone(self, skelType: SkelType, key: db.Key | str | int, *, bounce: bool = False, **kwargs):
732 """
733 Clone an existing entry, and render the entry, eventually with error notes on incorrect data.
734 Data is taken by any other arguments in *kwargs*.
736 The function performs several access control checks on the requested entity before it is added.
738 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned`
740 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf".
741 :param key: URL-safe key of the item to be edited.
743 :returns: The cloned object of the entry, eventually with error hints.
745 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
746 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found.
747 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
748 """
750 if not (skelType := self._checkSkelType(skelType)):
751 raise errors.NotAcceptable(f"Invalid skelType provided.")
753 skel = self.cloneSkel(skelType)
754 if not skel.read(key):
755 raise errors.NotFound()
757 # a clone-operation is some kind of edit and add...
758 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, kwargs.get("parententry"))):
759 raise errors.Unauthorized()
761 # Remember source skel and unset the key for clone operation!
762 src_skel = skel
763 skel = skel.clone(apply_clone_strategy=True)
764 skel["key"] = None
766 # make parententry required and writeable when provided
767 if "parententry" in kwargs:
768 skel.parententry.readOnly = False
769 skel.parententry.required = True
770 else:
771 _ = skel["parententry"] # TODO: because of accessedValues...
773 # make parentrepo required and writeable when provided
774 if "parentrepo" in kwargs:
775 skel.parentrepo.readOnly = False
776 skel.parentrepo.required = True
777 else:
778 _ = skel["parentrepo"] # TODO: because of accessedValues...
780 # Check all required preconditions for clone
781 if (
782 not kwargs # no data supplied
783 or not current.request.get().isPostRequest # failure if not using POST-method
784 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones
785 or bounce # review before changing
786 ):
787 return self.render.edit(skel, action="clone")
789 self.onClone(skelType, skel, src_skel=src_skel)
790 assert skel.write()
791 self.onCloned(skelType, skel, src_skel=src_skel)
793 return self.render.editSuccess(skel, action="cloneSuccess")
795 ## Default access control functions
797 def listFilter(self, query: db.Query) -> t.Optional[db.Query]:
798 """
799 Access control function on item listing.
801 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function,
802 and is used to modify the provided filter parameter to match only items that the current user
803 is allowed to see.
805 :param query: Query which should be altered.
807 :returns: The altered filter, or None if access is not granted.
808 """
810 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]):
811 return query
813 return None
815 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
816 """
817 Checks if the current user can view the given entry.
818 Should be identical to what's allowed by listFilter.
819 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this
820 method can be overridden for performance improvements (to eliminate that additional database access).
821 :param skel: The entry we check for
822 :return: True if the current session is authorized to view that entry, False otherwise
823 """
824 query = self.viewSkel(skelType).all()
826 if key := skel["key"]:
827 query.mergeExternalFilter({"key": key})
829 query = self.listFilter(query) # Access control
831 if query is None or (key and not query.getEntry()):
832 return False
834 return True
836 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance] = None) -> bool:
837 """
838 Access control function for adding permission.
840 Checks if the current user has the permission to add a new entry.
842 The default behavior is:
843 - If no user is logged in, adding is generally refused.
844 - If the user has "root" access, adding is generally allowed.
845 - If the user has the modules "add" permission (module-add) enabled, adding is allowed.
847 It should be overridden for a module-specific behavior.
849 .. seealso:: :func:`add`
851 :param skelType: Defines the type of the node that should be added.
852 :param parentNodeSkel: The parent node where a new entry should be added.
854 :returns: True, if adding entries is allowed, False otherwise.
855 """
857 if not (user := current.user.get()):
858 return False
859 # root user is always allowed.
860 if user["access"] and "root" in user["access"]:
861 return True
862 # user with add-permission is allowed.
863 if user and user["access"] and f"{self.moduleName}-add" in user["access"]:
864 return True
865 return False
867 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
868 """
869 Access control function for modification permission.
871 Checks if the current user has the permission to edit an entry.
873 The default behavior is:
874 - If no user is logged in, editing is generally refused.
875 - If the user has "root" access, editing is generally allowed.
876 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed.
878 It should be overridden for a module-specific behavior.
880 .. seealso:: :func:`edit`
882 :param skelType: Defines the type of the node that should be edited.
883 :param skel: The Skeleton that should be edited.
885 :returns: True, if editing entries is allowed, False otherwise.
886 """
887 if not (user := current.user.get()):
888 return False
889 if user["access"] and "root" in user["access"]:
890 return True
891 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]:
892 return True
893 return False
895 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
896 """
897 Access control function for delete permission.
899 Checks if the current user has the permission to delete an entry.
901 The default behavior is:
902 - If no user is logged in, deleting is generally refused.
903 - If the user has "root" access, deleting is generally allowed.
904 - If the user has the modules "deleting" permission (module-delete) enabled, \
905 deleting is allowed.
907 It should be overridden for a module-specific behavior.
909 :param skelType: Defines the type of the node that should be deleted.
910 :param skel: The Skeleton that should be deleted.
912 .. seealso:: :func:`delete`
914 :returns: True, if deleting entries is allowed, False otherwise.
915 """
916 if not (user := current.user.get()):
917 return False
918 if user["access"] and "root" in user["access"]:
919 return True
920 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]:
921 return True
922 return False
924 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool:
925 """
926 Access control function for moving permission.
928 Checks if the current user has the permission to move an entry.
930 The default behavior is:
931 - If no user is logged in, deleting is generally refused.
932 - If the user has "root" access, deleting is generally allowed.
933 - If the user has the modules "edit" permission (module-edit) enabled, \
934 moving is allowed.
936 It should be overridden for a module-specific behavior.
938 :param skelType: Defines the type of the node that shall be deleted.
939 :param node: URL-safe key of the node to be moved.
940 :param destNode: URL-safe key of the node where *node* should be moved to.
942 .. seealso:: :func:`move`
944 :returns: True, if deleting entries is allowed, False otherwise.
945 """
946 if not (user := current.user.get()):
947 return False
948 if user["access"] and "root" in user["access"]:
949 return True
950 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]:
951 return True
952 return False
954 ## Overridable eventhooks
956 def onAdd(self, skelType: SkelType, skel: SkeletonInstance):
957 """
958 Hook function that is called before adding an entry.
960 It can be overridden for a module-specific behavior.
962 :param skelType: Defines the type of the node that shall be added.
963 :param skel: The Skeleton that is going to be added.
965 .. seealso:: :func:`add`, :func:`onAdded`
966 """
967 pass
969 def onAdded(self, skelType: SkelType, skel: SkeletonInstance):
970 """
971 Hook function that is called after adding an entry.
973 It should be overridden for a module-specific behavior.
974 The default is writing a log entry.
976 :param skelType: Defines the type of the node that has been added.
977 :param skel: The Skeleton that has been added.
979 .. seealso:: :func:`add`, :func:`onAdd`
980 """
981 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""")
982 flushCache(kind=skel.kindName)
983 if user := current.user.get():
984 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
986 def onEdit(self, skelType: SkelType, skel: SkeletonInstance):
987 """
988 Hook function that is called before editing an entry.
990 It can be overridden for a module-specific behavior.
992 :param skelType: Defines the type of the node that shall be edited.
993 :param skel: The Skeleton that is going to be edited.
995 .. seealso:: :func:`edit`, :func:`onEdited`
996 """
997 pass
999 def onEdited(self, skelType: SkelType, skel: SkeletonInstance):
1000 """
1001 Hook function that is called after modifying an entry.
1003 It should be overridden for a module-specific behavior.
1004 The default is writing a log entry.
1006 :param skelType: Defines the type of the node that has been edited.
1007 :param skel: The Skeleton that has been modified.
1009 .. seealso:: :func:`edit`, :func:`onEdit`
1010 """
1011 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""")
1012 flushCache(key=skel["key"])
1013 if user := current.user.get():
1014 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1016 def onView(self, skelType: SkelType, skel: SkeletonInstance):
1017 """
1018 Hook function that is called when viewing an entry.
1020 It should be overridden for a module-specific behavior.
1021 The default is doing nothing.
1023 :param skelType: Defines the type of the node that is viewed.
1024 :param skel: The Skeleton that is viewed.
1026 .. seealso:: :func:`view`
1027 """
1028 pass
1030 def onDelete(self, skelType: SkelType, skel: SkeletonInstance):
1031 """
1032 Hook function that is called before deleting an entry.
1034 It can be overridden for a module-specific behavior.
1036 :param skelType: Defines the type of the node that shall be deleted.
1037 :param skel: The Skeleton that is going to be deleted.
1039 .. seealso:: :func:`delete`, :func:`onDeleted`
1040 """
1041 pass
1043 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance):
1044 """
1045 Hook function that is called after deleting an entry.
1047 It should be overridden for a module-specific behavior.
1048 The default is writing a log entry.
1050 ..warning: Saving the skeleton again will undo the deletion
1051 (if the skeleton was a leaf or a node with no children).
1053 :param skelType: Defines the type of the node that is deleted.
1054 :param skel: The Skeleton that has been deleted.
1056 .. seealso:: :func:`delete`, :func:`onDelete`
1057 """
1058 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""")
1059 flushCache(key=skel["key"])
1060 if user := current.user.get():
1061 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1063 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance):
1064 """
1065 Hook function that is called before cloning an entry.
1067 It can be overwritten to a module-specific behavior.
1069 :param skelType: Defines the type of the node that is cloned.
1070 :param skel: The new SkeletonInstance that is being created.
1071 :param src_skel: The source SkeletonInstance `skel` is cloned from.
1073 .. seealso:: :func:`clone`, :func:`onCloned`
1074 """
1075 pass
1077 @CallDeferred
1078 def _clone_recursive(
1079 self,
1080 skel_type: SkelType,
1081 src_key: db.Key,
1082 target_key: db.Key,
1083 target_repo: db.Key,
1084 cursor=None
1085 ):
1086 """
1087 Helper function which is used by default onCloned() to clone a recursive structure.
1088 """
1089 assert (skel_type := self._checkSkelType(skel_type))
1091 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}")
1093 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex")
1094 q.setCursor(cursor)
1096 count = 0
1097 for skel in q.fetch():
1098 src_skel = skel
1100 skel = skel.clone()
1101 skel["key"] = None
1102 skel["parententry"] = target_key
1103 skel["parentrepo"] = target_repo
1105 self.onClone(skel_type, skel, src_skel=src_skel)
1106 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written..
1107 assert skel.write()
1108 self.onCloned(skel_type, skel, src_skel=src_skel)
1109 count += 1
1111 logging.debug(f"_clone_recursive {count=}")
1113 if cursor := q.getCursor():
1114 self._clone_recursive(skel_type, src_key, target_key, target_repo, skel_type, cursor)
1116 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance):
1117 """
1118 Hook function that is called after cloning an entry.
1120 It can be overwritten to a module-specific behavior.
1122 By default, when cloning a "node", this function calls :func:`_clone_recursive`
1123 which recursively clones the entire structure below this node in the background.
1124 If this is not wanted, or wanted by a specific setting, overwrite this function
1125 without a super-call.
1127 :param skelType: Defines the type of the node that is cloned.
1128 :param skel: The new SkeletonInstance that was created.
1129 :param src_skel: The source SkeletonInstance `skel` was cloned from.
1131 .. seealso:: :func:`clone`, :func:`onClone`
1132 """
1133 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""")
1134 flushCache(kind=skel.kindName)
1136 if user := current.user.get():
1137 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1139 # Clone entire structure below, in case this is a node.
1140 if skelType == "node":
1141 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"])
1143 if self.leafSkelCls:
1144 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"])
1147Tree.vi = True
1148Tree.admin = True