Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/prototypes/tree.py: 0%

440 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1import logging 

2import typing as t 

3from deprecated.sphinx import deprecated 

4from viur.core import utils, errors, db, current 

5from viur.core.decorators import * 

6from viur.core.bones import KeyBone, SortIndexBone 

7from viur.core.cache import flushCache 

8from viur.core.skeleton import Skeleton, SkeletonInstance 

9from viur.core.tasks import CallDeferred 

10from .skelmodule import SkelModule 

11 

12 

13SkelType = t.Literal["node", "leaf"] 

14 

15 

16class TreeSkel(Skeleton): 

17 parententry = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

18 descr="Parent", 

19 visible=False, 

20 readOnly=True, 

21 ) 

22 

23 parentrepo = KeyBone( # TODO VIUR4: Why is this not a RelationalBone? 

24 descr="BaseRepo", 

25 visible=False, 

26 readOnly=True, 

27 ) 

28 

29 sortindex = SortIndexBone( 

30 visible=False, 

31 readOnly=True, 

32 ) 

33 

34 @classmethod 

35 def refresh(cls, skelValues): # ViUR2 Compatibility 

36 super().refresh(skelValues) 

37 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility 

38 skelValues["parententry"] = utils.normalizeKey( 

39 db.Key.from_legacy_urlsafe(skelValues.dbEntity["parentdir"])) 

40 

41 

42class Tree(SkelModule): 

43 """ 

44 Tree module prototype. 

45 

46 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only. 

47 """ 

48 accessRights = ("add", "edit", "view", "delete", "manage") 

49 

50 nodeSkelCls = None 

51 leafSkelCls = None 

52 

53 default_order = "sortindex" 

54 

55 def __init__(self, moduleName, modulePath, *args, **kwargs): 

56 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}" 

57 super().__init__(moduleName, modulePath, *args, **kwargs) 

58 

59 @property 

60 def handler(self): 

61 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy) 

62 

63 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]: 

64 """ 

65 Checks for correct skelType. 

66 

67 Either returns the type provided, or None in case it is invalid. 

68 """ 

69 skelType = skelType.lower() 

70 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls): 

71 return skelType 

72 

73 return None 

74 

75 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]: 

76 if not (skelType := self._checkSkelType(skelType)): 

77 raise ValueError("Unsupported skelType") 

78 

79 if skelType == "leaf": 

80 return self.leafSkelCls 

81 

82 return self.nodeSkelCls 

83 

84 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

85 """ 

86 Return unmodified base skeleton for the given skelType. 

87 

88 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel` 

89 """ 

90 return self._resolveSkelCls(skelType, *args, **kwargs)() 

91 

92 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

93 """ 

94 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

95 for viewing an existing entry from the tree. 

96 

97 The default is a Skeleton instance returned by :func:`~baseSkel`. 

98 

99 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel` 

100 

101 :return: Returns a Skeleton instance for viewing an entry. 

102 """ 

103 return self.baseSkel(skelType, *args, **kwargs) 

104 

105 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

106 """ 

107 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

108 for adding an entry to the tree. 

109 

110 The default is a Skeleton instance returned by :func:`~baseSkel`. 

111 

112 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

113 

114 :return: Returns a Skeleton instance for adding an entry. 

115 """ 

116 return self.baseSkel(skelType, *args, **kwargs) 

117 

118 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

119 """ 

120 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

121 for editing an existing entry from the tree. 

122 

123 The default is a Skeleton instance returned by :func:`~baseSkel`. 

124 

125 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

126 

127 :return: Returns a Skeleton instance for editing an entry. 

128 """ 

129 return self.baseSkel(skelType, *args, **kwargs) 

130 

131 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

132 """ 

133 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

134 for cloning an existing entry of the tree. 

135 

136 The default is a SkeletonInstance returned by :func:`~baseSkel`. 

137 

138 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

139 

140 :return: Returns a SkeletonInstance for cloning an entry. 

141 """ 

142 return self.baseSkel(skelType, *args, **kwargs) 

143 

144 def rootnodeSkel( 

145 self, 

146 *, 

147 identifier: str = "rep_module_repo", 

148 ensure: bool | dict | t.Callable[[SkeletonInstance], None] = False, 

149 ) -> SkeletonInstance: 

150 """ 

151 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

152 for rootnode entries. 

153 

154 The default is a SkeletonInstance returned by :func:`~baseSkel`, with a preset key created from identifier. 

155 

156 :param identifier: Unique identifier (name) for this rootnode. 

157 :param ensure: If provided, ensures that the skeleton is available, and created with optionally provided values. 

158 

159 :return: Returns a SkeletonInstance for handling root nodes. 

160 """ 

