Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/prototypes/tree.py: 0%

443 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 11:31 +0000

1import logging 

2import typing as t 

3from deprecated.sphinx import deprecated 

4from viur.core import utils, errors, db, current 

5from viur.core.decorators import * 

6from viur.core.bones import KeyBone, SortIndexBone, BooleanBone 

7from viur.core.cache import flushCache 

8from viur.core.skeleton import Skeleton, SkeletonInstance 

9from viur.core.tasks import CallDeferred 

10from .skelmodule import SkelModule 

11 

12 

13SkelType = t.Literal["node", "leaf"] 

14 

15 

16class TreeSkel(Skeleton): 

17 parententry = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

18 descr="Parent", 

19 visible=False, 

20 readOnly=True, 

21 ) 

22 

23 parentrepo = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

24 descr="BaseRepo", 

25 visible=False, 

26 readOnly=True, 

27 ) 

28 

29 sortindex = SortIndexBone( 

30 visible=False, 

31 readOnly=True, 

32 ) 

33 

34 is_root_node = BooleanBone( 

35 defaultValue=False, 

36 readOnly=True, 

37 visible=False, 

38 ) 

39 

40 @classmethod 

41 def refresh(cls, skelValues): # ViUR2 Compatibility 

42 super().refresh(skelValues) 

43 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility 

44 skelValues["parententry"] = utils.normalizeKey( 

45 db.Key.from_legacy_urlsafe(skelValues.dbEntity["parentdir"])) 

46 

47 

48class Tree(SkelModule): 

49 """ 

50 Tree module prototype. 

51 

52 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only. 

53 """ 

54 accessRights = ("add", "edit", "view", "delete", "manage") 

55 

56 nodeSkelCls = None 

57 leafSkelCls = None 

58 

59 default_order = "sortindex" 

60 

61 def __init__(self, moduleName, modulePath, *args, **kwargs): 

62 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}" 

63 super().__init__(moduleName, modulePath, *args, **kwargs) 

64 

65 @property 

66 def handler(self): 

67 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy) 

68 

69 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]: 

70 """ 

71 Checks for correct skelType. 

72 

73 Either returns the type provided, or None in case it is invalid. 

74 """ 

75 skelType = skelType.lower() 

76 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls): 

77 return skelType 

78 

79 return None 

80 

81 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]: 

82 if not (skelType := self._checkSkelType(skelType)): 

83 raise ValueError("Unsupported skelType") 

84 

85 if skelType == "leaf": 

86 return self.leafSkelCls 

87 

88 return self.nodeSkelCls 

89 

90 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

91 """ 

92 Return unmodified base skeleton for the given skelType. 

93 

94 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel` 

95 """ 

96 return self._resolveSkelCls(skelType, *args, **kwargs)() 

97 

98 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

99 """ 

100 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

101 for viewing an existing entry from the tree. 

102 

103 The default is a Skeleton instance returned by :func:`~baseSkel`. 

104 

105 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel` 

106 

107 :return: Returns a Skeleton instance for viewing an entry. 

108 """ 

109 return self.baseSkel(skelType, *args, **kwargs) 

110 

111 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

112 """ 

113 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

114 for adding an entry to the tree. 

115 

116 The default is a Skeleton instance returned by :func:`~baseSkel`. 

117 

118 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

119 

120 :return: Returns a Skeleton instance for adding an entry. 

121 """ 

122 return self.baseSkel(skelType, *args, **kwargs) 

123 

124 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

125 """ 

126 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

127 for editing an existing entry from the tree. 

128 

129 The default is a Skeleton instance returned by :func:`~baseSkel`. 

130 

131 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

132 

133 :return: Returns a Skeleton instance for editing an entry. 

134 """ 

135 return self.baseSkel(skelType, *args, **kwargs) 

136 

137 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

138 """ 

139 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

140 for cloning an existing entry of the tree. 

141 

142 The default is a SkeletonInstance returned by :func:`~baseSkel`. 

143 

144 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

145 

146 :return: Returns a SkeletonInstance for cloning an entry. 

147 """ 

148 return self.baseSkel(skelType, *args, **kwargs) 

149 

150 def rootnodeSkel( 

151 self, 

152 *, 

153 identifier: str = "rep_module_repo", 

154 ensure: bool | dict | t.Callable[[SkeletonInstance], None] = False, 

155 ) -> SkeletonInstance: 

156 """ 

157 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

158 for rootnode entries. 

159 

160 The default is a SkeletonInstance returned by :func:`~baseSkel`, with a preset key created from identifier. 

161 

162 :param identifier: Unique identifier (name) for this rootnode. 

163 :param ensure: If provided, ensures that the skeleton is available, and created with optionally provided values. 

164 

165 :return: Returns a SkeletonInstance for handling root nodes. 

166 """ 

