Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / prototypes / tree.py: 0%

438 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 14:23 +0000

1import logging 

2import time 

3import typing as t 

4 

5from deprecated.sphinx import deprecated 

6 

7from viur.core import current, db, errors 

8from viur.core.bones import BooleanBone, KeyBone, SortIndexBone 

9from viur.core.cache import flushCache 

10from viur.core.decorators import * 

11from viur.core.skeleton import Skeleton, SkeletonInstance 

12from viur.core.tasks import CallDeferred 

13from .skelmodule import SkelModule 

14 

15SkelType = t.Literal["node", "leaf"] 

16 

17 

18class TreeSkel(Skeleton): 

19 parententry = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

20 descr="Parent", 

21 visible=False, 

22 readOnly=True, 

23 ) 

24 

25 parentrepo = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

26 descr="BaseRepo", 

27 visible=False, 

28 readOnly=True, 

29 ) 

30 

31 sortindex = SortIndexBone( 

32 visible=False, 

33 readOnly=True, 

34 ) 

35 

36 is_root_node = BooleanBone( 

37 defaultValue=False, 

38 readOnly=True, 

39 visible=False, 

40 ) 

41 

42 @classmethod 

43 def refresh(cls, skelValues): # ViUR2 Compatibility 

44 super().refresh(skelValues) 

45 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility 

46 skelValues["parententry"] = db.normalize_key(skelValues.dbEntity["parentdir"]) 

47 

48 

49class Tree(SkelModule): 

50 """ 

51 Tree module prototype. 

52 

53 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only. 

54 """ 

55 accessRights = ("add", "edit", "view", "delete", "manage") 

56 

57 nodeSkelCls = None 

58 leafSkelCls = None 

59 

60 default_order = "sortindex" 

61 

62 def __init__(self, moduleName, modulePath, *args, **kwargs): 

63 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}" 

64 super().__init__(moduleName, modulePath, *args, **kwargs) 

65 

66 @property 

67 def handler(self): 

68 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy) 

69 

70 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]: 

71 """ 

72 Checks for correct skelType. 

73 

74 Either returns the type provided, or None in case it is invalid. 

75 """ 

76 skelType = skelType.lower() 

77 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls): 

78 return skelType 

79 

80 return None 

81 

82 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]: 

83 if not (skelType := self._checkSkelType(skelType)): 

84 raise ValueError("Unsupported skelType") 

85 

86 if skelType == "leaf": 

87 return self.leafSkelCls 

88 

89 return self.nodeSkelCls 

90 

91 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

92 """ 

93 Return unmodified base skeleton for the given skelType. 

94 

95 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel` 

96 """ 

97 return self._resolveSkelCls(skelType, *args, **kwargs)() 

98 

99 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

100 """ 

101 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

102 for viewing an existing entry from the tree. 

103 

104 The default is a Skeleton instance returned by :func:`~baseSkel`. 

105 

106 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel` 

107 

108 :return: Returns a Skeleton instance for viewing an entry. 

109 """ 

110 return self.baseSkel(skelType, *args, **kwargs) 

111 

112 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

113 """ 

114 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

115 for adding an entry to the tree. 

116 

117 The default is a Skeleton instance returned by :func:`~baseSkel`. 

118 

119 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

120 

121 :return: Returns a Skeleton instance for adding an entry. 

122 """ 

123 return self.baseSkel(skelType, *args, **kwargs) 

124 

125 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

126 """ 

127 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

128 for editing an existing entry from the tree. 

129 

130 The default is a Skeleton instance returned by :func:`~baseSkel`. 

131 

132 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

133 

134 :return: Returns a Skeleton instance for editing an entry. 

135 """ 

136 return self.baseSkel(skelType, *args, **kwargs) 

137 

138 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

139 """ 

140 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

141 for cloning an existing entry of the tree. 

142 

143 The default is a SkeletonInstance returned by :func:`~baseSkel`. 

144 

145 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

146 

147 :return: Returns a SkeletonInstance for cloning an entry. 

148 """ 

149 return self.baseSkel(skelType, *args, **kwargs) 

150 

151 def rootnodeSkel( 

152 self, 

153 *, 

154 identifier: str = "rep_module_repo", 

155 ensure: bool | dict | t.Callable[[SkeletonInstance], None] = False, 

156 ) -> SkeletonInstance: 

157 """ 

158 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

159 for rootnode entries. 

160 

161 The default is a SkeletonInstance returned by :func:`~baseSkel`, with a preset key created from identifier. 

162 

163 :param identifier: Unique identifier (name) for this rootnode. 

164 :param ensure: If provided, ensures that the skeleton is available, and created with optionally provided values. 

165 

166 :return: Returns a SkeletonInstance for handling root nodes. 

167 """ 