161 skel = self.baseSkel("node") 

162 

163 skel["key"] = db.Key(skel.kindName, identifier) 

164 skel["rootNode"] = True 

165 

166 if ensure not in (False, None): 

167 return skel.read(create=ensure) 

168 

169 return skel 

170 

171 @deprecated( 

172 version="3.7.0", 

173 reason="Use rootnodeSkel(ensure=True) instead.", 

174 action="always" 

175 ) 

176 def ensureOwnModuleRootNode(self) -> db.Entity: 

177 """ 

178 Ensures, that general root-node for the current module exists. 

179 If no root-node exists yet, it will be created. 

180 

181 :returns: The entity of the root-node. 

182 """ 

183 return self.rootnodeSkel(ensure=True).dbEntity 

184 

185 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]: 

186 """ 

187 Default function for providing a list of root node items. 

188 This list is requested by several module-internal functions and *must* be 

189 overridden by a custom functionality. The default stub for this function 

190 returns an empty list. 

191 An example implementation could be the following: 

192 

193 .. code-block:: python 

194 

195 # Example 

196 def getAvailableRootNodes(self, *args, **kwargs): 

197 q = db.Query(self.rootKindName) 

198 ret = [{"key": str(e.key()), 

199 "name": e.get("name", str(e.key().id_or_name()))} #FIXME 

200 for e in q.run(limit=25)] 

201 return ret 

202 

203 :param args: Can be used in custom implementations. 

204 :param kwargs: Can be used in custom implementations. 

205 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \ 

206 respective information. 

207 """ 

208 return [] 

209 

210 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None: 

211 """ 

212 Returns the root-node for a given child. 

213 

214 :param key: Key of the child node entry. 

215 

216 :returns: The skeleton of the root-node. 

217 """ 

218 skel = self.nodeSkelCls() 

219 

220 while key: 

221 if not skel.read(key): 

222 return None 

223 

224 key = skel["parententry"] 

225 

226 return skel 

227 

228 @CallDeferred 

229 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0): 

230 """ 

231 Recursively fixes the parentrepo key after a move operation. 

232 

233 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

234 

235 :param parentNode: URL-safe key of the node which children should be fixed. 

236 :param newRepoKey: URL-safe key of the new repository. 

237 :param depth: Safety level depth preventing infinitive loops. 

238 """ 

239 if depth > 99: 

240 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo") 

241 logging.critical("Your data is corrupt!") 

242 logging.debug(f"{parentNode=}, {newRepoKey=}") 

243 return 

244 

245 def fixTxn(nodeKey, newRepoKey): 

246 node = db.Get(nodeKey) 

247 node["parentrepo"] = newRepoKey 

248 db.Put(node) 

249 

250 # Fix all nodes 

251 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode) 

252 for repo in q.iter(): 

253 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1) 

254 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

255 

256 # Fix the leafs on this level 

257 if self.leafSkelCls: 

258 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode) 

259 for repo in q.iter(): 

260 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

261 

262 ## Internal exposed functions 

263 

264 @internal_exposed 

265 def pathToKey(self, key: db.Key): 

266 """ 

267 Returns the recursively expanded path through the Tree from the root-node to a 

268 requested node. 

269 :param key: Key of the destination *node*. 

270 :returns: An nested dictionary with information about all nodes in the path from root to the requested node. 

271 """ 

272 lastLevel = [] 

273 for x in range(0, 99): 

274 currentNodeSkel = self.viewSkel("node") 

275 if not currentNodeSkel.read(key): 

276 return [] # Either invalid key or listFilter prevented us from fetching anything 

277 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level 

278 break 

279 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"]) 

280 currentLevel = [{"skel": x, 

281 "active": x["key"] == currentNodeSkel["key"], 

282 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []} 

283 for x in self.listFilter(levelQry).fetch(99)] 

284 assert currentLevel, "Got emtpy parent list?" 

285 lastLevel = currentLevel 

286 key = currentNodeSkel["parententry"] 

287 return lastLevel 

288 

289 ## External exposed functions 

290 

291 @exposed 

292 def index(self, skelType: SkelType = "node", parententry: t.Optional[db.Key | int | str] = None, **kwargs): 

293 if not parententry: 

294 repos = self.getAvailableRootNodes(**kwargs) 

295 match len(repos): 

296 case 0: 

297 raise errors.Unauthorized() 

298 case 1: 

299 parententry = repos[0]["key"] 

300 case _: 

301 raise errors.NotAcceptable(f"Missing required parameter {'parententry'!r}") 

302 

303 return self.list(skelType=skelType, parententry=parententry, **kwargs) 

304 

305 @exposed 