167 skel = self.baseSkel("node") 

168 

169 skel["key"] = db.Key(skel.kindName, identifier) 

170 skel["is_root_node"] = True 

171 

172 if ensure not in (False, None): 

173 return skel.read(create=ensure) 

174 

175 return skel 

176 

177 @deprecated( 

178 version="3.7.0", 

179 reason="Use rootnodeSkel(ensure=True) instead.", 

180 action="always" 

181 ) 

182 def ensureOwnModuleRootNode(self) -> db.Entity: 

183 """ 

184 Ensures, that general root-node for the current module exists. 

185 If no root-node exists yet, it will be created. 

186 

187 :returns: The entity of the root-node. 

188 """ 

189 return self.rootnodeSkel(ensure=True).dbEntity 

190 

191 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]: 

192 """ 

193 Default function for providing a list of root node items. 

194 This list is requested by several module-internal functions and *must* be 

195 overridden by a custom functionality. The default stub for this function 

196 returns an empty list. 

197 An example implementation could be the following: 

198 

199 .. code-block:: python 

200 

201 # Example 

202 def getAvailableRootNodes(self, *args, **kwargs): 

203 q = db.Query(self.rootKindName) 

204 ret = [{"key": str(e.key()), 

205 "name": e.get("name", str(e.key().id_or_name()))} #FIXME 

206 for e in q.run(limit=25)] 

207 return ret 

208 

209 :param args: Can be used in custom implementations. 

210 :param kwargs: Can be used in custom implementations. 

211 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \ 

212 respective information. 

213 """ 

214 return [] 

215 

216 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None: 

217 """ 

218 Returns the root-node for a given child. 

219 

220 :param key: Key of the child node entry. 

221 

222 :returns: The skeleton of the root-node. 

223 """ 

224 skel = self.nodeSkelCls() 

225 

226 while key: 

227 if not skel.read(key): 

228 return None 

229 

230 key = skel["parententry"] 

231 

232 return skel 

233 

234 @CallDeferred 

235 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0): 

236 """ 

237 Recursively fixes the parentrepo key after a move operation. 

238 

239 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

240 

241 :param parentNode: URL-safe key of the node which children should be fixed. 

242 :param newRepoKey: URL-safe key of the new repository. 

243 :param depth: Safety level depth preventing infinitive loops. 

244 """ 

245 if depth > 99: 

246 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo") 

247 logging.critical("Your data is corrupt!") 

248 logging.debug(f"{parentNode=}, {newRepoKey=}") 

249 return 

250 

251 def fixTxn(nodeKey, newRepoKey): 

252 node = db.Get(nodeKey) 

253 node["parentrepo"] = newRepoKey 

254 db.Put(node) 

255 

256 # Fix all nodes 

257 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode) 

258 for repo in q.iter(): 

259 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1) 

260 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

261 

262 # Fix the leafs on this level 

263 if self.leafSkelCls: 

264 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode) 

265 for repo in q.iter(): 

266 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

267 

268 ## Internal exposed functions 

269 

270 @internal_exposed 

271 def pathToKey(self, key: db.Key): 

272 """ 

273 Returns the recursively expanded path through the Tree from the root-node to a 

274 requested node. 

275 :param key: Key of the destination *node*. 

276 :returns: An nested dictionary with information about all nodes in the path from root to the requested node. 

277 """ 

278 lastLevel = [] 

279 for x in range(0, 99): 

280 currentNodeSkel = self.viewSkel("node") 

281 if not currentNodeSkel.read(key): 

282 return [] # Either invalid key or listFilter prevented us from fetching anything 

283 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level 

284 break 

285 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"]) 

286 currentLevel = [{"skel": x, 

287 "active": x["key"] == currentNodeSkel["key"], 

288 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []} 

289 for x in self.listFilter(levelQry).fetch(99)] 

290 assert currentLevel, "Got emtpy parent list?" 

291 lastLevel = currentLevel 

292 key = currentNodeSkel["parententry"] 

293 return lastLevel 

294 

295 ## External exposed functions 

296 

297 @exposed 

298 def index(self, skelType: SkelType = "node", parententry: t.Optional[db.Key | int | str] = None, **kwargs): 

299 if not parententry: 

300 repos = self.getAvailableRootNodes(**kwargs) 

301 match len(repos): 

302 case 0: 

303 raise errors.Unauthorized() 

304 case 1: 

305 parententry = repos[0]["key"] 

306 case _: 

307 raise errors.NotAcceptable(f"Missing required parameter {'parententry'!r}") 

308 

309 return self.list(skelType=skelType, parententry=parententry, **kwargs) 