168 skel = self.baseSkel("node") 

169 

170 skel["key"] = db.Key(skel.kindName, identifier) 

171 skel["is_root_node"] = True 

172 

173 if ensure not in (False, None): 

174 return skel.read(create=ensure) 

175 

176 return skel 

177 

178 @deprecated( 

179 version="3.7.0", 

180 reason="Use rootnodeSkel(ensure=True) instead.", 

181 action="always" 

182 ) 

183 def ensureOwnModuleRootNode(self) -> db.Entity: 

184 """ 

185 Ensures, that general root-node for the current module exists. 

186 If no root-node exists yet, it will be created. 

187 

188 :returns: The entity of the root-node. 

189 """ 

190 return self.rootnodeSkel(ensure=True).dbEntity 

191 

192 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]: 

193 """ 

194 Default function for providing a list of root node items. 

195 This list is requested by several module-internal functions and *must* be 

196 overridden by a custom functionality. The default stub for this function 

197 returns an empty list. 

198 An example implementation could be the following: 

199 

200 .. code-block:: python 

201 

202 # Example 

203 def getAvailableRootNodes(self, *args, **kwargs): 

204 q = db.Query(self.rootKindName) 

205 ret = [{"key": str(e.key()), 

206 "name": e.get("name", str(e.key().id_or_name()))} #FIXME 

207 for e in q.run(limit=25)] 

208 return ret 

209 

210 :param args: Can be used in custom implementations. 

211 :param kwargs: Can be used in custom implementations. 

212 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \ 

213 respective information. 

214 """ 

215 return [] 

216 

217 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None: 

218 """ 

219 Returns the root-node for a given child. 

220 

221 :param key: Key of the child node entry. 

222 

223 :returns: The skeleton of the root-node. 

224 """ 

225 skel = self.nodeSkelCls() 

226 

227 while key: 

228 if not skel.read(key): 

229 return None 

230 

231 key = skel["parententry"] 

232 

233 return skel 

234 

235 @CallDeferred 

236 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0): 

237 """ 

238 Recursively fixes the parentrepo key after a move operation. 

239 

240 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

241 

242 :param parentNode: URL-safe key of the node which children should be fixed. 

243 :param newRepoKey: URL-safe key of the new repository. 

244 :param depth: Safety level depth preventing infinitive loops. 

245 """ 

246 if depth > 99: 

247 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo") 

248 logging.critical("Your data is corrupt!") 

249 logging.debug(f"{parentNode=}, {newRepoKey=}") 

250 return 

251 

252 def fixTxn(nodeKey, newRepoKey): 

253 node = db.get(nodeKey) 

254 node["parentrepo"] = newRepoKey 

255 db.put(node) 

256 

257 # Fix all nodes 

258 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode) 

259 for repo in q.iter(): 

260 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1) 

261 db.run_in_transaction(fixTxn, repo.key, newRepoKey) 

262 

263 # Fix the leafs on this level 

264 if self.leafSkelCls: 

265 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode) 

266 for repo in q.iter(): 

267 db.run_in_transaction(fixTxn, repo.key, newRepoKey) 

268 

269 ## Internal exposed functions 

270 

271 @internal_exposed 

272 def pathToKey(self, key: db.Key): 

273 """ 

274 Returns the recursively expanded path through the Tree from the root-node to a 

275 requested node. 

276 :param key: Key of the destination *node*. 

277 :returns: An nested dictionary with information about all nodes in the path from root to the requested node. 

278 """ 

279 lastLevel = [] 

280 for x in range(0, 99): 

281 currentNodeSkel = self.viewSkel("node") 

282 if not currentNodeSkel.read(key): 

283 return [] # Either invalid key or listFilter prevented us from fetching anything 

284 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level 

285 break 

286 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"]) 

287 currentLevel = [{"skel": x, 

288 "active": x["key"] == currentNodeSkel["key"], 

289 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []} 

290 for x in self.listFilter(levelQry).fetch(99)] 

291 assert currentLevel, "Got emtpy parent list?" 

292 lastLevel = currentLevel 

293 key = currentNodeSkel["parententry"] 

294 return lastLevel 

295 

296 ## External exposed functions 

297 

298 @exposed 

299 def index(self, skelType: SkelType = "node", parententry: t.Optional[db.Key | int | str] = None, **kwargs): 

300 if not parententry: 

301 repos = self.getAvailableRootNodes(**kwargs) 

302 match len(repos): 

303 case 0: 

304 raise errors.Unauthorized() 

305 case 1: 

306 parententry = repos[0]["key"] 

307 case _: 

308 raise errors.NotAcceptable(f"Missing required parameter {'parententry'!r}") 

309 