306 def listRootNodes(self, *args, **kwargs) -> t.Any: 

307 """ 

308 Renders a list of all available repositories for the current user using the 

309 modules default renderer. 

310 

311 :returns: The rendered representation of the available root-nodes. 

312 """ 

313 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs)) 

314 

315 @exposed 

316 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any: 

317 """ 

318 Prepares and renders a list of entries. 

319 

320 All supplied parameters are interpreted as filters for the elements displayed. 

321 

322 Unlike other module prototypes in ViUR, the access control in this function is performed 

323 by calling the function :func:`listFilter`, which updates the query-filter to match only 

324 elements which the user is allowed to see. 

325 

326 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter` 

327 

328 :returns: The rendered list objects for the matching entries. 

329 

330 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

331 """ 

332 if not (skelType := self._checkSkelType(skelType)): 

333 raise errors.NotAcceptable("Invalid skelType provided.") 

334 

335 # The general access control is made via self.listFilter() 

336 if not (query := self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))): 

337 raise errors.Unauthorized() 

338 

339 self._apply_default_order(query) 

340 return self.render.list(query.fetch()) 

341 

342 @exposed 

343 def structure(self, skelType: SkelType, action: t.Optional[str] = "view") -> t.Any: 

344 """ 

345 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set 

346 in each bone. 

347 

348 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

349 """ 

350 # FIXME: In ViUR > 3.7 this could also become dynamic (ActionSkel paradigm). 

351 match action: 

352 case "view": 

353 skel = self.viewSkel(skelType) 

354 if not self.canView(skelType, skel): 

355 raise errors.Unauthorized() 

356 

357 case "edit": 

358 skel = self.editSkel(skelType) 

359 if not self.canEdit(skelType, skel): 

360 raise errors.Unauthorized() 

361 

362 case "add": 

363 if not self.canAdd(skelType): 

364 raise errors.Unauthorized() 

365 

366 skel = self.addSkel(skelType) 

367 

368 case "clone": 

369 skel = self.cloneSkel(skelType) 

370 if not (self.canAdd(skelType) and self.canEdit(skelType, skel)): 

371 raise errors.Unauthorized() 

372 

373 case _: 

374 raise errors.NotImplemented(f"The action {action!r} is not implemented.") 

375 

376 return self.render.render(f"structure.{skelType}.{action}", skel) 

377 

378 @exposed 

379 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

380 """ 

381 Prepares and renders a single entry for viewing. 

382 

383 The entry is fetched by its *key* and its *skelType*. 

384 The function performs several access control checks on the requested entity before it is rendered. 

385 

386 .. seealso:: :func:`canView`, :func:`onView` 

387 

388 :returns: The rendered representation of the requested entity. 

389 

390 :param skelType: May either be "node" or "leaf". 

391 :param key: URL-safe key of the parent. 

392 

393 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided. 

394 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

395 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

396 """ 

397 if not (skelType := self._checkSkelType(skelType)): 

398 raise errors.NotAcceptable(f"Invalid skelType provided.") 

399 

400 skel = self.viewSkel(skelType) 

401 if not skel.read(key): 

402 raise errors.NotFound() 

403 

404 if not self.canView(skelType, skel): 

405 raise errors.Unauthorized() 

406 

407 self.onView(skelType, skel) 

408 return self.render.view(skel) 

409 

410 @exposed 

411 @force_ssl 

412 @skey(allow_empty=True) 

413 def add(self, skelType: SkelType, node: db.Key | int | str, *args, **kwargs) -> t.Any: 

414 """ 

415 Add a new entry with the given parent *node*, and render the entry, eventually with error notes 

416 on incorrect data. Data is taken by any other arguments in *kwargs*. 

417 

418 The function performs several access control checks on the requested entity before it is added. 

419 

420 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded` 

421 

422 :param skelType: Defines the type of the new entry and may either be "node" or "leaf". 

423 :param node: URL-safe key of the parent. 

424 

425 :returns: The rendered, added object of the entry, eventually with error hints. 

426 

427 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

428 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

429 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

430 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

431 """ 

432 if not (skelType := self._checkSkelType(skelType)): 

433 raise errors.NotAcceptable(f"Invalid skelType provided.") 

434 

435 skel = self.addSkel(skelType) 

436 parentNodeSkel = self.editSkel("node") 

437 

438 # TODO VIUR4: Why is this parameter called "node"? 

439 if not parentNodeSkel.read(node): 

440 raise errors.NotFound("The provided parent node could not be found.") 

441 if not self.canAdd(skelType, parentNodeSkel): 

442 raise errors.Unauthorized() 

443 

444 skel["parententry"] = parentNodeSkel["key"] 