310 

311 @exposed 

312 def listRootNodes(self, *args, **kwargs) -> t.Any: 

313 """ 

314 Renders a list of all available repositories for the current user using the 

315 modules default renderer. 

316 

317 :returns: The rendered representation of the available root-nodes. 

318 """ 

319 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs)) 

320 

321 @exposed 

322 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any: 

323 """ 

324 Prepares and renders a list of entries. 

325 

326 All supplied parameters are interpreted as filters for the elements displayed. 

327 

328 Unlike other module prototypes in ViUR, the access control in this function is performed 

329 by calling the function :func:`listFilter`, which updates the query-filter to match only 

330 elements which the user is allowed to see. 

331 

332 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter` 

333 

334 :returns: The rendered list objects for the matching entries. 

335 

336 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

337 """ 

338 if not (skelType := self._checkSkelType(skelType)): 

339 raise errors.NotAcceptable("Invalid skelType provided.") 

340 

341 # The general access control is made via self.listFilter() 

342 if not (query := self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))): 

343 raise errors.Unauthorized() 

344 

345 self._apply_default_order(query) 

346 return self.render.list(query.fetch()) 

347 

348 @exposed 

349 def structure(self, skelType: SkelType, action: t.Optional[str] = "view") -> t.Any: 

350 """ 

351 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set 

352 in each bone. 

353 

354 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

355 """ 

356 # FIXME: In ViUR > 3.7 this could also become dynamic (ActionSkel paradigm). 

357 match action: 

358 case "view": 

359 skel = self.viewSkel(skelType) 

360 if not self.canView(skelType, skel): 

361 raise errors.Unauthorized() 

362 

363 case "edit": 

364 skel = self.editSkel(skelType) 

365 if not self.canEdit(skelType, skel): 

366 raise errors.Unauthorized() 

367 

368 case "add": 

369 if not self.canAdd(skelType): 

370 raise errors.Unauthorized() 

371 

372 skel = self.addSkel(skelType) 

373 

374 case "clone": 

375 skel = self.cloneSkel(skelType) 

376 if not (self.canAdd(skelType) and self.canEdit(skelType, skel)): 

377 raise errors.Unauthorized() 

378 

379 case _: 

380 raise errors.NotImplemented(f"The action {action!r} is not implemented.") 

381 

382 return self.render.render(f"structure.{skelType}.{action}", skel) 

383 

384 @exposed 

385 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

386 """ 

387 Prepares and renders a single entry for viewing. 

388 

389 The entry is fetched by its *key* and its *skelType*. 

390 The function performs several access control checks on the requested entity before it is rendered. 

391 

392 .. seealso:: :func:`canView`, :func:`onView` 

393 

394 :returns: The rendered representation of the requested entity. 

395 

396 :param skelType: May either be "node" or "leaf". 

397 :param key: URL-safe key of the parent. 

398 

399 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided. 

400 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

401 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

402 """ 

403 if not (skelType := self._checkSkelType(skelType)): 

404 raise errors.NotAcceptable(f"Invalid skelType provided.") 

405 

406 skel = self.viewSkel(skelType) 

407 if not skel.read(key): 

408 raise errors.NotFound() 

409 

410 if not self.canView(skelType, skel): 

411 raise errors.Unauthorized() 

412 

413 self.onView(skelType, skel) 

414 return self.render.view(skel) 

415 

416 @exposed 

417 @force_ssl 

418 @skey(allow_empty=True) 

419 def add(self, skelType: SkelType, node: db.Key | int | str, *args, **kwargs) -> t.Any: 

420 """ 

421 Add a new entry with the given parent *node*, and render the entry, eventually with error notes 

422 on incorrect data. Data is taken by any other arguments in *kwargs*. 

423 

424 The function performs several access control checks on the requested entity before it is added. 

425 

426 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded` 

427 

428 :param skelType: Defines the type of the new entry and may either be "node" or "leaf". 

429 :param node: URL-safe key of the parent. 

430 

431 :returns: The rendered, added object of the entry, eventually with error hints. 

432 

433 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

434 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

435 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

436 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

437 """ 

438 if not (skelType := self._checkSkelType(skelType)): 

439 raise errors.NotAcceptable(f"Invalid skelType provided.") 

440 

441 skel = self.addSkel(skelType) 

442 parentNodeSkel = self.editSkel("node") 

443 

444 # TODO VIUR4: Why is this parameter called "node"? 

445 if not parentNodeSkel.read(node): 

446 raise errors.NotFound("The provided parent node could not be found.") 

447 if not self.canAdd(skelType, parentNodeSkel): 

448 raise errors.Unauthorized() 

449 

450 skel["parententry"] = parentNodeSkel["key"] 