310 return self.list(skelType=skelType, parententry=parententry, **kwargs) 

311 

312 @exposed 

313 def listRootNodes(self, *args, **kwargs) -> t.Any: 

314 """ 

315 Renders a list of all available repositories for the current user using the 

316 modules default renderer. 

317 

318 :returns: The rendered representation of the available root-nodes. 

319 """ 

320 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs)) 

321 

322 @exposed 

323 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any: 

324 """ 

325 Prepares and renders a list of entries. 

326 

327 All supplied parameters are interpreted as filters for the elements displayed. 

328 

329 Unlike other module prototypes in ViUR, the access control in this function is performed 

330 by calling the function :func:`listFilter`, which updates the query-filter to match only 

331 elements which the user is allowed to see. 

332 

333 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter` 

334 

335 :returns: The rendered list objects for the matching entries. 

336 

337 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

338 """ 

339 if not (skelType := self._checkSkelType(skelType)): 

340 raise errors.NotAcceptable("Invalid skelType provided.") 

341 

342 # The general access control is made via self.listFilter() 

343 if not (query := self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))): 

344 raise errors.Unauthorized() 

345 

346 self._apply_default_order(query) 

347 return self.render.list(query.fetch()) 

348 

349 @exposed 

350 def structure(self, skelType: SkelType, action: t.Optional[str] = "view") -> t.Any: 

351 """ 

352 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set 

353 in each bone. 

354 

355 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

356 """ 

357 # FIXME: In ViUR > 3.7 this could also become dynamic (ActionSkel paradigm). 

358 match action: 

359 case "view": 

360 skel = self.viewSkel(skelType) 

361 if not self.canView(skelType, skel): 

362 raise errors.Unauthorized() 

363 

364 case "edit": 

365 skel = self.editSkel(skelType) 

366 if not self.canEdit(skelType, skel): 

367 raise errors.Unauthorized() 

368 

369 case "add": 

370 if not self.canAdd(skelType): 

371 raise errors.Unauthorized() 

372 

373 skel = self.addSkel(skelType) 

374 

375 case "clone": 

376 skel = self.cloneSkel(skelType) 

377 if not (self.canAdd(skelType) and self.canEdit(skelType, skel)): 

378 raise errors.Unauthorized() 

379 

380 case _: 

381 raise errors.NotImplemented(f"The action {action!r} is not implemented.") 

382 

383 return self.render.render(f"structure.{skelType}.{action}", skel) 

384 

385 @exposed 

386 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

387 """ 

388 Prepares and renders a single entry for viewing. 

389 

390 The entry is fetched by its *key* and its *skelType*. 

391 The function performs several access control checks on the requested entity before it is rendered. 

392 

393 .. seealso:: :func:`canView`, :func:`onView` 

394 

395 :returns: The rendered representation of the requested entity. 

396 

397 :param skelType: May either be "node" or "leaf". 

398 :param key: URL-safe key of the parent. 

399 

400 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided. 

401 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

402 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

403 """ 

404 if not (skelType := self._checkSkelType(skelType)): 

405 raise errors.NotAcceptable(f"Invalid skelType provided.") 

406 

407 skel = self.viewSkel(skelType) 

408 if not skel.read(key): 

409 raise errors.NotFound() 

410 

411 if not self.canView(skelType, skel): 

412 raise errors.Unauthorized() 

413 

414 self.onView(skelType, skel) 

415 return self.render.view(skel) 

416 

417 @exposed 

418 @force_ssl 

419 @skey(allow_empty=True) 