445 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

446 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

447 

448 if ( 

449 not kwargs # no data supplied 

450 or not current.request.get().isPostRequest # failure if not using POST-method 

451 or not skel.fromClient(kwargs) # failure on reading into the bones 

452 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

453 ): 

454 return self.render.add(skel) 

455 

456 self.onAdd(skelType, skel) 

457 skel.write() 

458 self.onAdded(skelType, skel) 

459 

460 return self.render.addSuccess(skel) 

461 

462 @force_ssl 

463 @force_post 

464 @exposed 

465 @skey 

466 @access("root") 

467 def add_or_edit(self, skelType: SkelType, key: db.Key | int | str, **kwargs) -> t.Any: 

468 """ 

469 This function is intended to be used by importers. 

470 Only "root"-users are allowed to use it. 

471 """ 

472 if not (skelType := self._checkSkelType(skelType)): 

473 raise errors.NotAcceptable("Invalid skelType provided.") 

474 

475 kind_name = self.nodeSkelCls.kindName if skelType == "node" else self.leafSkelCls.kindName 

476 

477 db_key = db.keyHelper(key, targetKind=kind_name, adjust_kind=kind_name) 

478 is_add = not bool(db.Get(db_key)) 

479 

480 if is_add: 

481 skel = self.addSkel(skelType) 

482 else: 

483 skel = self.editSkel(skelType) 

484 

485 skel = skel.ensure_is_cloned() 

486 skel.parententry.required = True 

487 skel.parententry.readOnly = False 

488 

489 skel["key"] = db_key 

490 

491 if ( 

492 not kwargs # no data supplied 

493 or not skel.fromClient(kwargs) # failure on reading into the bones 

494 ): 

495 # render the skeleton in the version it could as far as it could be read. 

496 return self.render.render("add_or_edit", skel) 

497 

498 # Ensure the parententry exists 

499 parentNodeSkel = self.editSkel("node") 

500 if not parentNodeSkel.read(skel["parententry"]): 

501 raise errors.NotFound("The provided parent node could not be found.") 

502 if not self.canAdd(skelType, parentNodeSkel): 

503 raise errors.Unauthorized() 

504 

505 skel["parententry"] = parentNodeSkel["key"] 

506 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

507 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

508 

509 if is_add: 

510 self.onAdd(skelType, skel) 

511 else: 

512 self.onEdit(skelType, skel) 

513 

514 skel.write() 

515 

516 if is_add: 

517 self.onAdded(skelType, skel) 

518 return self.render.addSuccess(skel) 

519 

520 self.onEdited(skelType, skel) 

521 return self.render.editSuccess(skel) 

522 

523 @exposed 

524 @force_ssl 

525 @skey(allow_empty=True) 

