Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/prototypes/tree.py: 0%

434 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import time 

2import logging 

3import typing as t 

4from deprecated.sphinx import deprecated 

5from viur.core import errors, db, current 

6from viur.core.decorators import * 

7from viur.core.bones import KeyBone, SortIndexBone, BooleanBone 

8from viur.core.cache import flushCache 

9from viur.core.skeleton import Skeleton, SkeletonInstance 

10from viur.core.tasks import CallDeferred 

11from .skelmodule import SkelModule 

12 

13 

14SkelType = t.Literal["node", "leaf"] 

15 

16 

17class TreeSkel(Skeleton): 

18 parententry = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

19 descr="Parent", 

20 visible=False, 

21 readOnly=True, 

22 ) 

23 

24 parentrepo = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

25 descr="BaseRepo", 

26 visible=False, 

27 readOnly=True, 

28 ) 

29 

30 sortindex = SortIndexBone( 

31 visible=False, 

32 readOnly=True, 

33 ) 

34 

35 is_root_node = BooleanBone( 

36 defaultValue=False, 

37 readOnly=True, 

38 visible=False, 

39 ) 

40 

41 @classmethod 

42 def refresh(cls, skelValues): # ViUR2 Compatibility 

43 super().refresh(skelValues) 

44 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility 

45 skelValues["parententry"] = db.normalize_key(skelValues.dbEntity["parentdir"]) 

46 

47 

48class Tree(SkelModule): 

49 """ 

50 Tree module prototype. 

51 

52 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only. 

53 """ 

54 accessRights = ("add", "edit", "view", "delete", "manage") 

55 

56 nodeSkelCls = None 

57 leafSkelCls = None 

58 

59 default_order = "sortindex" 

60 

61 def __init__(self, moduleName, modulePath, *args, **kwargs): 

62 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}" 

63 super().__init__(moduleName, modulePath, *args, **kwargs) 

64 

65 @property 

66 def handler(self): 

67 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy) 

68 

69 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]: 

70 """ 

71 Checks for correct skelType. 

72 

73 Either returns the type provided, or None in case it is invalid. 

74 """ 

75 skelType = skelType.lower() 

76 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls): 

77 return skelType 

78 

79 return None 

80 

81 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]: 

82 if not (skelType := self._checkSkelType(skelType)): 

83 raise ValueError("Unsupported skelType") 

84 

85 if skelType == "leaf": 

86 return self.leafSkelCls 

87 

88 return self.nodeSkelCls 

89 

90 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

91 """ 

92 Return unmodified base skeleton for the given skelType. 

93 

94 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel` 

95 """ 

96 return self._resolveSkelCls(skelType, *args, **kwargs)() 

97 

98 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

99 """ 

100 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

101 for viewing an existing entry from the tree. 

102 

103 The default is a Skeleton instance returned by :func:`~baseSkel`. 

104 

105 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel` 

106 

107 :return: Returns a Skeleton instance for viewing an entry. 

108 """ 

109 return self.baseSkel(skelType, *args, **kwargs) 

110 

111 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

112 """ 

113 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

114 for adding an entry to the tree. 

115 

116 The default is a Skeleton instance returned by :func:`~baseSkel`. 

117 

118 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

119 

120 :return: Returns a Skeleton instance for adding an entry. 

121 """ 

122 return self.baseSkel(skelType, *args, **kwargs) 

123 

124 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

125 """ 

126 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

127 for editing an existing entry from the tree. 

128 

129 The default is a Skeleton instance returned by :func:`~baseSkel`. 

130 

131 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

132 

133 :return: Returns a Skeleton instance for editing an entry. 

134 """ 

135 return self.baseSkel(skelType, *args, **kwargs) 

136 

137 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

138 """ 

139 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

140 for cloning an existing entry of the tree. 

141 

142 The default is a SkeletonInstance returned by :func:`~baseSkel`. 

143 

144 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

145 

146 :return: Returns a SkeletonInstance for cloning an entry. 

147 """ 

148 return self.baseSkel(skelType, *args, **kwargs) 

149 

150 def rootnodeSkel( 

151 self, 

152 *, 

153 identifier: str = "rep_module_repo", 

154 ensure: bool | dict | t.Callable[[SkeletonInstance], None] = False, 

155 ) -> SkeletonInstance: 

156 """ 

157 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

158 for rootnode entries. 

159 

160 The default is a SkeletonInstance returned by :func:`~baseSkel`, with a preset key created from identifier. 

161 

162 :param identifier: Unique identifier (name) for this rootnode. 

163 :param ensure: If provided, ensures that the skeleton is available, and created with optionally provided values. 

164 

165 :return: Returns a SkeletonInstance for handling root nodes. 

166 """ 