420 def add(self, skelType: SkelType, node: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any: 

421 # FIXME: VIUR4 rename node into key... 

422 """ 

423 Add a new entry with the given parent *node*, and render the entry, eventually with error notes 

424 on incorrect data. Data is taken by any other arguments in *kwargs*. 

425 

426 The function performs several access control checks on the requested entity before it is added. 

427 

428 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded` 

429 

430 :param skelType: Defines the type of the new entry and may either be "node" or "leaf". 

431 :param node: URL-safe key of the parent. 

432 

433 :returns: The rendered, added object of the entry, eventually with error hints. 

434 

435 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

436 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

437 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

438 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

439 """ 

440 if not (skelType := self._checkSkelType(skelType)): 

441 raise errors.NotAcceptable(f"Invalid skelType provided.") 

442 

443 skel = self.addSkel(skelType) 

444 parentNodeSkel = self.editSkel("node") 

445 

446 # TODO VIUR4: Why is this parameter called "node"? 

447 if not parentNodeSkel.read(node): 

448 raise errors.NotFound("The provided parent node could not be found.") 

449 if not self.canAdd(skelType, parentNodeSkel): 

450 raise errors.Unauthorized() 

451 

452 skel["parententry"] = parentNodeSkel["key"] 

453 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

454 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

455 

456 if ( 

457 not kwargs # no data supplied 

458 or not current.request.get().isPostRequest # failure if not using POST-method 

459 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones 

460 or bounce # review before adding 

461 ): 

462 return self.render.add(skel) 

463 

464 self.onAdd(skelType, skel) 

465 skel.write() 

466 self.onAdded(skelType, skel) 

467 

468 return self.render.addSuccess(skel) 

469 

470 @force_ssl 

471 @force_post 

472 @exposed 

473 @skey 

474 @access("root") 

475 def add_or_edit(self, skelType: SkelType, key: db.Key | int | str, **kwargs) -> t.Any: 

476 """ 

477 This function is intended to be used by importers. 

478 Only "root"-users are allowed to use it. 

479 """ 

480 if not (skelType := self._checkSkelType(skelType)): 

481 raise errors.NotAcceptable("Invalid skelType provided.") 

482 

483 kind_name = self.nodeSkelCls.kindName if skelType == "node" else self.leafSkelCls.kindName 

484 

485 # Adjust key 

486 db_key = db.key_helper(key, target_kind=kind_name, adjust_kind=True) 

487 

488 # Retrieve and verify existing entry 

489 db_entity = db.get(db_key) 

490 is_add = not bool(db_entity) 

491 

492 # Instanciate relevant skeleton 

493 if is_add: 

494 skel = self.addSkel(skelType) 

495 else: 

496 skel = self.editSkel(skelType) 

497 skel.dbEntity = db_entity # assign existing entity 

498 

499 skel = skel.ensure_is_cloned() 

500 skel.parententry.required = True 

501 skel.parententry.readOnly = False 

502 

503 skel["key"] = db_key 

504 

505 if ( 

506 not kwargs # no data supplied 

507 or not skel.fromClient(kwargs) # failure on reading into the bones 

508 ): 

509 # render the skeleton in the version it could as far as it could be read. 

510 return self.render.render("add_or_edit", skel) 

511 

512 # Ensure the parententry exists 

513 parentNodeSkel = self.editSkel("node") 

514 if not parentNodeSkel.read(skel["parententry"]): 

515 raise errors.NotFound("The provided parent node could not be found.") 

516 if not self.canAdd(skelType, parentNodeSkel): 

517 raise errors.Unauthorized() 

518 

519 skel["parententry"] = parentNodeSkel["key"] 

520 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

521 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

522 

523 if is_add: 

524 self.onAdd(skelType, skel) 

525 else: 

526 self.onEdit(skelType, skel) 

527 

528 skel.write() 

529 

530 if is_add: 

531 self.onAdded(skelType, skel) 

532 return self.render.addSuccess(skel) 

533 

534 self.onEdited(skelType, skel) 

535 return self.render.editSuccess(skel) 

536 

537 @exposed 

538 @force_ssl 

539 @skey(allow_empty=True) 

540 def edit(self, skelType: SkelType, key: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any: 

541 """ 

542 Modify an existing entry, and render the entry, eventually with error notes on incorrect data. 

543 Data is taken by any other arguments in *kwargs*. 

544 

545 The function performs several access control checks on the requested entity before it is added. 

546 

547 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited` 

548 

549 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf". 

550 :param key: URL-safe key of the item to be edited. 

551 

552 :returns: The rendered, modified object of the entry, eventually with error hints. 

553 

554 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

555 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

556 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

557 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

558 """ 

559 if not (skelType := self._checkSkelType(skelType)): 

560 raise errors.NotAcceptable(f"Invalid skelType provided.") 

561 

562 skel = self.editSkel(skelType) 

563 if not skel.read(key): 

564 raise errors.NotFound() 

565 

566 if not self.canEdit(skelType, skel): 

567 raise errors.Unauthorized() 

568 

569 if ( 

570 not kwargs # no data supplied 

571 or not current.request.get().isPostRequest # failure if not using POST-method 

572 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones 

573 or bounce # review before adding 

574 ): 

575 return self.render.edit(skel) 

576 

577 self.onEdit(skelType, skel) 

578 skel.write() 

579 self.onEdited(skelType, skel) 

580 

581 return self.render.editSuccess(skel) 

582 

583 @exposed 

584 @force_ssl 

585 @force_post 

586 @skey 

587 def delete(self, skelType: SkelType, key: str, **kwargs) -> t.Any: 

588 """ 

589 Deletes an entry or an directory (including its contents). 

590 

591 The function runs several access control checks on the data before it is deleted. 

592 

593 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted` 

594 

595 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf". 

596 :param key: URL-safe key of the item to be deleted. 

597 

598 :returns: The rendered, deleted object of the entry. 

599 

600 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

601 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

602 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

603 """ 

604 if not (skelType := self._checkSkelType(skelType)): 

605 raise errors.NotAcceptable(f"Invalid skelType provided.") 

606 

607 skel = self.editSkel(skelType) 

608 if not skel.read(key): 

609 raise errors.NotFound() 

610 

611 if not self.canDelete(skelType, skel): 

612 raise errors.Unauthorized() 

613 

614 if skelType == "node": 

615 self.deleteRecursive(skel["key"]) 

616 

617 self.onDelete(skelType, skel) 

618 skel.delete() 

619 self.onDeleted(skelType, skel) 

620 

621 return self.render.deleteSuccess(skel, skelType=skelType) 

622 

623 @CallDeferred 

624 def deleteRecursive(self, parentKey: str): 

625 """ 

626 Recursively processes a delete request. 

627 

628 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

629 

630 :param parentKey: URL-safe key of the node which children should be deleted. 

631 """ 

632 nodeKey = db.key_helper(parentKey, self.viewSkel("node").kindName) 

633 if self.leafSkelCls: 

634 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter(): 

635 leafSkel = self.viewSkel("leaf") 

636 if not leafSkel.read(leaf.key): 

637 continue 

638 leafSkel.delete() 

639 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter(): 

640 self.deleteRecursive(node.key) 

641 nodeSkel = self.viewSkel("node") 

642 if not nodeSkel.read(node.key): 

643 continue 

644 nodeSkel.delete() 

645 

646 @exposed 

647 @force_ssl 

648 @force_post 

649 @skey 

650 def move( 

651 self, 

652 skelType: SkelType, 

653 key: db.Key | int | str, 

654 parentNode: db.Key | int | str, 

655 sortindex: t.Optional[float] = None 

656 ) -> str: 

657 """ 

658 Move a node (including its contents) or a leaf to another node. 

659 

660 .. seealso:: :func:`canMove` 

661 

662 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf". 

663 :param key: URL-safe key of the item to be moved. 

664 :param parentNode: URL-safe key of the destination node, which must be a node. 

665 :param sortindex: An optional sortindex for the key. 

666 

667 :returns: The rendered, edited object of the entry. 

668 

669 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

670 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

671 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

672 """ 

673 if not (skelType := self._checkSkelType(skelType)): 

674 raise errors.NotAcceptable("Invalid skelType provided.") 

675 

676 skel = self.editSkel(skelType) 

677 parentnode_skel = self.baseSkel("node") 

678 

679 if not skel.read(key): 

680 raise errors.NotFound("Cannot find entity to move") 

681 

682 if not parentnode_skel.read(parentNode): 

683 parentNode = db.normalize_key(parentNode) 

684 

685 if parentNode.kind != parentnode_skel.kindName: 

686 raise errors.NotFound( 

687 f"You provided a key of kind {parentNode.kind}, but require a {parentnode_skel.kindName}." 

688 ) 

689 

690 raise errors.NotFound("Cannot find parentNode entity") 

691 

692 if skel["key"] == parentnode_skel["key"]: 

693 raise errors.NotAcceptable("Cannot move a node into itself") 

694 

695 # Test if we try to move a rootNode 

696 if not skel["parententry"]: 

697 raise errors.NotAcceptable("Can't move a rootNode to somewhere else") 

698 

699 if not self.canMove(skelType, skel, parentnode_skel): 

700 raise errors.Unauthorized() 

701 

702 # Check if parentNodeSkel is descendant of the skel 

703 walk_skel = parentnode_skel.clone() 

704 

705 while walk_skel["parententry"]: 

706 if walk_skel["parententry"] == skel["key"]: 

707 raise errors.NotAcceptable( 

708 f"Invalid move: Entry {key} cannot be moved below its own descendant {parentNode}." 

709 ) 

710 

711 walk_skel = walk_skel.read(walk_skel["parententry"]) 

712 

713 old_parentrepo = skel["parentrepo"] 

714 

715 self.onEdit(skelType, skel) 

716 skel.patch({ 

717 "parententry": parentnode_skel["key"], 

718 "parentrepo": parentnode_skel["parentrepo"], 

719 "sortindex": sortindex or time.time() 

720 }) 

721 self.onEdited(skelType, skel) 

722 

723 # Ensure a changed parentRepo get's propagated 

724 if old_parentrepo != parentnode_skel["parentrepo"]: 

725 self.updateParentRepo(key, parentnode_skel["parentrepo"]) 

726 

727 return self.render.render("moveSuccess", skel) 

728 

729 @exposed 

730 @force_ssl 

731 @skey(allow_empty=True) 

732 def clone( 

733 self, 

734 skelType: SkelType, 

735 key: db.Key | str | int, 

736 *, 

737 bounce: bool = False, 

738 parententry: t.Optional[db.Key | str | int] = None, 

739 **kwargs, 

740 ): 

741 """ 

742 Clone an existing entry, and render the entry, eventually with error notes on incorrect data. 

743 Data is taken by any other arguments in *kwargs*. 

744 

745 The function performs several access control checks on the requested entity before it is added. 

746 

747 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned` 

748 

749 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf". 

750 :param key: URL-safe key of the item to be edited. 

751 :param bounce: Return the skeleton after applying client data and validtion without writing. 

752 :param parententry: URL-safe key of the destination parent node. 

753 

754 :returns: The cloned object of the entry, eventually with error hints. 

755 

756 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

757 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found. 

758 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

759 """ 

760 

761 if not (skelType := self._checkSkelType(skelType)): 

762 raise errors.NotAcceptable(f"Invalid skelType provided.") 

763 

764 skel = self.cloneSkel(skelType) 

765 if not skel.read(key): 

766 raise errors.NotFound() 

767 

768 if parententry is not None: 

769 if not (parent_node_skel := self.viewSkel("node").read(parententry)): 

770 raise errors.NotFound("The provided parent node could not be found.") 

771 else: 

772 parent_node_skel = None 

773 

774 # a clone-operation is some kind of edit and add... 

775 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, parent_node_skel)): 