526 def edit(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

527 """ 

528 Modify an existing entry, and render the entry, eventually with error notes on incorrect data. 

529 Data is taken by any other arguments in *kwargs*. 

530 

531 The function performs several access control checks on the requested entity before it is added. 

532 

533 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited` 

534 

535 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf". 

536 :param key: URL-safe key of the item to be edited. 

537 

538 :returns: The rendered, modified object of the entry, eventually with error hints. 

539 

540 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

541 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

542 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

543 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

544 """ 

545 if not (skelType := self._checkSkelType(skelType)): 

546 raise errors.NotAcceptable(f"Invalid skelType provided.") 

547 

548 skel = self.editSkel(skelType) 

549 if not skel.read(key): 

550 raise errors.NotFound() 

551 

552 if not self.canEdit(skelType, skel): 

553 raise errors.Unauthorized() 

554 

555 if ( 

556 not kwargs # no data supplied 

557 or not current.request.get().isPostRequest # failure if not using POST-method 

558 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones 

559 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

560 ): 

561 return self.render.edit(skel) 

562 

563 self.onEdit(skelType, skel) 

564 skel.write() 

565 self.onEdited(skelType, skel) 

566 

567 return self.render.editSuccess(skel) 

568 

569 @exposed 

570 @force_ssl 

571 @force_post 

572 @skey 

573 def delete(self, skelType: SkelType, key: str, *args, **kwargs) -> t.Any: 

574 """ 

575 Deletes an entry or an directory (including its contents). 

576 

577 The function runs several access control checks on the data before it is deleted. 

578 

579 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted` 

580 

581 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf". 

582 :param key: URL-safe key of the item to be deleted. 

583 

584 :returns: The rendered, deleted object of the entry. 

585 

586 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

587 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

588 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

589 """ 

590 if not (skelType := self._checkSkelType(skelType)): 

591 raise errors.NotAcceptable(f"Invalid skelType provided.") 

592 

593 skel = self.editSkel(skelType) 

594 if not skel.read(key): 

595 raise errors.NotFound() 

596 

597 if not self.canDelete(skelType, skel): 

598 raise errors.Unauthorized() 

599 

600 if skelType == "node": 

601 self.deleteRecursive(skel["key"]) 

602 

603 self.onDelete(skelType, skel) 

604 skel.delete() 

605 self.onDeleted(skelType, skel) 

606 

607 return self.render.deleteSuccess(skel, skelType=skelType) 

608 

609 @CallDeferred 

610 def deleteRecursive(self, parentKey: str): 

611 """ 

612 Recursively processes a delete request. 

613 

614 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

615 

616 :param parentKey: URL-safe key of the node which children should be deleted. 

617 """ 

618 nodeKey = db.keyHelper(parentKey, self.viewSkel("node").kindName) 

619 if self.leafSkelCls: 

620 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter(): 

621 leafSkel = self.viewSkel("leaf") 

622 if not leafSkel.read(leaf.key): 

623 continue 

624 leafSkel.delete() 

625 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter(): 

626 self.deleteRecursive(node.key) 

627 nodeSkel = self.viewSkel("node") 

628 if not nodeSkel.read(node.key): 

629 continue 

630 nodeSkel.delete() 

631 

632 @exposed 

633 @force_ssl 

634 @force_post 

635 @skey 

636 def move(self, skelType: SkelType, key: db.Key | int | str, parentNode: str, *args, **kwargs) -> str: 

637 """ 

638 Move a node (including its contents) or a leaf to another node. 

639 

640 .. seealso:: :func:`canMove` 

641 

642 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf". 

643 :param key: URL-safe key of the item to be moved. 

644 :param parentNode: URL-safe key of the destination node, which must be a node. 

645 :param skey: The CSRF security key. 

646 

647 :returns: The rendered, edited object of the entry. 

648 

649 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

650 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

651 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

652 """ 

653 if not (skelType := self._checkSkelType(skelType)): 

654 raise errors.NotAcceptable(f"Invalid skelType provided.") 

655 

656 skel = self.editSkel(skelType) # srcSkel - the skeleton to be moved 

657 parentNodeSkel = self.baseSkel("node") # destSkel - the node it should be moved into 

658 

659 if not skel.read(key): 

660 raise errors.NotFound("Cannot find entity to move") 

661 

662 if not parentNodeSkel.read(parentNode): 

663 parentNode = utils.normalizeKey(db.Key.from_legacy_urlsafe(parentNode)) 

664 

665 if parentNode.kind != parentNodeSkel.kindName: 

666 raise errors.NotFound( 

667 f"You provided a key of kind {parentNode.kind}, but require a {parentNodeSkel.kindName}." 

668 ) 

669 

670 raise errors.NotFound("Cannot find parentNode entity") 

671 

672 if not self.canMove(skelType, skel, parentNodeSkel): 

673 raise errors.Unauthorized() 

674 

675 if skel["key"] == parentNodeSkel["key"]: 

676 raise errors.NotAcceptable("Cannot move a node into itself") 

677 

678 ## Test for recursion 

679 currLevel = db.Get(parentNodeSkel["key"]) 

680 for _ in range(0, 99): 

681 if currLevel.key == skel["key"]: 

682 break 

683 if currLevel.get("rootNode") or currLevel.get("is_root_node"): 

684 # We reached a rootNode, so this is okay 

685 break 

686 currLevel = db.Get(currLevel["parententry"]) 

687 else: # We did not "break" - recursion-level exceeded or loop detected 

688 raise errors.NotAcceptable("Unable to find a root node in recursion?") 

689 

690 # Test if we try to move a rootNode 

691 # TODO: Remove "rootNode"-fallback with VIUR4 

692 if skel.dbEntity.get("is_root_node") or skel.dbEntity.get("rootNode"): 

693 raise errors.NotAcceptable("Can't move a rootNode to somewhere else") 

694 

695 currentParentRepo = skel["parentrepo"] 

696 skel["parententry"] = parentNodeSkel["key"] 

697 skel["parentrepo"] = parentNodeSkel["parentrepo"] # Fixme: Need to recursive fixing to parentrepo? 

698 if "sortindex" in kwargs: 

699 try: 

700 skel["sortindex"] = float(kwargs["sortindex"]) 

701 except: 

702 raise errors.PreconditionFailed() 

703 

704 self.onEdit(skelType, skel) 

705 skel.write() 

706 self.onEdited(skelType, skel) 

707 

708 # Ensure a changed parentRepo get's proagated 

709 if currentParentRepo != parentNodeSkel["parentrepo"]: 

710 self.updateParentRepo(key, parentNodeSkel["parentrepo"]) 

711 

712 return self.render.editSuccess(skel) 

713 

714 @exposed 

715 @force_ssl 

716 @skey(allow_empty=True) 

717 def clone(self, skelType: SkelType, key: db.Key | str | int, **kwargs): 

718 """ 

719 Clone an existing entry, and render the entry, eventually with error notes on incorrect data. 

720 Data is taken by any other arguments in *kwargs*. 

721 

722 The function performs several access control checks on the requested entity before it is added. 

723 

724 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned` 

725 

726 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf". 

727 :param key: URL-safe key of the item to be edited. 

728 

729 :returns: The cloned object of the entry, eventually with error hints. 

730 

731 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

732 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found. 

733 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

734 """ 

735 

736 if not (skelType := self._checkSkelType(skelType)): 

737 raise errors.NotAcceptable(f"Invalid skelType provided.") 

738 

739 skel = self.cloneSkel(skelType) 

740 if not skel.read(key): 

741 raise errors.NotFound() 

742 

743 # a clone-operation is some kind of edit and add... 

744 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, kwargs.get("parententry"))): 