167 skel = self.baseSkel("node") 

168 

169 skel["key"] = db.Key(skel.kindName, identifier) 

170 skel["is_root_node"] = True 

171 

172 if ensure not in (False, None): 

173 return skel.read(create=ensure) 

174 

175 return skel 

176 

177 @deprecated( 

178 version="3.7.0", 

179 reason="Use rootnodeSkel(ensure=True) instead.", 

180 action="always" 

181 ) 

182 def ensureOwnModuleRootNode(self) -> db.Entity: 

183 """ 

184 Ensures, that general root-node for the current module exists. 

185 If no root-node exists yet, it will be created. 

186 

187 :returns: The entity of the root-node. 

188 """ 

189 return self.rootnodeSkel(ensure=True).dbEntity 

190 

191 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]: 

192 """ 

193 Default function for providing a list of root node items. 

194 This list is requested by several module-internal functions and *must* be 

195 overridden by a custom functionality. The default stub for this function 

196 returns an empty list. 

197 An example implementation could be the following: 

198 

199 .. code-block:: python 

200 

201 # Example 

202 def getAvailableRootNodes(self, *args, **kwargs): 

203 q = db.Query(self.rootKindName) 

204 ret = [{"key": str(e.key()), 

205 "name": e.get("name", str(e.key().id_or_name()))} #FIXME 

206 for e in q.run(limit=25)] 

207 return ret 

208 

209 :param args: Can be used in custom implementations. 

210 :param kwargs: Can be used in custom implementations. 

211 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \ 

212 respective information. 

213 """ 

214 return [] 

215 

216 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None: 

217 """ 

218 Returns the root-node for a given child. 

219 

220 :param key: Key of the child node entry. 

221 

222 :returns: The skeleton of the root-node. 

223 """ 

224 skel = self.nodeSkelCls() 

225 

226 while key: 

227 if not skel.read(key): 

228 return None 

229 

230 key = skel["parententry"] 

231 

232 return skel 

233 

234 @CallDeferred 

235 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0): 

236 """ 

237 Recursively fixes the parentrepo key after a move operation. 

238 

239 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

240 

241 :param parentNode: URL-safe key of the node which children should be fixed. 

242 :param newRepoKey: URL-safe key of the new repository. 

243 :param depth: Safety level depth preventing infinitive loops. 

244 """ 

245 if depth > 99: 

246 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo") 

247 logging.critical("Your data is corrupt!") 

248 logging.debug(f"{parentNode=}, {newRepoKey=}") 

249 return 

250 

251 def fixTxn(nodeKey, newRepoKey): 

252 node = db.get(nodeKey) 

253 node["parentrepo"] = newRepoKey 

254 db.put(node) 

255 

256 # Fix all nodes 

257 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode) 

258 for repo in q.iter(): 

259 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1) 

260 db.run_in_transaction(fixTxn, repo.key, newRepoKey) 

261 

262 # Fix the leafs on this level 

263 if self.leafSkelCls: 

264 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode) 

265 for repo in q.iter(): 

266 db.run_in_transaction(fixTxn, repo.key, newRepoKey) 

267 

268 ## Internal exposed functions 

269 

270 @internal_exposed 

271 def pathToKey(self, key: db.Key): 

272 """ 

273 Returns the recursively expanded path through the Tree from the root-node to a 

274 requested node. 

275 :param key: Key of the destination *node*. 

276 :returns: An nested dictionary with information about all nodes in the path from root to the requested node. 

277 """ 

278 lastLevel = [] 

279 for x in range(0, 99): 

280 currentNodeSkel = self.viewSkel("node") 

281 if not currentNodeSkel.read(key): 

282 return [] # Either invalid key or listFilter prevented us from fetching anything 

283 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level 

284 break 

285 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"]) 

286 currentLevel = [{"skel": x, 

287 "active": x["key"] == currentNodeSkel["key"], 

288 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []} 

289 for x in self.listFilter(levelQry).fetch(99)] 

290 assert currentLevel, "Got emtpy parent list?" 

291 lastLevel = currentLevel 

292 key = currentNodeSkel["parententry"] 

293 return lastLevel 

294 

295 ## External exposed functions 

296 

297 @exposed 

298 def index(self, skelType: SkelType = "node", parententry: t.Optional[db.Key | int | str] = None, **kwargs): 

299 if not parententry: 

300 repos = self.getAvailableRootNodes(**kwargs) 

301 match len(repos): 

302 case 0: 

303 raise errors.Unauthorized() 

304 case 1: 

305 parententry = repos[0]["key"] 

306 case _: 