776 raise errors.Unauthorized() 

777 

778 # Remember source skel and unset the key for clone operation! 

779 src_skel = skel 

780 skel = skel.clone(apply_clone_strategy=True) 

781 skel["key"] = None 

782 

783 # make parententry required and writeable when provided 

784 if "parententry" in kwargs: 

785 skel.parententry.readOnly = False 

786 skel.parententry.required = True 

787 else: 

788 _ = skel["parententry"] # TODO: because of accessedValues... 

789 

790 # make parentrepo required and writeable when provided 

791 if "parentrepo" in kwargs: 

792 skel.parentrepo.readOnly = False 

793 skel.parentrepo.required = True 

794 else: 

795 _ = skel["parentrepo"] # TODO: because of accessedValues... 

796 

797 # Check all required preconditions for clone 

798 if ( 

799 not kwargs # no data supplied 

800 or not current.request.get().isPostRequest # failure if not using POST-method 

801 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones 

802 or bounce # review before changing 

803 ): 

804 return self.render.edit(skel, action="clone") 

805 

806 self.onClone(skelType, skel, src_skel=src_skel) 

807 assert skel.write() 

808 self.onCloned(skelType, skel, src_skel=src_skel) 

809 

810 return self.render.editSuccess(skel, action="cloneSuccess") 