451 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

452 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

453 

454 if ( 

455 not kwargs # no data supplied 

456 or not current.request.get().isPostRequest # failure if not using POST-method 

457 or not skel.fromClient(kwargs) # failure on reading into the bones 

458 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

459 ): 

460 return self.render.add(skel) 

461 

462 self.onAdd(skelType, skel) 

463 skel.write() 

464 self.onAdded(skelType, skel) 

465 

466 return self.render.addSuccess(skel) 

467 

468 @force_ssl 

469 @force_post 

470 @exposed 

471 @skey 

472 @access("root") 

473 def add_or_edit(self, skelType: SkelType, key: db.Key | int | str, **kwargs) -> t.Any: 

474 """ 

475 This function is intended to be used by importers. 

476 Only "root"-users are allowed to use it. 

477 """ 

478 if not (skelType := self._checkSkelType(skelType)): 

479 raise errors.NotAcceptable("Invalid skelType provided.") 

480 

481 kind_name = self.nodeSkelCls.kindName if skelType == "node" else self.leafSkelCls.kindName 

482 

483 # Adjust key 

484 db_key = db.keyHelper(key, targetKind=kind_name, adjust_kind=True) 

485 

486 # Retrieve and verify existing entry 

487 db_entity = db.Get(db_key) 

488 is_add = not bool(db_entity) 

489 

490 # Instanciate relevant skeleton 

491 if is_add: 

492 skel = self.addSkel(skelType) 

493 else: 

494 skel = self.editSkel(skelType) 

495 skel.dbEntity = db_entity # assign existing entity 

496 

497 skel = skel.ensure_is_cloned() 

498 skel.parententry.required = True 

499 skel.parententry.readOnly = False 

500 

501 skel["key"] = db_key 

502 

503 if ( 

504 not kwargs # no data supplied 

505 or not skel.fromClient(kwargs) # failure on reading into the bones 

506 ): 

507 # render the skeleton in the version it could as far as it could be read. 

508 return self.render.render("add_or_edit", skel) 

509 

510 # Ensure the parententry exists 

511 parentNodeSkel = self.editSkel("node") 

512 if not parentNodeSkel.read(skel["parententry"]): 

513 raise errors.NotFound("The provided parent node could not be found.") 

514 if not self.canAdd(skelType, parentNodeSkel): 

515 raise errors.Unauthorized() 

516 

517 skel["parententry"] = parentNodeSkel["key"] 

518 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

519 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

520 

521 if is_add: 

522 self.onAdd(skelType, skel) 

523 else: 

524 self.onEdit(skelType, skel) 

525 

526 skel.write() 

527 

528 if is_add: 

529 self.onAdded(skelType, skel) 

530 return self.render.addSuccess(skel) 

531 

532 self.onEdited(skelType, skel) 

533 return self.render.editSuccess(skel) 

534 

535 @exposed 

536 @force_ssl 

537 @skey(allow_empty=True) 