307 raise errors.NotAcceptable(f"Missing required parameter {'parententry'!r}") 

308 

309 return self.list(skelType=skelType, parententry=parententry, **kwargs) 

310 

311 @exposed 

312 def listRootNodes(self, *args, **kwargs) -> t.Any: 

313 """ 

314 Renders a list of all available repositories for the current user using the 

315 modules default renderer. 

316 

317 :returns: The rendered representation of the available root-nodes. 

318 """ 

319 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs)) 

320 

321 @exposed 

322 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any: 

323 """ 

324 Prepares and renders a list of entries. 

325 

326 All supplied parameters are interpreted as filters for the elements displayed. 

327 

328 Unlike other module prototypes in ViUR, the access control in this function is performed 

329 by calling the function :func:`listFilter`, which updates the query-filter to match only 

330 elements which the user is allowed to see. 

331 

332 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter` 

333 

334 :returns: The rendered list objects for the matching entries. 

335 

336 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

337 """ 

338 if not (skelType := self._checkSkelType(skelType)): 

339 raise errors.NotAcceptable("Invalid skelType provided.") 

340 

341 # The general access control is made via self.listFilter() 

342 if not (query := self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))): 

343 raise errors.Unauthorized() 

344 

345 self._apply_default_order(query) 

346 return self.render.list(query.fetch()) 

347 

348 @exposed 

349 def structure(self, skelType: SkelType, action: t.Optional[str] = "view") -> t.Any: 

350 """ 

351 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set 

352 in each bone. 

353 

354 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

355 """ 

356 # FIXME: In ViUR > 3.7 this could also become dynamic (ActionSkel paradigm). 

357 match action: 

358 case "view": 

359 skel = self.viewSkel(skelType) 

360 if not self.canView(skelType, skel): 

361 raise errors.Unauthorized() 

362 

363 case "edit": 

364 skel = self.editSkel(skelType) 

365 if not self.canEdit(skelType, skel): 

366 raise errors.Unauthorized() 

367 

368 case "add": 

369 if not self.canAdd(skelType): 

370 raise errors.Unauthorized() 

371 

372 skel = self.addSkel(skelType) 

373 

374 case "clone": 

375 skel = self.cloneSkel(skelType) 

376 if not (self.canAdd(skelType) and self.canEdit(skelType, skel)): 

377 raise errors.Unauthorized() 

378 

379 case _: 

380 raise errors.NotImplemented(f"The action {action!r} is not implemented.") 

381 

382 return self.render.render(f"structure.{skelType}.{action}", skel) 

383 

384 @exposed 

385 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

386 """ 

387 Prepares and renders a single entry for viewing. 

388 

389 The entry is fetched by its *key* and its *skelType*. 

390 The function performs several access control checks on the requested entity before it is rendered. 

391 

392 .. seealso:: :func:`canView`, :func:`onView` 

393 

394 :returns: The rendered representation of the requested entity. 

395 

396 :param skelType: May either be "node" or "leaf". 

397 :param key: URL-safe key of the parent. 

398 

399 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided. 

400 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

401 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

402 """ 

403 if not (skelType := self._checkSkelType(skelType)): 

404 raise errors.NotAcceptable(f"Invalid skelType provided.") 

405 

406 skel = self.viewSkel(skelType) 

407 if not skel.read(key): 

408 raise errors.NotFound() 

409 

410 if not self.canView(skelType, skel): 

411 raise errors.Unauthorized() 

412 

413 self.onView(skelType, skel) 

414 return self.render.view(skel) 

415 

416 @exposed 

417 @force_ssl 

418 @skey(allow_empty=True) 