811 

812 ## Default access control functions 

813 

814 def listFilter(self, query: db.Query) -> t.Optional[db.Query]: 

815 """ 

816 Access control function on item listing. 

817 

818 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function, 

819 and is used to modify the provided filter parameter to match only items that the current user 

820 is allowed to see. 

821 

822 :param query: Query which should be altered. 

823 

824 :returns: The altered filter, or None if access is not granted. 

825 """ 

826 

827 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]): 

828 return query 

829 

830 return None 

831 

832 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

833 """ 

834 Checks if the current user can view the given entry. 

835 Should be identical to what's allowed by listFilter. 

836 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this 

837 method can be overridden for performance improvements (to eliminate that additional database access). 

838 :param skel: The entry we check for 

839 :return: True if the current session is authorized to view that entry, False otherwise 

840 """ 

841 query = self.viewSkel(skelType).all() 

842 

843 if key := skel["key"]: 

844 query.mergeExternalFilter({"key": key}) 

845 

846 query = self.listFilter(query) # Access control 

847 

848 if query is None or (key and not query.getEntry()): 

849 return False 

850 

851 return True 

852 

853 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance] = None) -> bool: 

854 """ 

855 Access control function for adding permission. 

856 

857 Checks if the current user has the permission to add a new entry. 

858 

859 The default behavior is: 

860 - If no user is logged in, adding is generally refused. 

861 - If the user has "root" access, adding is generally allowed. 

862 - If the user has the modules "add" permission (module-add) enabled, adding is allowed. 

863 

864 It should be overridden for a module-specific behavior. 

865 

866 .. seealso:: :func:`add` 

867 

868 :param skelType: Defines the type of the node that should be added. 

869 :param parentNodeSkel: The parent node where a new entry should be added. 

870 

871 :returns: True, if adding entries is allowed, False otherwise. 

872 """ 

873 

874 if not (user := current.user.get()): 

875 return False 

876 # root user is always allowed. 

877 if user["access"] and "root" in user["access"]: 

878 return True 

879 # user with add-permission is allowed. 