745 raise errors.Unauthorized() 

746 

747 # Remember source skel and unset the key for clone operation! 

748 src_skel = skel 

749 skel = skel.clone(apply_clone_strategy=True) 

750 skel["key"] = None 

751 

752 # make parententry required and writeable when provided 

753 if "parententry" in kwargs: 

754 skel.parententry.readOnly = False 

755 skel.parententry.required = True 

756 else: 

757 _ = skel["parententry"] # TODO: because of accessedValues... 

758 

759 # make parentrepo required and writeable when provided 

760 if "parentrepo" in kwargs: 

761 skel.parentrepo.readOnly = False 

762 skel.parentrepo.required = True 

763 else: 

764 _ = skel["parentrepo"] # TODO: because of accessedValues... 

765 

766 # Check all required preconditions for clone 

767 if ( 

768 not kwargs # no data supplied 

769 or not current.request.get().isPostRequest # failure if not using POST-method 

770 or not skel.fromClient(kwargs) # failure on reading into the bones 

771 or utils.parse.bool(kwargs.get("bounce")) # review before changing 

772 ): 

773 return self.render.edit(skel, action="clone") 

774 

775 self.onClone(skelType, skel, src_skel=src_skel) 

776 assert skel.write() 

777 self.onCloned(skelType, skel, src_skel=src_skel) 

778 

779 return self.render.editSuccess(skel, action="cloneSuccess") 

780 

781 ## Default access control functions 

782 

783 def listFilter(self, query: db.Query) -> t.Optional[db.Query]: 

784 """ 

785 Access control function on item listing. 

786 

787 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function, 

788 and is used to modify the provided filter parameter to match only items that the current user 

789 is allowed to see. 

790 

791 :param query: Query which should be altered. 

792 

793 :returns: The altered filter, or None if access is not granted. 

794 """ 

795 

796 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]): 

797 return query 

798 

799 return None 

800 

801 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

802 """ 

803 Checks if the current user can view the given entry. 

804 Should be identical to what's allowed by listFilter. 

805 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this 

806 method can be overridden for performance improvements (to eliminate that additional database access). 

807 :param skel: The entry we check for 

808 :return: True if the current session is authorized to view that entry, False otherwise 

809 """ 

810 query = self.viewSkel(skelType).all() 

811 

812 if key := skel["key"]: 

813 query.mergeExternalFilter({"key": key}) 

814 

815 query = self.listFilter(query) # Access control 

816 

817 if query is None or (key and not query.getEntry()): 

818 return False 

819 

820 return True 

821 

822 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance] = None) -> bool: 

823 """ 

824 Access control function for adding permission. 

825 

826 Checks if the current user has the permission to add a new entry. 

827 

828 The default behavior is: 

829 - If no user is logged in, adding is generally refused. 

830 - If the user has "root" access, adding is generally allowed. 

831 - If the user has the modules "add" permission (module-add) enabled, adding is allowed. 

832 

833 It should be overridden for a module-specific behavior. 

834 

835 .. seealso:: :func:`add` 

836 

837 :param skelType: Defines the type of the node that should be added. 

838 :param parentNodeSkel: The parent node where a new entry should be added. 

839 

840 :returns: True, if adding entries is allowed, False otherwise. 

841 """ 

842 

843 if not (user := current.user.get()): 

844 return False 

845 # root user is always allowed. 

846 if user["access"] and "root" in user["access"]: 

847 return True 

848 # user with add-permission is allowed. 