419 def add(self, skelType: SkelType, node: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any: 

420 # FIXME: VIUR4 rename node into key... 

421 """ 

422 Add a new entry with the given parent *node*, and render the entry, eventually with error notes 

423 on incorrect data. Data is taken by any other arguments in *kwargs*. 

424 

425 The function performs several access control checks on the requested entity before it is added. 

426 

427 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded` 

428 

429 :param skelType: Defines the type of the new entry and may either be "node" or "leaf". 

430 :param node: URL-safe key of the parent. 

431 

432 :returns: The rendered, added object of the entry, eventually with error hints. 

433 

434 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

435 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

436 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

437 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

438 """ 

439 if not (skelType := self._checkSkelType(skelType)): 

440 raise errors.NotAcceptable(f"Invalid skelType provided.") 

441 

442 skel = self.addSkel(skelType) 

443 parentNodeSkel = self.editSkel("node") 

444 

445 # TODO VIUR4: Why is this parameter called "node"? 

446 if not parentNodeSkel.read(node): 

447 raise errors.NotFound("The provided parent node could not be found.") 

448 if not self.canAdd(skelType, parentNodeSkel): 

449 raise errors.Unauthorized() 

450 

451 skel["parententry"] = parentNodeSkel["key"] 

452 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

453 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

454 

455 if ( 

456 not kwargs # no data supplied 

457 or not current.request.get().isPostRequest # failure if not using POST-method 

458 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones 

459 or bounce # review before adding 

460 ): 

461 return self.render.add(skel) 

462 

463 self.onAdd(skelType, skel) 

464 skel.write() 

465 self.onAdded(skelType, skel) 

466 

467 return self.render.addSuccess(skel) 

468 

469 @force_ssl 

470 @force_post 

471 @exposed 

472 @skey 

473 @access("root") 

474 def add_or_edit(self, skelType: SkelType, key: db.Key | int | str, **kwargs) -> t.Any: 

475 """ 

476 This function is intended to be used by importers. 

477 Only "root"-users are allowed to use it. 

478 """ 

479 if not (skelType := self._checkSkelType(skelType)): 

480 raise errors.NotAcceptable("Invalid skelType provided.") 

481 

482 kind_name = self.nodeSkelCls.kindName if skelType == "node" else self.leafSkelCls.kindName 

483 

484 # Adjust key 

485 db_key = db.key_helper(key, target_kind=kind_name, adjust_kind=True) 

486 

487 # Retrieve and verify existing entry 

488 db_entity = db.get(db_key) 

489 is_add = not bool(db_entity) 

490 

491 # Instanciate relevant skeleton 

492 if is_add: 

493 skel = self.addSkel(skelType) 

494 else: 

495 skel = self.editSkel(skelType) 

496 skel.dbEntity = db_entity # assign existing entity 

497 

498 skel = skel.ensure_is_cloned() 

499 skel.parententry.required = True 

500 skel.parententry.readOnly = False 

501 

502 skel["key"] = db_key 

503 

504 if ( 

505 not kwargs # no data supplied 

506 or not skel.fromClient(kwargs) # failure on reading into the bones 

507 ): 

508 # render the skeleton in the version it could as far as it could be read. 

509 return self.render.render("add_or_edit", skel) 

510 

511 # Ensure the parententry exists 

512 parentNodeSkel = self.editSkel("node") 

513 if not parentNodeSkel.read(skel["parententry"]): 

514 raise errors.NotFound("The provided parent node could not be found.") 

515 if not self.canAdd(skelType, parentNodeSkel): 

516 raise errors.Unauthorized() 

517 

518 skel["parententry"] = parentNodeSkel["key"] 

519 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

520 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

521 

522 if is_add: 

523 self.onAdd(skelType, skel) 

524 else: 

525 self.onEdit(skelType, skel) 

526 

527 skel.write() 

528 

529 if is_add: 

530 self.onAdded(skelType, skel) 

531 return self.render.addSuccess(skel) 

532 

533 self.onEdited(skelType, skel) 

534 return self.render.editSuccess(skel) 

535 

536 @exposed 

537 @force_ssl 

538 @skey(allow_empty=True) 

539 def edit(self, skelType: SkelType, key: db.Key | int | str, *, bounce: bool = False, **kwargs) -> t.Any: 

540 """ 

541 Modify an existing entry, and render the entry, eventually with error notes on incorrect data. 

542 Data is taken by any other arguments in *kwargs*. 

543 

544 The function performs several access control checks on the requested entity before it is added. 

545 

546 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited` 

547 

548 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf". 

549 :param key: URL-safe key of the item to be edited. 

550 

551 :returns: The rendered, modified object of the entry, eventually with error hints. 

552 

553 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

554 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

555 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

556 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

557 """ 

558 if not (skelType := self._checkSkelType(skelType)): 

559 raise errors.NotAcceptable(f"Invalid skelType provided.") 

560 

561 skel = self.editSkel(skelType) 

562 if not skel.read(key): 

563 raise errors.NotFound() 

564 

565 if not self.canEdit(skelType, skel): 

566 raise errors.Unauthorized() 

567 

568 if ( 

569 not kwargs # no data supplied 

570 or not current.request.get().isPostRequest # failure if not using POST-method 

571 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones 

572 or bounce # review before adding 

573 ): 

574 return self.render.edit(skel) 

575 

576 self.onEdit(skelType, skel) 

577 skel.write() 

578 self.onEdited(skelType, skel) 

579 

580 return self.render.editSuccess(skel) 

581 

582 @exposed 

583 @force_ssl 

584 @force_post 

585 @skey 

586 def delete(self, skelType: SkelType, key: str, **kwargs) -> t.Any: 

587 """ 

588 Deletes an entry or an directory (including its contents). 

589 

590 The function runs several access control checks on the data before it is deleted. 

591 

592 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted` 

593 

594 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf". 

595 :param key: URL-safe key of the item to be deleted. 

596 

597 :returns: The rendered, deleted object of the entry. 

598 

599 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

600 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

601 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

602 """ 

603 if not (skelType := self._checkSkelType(skelType)): 

604 raise errors.NotAcceptable(f"Invalid skelType provided.") 

605 

606 skel = self.editSkel(skelType) 

607 if not skel.read(key): 

608 raise errors.NotFound() 

609 

610 if not self.canDelete(skelType, skel): 

611 raise errors.Unauthorized() 

612 

613 if skelType == "node": 

614 self.deleteRecursive(skel["key"]) 

615 

616 self.onDelete(skelType, skel) 

617 skel.delete() 

618 self.onDeleted(skelType, skel) 

619 

620 return self.render.deleteSuccess(skel, skelType=skelType) 

621 

622 @CallDeferred 

623 def deleteRecursive(self, parentKey: str): 

624 """ 

625 Recursively processes a delete request. 

626 

627 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

628 

629 :param parentKey: URL-safe key of the node which children should be deleted. 

630 """ 

631 nodeKey = db.key_helper(parentKey, self.viewSkel("node").kindName) 

632 if self.leafSkelCls: 

633 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter(): 

634 leafSkel = self.viewSkel("leaf") 

635 if not leafSkel.read(leaf.key): 

636 continue 

637 leafSkel.delete() 

638 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter(): 

639 self.deleteRecursive(node.key) 

640 nodeSkel = self.viewSkel("node") 

641 if not nodeSkel.read(node.key): 

642 continue 

643 nodeSkel.delete() 

644 

645 @exposed 

646 @force_ssl 

647 @force_post 

648 @skey 

649 def move( 

650 self, 

651 skelType: SkelType, 

652 key: db.Key | int | str, 

653 parentNode: db.Key | int | str, 

654 sortindex: t.Optional[float] = None 

655 ) -> str: 

656 """ 

657 Move a node (including its contents) or a leaf to another node. 

658 

659 .. seealso:: :func:`canMove` 

660 

661 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf". 

662 :param key: URL-safe key of the item to be moved. 

663 :param parentNode: URL-safe key of the destination node, which must be a node. 

664 :param sortindex: An optional sortindex for the key. 

665 

666 :returns: The rendered, edited object of the entry. 

667 

668 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

669 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

670 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

671 """ 

672 if not (skelType := self._checkSkelType(skelType)): 

673 raise errors.NotAcceptable("Invalid skelType provided.") 

674 

675 skel = self.editSkel(skelType) 

676 parentnode_skel = self.baseSkel("node") 

677 

678 if not skel.read(key): 

679 raise errors.NotFound("Cannot find entity to move") 

680 

681 if not parentnode_skel.read(parentNode): 

682 parentNode = db.normalize_key(parentNode) 

683 

684 if parentNode.kind != parentnode_skel.kindName: 

685 raise errors.NotFound( 

686 f"You provided a key of kind {parentNode.kind}, but require a {parentnode_skel.kindName}." 

687 ) 

688 

689 raise errors.NotFound("Cannot find parentNode entity") 

690 

691 if skel["key"] == parentnode_skel["key"]: 

692 raise errors.NotAcceptable("Cannot move a node into itself") 

693 

694 # Test if we try to move a rootNode 

695 if not skel["parententry"]: 

696 raise errors.NotAcceptable("Can't move a rootNode to somewhere else") 

697 

698 if not self.canMove(skelType, skel, parentnode_skel): 

699 raise errors.Unauthorized() 

700 

701 # Check if parentNodeSkel is descendant of the skel 

702 walk_skel = parentnode_skel.clone() 

703 

704 while walk_skel["parententry"]: 

705 if walk_skel["parententry"] == skel["key"]: 

706 raise errors.NotAcceptable( 

707 f"Invalid move: Entry {key} cannot be moved below its own descendant {parentNode}." 

708 ) 

709 

710 walk_skel = walk_skel.read(walk_skel["parententry"]) 

711 

712 old_parentrepo = skel["parentrepo"] 

713 

714 self.onEdit(skelType, skel) 

715 skel.patch({ 

716 "parententry": parentnode_skel["key"], 

717 "parentrepo": parentnode_skel["parentrepo"], 

718 "sortindex": sortindex or time.time() 

719 }) 

720 self.onEdited(skelType, skel) 

721 

722 # Ensure a changed parentRepo get's propagated 

723 if old_parentrepo != parentnode_skel["parentrepo"]: 

724 self.updateParentRepo(key, parentnode_skel["parentrepo"]) 

725 

726 return self.render.render("moveSuccess", skel) 

727 

728 @exposed 

729 @force_ssl 

730 @skey(allow_empty=True) 

731 def clone(self, skelType: SkelType, key: db.Key | str | int, *, bounce: bool = False, **kwargs): 

732 """ 

733 Clone an existing entry, and render the entry, eventually with error notes on incorrect data. 

734 Data is taken by any other arguments in *kwargs*. 

735 

736 The function performs several access control checks on the requested entity before it is added. 

737 

738 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned` 

739 

740 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf". 

741 :param key: URL-safe key of the item to be edited. 

742 

743 :returns: The cloned object of the entry, eventually with error hints. 

744 

745 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

746 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found. 

747 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

748 """ 

749 

750 if not (skelType := self._checkSkelType(skelType)): 

751 raise errors.NotAcceptable(f"Invalid skelType provided.") 

752 

753 skel = self.cloneSkel(skelType) 

754 if not skel.read(key): 

755 raise errors.NotFound() 

756 

757 # a clone-operation is some kind of edit and add... 

758 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, kwargs.get("parententry"))): 