538 def edit(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

539 """ 

540 Modify an existing entry, and render the entry, eventually with error notes on incorrect data. 

541 Data is taken by any other arguments in *kwargs*. 

542 

543 The function performs several access control checks on the requested entity before it is added. 

544 

545 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited` 

546 

547 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf". 

548 :param key: URL-safe key of the item to be edited. 

549 

550 :returns: The rendered, modified object of the entry, eventually with error hints. 

551 

552 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

553 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

554 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

555 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

556 """ 

557 if not (skelType := self._checkSkelType(skelType)): 

558 raise errors.NotAcceptable(f"Invalid skelType provided.") 

559 

560 skel = self.editSkel(skelType) 

561 if not skel.read(key): 

562 raise errors.NotFound() 

563 

564 if not self.canEdit(skelType, skel): 

565 raise errors.Unauthorized() 

566 

567 if ( 

568 not kwargs # no data supplied 

569 or not current.request.get().isPostRequest # failure if not using POST-method 

570 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones 

571 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

572 ): 

573 return self.render.edit(skel) 

574 

575 self.onEdit(skelType, skel) 

576 skel.write() 

577 self.onEdited(skelType, skel) 

578 

579 return self.render.editSuccess(skel) 

580 

581 @exposed 

582 @force_ssl 

583 @force_post 

584 @skey 

585 def delete(self, skelType: SkelType, key: str, *args, **kwargs) -> t.Any: 

586 """ 

587 Deletes an entry or an directory (including its contents). 

588 

589 The function runs several access control checks on the data before it is deleted. 

590 

591 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted` 

592 

593 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf". 

594 :param key: URL-safe key of the item to be deleted. 

595 

596 :returns: The rendered, deleted object of the entry. 

597 

598 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

599 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

600 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

601 """ 

602 if not (skelType := self._checkSkelType(skelType)): 

603 raise errors.NotAcceptable(f"Invalid skelType provided.") 

604 

605 skel = self.editSkel(skelType) 

606 if not skel.read(key): 

607 raise errors.NotFound() 

608 

609 if not self.canDelete(skelType, skel): 

610 raise errors.Unauthorized() 

611 

612 if skelType == "node": 

613 self.deleteRecursive(skel["key"]) 

614 

615 self.onDelete(skelType, skel) 

616 skel.delete() 

617 self.onDeleted(skelType, skel) 

618 

619 return self.render.deleteSuccess(skel, skelType=skelType) 

620 

621 @CallDeferred 

622 def deleteRecursive(self, parentKey: str): 

623 """ 

624 Recursively processes a delete request. 

625 

626 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

627 

628 :param parentKey: URL-safe key of the node which children should be deleted. 

629 """ 

630 nodeKey = db.keyHelper(parentKey, self.viewSkel("node").kindName) 

631 if self.leafSkelCls: 

632 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter(): 

633 leafSkel = self.viewSkel("leaf") 

634 if not leafSkel.read(leaf.key): 

635 continue 

636 leafSkel.delete() 

637 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter(): 

638 self.deleteRecursive(node.key) 

639 nodeSkel = self.viewSkel("node") 

640 if not nodeSkel.read(node.key): 

641 continue 

642 nodeSkel.delete() 

643 

644 @exposed 

645 @force_ssl 

646 @force_post 

647 @skey 

648 def move(self, skelType: SkelType, key: db.Key | int | str, parentNode: str, *args, **kwargs) -> str: 

649 """ 

650 Move a node (including its contents) or a leaf to another node. 

651 

652 .. seealso:: :func:`canMove` 

653 

654 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf". 

655 :param key: URL-safe key of the item to be moved. 

656 :param parentNode: URL-safe key of the destination node, which must be a node. 

657 :param skey: The CSRF security key. 

658 

659 :returns: The rendered, edited object of the entry. 

660 

661 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

662 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

663 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

664 """ 

665 if not (skelType := self._checkSkelType(skelType)): 

666 raise errors.NotAcceptable(f"Invalid skelType provided.") 

667 

668 skel = self.editSkel(skelType) # srcSkel - the skeleton to be moved 

669 parentNodeSkel = self.baseSkel("node") # destSkel - the node it should be moved into 

670 

671 if not skel.read(key): 

672 raise errors.NotFound("Cannot find entity to move") 

673 

674 if not parentNodeSkel.read(parentNode): 

675 parentNode = utils.normalizeKey(db.Key.from_legacy_urlsafe(parentNode)) 

676 

677 if parentNode.kind != parentNodeSkel.kindName: 

678 raise errors.NotFound( 

679 f"You provided a key of kind {parentNode.kind}, but require a {parentNodeSkel.kindName}." 

680 ) 

681 

682 raise errors.NotFound("Cannot find parentNode entity") 

683 

684 if not self.canMove(skelType, skel, parentNodeSkel): 

685 raise errors.Unauthorized() 

686 

687 if skel["key"] == parentNodeSkel["key"]: 

688 raise errors.NotAcceptable("Cannot move a node into itself") 

689 

690 ## Test for recursion 

691 currLevel = db.Get(parentNodeSkel["key"]) 

692 for _ in range(0, 99): 

693 if currLevel.key == skel["key"]: 

694 break 

695 if currLevel.get("rootNode") or currLevel.get("is_root_node"): 

696 # We reached a rootNode, so this is okay 

697 break 

698 currLevel = db.Get(currLevel["parententry"]) 

699 else: # We did not "break" - recursion-level exceeded or loop detected 

700 raise errors.NotAcceptable("Unable to find a root node in recursion?") 

701 

702 # Test if we try to move a rootNode 

703 # TODO: Remove "rootNode"-fallback with VIUR4 

704 if skel.dbEntity.get("is_root_node") or skel.dbEntity.get("rootNode"): 

705 raise errors.NotAcceptable("Can't move a rootNode to somewhere else") 

706 

707 currentParentRepo = skel["parentrepo"] 

708 skel["parententry"] = parentNodeSkel["key"] 

709 skel["parentrepo"] = parentNodeSkel["parentrepo"] # Fixme: Need to recursive fixing to parentrepo? 

710 if "sortindex" in kwargs: 

711 try: 

712 skel["sortindex"] = float(kwargs["sortindex"]) 

713 except: 

714 raise errors.PreconditionFailed() 

715 

716 self.onEdit(skelType, skel) 

717 skel.write() 

718 self.onEdited(skelType, skel) 

719 

720 # Ensure a changed parentRepo get's proagated 

721 if currentParentRepo != parentNodeSkel["parentrepo"]: 

722 self.updateParentRepo(key, parentNodeSkel["parentrepo"]) 

723 

724 return self.render.editSuccess(skel) 

725 

726 @exposed 

727 @force_ssl 

728 @skey(allow_empty=True) 

729 def clone(self, skelType: SkelType, key: db.Key | str | int, **kwargs): 

730 """ 

731 Clone an existing entry, and render the entry, eventually with error notes on incorrect data. 

732 Data is taken by any other arguments in *kwargs*. 

733 

734 The function performs several access control checks on the requested entity before it is added. 

735 

736 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned` 

737 

738 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf". 

739 :param key: URL-safe key of the item to be edited. 

740 

741 :returns: The cloned object of the entry, eventually with error hints. 

742 

743 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

744 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found. 

745 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

746 """ 

747 

748 if not (skelType := self._checkSkelType(skelType)): 

749 raise errors.NotAcceptable(f"Invalid skelType provided.") 

750 

751 skel = self.cloneSkel(skelType) 

752 if not skel.read(key): 

753 raise errors.NotFound() 

754 

755 # a clone-operation is some kind of edit and add... 

756 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, kwargs.get("parententry"))): 