849 if user and user["access"] and f"{self.moduleName}-add" in user["access"]: 

850 return True 

851 return False 

852 

853 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

854 """ 

855 Access control function for modification permission. 

856 

857 Checks if the current user has the permission to edit an entry. 

858 

859 The default behavior is: 

860 - If no user is logged in, editing is generally refused. 

861 - If the user has "root" access, editing is generally allowed. 

862 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed. 

863 

864 It should be overridden for a module-specific behavior. 

865 

866 .. seealso:: :func:`edit` 

867 

868 :param skelType: Defines the type of the node that should be edited. 

869 :param skel: The Skeleton that should be edited. 

870 

871 :returns: True, if editing entries is allowed, False otherwise. 

872 """ 

873 if not (user := current.user.get()): 

874 return False 

875 if user["access"] and "root" in user["access"]: 

876 return True 

877 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

878 return True 

879 return False 

880 

881 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

882 """ 

883 Access control function for delete permission. 

884 

885 Checks if the current user has the permission to delete an entry. 

886 

887 The default behavior is: 

888 - If no user is logged in, deleting is generally refused. 

889 - If the user has "root" access, deleting is generally allowed. 

890 - If the user has the modules "deleting" permission (module-delete) enabled, \ 

891 deleting is allowed. 

892 

893 It should be overridden for a module-specific behavior. 

894 

895 :param skelType: Defines the type of the node that should be deleted. 

896 :param skel: The Skeleton that should be deleted. 

897 

898 .. seealso:: :func:`delete` 

899 

900 :returns: True, if deleting entries is allowed, False otherwise. 

901 """ 

902 if not (user := current.user.get()): 

903 return False 

904 if user["access"] and "root" in user["access"]: 

905 return True 

906 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]: 

907 return True 

908 return False 

909 

910 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool: 

911 """ 

912 Access control function for moving permission. 

913 

914 Checks if the current user has the permission to move an entry. 

915 

916 The default behavior is: 

917 - If no user is logged in, deleting is generally refused. 

918 - If the user has "root" access, deleting is generally allowed. 

919 - If the user has the modules "edit" permission (module-edit) enabled, \ 

920 moving is allowed. 

921 

922 It should be overridden for a module-specific behavior. 

923 

924 :param skelType: Defines the type of the node that shall be deleted. 

925 :param node: URL-safe key of the node to be moved. 

926 :param destNode: URL-safe key of the node where *node* should be moved to. 

927 

928 .. seealso:: :func:`move` 

929 

930 :returns: True, if deleting entries is allowed, False otherwise. 

931 """ 

932 if not (user := current.user.get()): 

933 return False 

934 if user["access"] and "root" in user["access"]: 

935 return True 

936 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

937 return True 

938 return False 

939 

940 ## Overridable eventhooks 

941 

942 def onAdd(self, skelType: SkelType, skel: SkeletonInstance): 

943 """ 

944 Hook function that is called before adding an entry. 

945 

946 It can be overridden for a module-specific behavior. 

947 

948 :param skelType: Defines the type of the node that shall be added. 

949 :param skel: The Skeleton that is going to be added. 

950 

951 .. seealso:: :func:`add`, :func:`onAdded` 

952 """ 

953 pass 

954 

955 def onAdded(self, skelType: SkelType, skel: SkeletonInstance): 

956 """ 

957 Hook function that is called after adding an entry. 

958 

959 It should be overridden for a module-specific behavior. 

960 The default is writing a log entry. 

961 

962 :param skelType: Defines the type of the node that has been added. 

963 :param skel: The Skeleton that has been added. 

964 

965 .. seealso:: :func:`add`, :func:`onAdd` 

966 """ 

967 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""") 

968 flushCache(kind=skel.kindName) 

969 if user := current.user.get(): 

970 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

971 

972 def onEdit(self, skelType: SkelType, skel: SkeletonInstance): 

973 """ 

974 Hook function that is called before editing an entry. 

975 

976 It can be overridden for a module-specific behavior. 

977 

978 :param skelType: Defines the type of the node that shall be edited. 

979 :param skel: The Skeleton that is going to be edited. 

980 

981 .. seealso:: :func:`edit`, :func:`onEdited` 

982 """ 

983 pass 

984 

985 def onEdited(self, skelType: SkelType, skel: SkeletonInstance): 

986 """ 

987 Hook function that is called after modifying an entry. 

988 

989 It should be overridden for a module-specific behavior. 

990 The default is writing a log entry. 

991 

992 :param skelType: Defines the type of the node that has been edited. 

993 :param skel: The Skeleton that has been modified. 

994 

995 .. seealso:: :func:`edit`, :func:`onEdit` 

996 """ 

997 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""") 