759 raise errors.Unauthorized() 

760 

761 # Remember source skel and unset the key for clone operation! 

762 src_skel = skel 

763 skel = skel.clone(apply_clone_strategy=True) 

764 skel["key"] = None 

765 

766 # make parententry required and writeable when provided 

767 if "parententry" in kwargs: 

768 skel.parententry.readOnly = False 

769 skel.parententry.required = True 

770 else: 

771 _ = skel["parententry"] # TODO: because of accessedValues... 

772 

773 # make parentrepo required and writeable when provided 

774 if "parentrepo" in kwargs: 

775 skel.parentrepo.readOnly = False 

776 skel.parentrepo.required = True 

777 else: 

778 _ = skel["parentrepo"] # TODO: because of accessedValues... 

779 

780 # Check all required preconditions for clone 

781 if ( 

782 not kwargs # no data supplied 

783 or not current.request.get().isPostRequest # failure if not using POST-method 

784 or not skel.fromClient(kwargs, amend=bounce) # failure on reading into the bones 

785 or bounce # review before changing 

786 ): 

787 return self.render.edit(skel, action="clone") 

788 

789 self.onClone(skelType, skel, src_skel=src_skel) 

790 assert skel.write() 

791 self.onCloned(skelType, skel, src_skel=src_skel) 