757 raise errors.Unauthorized() 

758 

759 # Remember source skel and unset the key for clone operation! 

760 src_skel = skel 

761 skel = skel.clone(apply_clone_strategy=True) 

762 skel["key"] = None 

763 

764 # make parententry required and writeable when provided 

765 if "parententry" in kwargs: 

766 skel.parententry.readOnly = False 

767 skel.parententry.required = True 

768 else: 

769 _ = skel["parententry"] # TODO: because of accessedValues... 

770 

771 # make parentrepo required and writeable when provided 

772 if "parentrepo" in kwargs: 

773 skel.parentrepo.readOnly = False 

774 skel.parentrepo.required = True 

775 else: 

776 _ = skel["parentrepo"] # TODO: because of accessedValues... 

777 

778 # Check all required preconditions for clone 

779 if ( 

780 not kwargs # no data supplied 

781 or not current.request.get().isPostRequest # failure if not using POST-method 

782 or not skel.fromClient(kwargs) # failure on reading into the bones 

783 or utils.parse.bool(kwargs.get("bounce")) # review before changing 

784 ): 

785 return self.render.edit(skel, action="clone") 

786 

787 self.onClone(skelType, skel, src_skel=src_skel) 

788 assert skel.write() 

789 self.onCloned(skelType, skel, src_skel=src_skel) 

790 

791 return self.render.editSuccess(skel, action="cloneSuccess") 

792 

793 ## Default access control functions 

794 

795 def listFilter(self, query: db.Query) -> t.Optional[db.Query]: 

796 """ 

797 Access control function on item listing. 

798 

799 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function, 

800 and is used to modify the provided filter parameter to match only items that the current user 

801 is allowed to see. 

802 

803 :param query: Query which should be altered. 

804 

805 :returns: The altered filter, or None if access is not granted. 

806 """ 

807 

808 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]): 

809 return query 

810 

811 return None 

812 

813 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

814 """ 

815 Checks if the current user can view the given entry. 

816 Should be identical to what's allowed by listFilter. 

817 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this 

818 method can be overridden for performance improvements (to eliminate that additional database access). 

819 :param skel: The entry we check for 

820 :return: True if the current session is authorized to view that entry, False otherwise 

821 """ 

822 query = self.viewSkel(skelType).all() 

823 

824 if key := skel["key"]: 

825 query.mergeExternalFilter({"key": key}) 

826 

827 query = self.listFilter(query) # Access control 

828 

829 if query is None or (key and not query.getEntry()): 

830 return False 

831 

832 return True 

833 

834 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance] = None) -> bool: 

835 """ 

836 Access control function for adding permission. 

837 

838 Checks if the current user has the permission to add a new entry. 

839 

840 The default behavior is: 

841 - If no user is logged in, adding is generally refused. 

842 - If the user has "root" access, adding is generally allowed. 

843 - If the user has the modules "add" permission (module-add) enabled, adding is allowed. 

844 

845 It should be overridden for a module-specific behavior. 

846 

847 .. seealso:: :func:`add` 

848 

849 :param skelType: Defines the type of the node that should be added. 

850 :param parentNodeSkel: The parent node where a new entry should be added. 

851 

852 :returns: True, if adding entries is allowed, False otherwise. 

853 """ 

854 

855 if not (user := current.user.get()): 

856 return False 

857 # root user is always allowed. 

858 if user["access"] and "root" in user["access"]: 

859 return True 