998 flushCache(key=skel["key"]) 

999 if user := current.user.get(): 

1000 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1001 

1002 def onView(self, skelType: SkelType, skel: SkeletonInstance): 

1003 """ 

1004 Hook function that is called when viewing an entry. 

1005 

1006 It should be overridden for a module-specific behavior. 

1007 The default is doing nothing. 

1008 

1009 :param skelType: Defines the type of the node that is viewed. 

1010 :param skel: The Skeleton that is viewed. 

1011 

1012 .. seealso:: :func:`view` 

1013 """ 

1014 pass 

1015 

1016 def onDelete(self, skelType: SkelType, skel: SkeletonInstance): 

1017 """ 

1018 Hook function that is called before deleting an entry. 

1019 

1020 It can be overridden for a module-specific behavior. 

1021 

1022 :param skelType: Defines the type of the node that shall be deleted. 

1023 :param skel: The Skeleton that is going to be deleted. 

1024 

1025 .. seealso:: :func:`delete`, :func:`onDeleted` 

1026 """ 

1027 pass 

1028 

1029 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance): 

1030 """ 

1031 Hook function that is called after deleting an entry. 

1032 

1033 It should be overridden for a module-specific behavior. 

1034 The default is writing a log entry. 

1035 

1036 ..warning: Saving the skeleton again will undo the deletion 

1037 (if the skeleton was a leaf or a node with no children). 

1038 

1039 :param skelType: Defines the type of the node that is deleted. 

1040 :param skel: The Skeleton that has been deleted. 

1041 

1042 .. seealso:: :func:`delete`, :func:`onDelete` 

1043 """ 

1044 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""") 

1045 flushCache(key=skel["key"]) 

1046 if user := current.user.get(): 

1047 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1048 

1049 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1050 """ 

1051 Hook function that is called before cloning an entry. 

1052 

1053 It can be overwritten to a module-specific behavior. 

1054 

1055 :param skelType: Defines the type of the node that is cloned. 

1056 :param skel: The new SkeletonInstance that is being created. 

1057 :param src_skel: The source SkeletonInstance `skel` is cloned from. 

1058 

1059 .. seealso:: :func:`clone`, :func:`onCloned` 

1060 """ 

1061 pass 

1062 

1063 @CallDeferred 

1064 def _clone_recursive( 

1065 self, 

1066 skel_type: SkelType, 

1067 src_key: db.Key, 

1068 target_key: db.Key, 

1069 target_repo: db.Key, 

1070 cursor=None 

1071 ): 

1072 """ 

1073 Helper function which is used by default onCloned() to clone a recursive structure. 

1074 """ 

1075 assert (skel_type := self._checkSkelType(skel_type)) 

1076 

1077 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}") 

1078 

1079 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex") 

1080 q.setCursor(cursor) 

1081 

1082 count = 0 

1083 for skel in q.fetch(): 

1084 src_skel = skel 

1085 

1086 skel = skel.clone() 

1087 skel["key"] = None 

1088 skel["parententry"] = target_key 

1089 skel["parentrepo"] = target_repo 

1090 

1091 self.onClone(skel_type, skel, src_skel=src_skel) 

1092 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written.. 

1093 assert skel.write() 

1094 self.onCloned(skel_type, skel, src_skel=src_skel) 

1095 count += 1 

1096 

1097 logging.debug(f"_clone_recursive {count=}") 

1098 

1099 if cursor := q.getCursor(): 

1100 self._clone_recursive(skel_type, src_key, target_key, target_repo, skel_type, cursor) 

1101 

1102 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1103 """ 

1104 Hook function that is called after cloning an entry. 

1105 

1106 It can be overwritten to a module-specific behavior. 

1107 

1108 By default, when cloning a "node", this function calls :func:`_clone_recursive` 

1109 which recursively clones the entire structure below this node in the background. 

1110 If this is not wanted, or wanted by a specific setting, overwrite this function 

1111 without a super-call. 

1112 

1113 :param skelType: Defines the type of the node that is cloned. 

1114 :param skel: The new SkeletonInstance that was created. 

1115 :param src_skel: The source SkeletonInstance `skel` was cloned from. 

1116 

1117 .. seealso:: :func:`clone`, :func:`onClone` 

1118 """ 

1119 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""") 

1120 flushCache(kind=skel.kindName) 

1121 

1122 if user := current.user.get(): 

1123 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1124 

1125 # Clone entire structure below, in case this is a node. 

1126 if skelType == "node": 

1127 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"]) 

1128 

1129 if self.leafSkelCls: 

1130 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"]) 

1131 

1132 

1133Tree.vi = True 

1134Tree.admin = True