792 

793 return self.render.editSuccess(skel, action="cloneSuccess") 

794 

795 ## Default access control functions 

796 

797 def listFilter(self, query: db.Query) -> t.Optional[db.Query]: 

798 """ 

799 Access control function on item listing. 

800 

801 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function, 

802 and is used to modify the provided filter parameter to match only items that the current user 

803 is allowed to see. 

804 

805 :param query: Query which should be altered. 

806 

807 :returns: The altered filter, or None if access is not granted. 

808 """ 

809 

810 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]): 

811 return query 

812 

813 return None 

814 

815 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

816 """ 

817 Checks if the current user can view the given entry. 

818 Should be identical to what's allowed by listFilter. 

819 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this 

820 method can be overridden for performance improvements (to eliminate that additional database access). 

821 :param skel: The entry we check for 

822 :return: True if the current session is authorized to view that entry, False otherwise 

823 """ 

824 query = self.viewSkel(skelType).all() 

825 

826 if key := skel["key"]: 

827 query.mergeExternalFilter({"key": key}) 

828 

829 query = self.listFilter(query) # Access control 

830 

831 if query is None or (key and not query.getEntry()): 

832 return False 

833 

834 return True 

835 

836 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance] = None) -> bool: 

837 """ 

838 Access control function for adding permission. 

839 

840 Checks if the current user has the permission to add a new entry. 

841 

842 The default behavior is: 

843 - If no user is logged in, adding is generally refused. 

844 - If the user has "root" access, adding is generally allowed. 

845 - If the user has the modules "add" permission (module-add) enabled, adding is allowed. 

846 

847 It should be overridden for a module-specific behavior. 

848 

849 .. seealso:: :func:`add` 

850 

851 :param skelType: Defines the type of the node that should be added. 

852 :param parentNodeSkel: The parent node where a new entry should be added. 

853 

854 :returns: True, if adding entries is allowed, False otherwise. 

855 """ 

856 

857 if not (user := current.user.get()): 

858 return False 

859 # root user is always allowed. 

860 if user["access"] and "root" in user["access"]: 

861 return True 

862 # user with add-permission is allowed. 

863 if user and user["access"] and f"{self.moduleName}-add" in user["access"]: 