860 # user with add-permission is allowed. 

861 if user and user["access"] and f"{self.moduleName}-add" in user["access"]: 

862 return True 

863 return False 

864 

865 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

866 """ 

867 Access control function for modification permission. 

868 

869 Checks if the current user has the permission to edit an entry. 

870 

871 The default behavior is: 

872 - If no user is logged in, editing is generally refused. 

873 - If the user has "root" access, editing is generally allowed. 

874 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed. 

875 

876 It should be overridden for a module-specific behavior. 

877 

878 .. seealso:: :func:`edit` 

879 

880 :param skelType: Defines the type of the node that should be edited. 

881 :param skel: The Skeleton that should be edited. 

882 

883 :returns: True, if editing entries is allowed, False otherwise. 

884 """ 

885 if not (user := current.user.get()): 

886 return False 

887 if user["access"] and "root" in user["access"]: 

888 return True 

889 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

890 return True 

891 return False 

892 

893 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

894 """ 

895 Access control function for delete permission. 

896 

897 Checks if the current user has the permission to delete an entry. 

898 

899 The default behavior is: 

900 - If no user is logged in, deleting is generally refused. 

901 - If the user has "root" access, deleting is generally allowed. 

902 - If the user has the modules "deleting" permission (module-delete) enabled, \ 

903 deleting is allowed. 

904 

905 It should be overridden for a module-specific behavior. 

906 

907 :param skelType: Defines the type of the node that should be deleted. 

908 :param skel: The Skeleton that should be deleted. 

909 

910 .. seealso:: :func:`delete` 

911 

912 :returns: True, if deleting entries is allowed, False otherwise. 

913 """ 

914 if not (user := current.user.get()): 

915 return False 

916 if user["access"] and "root" in user["access"]: 

917 return True 

918 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]: 

919 return True 

920 return False 

921 

922 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool: 

923 """ 

924 Access control function for moving permission. 

925 

926 Checks if the current user has the permission to move an entry. 

927 

928 The default behavior is: 

929 - If no user is logged in, deleting is generally refused. 

930 - If the user has "root" access, deleting is generally allowed. 

931 - If the user has the modules "edit" permission (module-edit) enabled, \ 

932 moving is allowed. 

933 

934 It should be overridden for a module-specific behavior. 

935 

936 :param skelType: Defines the type of the node that shall be deleted. 

937 :param node: URL-safe key of the node to be moved. 

938 :param destNode: URL-safe key of the node where *node* should be moved to. 

939 

940 .. seealso:: :func:`move` 

941 

942 :returns: True, if deleting entries is allowed, False otherwise. 

943 """ 

944 if not (user := current.user.get()): 

945 return False 

946 if user["access"] and "root" in user["access"]: 

947 return True 

948 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

949 return True 

950 return False 

951 

952 ## Overridable eventhooks 

953 

954 def onAdd(self, skelType: SkelType, skel: SkeletonInstance): 

955 """ 

956 Hook function that is called before adding an entry. 

957 

958 It can be overridden for a module-specific behavior. 

959 

960 :param skelType: Defines the type of the node that shall be added. 

961 :param skel: The Skeleton that is going to be added. 

962 

963 .. seealso:: :func:`add`, :func:`onAdded` 

964 """ 

965 pass 

966 

967 def onAdded(self, skelType: SkelType, skel: SkeletonInstance): 

968 """ 

969 Hook function that is called after adding an entry. 

970 

971 It should be overridden for a module-specific behavior. 

972 The default is writing a log entry. 

973 

974 :param skelType: Defines the type of the node that has been added. 

975 :param skel: The Skeleton that has been added. 

976 

977 .. seealso:: :func:`add`, :func:`onAdd` 

978 """ 

979 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""") 

980 flushCache(kind=skel.kindName) 

981 if user := current.user.get(): 

982 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

983 

984 def onEdit(self, skelType: SkelType, skel: SkeletonInstance): 

985 """ 

986 Hook function that is called before editing an entry. 

987 

988 It can be overridden for a module-specific behavior. 

989 

990 :param skelType: Defines the type of the node that shall be edited. 

991 :param skel: The Skeleton that is going to be edited. 

992 

993 .. seealso:: :func:`edit`, :func:`onEdited` 

994 """ 

995 pass 

996 

997 def onEdited(self, skelType: SkelType, skel: SkeletonInstance): 

998 """ 

999 Hook function that is called after modifying an entry. 

1000 

1001 It should be overridden for a module-specific behavior. 

1002 The default is writing a log entry. 

1003 

1004 :param skelType: Defines the type of the node that has been edited. 

1005 :param skel: The Skeleton that has been modified. 

1006 

1007 .. seealso:: :func:`edit`, :func:`onEdit` 