880 if user and user["access"] and f"{self.moduleName}-add" in user["access"]: 

881 return True 

882 return False 

883 

884 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

885 """ 

886 Access control function for modification permission. 

887 

888 Checks if the current user has the permission to edit an entry. 

889 

890 The default behavior is: 

891 - If no user is logged in, editing is generally refused. 

892 - If the user has "root" access, editing is generally allowed. 

893 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed. 

894 

895 It should be overridden for a module-specific behavior. 

896 

897 .. seealso:: :func:`edit` 

898 

899 :param skelType: Defines the type of the node that should be edited. 

900 :param skel: The Skeleton that should be edited. 

901 

902 :returns: True, if editing entries is allowed, False otherwise. 

903 """ 

904 if not (user := current.user.get()): 

905 return False 

906 if user["access"] and "root" in user["access"]: 

907 return True 

908 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

909 return True 

910 return False 

911 

912 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

913 """ 

914 Access control function for delete permission. 

915 

916 Checks if the current user has the permission to delete an entry. 

917 

918 The default behavior is: 

919 - If no user is logged in, deleting is generally refused. 

920 - If the user has "root" access, deleting is generally allowed. 

921 - If the user has the modules "deleting" permission (module-delete) enabled, \ 

922 deleting is allowed. 

923 

924 It should be overridden for a module-specific behavior. 

925 

926 :param skelType: Defines the type of the node that should be deleted. 

927 :param skel: The Skeleton that should be deleted. 

928 

929 .. seealso:: :func:`delete` 

930 

931 :returns: True, if deleting entries is allowed, False otherwise. 

932 """ 

933 if not (user := current.user.get()): 

934 return False 

935 if user["access"] and "root" in user["access"]: 

936 return True 

937 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]: 

938 return True 

939 return False 

940 

941 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool: 

942 """ 

943 Access control function for moving permission. 

944 

945 Checks if the current user has the permission to move an entry. 

946 

947 The default behavior is: 

948 - If no user is logged in, deleting is generally refused. 

949 - If the user has "root" access, deleting is generally allowed. 

950 - If the user has the modules "edit" permission (module-edit) enabled, \ 

951 moving is allowed. 

952 

953 It should be overridden for a module-specific behavior. 

954 

955 :param skelType: Defines the type of the node that shall be deleted. 

956 :param node: URL-safe key of the node to be moved. 

957 :param destNode: URL-safe key of the node where *node* should be moved to. 

958 

959 .. seealso:: :func:`move` 

960 

961 :returns: True, if deleting entries is allowed, False otherwise. 

962 """ 

963 if not (user := current.user.get()): 

964 return False 

965 if user["access"] and "root" in user["access"]: 

966 return True 

967 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

968 return True 

969 return False 

970 

971 ## Overridable eventhooks 

972 

973 def onAdd(self, skelType: SkelType, skel: SkeletonInstance): 

974 """ 

975 Hook function that is called before adding an entry. 

976 

977 It can be overridden for a module-specific behavior. 

978 

979 :param skelType: Defines the type of the node that shall be added. 

980 :param skel: The Skeleton that is going to be added. 

981 

982 .. seealso:: :func:`add`, :func:`onAdded` 

983 """ 

984 pass 

985 

986 def onAdded(self, skelType: SkelType, skel: SkeletonInstance): 

987 """ 

988 Hook function that is called after adding an entry. 

989 

990 It should be overridden for a module-specific behavior. 

991 The default is writing a log entry. 

992 

993 :param skelType: Defines the type of the node that has been added. 

994 :param skel: The Skeleton that has been added. 

995 

996 .. seealso:: :func:`add`, :func:`onAdd` 

997 """ 

998 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""") 

999 flushCache(kind=skel.kindName) 

1000 if user := current.user.get(): 

1001 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1002 

1003 def onEdit(self, skelType: SkelType, skel: SkeletonInstance): 

1004 """ 

1005 Hook function that is called before editing an entry. 

1006 

1007 It can be overridden for a module-specific behavior. 

1008 

1009 :param skelType: Defines the type of the node that shall be edited. 

1010 :param skel: The Skeleton that is going to be edited. 

1011 

1012 .. seealso:: :func:`edit`, :func:`onEdited` 

1013 """ 

1014 pass 

1015 

1016 def onEdited(self, skelType: SkelType, skel: SkeletonInstance): 

1017 """ 

1018 Hook function that is called after modifying an entry. 

1019 

1020 It should be overridden for a module-specific behavior. 

1021 The default is writing a log entry. 

1022 

1023 :param skelType: Defines the type of the node that has been edited. 

1024 :param skel: The Skeleton that has been modified. 

1025 