864 return True 

865 return False 

866 

867 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

868 """ 

869 Access control function for modification permission. 

870 

871 Checks if the current user has the permission to edit an entry. 

872 

873 The default behavior is: 

874 - If no user is logged in, editing is generally refused. 

875 - If the user has "root" access, editing is generally allowed. 

876 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed. 

877 

878 It should be overridden for a module-specific behavior. 

879 

880 .. seealso:: :func:`edit` 

881 

882 :param skelType: Defines the type of the node that should be edited. 

883 :param skel: The Skeleton that should be edited. 

884 

885 :returns: True, if editing entries is allowed, False otherwise. 

886 """ 

887 if not (user := current.user.get()): 

888 return False 

889 if user["access"] and "root" in user["access"]: 

890 return True 

891 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

892 return True 

893 return False 

894 

895 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

896 """ 

897 Access control function for delete permission. 

898 

899 Checks if the current user has the permission to delete an entry. 

900 

901 The default behavior is: 

902 - If no user is logged in, deleting is generally refused. 

903 - If the user has "root" access, deleting is generally allowed. 

904 - If the user has the modules "deleting" permission (module-delete) enabled, \ 

905 deleting is allowed. 

906 

907 It should be overridden for a module-specific behavior. 

908 

909 :param skelType: Defines the type of the node that should be deleted. 

910 :param skel: The Skeleton that should be deleted. 

911 

912 .. seealso:: :func:`delete` 

913 

914 :returns: True, if deleting entries is allowed, False otherwise. 

915 """ 

916 if not (user := current.user.get()): 

917 return False 

918 if user["access"] and "root" in user["access"]: 

919 return True 

920 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]: 

921 return True 

922 return False 

923 

924 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool: 

925 """ 

926 Access control function for moving permission. 

927 

928 Checks if the current user has the permission to move an entry. 

929 

930 The default behavior is: 

931 - If no user is logged in, deleting is generally refused. 

932 - If the user has "root" access, deleting is generally allowed. 

933 - If the user has the modules "edit" permission (module-edit) enabled, \ 

934 moving is allowed. 

935 

936 It should be overridden for a module-specific behavior. 

937 

938 :param skelType: Defines the type of the node that shall be deleted. 

939 :param node: URL-safe key of the node to be moved. 

940 :param destNode: URL-safe key of the node where *node* should be moved to. 

941 

942 .. seealso:: :func:`move` 

943 

944 :returns: True, if deleting entries is allowed, False otherwise. 

945 """ 

946 if not (user := current.user.get()): 

947 return False 

948 if user["access"] and "root" in user["access"]: 

949 return True 

950 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

951 return True 

952 return False 

953 

954 ## Overridable eventhooks 

955 

956 def onAdd(self, skelType: SkelType, skel: SkeletonInstance): 

957 """ 

958 Hook function that is called before adding an entry. 

959 

960 It can be overridden for a module-specific behavior. 

961 

962 :param skelType: Defines the type of the node that shall be added. 

963 :param skel: The Skeleton that is going to be added. 

964 

965 .. seealso:: :func:`add`, :func:`onAdded` 

966 """ 

967 pass 

968 

969 def onAdded(self, skelType: SkelType, skel: SkeletonInstance): 

970 """ 

971 Hook function that is called after adding an entry. 

972 

973 It should be overridden for a module-specific behavior. 

974 The default is writing a log entry. 

975 

976 :param skelType: Defines the type of the node that has been added. 

977 :param skel: The Skeleton that has been added. 

978 

979 .. seealso:: :func:`add`, :func:`onAdd` 

980 """ 

981 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""") 

982 flushCache(kind=skel.kindName) 

983 if user := current.user.get(): 

984 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

985 

986 def onEdit(self, skelType: SkelType, skel: SkeletonInstance): 

987 """ 

988 Hook function that is called before editing an entry. 

989 

990 It can be overridden for a module-specific behavior. 

991 

992 :param skelType: Defines the type of the node that shall be edited. 

993 :param skel: The Skeleton that is going to be edited. 

994 

995 .. seealso:: :func:`edit`, :func:`onEdited` 

996 """ 

997 pass 

998 

999 def onEdited(self, skelType: SkelType, skel: SkeletonInstance): 

1000 """ 

1001 Hook function that is called after modifying an entry. 

1002 

1003 It should be overridden for a module-specific behavior. 

1004 The default is writing a log entry. 

1005 

1006 :param skelType: Defines the type of the node that has been edited. 

1007 :param skel: The Skeleton that has been modified. 

1008 

1009 .. seealso:: :func:`edit`, :func:`onEdit` 

1010 """ 

1011 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""") 