1008 """ 

1009 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""") 

1010 flushCache(key=skel["key"]) 

1011 if user := current.user.get(): 

1012 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1013 

1014 def onView(self, skelType: SkelType, skel: SkeletonInstance): 

1015 """ 

1016 Hook function that is called when viewing an entry. 

1017 

1018 It should be overridden for a module-specific behavior. 

1019 The default is doing nothing. 

1020 

1021 :param skelType: Defines the type of the node that is viewed. 

1022 :param skel: The Skeleton that is viewed. 

1023 

1024 .. seealso:: :func:`view` 

1025 """ 

1026 pass 

1027 

1028 def onDelete(self, skelType: SkelType, skel: SkeletonInstance): 

1029 """ 

1030 Hook function that is called before deleting an entry. 

1031 

1032 It can be overridden for a module-specific behavior. 

1033 

1034 :param skelType: Defines the type of the node that shall be deleted. 

1035 :param skel: The Skeleton that is going to be deleted. 

1036 

1037 .. seealso:: :func:`delete`, :func:`onDeleted` 

1038 """ 

1039 pass 

1040 

1041 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance): 

1042 """ 

1043 Hook function that is called after deleting an entry. 

1044 

1045 It should be overridden for a module-specific behavior. 

1046 The default is writing a log entry. 

1047 

1048 ..warning: Saving the skeleton again will undo the deletion 

1049 (if the skeleton was a leaf or a node with no children). 

1050 

1051 :param skelType: Defines the type of the node that is deleted. 

1052 :param skel: The Skeleton that has been deleted. 

1053 

1054 .. seealso:: :func:`delete`, :func:`onDelete` 

1055 """ 

1056 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""") 

1057 flushCache(key=skel["key"]) 

1058 if user := current.user.get(): 

1059 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1060 

1061 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1062 """ 

1063 Hook function that is called before cloning an entry. 

1064 

1065 It can be overwritten to a module-specific behavior. 

1066 

1067 :param skelType: Defines the type of the node that is cloned. 

1068 :param skel: The new SkeletonInstance that is being created. 

1069 :param src_skel: The source SkeletonInstance `skel` is cloned from. 

1070 

1071 .. seealso:: :func:`clone`, :func:`onCloned` 

1072 """ 

1073 pass 

1074 

1075 @CallDeferred 

1076 def _clone_recursive( 

1077 self, 

1078 skel_type: SkelType, 

1079 src_key: db.Key, 

1080 target_key: db.Key, 

1081 target_repo: db.Key, 

1082 cursor=None 

1083 ): 

1084 """ 

1085 Helper function which is used by default onCloned() to clone a recursive structure. 

1086 """ 

1087 assert (skel_type := self._checkSkelType(skel_type)) 

1088 

1089 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}") 

1090 

1091 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex") 

1092 q.setCursor(cursor) 

1093 

1094 count = 0 

1095 for skel in q.fetch(): 

1096 src_skel = skel 

1097 

1098 skel = skel.clone() 

1099 skel["key"] = None 

1100 skel["parententry"] = target_key 

1101 skel["parentrepo"] = target_repo 

1102 

1103 self.onClone(skel_type, skel, src_skel=src_skel) 

1104 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written.. 

1105 assert skel.write() 

1106 self.onCloned(skel_type, skel, src_skel=src_skel) 

1107 count += 1 

1108 

1109 logging.debug(f"_clone_recursive {count=}") 

1110 

1111 if cursor := q.getCursor(): 

1112 self._clone_recursive(skel_type, src_key, target_key, target_repo, skel_type, cursor) 

1113 

1114 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1115 """ 

1116 Hook function that is called after cloning an entry. 

1117 

1118 It can be overwritten to a module-specific behavior. 

1119 

1120 By default, when cloning a "node", this function calls :func:`_clone_recursive` 

1121 which recursively clones the entire structure below this node in the background. 

1122 If this is not wanted, or wanted by a specific setting, overwrite this function 

1123 without a super-call. 

1124 

1125 :param skelType: Defines the type of the node that is cloned. 

1126 :param skel: The new SkeletonInstance that was created. 

1127 :param src_skel: The source SkeletonInstance `skel` was cloned from. 

1128 

1129 .. seealso:: :func:`clone`, :func:`onClone` 

1130 """ 

1131 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""") 

1132 flushCache(kind=skel.kindName) 

1133 

1134 if user := current.user.get(): 

1135 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1136 

1137 # Clone entire structure below, in case this is a node. 

1138 if skelType == "node": 

1139 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"]) 

1140 

1141 if self.leafSkelCls: 

1142 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"]) 

1143 

1144 

1145Tree.vi = True 

1146Tree.admin = True