1026 .. seealso:: :func:`edit`, :func:`onEdit` 

1027 """ 

1028 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""") 

1029 flushCache(key=skel["key"]) 

1030 if user := current.user.get(): 

1031 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1032 

1033 def onView(self, skelType: SkelType, skel: SkeletonInstance): 

1034 """ 

1035 Hook function that is called when viewing an entry. 

1036 

1037 It should be overridden for a module-specific behavior. 

1038 The default is doing nothing. 

1039 

1040 :param skelType: Defines the type of the node that is viewed. 

1041 :param skel: The Skeleton that is viewed. 

1042 

1043 .. seealso:: :func:`view` 

1044 """ 

1045 pass 

1046 

1047 def onDelete(self, skelType: SkelType, skel: SkeletonInstance): 

1048 """ 

1049 Hook function that is called before deleting an entry. 

1050 

1051 It can be overridden for a module-specific behavior. 

1052 

1053 :param skelType: Defines the type of the node that shall be deleted. 

1054 :param skel: The Skeleton that is going to be deleted. 

1055 

1056 .. seealso:: :func:`delete`, :func:`onDeleted` 

1057 """ 

1058 pass 

1059 

1060 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance): 

1061 """ 

1062 Hook function that is called after deleting an entry. 

1063 

1064 It should be overridden for a module-specific behavior. 

1065 The default is writing a log entry. 

1066 

1067 ..warning: Saving the skeleton again will undo the deletion 

1068 (if the skeleton was a leaf or a node with no children). 

1069 

1070 :param skelType: Defines the type of the node that is deleted. 

1071 :param skel: The Skeleton that has been deleted. 

1072 

1073 .. seealso:: :func:`delete`, :func:`onDelete` 

1074 """ 

1075 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""") 

1076 flushCache(key=skel["key"]) 

1077 if user := current.user.get(): 

1078 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1079 

1080 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1081 """ 

1082 Hook function that is called before cloning an entry. 

1083 

1084 It can be overwritten to a module-specific behavior. 

1085 

1086 :param skelType: Defines the type of the node that is cloned. 

1087 :param skel: The new SkeletonInstance that is being created. 

1088 :param src_skel: The source SkeletonInstance `skel` is cloned from. 

1089 

1090 .. seealso:: :func:`clone`, :func:`onCloned` 

1091 """ 

1092 pass 

1093 

1094 @CallDeferred 

1095 def _clone_recursive( 

1096 self, 

1097 skel_type: SkelType, 

1098 src_key: db.Key, 

1099 target_key: db.Key, 

1100 target_repo: db.Key, 

1101 cursor=None 

1102 ): 

1103 """ 

1104 Helper function which is used by default onCloned() to clone a recursive structure. 

1105 """ 

1106 assert (skel_type := self._checkSkelType(skel_type)) 

1107 

1108 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}") 

1109 

1110 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex") 

1111 q.setCursor(cursor) 

1112 

1113 count = 0 

1114 for skel in q.fetch(): 

1115 src_skel = skel 

1116 

1117 skel = skel.clone() 

1118 skel["key"] = None 

1119 skel["parententry"] = target_key 

1120 skel["parentrepo"] = target_repo 

1121 

1122 self.onClone(skel_type, skel, src_skel=src_skel) 

1123 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written.. 

1124 assert skel.write() 

1125 self.onCloned(skel_type, skel, src_skel=src_skel) 

1126 count += 1 

1127 

1128 logging.debug(f"_clone_recursive {count=}") 

1129 

1130 if cursor := q.getCursor(): 

1131 self._clone_recursive(skel_type, src_key, target_key, target_repo, cursor) 

1132 

1133 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1134 """ 

1135 Hook function that is called after cloning an entry. 

1136 

1137 It can be overwritten to a module-specific behavior. 

1138 

1139 By default, when cloning a "node", this function calls :func:`_clone_recursive` 

1140 which recursively clones the entire structure below this node in the background. 

1141 If this is not wanted, or wanted by a specific setting, overwrite this function 

1142 without a super-call. 

1143 

1144 :param skelType: Defines the type of the node that is cloned. 

1145 :param skel: The new SkeletonInstance that was created. 

1146 :param src_skel: The source SkeletonInstance `skel` was cloned from. 

1147 

1148 .. seealso:: :func:`clone`, :func:`onClone` 

1149 """ 

1150 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""") 

1151 flushCache(kind=skel.kindName) 

1152 

1153 if user := current.user.get(): 

1154 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1155 

1156 # Clone entire structure below, in case this is a node. 

1157 if skelType == "node": 

1158 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"]) 

1159 

1160 if self.leafSkelCls: 

1161 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"]) 

1162 

1163 

1164Tree.vi = True 

1165Tree.admin = True