1012 flushCache(key=skel["key"]) 

1013 if user := current.user.get(): 

1014 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1015 

1016 def onView(self, skelType: SkelType, skel: SkeletonInstance): 

1017 """ 

1018 Hook function that is called when viewing an entry. 

1019 

1020 It should be overridden for a module-specific behavior. 

1021 The default is doing nothing. 

1022 

1023 :param skelType: Defines the type of the node that is viewed. 

1024 :param skel: The Skeleton that is viewed. 

1025 

1026 .. seealso:: :func:`view` 

1027 """ 

1028 pass 

1029 

1030 def onDelete(self, skelType: SkelType, skel: SkeletonInstance): 

1031 """ 

1032 Hook function that is called before deleting an entry. 

1033 

1034 It can be overridden for a module-specific behavior. 

1035 

1036 :param skelType: Defines the type of the node that shall be deleted. 

1037 :param skel: The Skeleton that is going to be deleted. 

1038 

1039 .. seealso:: :func:`delete`, :func:`onDeleted` 

1040 """ 

1041 pass 

1042 

1043 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance): 

1044 """ 

1045 Hook function that is called after deleting an entry. 

1046 

1047 It should be overridden for a module-specific behavior. 

1048 The default is writing a log entry. 

1049 

1050 ..warning: Saving the skeleton again will undo the deletion 

1051 (if the skeleton was a leaf or a node with no children). 

1052 

1053 :param skelType: Defines the type of the node that is deleted. 

1054 :param skel: The Skeleton that has been deleted. 

1055 

1056 .. seealso:: :func:`delete`, :func:`onDelete` 

1057 """ 

1058 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""") 

1059 flushCache(key=skel["key"]) 

1060 if user := current.user.get(): 

1061 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1062 

1063 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1064 """ 

1065 Hook function that is called before cloning an entry. 

1066 

1067 It can be overwritten to a module-specific behavior. 

1068 

1069 :param skelType: Defines the type of the node that is cloned. 

1070 :param skel: The new SkeletonInstance that is being created. 

1071 :param src_skel: The source SkeletonInstance `skel` is cloned from. 

1072 

1073 .. seealso:: :func:`clone`, :func:`onCloned` 

1074 """ 

1075 pass 

1076 

1077 @CallDeferred 

1078 def _clone_recursive( 

1079 self, 

1080 skel_type: SkelType, 

1081 src_key: db.Key, 

1082 target_key: db.Key, 

1083 target_repo: db.Key, 

1084 cursor=None 

1085 ): 

1086 """ 

1087 Helper function which is used by default onCloned() to clone a recursive structure. 

1088 """ 

1089 assert (skel_type := self._checkSkelType(skel_type)) 

1090 

1091 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}") 

1092 

1093 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex") 

1094 q.setCursor(cursor) 

1095 

1096 count = 0 

1097 for skel in q.fetch(): 

1098 src_skel = skel 

1099 

1100 skel = skel.clone() 

1101 skel["key"] = None 

1102 skel["parententry"] = target_key 

1103 skel["parentrepo"] = target_repo 

1104 

1105 self.onClone(skel_type, skel, src_skel=src_skel) 

1106 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written.. 

1107 assert skel.write() 

1108 self.onCloned(skel_type, skel, src_skel=src_skel) 

1109 count += 1 

1110 

1111 logging.debug(f"_clone_recursive {count=}") 

1112 

1113 if cursor := q.getCursor(): 

1114 self._clone_recursive(skel_type, src_key, target_key, target_repo, skel_type, cursor) 

1115 

1116 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1117 """ 

1118 Hook function that is called after cloning an entry. 

1119 

1120 It can be overwritten to a module-specific behavior. 

1121 

1122 By default, when cloning a "node", this function calls :func:`_clone_recursive` 

1123 which recursively clones the entire structure below this node in the background. 

1124 If this is not wanted, or wanted by a specific setting, overwrite this function 

1125 without a super-call. 

1126 

1127 :param skelType: Defines the type of the node that is cloned. 

1128 :param skel: The new SkeletonInstance that was created. 

1129 :param src_skel: The source SkeletonInstance `skel` was cloned from. 

1130 

1131 .. seealso:: :func:`clone`, :func:`onClone` 

1132 """ 

1133 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""") 

1134 flushCache(kind=skel.kindName) 

1135 

1136 if user := current.user.get(): 

1137 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1138 

1139 # Clone entire structure below, in case this is a node. 

1140 if skelType == "node": 

1141 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"]) 

1142 

1143 if self.leafSkelCls: 

1144 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"]) 

1145 

1146 

1147Tree.vi = True 

1148Tree.admin = True