Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / modules / user.py: 0%

774 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 23:08 +0000

1import abc 

2import datetime 

3import enum 

4import fnmatch 

5import functools 

6import hashlib 

7import hmac 

8import json 

9import logging 

10import secrets 

11import time 

12import urllib.parse 

13import warnings 

14from http.cookies import SimpleCookie 

15 

16import user_agents 

17 

18import pyotp 

19import base64 

20import dataclasses 

21import typing as t 

22from google.auth.transport import requests 

23from google.oauth2 import id_token 

24 

25from viur.core import ( 

26 conf, current, db, email, errors, i18n, 

27 securitykey, session, skeleton, tasks, utils, Module 

28) 

29from viur.core.decorators import * 

30from viur.core.bones import * 

31from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

32from viur.core.prototypes.list import List 

33from viur.core.ratelimit import RateLimit 

34from viur.core.securityheaders import extendCsp 

35from viur.core.session import Session 

36 

37 

38@functools.total_ordering 

39class Status(enum.Enum): 

40 """Status enum for a user 

41 

42 Has backwards compatibility to be comparable with non-enum values. 

43 Will be removed with viur-core 4.0.0 

44 """ 

45 

46 UNSET = 0 # Status is unset 

47 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

48 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

49 DISABLED = 5 # Account disabled 

50 ACTIVE = 10 # Active 

51 

52 def __eq__(self, other): 

53 if isinstance(other, Status): 

54 return super().__eq__(other) 

55 return self.value == other 

56 

57 def __lt__(self, other): 

58 if isinstance(other, Status): 

59 return super().__lt__(other) 

60 return self.value < other 

61 

62 

63class UserSkel(skeleton.Skeleton): 

64 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

65 

66 name = EmailBone( 

67 descr="E-Mail", 

68 required=True, 

69 readOnly=True, 

70 caseSensitive=False, 

71 searchable=True, 

72 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

73 ) 

74 

75 firstname = StringBone( 

76 descr="Firstname", 

77 searchable=True, 

78 ) 

79 

80 lastname = StringBone( 

81 descr="Lastname", 

82 searchable=True, 

83 ) 

84 

85 roles = SelectBone( 

86 descr=i18n.translate("viur.core.modules.user.bone.roles", defaultText="Roles"), 

87 values=conf.user.roles, 

88 required=True, 

89 multiple=True, 

90 # fixme: This is generally broken in VIUR! See #776 for details. 

91 # vfunc=lambda values: 

92 # i18n.translate( 

93 # "user.bone.roles.invalid", 

94 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

95 # if "custom" in values and len(values) > 1 else None, 

96 defaultValue=list(conf.user.roles.keys())[:1], 

97 ) 

98 

99 access = SelectBone( 

100 descr=i18n.translate("viur.core.modules.user.bone.access", defaultText="Access rights"), 

101 type_suffix="access", 

102 values=lambda: { 

103 right: i18n.translate(f"viur.core.modules.user.accessright.{right}", defaultText=right) 

104 for right in sorted(conf.user.access_rights) 

105 }, 

106 multiple=True, 

107 params={ 

108 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

109 } 

110 ) 

111 

112 status = SelectBone( 

113 descr=i18n.translate("viur.core.modules.user.bone.status", "Account status"), 

114 values=Status, 

115 translation_key_prefix="viur.core.user.status.", 

116 defaultValue=Status.ACTIVE, 

117 required=True, 

118 ) 

119 

120 lastlogin = DateBone( 

121 descr="Last Login", 

122 readOnly=True, 

123 ) 

124 

125 admin_config = JsonBone( # This bone stores settings from the vi 

126 descr="Config for the User", 

127 visible=False 

128 ) 

129 

130 def __new__(cls, *args, **kwargs): 

131 """ 

132 Constructor for the UserSkel-class, with the capability 

133 to dynamically add bones required for the configured 

134 authentication methods. 

135 """ 

136 for provider in conf.main_app.vi.user.authenticationProviders: 

137 assert issubclass(provider, UserPrimaryAuthentication) 

138 provider.patch_user_skel(cls) 

139 

140 for provider in conf.main_app.vi.user.secondFactorProviders: 

141 assert issubclass(provider, UserSecondFactorAuthentication) 

142 provider.patch_user_skel(cls) 

143 

144 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

145 return super().__new__(cls, *args, **kwargs) 

146 

147 @classmethod 

148 def write(cls, skel, *args, **kwargs): 

149 # Roles 

150 if skel["roles"] and "custom" not in skel["roles"]: 

151 # Collect access rights through rules 

152 access = set() 

153 

154 for role in skel["roles"]: 

155 # Get default access for this role 

156 access |= conf.main_app.vi.user.get_role_defaults(role) 

157 

158 # Go through all modules and evaluate available role-settings 

159 for name in dir(conf.main_app.vi): 

160 if name.startswith("_"): 

161 continue 

162 

163 module = getattr(conf.main_app.vi, name) 

164 if not isinstance(module, Module): 

165 continue 

166 

167 roles = getattr(module, "roles", None) or {} 

168 rights = roles.get(role, roles.get("*", ())) 

169 

170 # Convert role into tuple if it's not 

171 if not isinstance(rights, (tuple, list)): 

172 rights = (rights, ) 

173 

174 if "*" in rights: 

175 for right in module.accessRights: 

176 access.add(f"{name}-{right}") 

177 else: 

178 for right in rights: 

179 if right in module.accessRights: 

180 access.add(f"{name}-{right}") 

181 

182 # special case: "edit" and "delete" actions require "view" as well! 

183 if right in ("edit", "delete") and "view" in module.accessRights: 

184 access.add(f"{name}-view") 

185 

186 skel["access"] = list(access) 

187 

188 return super().write(skel, *args, **kwargs) 

189 

190 

191class UserAuthentication(Module, abc.ABC): 

192 @property 

193 @abc.abstractstaticmethod 

194 def METHOD_NAME() -> str: 

195 """ 

196 Define a unique method name for this authentication. 

197 """ 

198 ... 

199 

200 @property 

201 @abc.abstractstaticmethod 

202 def NAME() -> str: 

203 """ 

204 Define a descriptive name for this authentication. 

205 """ 

206 ... 

207 

208 @property 

209 @staticmethod 

210 def VISIBLE(cls) -> bool: 

211 """ 

212 Defines if the authentication method is visible to the user. 

213 """ 

214 return True 

215 

216 def __init__(self, moduleName, modulePath, userModule): 

217 super().__init__(moduleName, modulePath) 

218 self._user_module = userModule 

219 self.start_url = f"{self.modulePath}/login" 

220 

221 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

222 return True 

223 

224 @classmethod 

225 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

226 """ 

227 Allows for an UserAuthentication to patch the UserSkel 

228 class with additional bones which are required for 

229 the implemented authentication method. 

230 """ 

231 ... 

232 

233 

234class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

235 """Abstract class for all primary authentication methods.""" 

236 registrationEnabled = False 

237 

238 @abc.abstractmethod 

239 def login(self, *args, **kwargs): 

240 ... 

241 

242 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

243 """ 

244 Hook that is called whenever a part of the authentication was successful. 

245 It allows to perform further steps in custom authentications, 

246 e.g. change a password after first login. 

247 """ 

248 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

249 

250 

251class UserPassword(UserPrimaryAuthentication): 

252 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

253 NAME = "Username & Password" 

254 

255 registrationEmailVerificationRequired = True 

256 registrationAdminVerificationRequired = True 

257 

258 verifySuccessTemplate = "user_verify_success" 

259 verifyEmailAddressMail = "user_verify_address" 

260 verifyFailedTemplate = "user_verify_failed" 

261 passwordRecoveryMail = "user_password_recovery" 

262 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

263 passwordRecoveryTemplate = "user_passwordrecover" 

264 

265 # The default rate-limit for password recovery (10 tries each 15 minutes) 

266 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

267 

268 # Limit (invalid) login-retries to once per 5 seconds 

269 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

270 

271 @classmethod 

272 def patch_user_skel(cls, skel_cls): 

273 """ 

274 Modifies the UserSkel to be equipped by a PasswordBone. 

275 """ 

276 skel_cls.password = PasswordBone( 

277 readOnly=True, 

278 visible=False, 

279 params={ 

280 "category": "Authentication", 

281 } 

282 ) 

283 

284 class LoginSkel(skeleton.RelSkel): 

285 name = EmailBone( 

286 descr="E-Mail", 

287 required=True, 

288 caseSensitive=False, 

289 ) 

290 password = PasswordBone( 

291 required=True, 

292 test_threshold=0, 

293 tests=(), 

294 raw=True, 

295 ) 

296 

297 class LostPasswordStep1Skel(skeleton.RelSkel): 

298 name = EmailBone( 

299 descr="E-Mail", 

300 required=True, 

301 ) 

302 

303 class LostPasswordStep2Skel(skeleton.RelSkel): 

304 recovery_key = StringBone( 

305 descr="Recovery Key", 

306 required=True, 

307 params={ 

308 "tooltip": i18n.translate( 

309 key="viur.core.modules.user.userpassword.lostpasswordstep2.recoverykey", 

310 defaultText="Please enter the validation key you've received via e-mail.", 

311 hint="Shown when the user needs more than 15 minutes to paste the key", 

312 ), 

313 } 

314 ) 

315 

316 class LostPasswordStep3Skel(skeleton.RelSkel): 

317 # send the recovery key again, in case the password is rejected by some reason. 

318 recovery_key = StringBone( 

319 descr="Recovery Key", 

320 visible=False, 

321 ) 

322 

323 password = PasswordBone( 

324 descr="New Password", 

325 required=True, 

326 params={ 

327 "tooltip": i18n.translate( 

328 key="viur.core.modules.user.userpassword.lostpasswordstep3.password", 

329 defaultText="Please enter a new password for your account.", 

330 ), 

331 } 

332 ) 

333 

334 @exposed 

335 @force_ssl 

336 @skey(allow_empty=True) 

337 def login(self, **kwargs): 

338 # Obtain a fresh login skel 

339 skel = self.LoginSkel() 

340 

341 # Read required bones from client 

342 if not (kwargs and skel.fromClient(kwargs)): 

343 return self._user_module.render.render("login", skel) 

344 

345 self.loginRateLimit.assertQuotaIsAvailable() 

346 

347 # query for the username. The query might find another user, but the name is being checked for equality below 

348 name = skel["name"].lower().strip() 

349 user_skel = self._user_module.baseSkel() 

350 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

351 

352 # extract password hash from raw database entity (skeleton access blocks it) 

353 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

354 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

355 password_hash = encode_password(skel["password"], password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

356 

357 # now check if the username matches 

358 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

359 

360 # next, check if the password hash matches 

361 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

362 

363 if not is_okay: 

364 # Set error to all required fields 

365 for name, bone in skel.items(): 

366 if bone.required: 

367 skel.errors.append( 

368 ReadFromClientError( 

369 ReadFromClientErrorSeverity.Invalid, 

370 i18n.translate( 

371 key="viur.core.modules.user.userpassword.login.failed", 

372 defaultText="Invalid username or password provided", 

373 ), 

374 name, 

375 ) 

376 ) 

377 

378 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

379 return self._user_module.render.render("login", skel) 

380 

381 # check if iterations are below current security standards, and update if necessary. 

382 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

383 logging.info(f"Update password hash for user {name}.") 

384 # re-hash the password with more iterations 

385 # FIXME: This must be done within a transaction! 

386 user_skel["password"] = kwargs["password"] # will be hashed on serialize 

387 user_skel.write(update_relations=False) 

388 

389 return self.next_or_finish(user_skel) 

390 

391 @exposed 

392 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, **kwargs): 

393 """ 

394 This implements a password recovery process which lets users set a new password for their account, 

395 after validating a recovery key sent by email. 

396 

397 The process is as following: 

398 

399 - The user enters the registered email adress (not validated here) 

400 - A random code is generated and stored as a security-ke, then sendUserPasswordRecoveryCode is called. 

401 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

402 and send a link with the code. It runs as a deferred task so no information if a user account exists 

403 is being leaked. 

404 - If the user received an email, the link can be clicked to set a new password for the account. 

405 

406 To prevent automated attacks, the first step is guarded by limited calls to this function to 10 actions 

407 per 15 minutes. (One complete recovery process consists of two calls). 

408 """ 

409 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

410 current_request = current.request.get() 

411 

412 if recovery_key is None: 

413 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

414 skel = self.LostPasswordStep1Skel() 

415 

416 if ( 

417 not kwargs 

418 or not current_request.isPostRequest 

419 or not skel.fromClient(kwargs) 

420 ): 

421 return self._user_module.render.render( 

422 "pwrecover", skel, 

423 tpl=self.passwordRecoveryTemplate, 

424 ) 

425 

426 # validate security key 

427 if not securitykey.validate(skey): 

428 raise errors.PreconditionFailed() 

429 

430 self.passwordRecoveryRateLimit.decrementQuota() 

431 

432 recovery_key = securitykey.create( 

433 duration=datetime.timedelta(minutes=15), 

434 key_length=conf.security.password_recovery_key_length, 

435 user_name=skel["name"].lower(), 

436 session_bound=False, 

437 ) 

438 

439 # Send the code in background 

440 self.sendUserPasswordRecoveryCode( 

441 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

442 ) 

443 

444 # step 2 is only an action-skel, and can be ignored by a direct link in the 

445 # e-mail previously sent. It depends on the implementation of the specific project. 

446 return self._user_module.render.render( 

447 "pwrecover", self.LostPasswordStep2Skel(), 

448 tpl=self.passwordRecoveryTemplate, 

449 ) 

450 

451 # in step 3 

452 skel = self.LostPasswordStep3Skel() 

453 

454 # reset the recovery key again, in case the fromClient() fails. 

455 skel["recovery_key"] = str(recovery_key).strip() 

456 

457 # check for any input; Render input-form again when incomplete. 

458 if ( 

459 not kwargs 

460 or not current_request.isPostRequest 

461 or not skel.fromClient(kwargs, ignore=("recovery_key",)) 

462 ): 

463 return self._user_module.render.render( 

464 "pwrecover", skel, 

465 tpl=self.passwordRecoveryTemplate, 

466 ) 

467 

468 # validate security key 

469 if not securitykey.validate(skey): 

470 raise errors.PreconditionFailed() 

471 

472 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

473 raise errors.PreconditionFailed( 

474 i18n.translate( 

475 key="viur.core.modules.user.passwordrecovery.keyexpired", 

476 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

477 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

478 ) 

479 ) 

480 

481 self.passwordRecoveryRateLimit.decrementQuota() 

482 

483 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

484 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

485 

486 if not user_skel: 

487 raise errors.NotFound( 

488 i18n.translate( 

489 key="viur.core.modules.user.passwordrecovery.usernotfound", 

490 defaultText="There is no account with this name", 

491 hint="We cant find an account with that name (Should never happen)" 

492 ) 

493 ) 

494 

495 # If the account is locked or not yet validated, abort the process. 

496 if not self._user_module.is_active(user_skel): 

497 raise errors.NotFound( 

498 i18n.translate( 

499 key="viur.core.modules.user.passwordrecovery.accountlocked", 

500 defaultText="This account is currently locked. You cannot change its password.", 

501 hint="Attempted password recovery on a locked account" 

502 ) 

503 ) 

504 

505 # Update the password, save the user, reset his session and show the success-template 

506 user_skel["password"] = skel["password"] 

507 user_skel.write(update_relations=False) 

508 

509 return self._user_module.render.render( 

510 "pwrecover_success", 

511 next_url=self.start_url, 

512 tpl=self.passwordRecoverySuccessTemplate 

513 ) 

514 

515 @tasks.CallDeferred 

516 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

517 """ 

518 Sends the given recovery code to the user given in userName. This function runs deferred 

519 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

520 code by email (assuming we have working email delivery), but this can be overridden to send it 

521 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

522 can be send to any given user in four hours. 

523 """ 

524 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

525 user_agent = user_agents.parse(user_agent) 

526 email.send_email( 

527 tpl=self.passwordRecoveryMail, 

528 skel=user_skel, 

529 dests=[user_name], 

530 recovery_key=recovery_key, 

531 user_agent={ 

532 "device": user_agent.get_device(), 

533 "os": user_agent.get_os(), 

534 "browser": user_agent.get_browser() 

535 } 

536 ) 

537 

538 @exposed 

539 @skey(forward_payload="data", session_bound=False) 

540 def verify(self, data): 

541 def transact(key): 

542 skel = self._user_module.editSkel() 

543 if not key or not skel.read(key): 

544 return None 

545 

546 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

547 if self.registrationAdminVerificationRequired else Status.ACTIVE 

548 

549 skel.write(update_relations=False) 

550 return skel 

551 

552 if not isinstance(data, dict) or not (skel := db.run_in_transaction(transact, data.get("user_key"))): 

553 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

554 

555 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

556 

557 def canAdd(self) -> bool: 

558 return self.registrationEnabled 

559 

560 def addSkel(self): 

561 """ 

562 Prepare the add-Skel for rendering. 

563 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

564 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

565 :return: viur.core.skeleton.Skeleton 

566 """ 

567 skel = self._user_module.addSkel() 

568 

569 if self.registrationEmailVerificationRequired: 

570 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

571 elif self.registrationAdminVerificationRequired: 

572 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

573 else: # No further verification required 

574 defaultStatusValue = Status.ACTIVE 

575 

576 skel.status.readOnly = True 

577 skel["status"] = defaultStatusValue 

578 

579 if "password" in skel: 

580 skel.password.required = True # The user will have to set a password 

581 

582 return skel 

583 

584 @force_ssl 

585 @exposed 

586 @skey(allow_empty=True) 

587 def add(self, *, bounce: bool = False, **kwargs): 

588 """ 

589 Allows guests to register a new account if self.registrationEnabled is set to true 

590 

591 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

592 

593 :returns: The rendered, added object of the entry, eventually with error hints. 

594 

595 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

596 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

597 """ 

598 if not self.canAdd(): 

599 raise errors.Unauthorized() 

600 

601 skel = self.addSkel() 

602 

603 if ( 

604 not kwargs # no data supplied 

605 or not current.request.get().isPostRequest # bail out if not using POST-method 

606 or not skel.fromClient(kwargs) # failure on reading into the bones 

607 or bounce # review before adding 

608 ): 

609 # render the skeleton in the version it could as far as it could be read. 

610 return self._user_module.render.add(skel) 

611 

612 self._user_module.onAdd(skel) 

613 skel.write() 

614 

615 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

616 # The user will have to verify his email-address. Create a skey and send it to his address 

617 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

618 user_key=db.normalize_key(skel["key"]), 

619 name=skel["name"]) 

620 skel.skey = BaseBone(descr="Skey") 

621 skel["skey"] = skey 

622 email.send_email(dests=[skel["name"]], tpl=self.verifyEmailAddressMail, skel=skel) 

623 

624 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

625 return self._user_module.render.addSuccess(skel) 

626 

627 

628class GoogleAccount(UserPrimaryAuthentication): 

629 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

630 NAME = "Google Account" 

631 

632 @classmethod 

633 def patch_user_skel(cls, skel_cls): 

634 """ 

635 Modifies the UserSkel to be equipped by a bones required by Google Auth 

636 """ 

637 skel_cls.uid = StringBone( 

638 descr="Google UserID", 

639 required=False, 

640 readOnly=True, 

641 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

642 params={ 

643 "category": "Authentication", 

644 } 

645 ) 

646 

647 skel_cls.sync = BooleanBone( 

648 descr="Sync user data with OAuth-based services", 

649 defaultValue=True, 

650 params={ 

651 "category": "Authentication", 

652 "tooltip": 

653 "If set, user data like firstname and lastname is automatically kept" 

654 "synchronous with the information stored at the OAuth service provider" 

655 "(e.g. Google Login)." 

656 } 

657 ) 

658 

659 @exposed 

660 @force_ssl 

661 @skey(allow_empty=True) 

662 def login(self, token: str | None = None, *args, **kwargs): 

663 if not conf.user.google_client_id: 

664 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

665 

666 if not token: 

667 request = current.request.get() 

668 request.response.headers["Content-Type"] = "text/html" 

669 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

670 # We have to allow popups here 

671 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

672 

673 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

674 with open(file_path) as file: 

675 tpl_string = file.read() 

676 

677 # FIXME: Use Jinja2 for rendering? 

678 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

679 extendCsp({ 

680 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

681 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

682 }) 

683 return tpl_string 

684 

685 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

686 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

687 raise ValueError("Invalid issuer") 

688 

689 # Token looks valid :) 

690 uid = user_info["sub"] 

691 email = user_info["email"] 

692 

693 base_skel = self._user_module.baseSkel() 

694 update = False 

695 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

696 # We'll try again - checking if there's already an user with that email 

697 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

698 # Still no luck - it's a completely new user 

699 if not self.registrationEnabled: 

700 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

701 logging.debug(f"Google user is from allowed {domain} - adding account") 

702 else: 

703 logging.debug(f"Google user is from {domain} - denying registration") 

704 raise errors.Forbidden("Registration for new users is disabled") 

705 

706 user_skel = base_skel 

707 user_skel["uid"] = uid 

708 user_skel["name"] = email 

709 update = True 

710 

711 # Take user information from Google, if wanted! 

712 if user_skel["sync"]: 

713 for target, source in { 

714 "name": email, 

715 "firstname": user_info.get("given_name"), 

716 "lastname": user_info.get("family_name"), 

717 }.items(): 

718 

719 if user_skel[target] != source: 

720 user_skel[target] = source 

721 update = True 

722 

723 if update: 

724 assert user_skel.write() 

725 

726 return self.next_or_finish(user_skel) 

727 

728 

729class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

730 """Abstract class for all second factors.""" 

731 MAX_RETRY = 3 

732 second_factor_login_template = "user_login_secondfactor" 

733 """Template to enter the TOPT on login""" 

734 

735 @property 

736 @abc.abstractmethod 

737 def NAME(self) -> str: 

738 """Name for this factor for templates.""" 

739 ... 

740 

741 @property 

742 @abc.abstractmethod 

743 def ACTION_NAME(self) -> str: 

744 """The action name for this factor, used as path-segment.""" 

745 ... 

746 

747 def __init__(self, moduleName, modulePath, _user_module): 

748 super().__init__(moduleName, modulePath, _user_module) 

749 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

750 self.add_url = f"{self.modulePath}/add" 

751 self.start_url = f"{self.modulePath}/start" 

752 

753 

754class TimeBasedOTP(UserSecondFactorAuthentication): 

755 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

756 WINDOW_SIZE = 5 

757 ACTION_NAME = "otp" 

758 NAME = "Time-based OTP" 

759 second_factor_login_template = "user_login_secondfactor" 

760 

761 @dataclasses.dataclass 

762 class OtpConfig: 

763 """ 

764 This dataclass is used to provide an interface for a OTP token 

765 algorithm description that is passed within the TimeBasedOTP 

766 class for configuration. 

767 """ 

768 secret: str 

769 timedrift: float = 0.0 

770 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

771 interval: int = 60 

772 

773 class OtpSkel(skeleton.RelSkel): 

774 """ 

775 This is the Skeleton used to ask for the OTP token. 

776 """ 

777 otptoken = NumericBone( 

778 descr="Token", 

779 required=True, 

780 max=999999, 

781 min=0, 

782 ) 

783 

784 @classmethod 

785 def patch_user_skel(cls, skel_cls): 

786 """ 

787 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

788 """ 

789 # One-Time Password Verification 

790 skel_cls.otp_serial = StringBone( 

791 descr="OTP serial", 

792 searchable=True, 

793 params={ 

794 "category": "Second Factor Authentication", 

795 } 

796 ) 

797 

798 skel_cls.otp_secret = CredentialBone( 

799 descr="OTP secret", 

800 params={ 

801 "category": "Second Factor Authentication", 

802 } 

803 ) 

804 

805 skel_cls.otp_timedrift = NumericBone( 

806 descr="OTP time drift", 

807 readOnly=True, 

808 defaultValue=0, 

809 precision=1, 

810 params={ 

811 "category": "Second Factor Authentication", 

812 } 

813 ) 

814 

815 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

816 """ 

817 Returns an instance of self.OtpConfig with a provided token configuration, 

818 or None when there is no appropriate configuration of this second factor handler available. 

819 """ 

820 

821 if otp_secret := skel.dbEntity.get("otp_secret"): 

822 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

823 

824 return None 

825 

826 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

827 """ 

828 Specified whether the second factor authentication can be handled by the given user or not. 

829 """ 

830 return bool(self.get_config(skel)) 

831 

832 @exposed 

833 def start(self): 

834 """ 

835 Configures OTP login for the current session. 

836 

837 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

838 """ 

839 session = current.session.get() 

840 

841 if not (user_key := session.get("possible_user_key")): 

842 raise errors.PreconditionFailed( 

843 "Second factor can only be triggered after successful primary authentication." 

844 ) 

845 

846 user_skel = self._user_module.baseSkel() 

847 if not user_skel.read(user_key): 

848 raise errors.NotFound("The previously authenticated user is gone.") 

849 

850 if not (otp_user_conf := self.get_config(user_skel)): 

851 raise errors.PreconditionFailed("This second factor is not available for the user") 

852 

853 otp_user_conf = { 

854 "key": str(user_key), 

855 } | dataclasses.asdict(otp_user_conf) 

856 

857 session = current.session.get() 

858 session["_otp_user"] = otp_user_conf 

859 session.markChanged() 

860 

861 return self._user_module.render.edit( 

862 self.OtpSkel(), 

863 params={ 

864 "name": i18n.translate( 

865 f"viur.core.modules.user.{self.ACTION_NAME}", 

866 default_variables={"name": self.NAME}, 

867 ), 

868 "action_name": self.ACTION_NAME, 

869 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

870 }, 

871 tpl=self.second_factor_login_template 

872 ) 

873 

874 @exposed 

875 @force_ssl 

876 @skey(allow_empty=True) 

877 def otp(self, *args, **kwargs): 

878 """ 

879 Performs the second factor validation and interaction with the client. 

880 """ 

881 session = current.session.get() 

882 if not (otp_user_conf := session.get("_otp_user")): 

883 raise errors.PreconditionFailed("No OTP process started in this session") 

884 

885 # Check if maximum second factor verification attempts 

886 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

887 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

888 

889 # Read the OTP token via the skeleton, to obtain a valid value 

890 skel = self.OtpSkel() 

891 if skel.fromClient(kwargs): 

892 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

893 res = self.verify( 

894 otp=skel["otptoken"], 

895 secret=otp_user_conf["secret"], 

896 algorithm=otp_user_conf.get("algorithm") or "sha1", 

897 interval=otp_user_conf.get("interval") or 60, 

898 timedrift=otp_user_conf.get("timedrift") or 0.0, 

899 valid_window=self.WINDOW_SIZE 

900 ) 

901 else: 

902 res = None 

903 

904 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

905 if res is None: 

906 otp_user_conf["attempts"] = attempts + 1 

907 session.markChanged() 

908 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

909 return self._user_module.render.edit( 

910 skel, 

911 name=i18n.translate( 

912 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

913 default_variables={"name": self.NAME}, 

914 ), 

915 action_name=self.ACTION_NAME, 

916 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

917 tpl=self.second_factor_login_template 

918 ) 

919 

920 # Remove otp user config from session 

921 user_key = db.key_helper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

922 del session["_otp_user"] 

923 session.markChanged() 

924 

925 # Check if the OTP device has a time drift 

926 

927 timedriftchange = float(res) - otp_user_conf["timedrift"] 

928 if abs(timedriftchange) > 2: 

929 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

930 # update clock-drift value accordingly 

931 self.updateTimeDrift(user_key, timedriftchange) 

932 

933 # Continue with authentication 

934 return self._user_module.secondFactorSucceeded(self, user_key) 

935 

936 @staticmethod 

937 def verify( 

938 otp: str | int, 

939 secret: str, 

940 algorithm: str = "sha1", 

941 interval: int = 60, 

942 timedrift: float = 0.0, 

943 for_time: datetime.datetime | None = None, 

944 valid_window: int = 0, 

945 ) -> int | None: 

946 """ 

947 Verifies the OTP passed in against the current time OTP. 

948 

949 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

950 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

951 on the hardware token generator. This can be used to store the time drift of a given token generator. 

952 

953 :param otp: the OTP token to check against 

954 :param secret: The OTP secret 

955 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

956 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). 

957 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

958 :param for_time: Time to check OTP at (defaults to now) 

959 :param valid_window: extends the validity to this many counter ticks before and after the current one 

960 

961 :returns: The index where verification succeeded, None otherwise 

962 """ 

963 # get the hashing digest 

964 digest = { 

965 "sha1": hashlib.sha1, 

966 "sha256": hashlib.sha256, 

967 }.get(algorithm) 

968 

969 if not digest: 

970 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

971 

972 if for_time is None: 

973 for_time = datetime.datetime.now() 

974 

975 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

976 timedrift = round(timedrift) 

977 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

978 otp = str(otp).zfill(6) # fill with zeros in front 

979 

980 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

981 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

982 

983 if valid_window: 

984 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

985 token = str(totp.at(for_time, offset)) 

986 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

987 if hmac.compare_digest(otp, token): 

988 return offset 

989 

990 return None 

991 

992 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

993 

994 # FIXME: VIUR4 rename 

995 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

996 """ 

997 Updates the clock-drift value. 

998 

999 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

1000 it out of bounds. Maximum change per call is 0.3 minutes. 

1001 

1002 :param user_key: For which user should the update occour 

1003 :param idx: How many steps before/behind was that token 

1004 """ 

1005 if user_skel := self._user_module.skel().read(user_key): 

1006 if otp_skel := self._get_otptoken(user_skel): 

1007 otp_skel.patch( 

1008 { 

1009 "+otp_timedrift": min(max(0.1 * idx, -0.3), 0.3) 

1010 }, 

1011 update_relations=False, 

1012 ) 

1013 

1014 

1015class AuthenticatorOTP(UserSecondFactorAuthentication): 

1016 """ 

1017 This class handles the second factor for apps like authy and so on 

1018 """ 

1019 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

1020 

1021 # second_factor_add_template = "user_secondfactor_add" 

1022 # """Template to configure (add) a new TOPT""" 

1023 

1024 ACTION_NAME = "authenticator_otp" 

1025 """Action name provided for *otp_template* on login""" 

1026 

1027 NAME = "Authenticator App" 

1028 

1029 # FIXME: The second factor add has to be rewritten entirely to ActionSkel paradigm. 

1030 ''' 

1031 @exposed 

1032 @force_ssl 

1033 @skey(allow_empty=True) 

1034 def add(self, otp=None): 

1035 """ 

1036 We try to read the otp_app_secret from the current session. When this fails we generate a new one and store 

1037 it in the session. 

1038 

1039 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

1040 the otp_app_secret from the session in the user entry. 

1041 """ 

1042 current_session = current.session.get() 

1043 

1044 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

1045 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

1046 current_session["_maybe_otp_app_secret"] = otp_app_secret 

1047 current_session.markChanged() 

1048 

1049 if otp is None: 

1050 return self._user_module.render.second_factor_add( 

1051 tpl=self.second_factor_add_template, 

1052 action_name=self.ACTION_NAME, 

1053 name=i18n.translate( 

1054 f"viur.core.modules.user.auth{self.ACTION_NAME}", 

1055 default_variables={"name": self.NAME}, 

1056 ), 

1057 add_url=self.add_url, 

1058 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

1059 else: 

1060 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

1061 return self._user_module.render.second_factor_add( 

1062 tpl=self.second_factor_add_template, 

1063 action_name=self.ACTION_NAME, 

1064 name=i18n.translate( 

1065 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1066 default_variables={"name": self.NAME}, 

1067 ), 

1068 add_url=self.add_url, 

1069 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

1070 

1071 # Now we can set the otp_app_secret to the current User and render der Success-template 

1072 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

1073 return self._user_module.render.second_factor_add_success( 

1074 action_name=self.ACTION_NAME, 

1075 name=i18n.translate( 

1076 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1077 default_variables={"name": self.NAME}, 

1078 ), 

1079 ) 

1080 ''' 

1081 

1082 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1083 """ 

1084 We can only handle the second factor if we have stored an otp_app_secret before. 

1085 """ 

1086 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1087 

1088 @classmethod 

1089 def patch_user_skel(cls, skel_cls): 

1090 """ 

1091 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1092 """ 

1093 # Authenticator OTP Apps (like Authy) 

1094 skel_cls.otp_app_secret = CredentialBone( 

1095 descr="OTP Secret (App-Key)", 

1096 params={ 

1097 "category": "Second Factor Authentication", 

1098 } 

1099 ) 

1100 

1101 @classmethod 

1102 def set_otp_app_secret(cls, otp_app_secret=None): 

1103 """ 

1104 Write a new OTP Token in the current user entry. 

1105 """ 

1106 if otp_app_secret is None: 

1107 logging.error("No 'otp_app_secret' is provided") 

1108 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1109 if not (cuser := current.user.get()): 

1110 raise errors.Unauthorized() 

1111 

1112 def transaction(user_key): 

1113 if not (user := db.get(user_key)): 

1114 raise errors.NotFound() 

1115 user["otp_app_secret"] = otp_app_secret 

1116 db.put(user) 

1117 

1118 db.run_in_transaction(transaction, cuser["key"]) 

1119 

1120 @classmethod 

1121 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1122 """ 

1123 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1124 """ 

1125 if not (cuser := current.user.get()): 

1126 raise errors.Unauthorized() 

1127 if not (issuer := conf.user.otp_issuer): 

1128 logging.warning( 

1129 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1130 issuer = conf.instance.project_id 

1131 

1132 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1133 

1134 @classmethod 

1135 def generate_otp_app_secret(cls) -> str: 

1136 """ 

1137 Generate a new OTP Secret 

1138 :return an otp 

1139 """ 

1140 return pyotp.random_base32() 

1141 

1142 @classmethod 

1143 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1144 return pyotp.TOTP(secret).verify(otp) 

1145 

1146 @exposed 

1147 def start(self): 

1148 otp_user_conf = {"attempts": 0} 

1149 session = current.session.get() 

1150 session["_otp_user"] = otp_user_conf 

1151 session.markChanged() 

1152 return self._user_module.render.edit( 

1153 TimeBasedOTP.OtpSkel(), 

1154 params={ 

1155 "name": i18n.translate( 

1156 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1157 default_variables={"name": self.NAME}, 

1158 ), 

1159 "action_name": self.ACTION_NAME, 

1160 "action_url": self.action_url, 

1161 }, 

1162 tpl=self.second_factor_login_template, 

1163 ) 

1164 

1165 @exposed 

1166 @force_ssl 

1167 @skey 

1168 def authenticator_otp(self, **kwargs): 

1169 """ 

1170 We verify the otp here with the secret we stored before. 

1171 """ 

1172 session = current.session.get() 

1173 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1174 

1175 if not (otp_user_conf := session.get("_otp_user")): 

1176 raise errors.PreconditionFailed("No OTP process started in this session") 

1177 

1178 # Check if maximum second factor verification attempts 

1179 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1180 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1181 

1182 if not (user := db.get(user_key)): 

1183 raise errors.NotFound() 

1184 

1185 skel = TimeBasedOTP.OtpSkel() 

1186 if not skel.fromClient(kwargs): 

1187 raise errors.PreconditionFailed() 

1188 otp_token = str(skel["otptoken"]).zfill(6) 

1189 

1190 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1191 return self._user_module.secondFactorSucceeded(self, user_key) 

1192 otp_user_conf["attempts"] = attempts + 1 

1193 session.markChanged() 

1194 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1195 return self._user_module.render.edit( 

1196 skel, 

1197 name=i18n.translate( 

1198 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1199 default_variables={"name": self.NAME}, 

1200 ), 

1201 action_name=self.ACTION_NAME, 

1202 action_url=self.action_url, 

1203 tpl=self.second_factor_login_template, 

1204 ) 

1205 

1206 

1207class User(List): 

1208 """ 

1209 The User module is used to manage and authenticate users in a ViUR system. 

1210 

1211 It is used in almost any ViUR project, but ViUR can also function without any user capabilites. 

1212 """ 

1213 

1214 kindName = "user" 

1215 addTemplate = "user_add" 

1216 addSuccessTemplate = "user_add_success" 

1217 

1218 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1219 None, ( 

1220 UserPassword, 

1221 conf.user.google_client_id and GoogleAccount, 

1222 ) 

1223 )) 

1224 """ 

1225 Specifies primary authentication providers that are made available 

1226 as sub-modules under `user/auth_<classname>`. They might require 

1227 customization or configuration. 

1228 """ 

1229 

1230 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1231 TimeBasedOTP, 

1232 AuthenticatorOTP, 

1233 ) 

1234 """ 

1235 Specifies secondary authentication providers that are made available 

1236 as sub-modules under `user/f2_<classname>`. They might require 

1237 customization or configuration, which is determined during the 

1238 login-process depending on the user that wants to login. 

1239 """ 

1240 

1241 validAuthenticationMethods = tuple(filter( 

1242 None, ( 

1243 (UserPassword, AuthenticatorOTP), 

1244 (UserPassword, TimeBasedOTP), 

1245 (UserPassword, None), 

1246 (GoogleAccount, None) if conf.user.google_client_id else None, 

1247 ) 

1248 )) 

1249 """ 

1250 Specifies the possible combinations of primary- and secondary factor 

1251 login methos. 

1252 

1253 GoogleLogin defaults to no second factor, as the Google Account can be 

1254 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1255 handled when there is a user-dependent configuration available. 

1256 """ 

1257 

1258 msg_missing_second_factor = "Second factor required but not configured for this user." 

1259 

1260 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1261 

1262 default_order = "name.idx" 

1263 

1264 roles = { 

1265 "admin": "*", 

1266 } 

1267 

1268 def __init__(self, moduleName, modulePath): 

1269 for provider in self.authenticationProviders: 

1270 assert issubclass(provider, UserPrimaryAuthentication) 

1271 name = f"auth_{provider.__name__.lower()}" 

1272 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1273 

1274 for provider in self.secondFactorProviders: 

1275 assert issubclass(provider, UserSecondFactorAuthentication) 

1276 name = f"f2_{provider.__name__.lower()}" 

1277 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1278 

1279 super().__init__(moduleName, modulePath) 

1280 

1281 def adminInfo(self): 

1282 ret = { 

1283 "icon": "person-fill", 

1284 } 

1285 

1286 if self.is_admin(current.user.get()): 

1287 ret |= { 

1288 "actions": [ 

1289 "trigger_kick", 

1290 "trigger_takeover", 

1291 ], 

1292 "customActions": { 

1293 "trigger_kick": { 

1294 "name": i18n.translate( 

1295 key="viur.core.modules.user.customActions.kick", 

1296 defaultText="Kick user", 

1297 hint="Title of the kick user function" 

1298 ), 

1299 "icon": "trash2-fill", 

1300 "action": "fetch", 

1301 "url": "/vi/{{module}}/trigger/kick/{{key}}", 

1302 "confirm": i18n.translate( 

1303 key="viur.core.modules.user.customActions.kick.confirm", 

1304 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1305 ), 

1306 "success": i18n.translate( 

1307 key="viur.core.modules.user.customActions.kick.success", 

1308 defaultText="Sessions of the user are being invalidated.", 

1309 ), 

1310 }, 

1311 "trigger_takeover": { 

1312 "name": i18n.translate( 

1313 key="viur.core.modules.user.customActions.takeover", 

1314 defaultText="Take-over user", 

1315 hint="Title of the take user over function" 

1316 ), 

1317 "icon": "file-person-fill", 

1318 "action": "fetch", 

1319 "url": "/vi/{{module}}/trigger/takeover/{{key}}", 

1320 "confirm": i18n.translate( 

1321 key="viur.core.modules.user.customActions.takeover.confirm", 

1322 defaultText="Do you really want to replace your current user session by a " 

1323 "user session of the selected user?", 

1324 ), 

1325 "success": i18n.translate( 

1326 key="viur.core.modules.user.customActions.takeover.success", 

1327 defaultText="You're now know as the selected user!", 

1328 ), 

1329 "then": "reload-vi", 

1330 }, 

1331 }, 

1332 } 

1333 

1334 return ret 

1335 

1336 def get_role_defaults(self, role: str) -> set[str]: 

1337 """ 

1338 Returns a set of default access rights for a given role. 

1339 

1340 Defaults to "admin" usage for any role > "user" 

1341 and "scriptor" usage for "admin" role. 

1342 """ 

1343 ret = set() 

1344 

1345 if role in ("viewer", "editor", "admin"): 

1346 ret.add("admin") 

1347 

1348 if role == "admin": 

1349 ret.add("scriptor") 

1350 

1351 return ret 

1352 

1353 def addSkel(self): 

1354 skel = super().addSkel().clone() 

1355 

1356 if self.is_admin(current.user.get()): 

1357 # An admin tries to add a new user. 

1358 skel.status.readOnly = False 

1359 skel.status.visible = True 

1360 skel.access.readOnly = False 

1361 skel.access.visible = True 

1362 

1363 else: 

1364 skel.status.readOnly = True 

1365 skel["status"] = Status.UNSET 

1366 skel.status.visible = False 

1367 skel.access.readOnly = True 

1368 skel["access"] = [] 

1369 skel.access.visible = False 

1370 

1371 if "password" in skel: 

1372 # Unlock and require a password 

1373 skel.password.required = True 

1374 skel.password.visible = True 

1375 skel.password.readOnly = False 

1376 

1377 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1378 return skel 

1379 

1380 def editSkel(self, *args, **kwargs): 

1381 skel = super().editSkel().clone() 

1382 

1383 if "password" in skel: 

1384 skel.password.required = False 

1385 skel.password.visible = True 

1386 skel.password.readOnly = False 

1387 

1388 lock = not self.is_admin(current.user.get()) 

1389 skel.name.readOnly = lock 

1390 skel.access.readOnly = lock 

1391 skel.status.readOnly = lock 

1392 

1393 return skel 

1394 

1395 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1396 return getattr(self, f"f2_{cls.__name__.lower()}") 

1397 

1398 def getCurrentUser(self): 

1399 session = current.session.get() 

1400 

1401 req = current.request.get() 

1402 if session and (session.loaded or req.is_deferred) and (user := session.get("user")): 

1403 skel = self.baseSkel() 

1404 skel.setEntity(user) 

1405 return skel 

1406 

1407 return None 

1408 

1409 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1410 """ 

1411 Continue authentication flow when primary authentication succeeded. 

1412 """ 

1413 skel = self.baseSkel() 

1414 

1415 if not skel.read(user_key): 

1416 raise errors.NotFound("User was not found.") 

1417 

1418 if not provider.can_handle(skel): 

1419 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1420 

1421 session = current.session.get() 

1422 session["possible_user_key"] = user_key.id_or_name 

1423 session["_secondFactorStart"] = utils.utcNow() 

1424 session.markChanged() 

1425 

1426 second_factor_providers = [] 

1427 

1428 for auth_provider, second_factor in self.validAuthenticationMethods: 

1429 if isinstance(provider, auth_provider): 

1430 if second_factor is not None: 

1431 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1432 if second_factor_provider_instance.can_handle(skel): 

1433 second_factor_providers.append(second_factor_provider_instance) 

1434 else: 

1435 second_factor_providers.append(None) 

1436 

1437 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1438 # We have a second factor. So we can get rid of the None 

1439 second_factor_providers.pop(second_factor_providers.index(None)) 

1440 

1441 if len(second_factor_providers) == 0: 

1442 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1443 elif len(second_factor_providers) == 1: 

1444 if second_factor_providers[0] is None: 

1445 # We allow sign-in without a second factor 

1446 return self.authenticateUser(user_key) 

1447 # We have only one second factor we don't need the choice template 

1448 return second_factor_providers[0].start(user_key) 

1449 

1450 # In case there is more than one second factor provider remaining, let the user decide! 

1451 current.session.get()["_secondfactor_providers"] = { 

1452 second_factor.start_url: second_factor.NAME 

1453 for second_factor in second_factor_providers 

1454 if second_factor.VISIBLE 

1455 } 

1456 

1457 return self.select_secondfactor_provider() 

1458 

1459 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1460 """ 

1461 Continue authentication flow when secondary authentication succeeded. 

1462 """ 

1463 session = current.session.get() 

1464 if session["possible_user_key"] != user_key.id_or_name: 

1465 raise errors.Forbidden() 

1466 

1467 # Assert that the second factor verification finished in time 

1468 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1469 raise errors.RequestTimeout() 

1470 

1471 return self.authenticateUser(user_key) 

1472 

1473 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1474 """ 

1475 Hookable check if a user is defined as "active" and can login. 

1476 

1477 :param skel: The UserSkel of the user who wants to login. 

1478 :returns: Returns True or False when the result is unambigous and the user is active or not. \ 

1479 Returns None when the provided skel doesn't provide enough information for determination. 

1480 """ 

1481 if skel and "status" in skel: 

1482 status = skel["status"] 

1483 if not isinstance(status, (Status, int)): 

1484 try: 

1485 status = int(status) 

1486 except ValueError: 

1487 status = Status.UNSET 

1488 

1489 return status >= Status.ACTIVE.value 

1490 

1491 return None 

1492 

1493 def is_admin(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1494 """ 

1495 Hookable check if a user is defined as "admin" and can edit or log into other users. 

1496 Defaults to "root" users only. 

1497 

1498 :param skel: The UserSkel of the user who wants should be checked for user admin privileges. 

1499 :returns: Returns True or False when the result is unambigous and the user is admin or not. \ 

1500 Returns None when the provided skel doesn't provide enough information for determination. 

1501 """ 

1502 if skel and "access" in skel: 

1503 return "root" in skel["access"] 

1504 

1505 return None 

1506 

1507 def authenticateUser(self, key: db.Key, **kwargs): 

1508 """ 

1509 Performs Log-In for the current session and the given user key. 

1510 

1511 This resets the current session: All fields not explicitly marked as persistent 

1512 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1513 

1514 :param key: The (DB-)Key of the user we shall authenticate 

1515 """ 

1516 skel = self.baseSkel() 

1517 if not skel.read(key): 

1518 raise ValueError(f"Unable to authenticate unknown user {key}") 

1519 

1520 # Verify that this user account is active 

1521 if not self.is_active(skel): 

1522 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1523 

1524 # Update session for user 

1525 session = current.session.get() 

1526 # Remember persistent fields... 

1527 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1528 session.reset() 

1529 # and copy them over to the new session 

1530 session |= take_over 

1531 

1532 # Update session, user and request 

1533 session["user"] = skel.dbEntity 

1534 

1535 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1536 current.user.set(self.getCurrentUser()) 

1537 

1538 self.onLogin(skel) 

1539 

1540 return self.render.render("login_success", skel, **kwargs) 

1541 

1542 

1543 # Action for primary authentication selection 

1544 

1545 def SelectAuthenticationProviderSkel(self): 

1546 providers = {} 

1547 first = None 

1548 for provider in self.authenticationProviders: 

1549 if not provider.VISIBLE: 

1550 continue 

1551 

1552 provider = getattr(self, f"auth_{provider.__name__.lower()}") 

1553 providers[provider.start_url] = provider.NAME 

1554 

1555 if first is None: 

1556 first = provider.start_url 

1557 

1558 class SelectAuthenticationProviderSkel(skeleton.RelSkel): 

1559 provider = SelectBone( 

1560 descr="Authentication method", 

1561 required=True, 

1562 values=providers, 

1563 defaultValue=first, 

1564 ) 

1565 

1566 return SelectAuthenticationProviderSkel() 

1567 

1568 @exposed 

1569 def select_authentication_provider(self, **kwargs): 

1570 skel = self.SelectAuthenticationProviderSkel() 

1571 

1572 # Read required bones from client 

1573 if len(skel.provider.values) > 1 and (not kwargs or not skel.fromClient(kwargs)): 

1574 return self.render.render("select_authentication_provider", skel) 

1575 

1576 return self.render.render("select_authentication_provider_success", skel, next_url=skel["provider"]) 

1577 

1578 # Action for second factor select 

1579 

1580 class SelectSecondFactorProviderSkel(skeleton.RelSkel): 

1581 provider = SelectBone( 

1582 descr="Second factor", 

1583 required=True, 

1584 values=lambda: current.session.get()["_secondfactor_providers"] or (), 

1585 ) 

1586 

1587 @exposed 

1588 def select_secondfactor_provider(self, **kwargs): 

1589 skel = self.SelectSecondFactorProviderSkel() 

1590 

1591 # Read required bones from client 

1592 if not kwargs or not skel.fromClient(kwargs): 

1593 return self.render.render("select_secondfactor_provider", skel) 

1594 

1595 del current.session.get()["_secondfactor_providers"] 

1596 

1597 return self.render.render("select_secondfactor_provider_success", skel, next_url=skel["provider"]) 

1598 

1599 @exposed 

1600 @skey 

1601 def logout(self, **kwargs): 

1602 """ 

1603 Implements the logout action. It also terminates the current session (all keys not listed 

1604 in viur.session_persistent_fields_on_logout will be lost). 

1605 """ 

1606 if not (user := current.user.get()): 

1607 raise errors.Unauthorized() 

1608 

1609 self.onLogout(user) 

1610 

1611 session = current.session.get() 

1612 

1613 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}: 

1614 session.reset() 

1615 session |= take_over 

1616 else: 

1617 session.clear() 

1618 

1619 current.user.set(None) # set user to none in context var 

1620 

1621 return self.render.render("logout_success") 

1622 

1623 @exposed 

1624 def login(self, *args, **kwargs): 

1625 return self.select_authentication_provider() 

1626 

1627 def onLogin(self, skel: skeleton.SkeletonInstance): 

1628 """ 

1629 Hook to be called on user login. 

1630 """ 

1631 # Update the lastlogin timestamp (if available!) 

1632 if "lastlogin" in skel: 

1633 now = utils.utcNow() 

1634 

1635 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1636 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1637 skel["lastlogin"] = now 

1638 skel.write(update_relations=False) 

1639 

1640 logging.info(f"""User {skel["name"]} logged in""") 

1641 

1642 def onLogout(self, skel: skeleton.SkeletonInstance): 

1643 """ 

1644 Hook to be called on user logout. 

1645 """ 

1646 logging.info(f"""User {skel["name"]} logged out""") 

1647 

1648 @exposed 

1649 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1650 """ 

1651 Allow a special key "self" to reference the current user. 

1652 

1653 By default, any authenticated user can view its own user entry, 

1654 to obtain access rights and any specific user information. 

1655 This behavior is defined in the customized `canView` function, 

1656 which is overwritten by the User-module. 

1657 

1658 The rendered skeleton can be modified or restriced by specifying 

1659 a customized view-skeleton. 

1660 """ 

1661 if key == "self": 

1662 if user := current.user.get(): 

1663 key = user["key"] 

1664 else: 

1665 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1666 

1667 return super().view(key, *args, **kwargs) 

1668 

1669 def canView(self, skel) -> bool: 

1670 if user := current.user.get(): 

1671 if skel["key"] == user["key"]: 

1672 return True 

1673 

1674 if self.is_admin(user) or "user-view" in user["access"]: 

1675 return True 

1676 

1677 return False 

1678 

1679 @exposed 

1680 @skey(allow_empty=True) 

1681 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1682 """ 

1683 Allow a special key "self" to reference the current user. 

1684 

1685 This modification will only allow to use "self" as a key; 

1686 The specific access right to let the user edit itself must 

1687 still be customized. 

1688 

1689 The rendered and editable skeleton can be modified or restriced 

1690 by specifying a customized edit-skeleton. 

1691 """ 

1692 if key == "self": 

1693 if user := current.user.get(): 

1694 key = user["key"] 

1695 else: 

1696 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1697 

1698 return super().edit(key, *args, **kwargs) 

1699 

1700 @exposed 

1701 def getAuthMethods(self, *args, **kwargs): 

1702 """Legacy method prior < viur-core 3.8: Inform tools like Admin which authentication to use""" 

1703 logging.warning("DEPRECATED!!! Use '/user/login'-method for this, or update your admin version!") 

1704 

1705 res = [ 

1706 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1707 for primary, secondary in self.validAuthenticationMethods 

1708 ] 

1709 

1710 return json.dumps(res) 

1711 

1712 @exposed 

1713 def trigger(self, action: str, key: str): 

1714 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1715 access = self.adminInfo().get("customActions", {}).get(f"trigger_{action}", {}).get("access") or () 

1716 if not ( 

1717 (cuser := current.user.get()) 

1718 and ( 

1719 any(role in cuser["access"] for role in access) 

1720 or self.is_admin(cuser) 

1721 ) 

1722 ): 

1723 raise errors.Unauthorized() 

1724 

1725 skel = self.skel() 

1726 if not skel.read(key) and not (skel := skel.all().mergeExternalFilter({"name": key}).getSkel()): 

1727 raise errors.NotFound("The provided user does not exist.") 

1728 

1729 match action: 

1730 case "takeover": 

1731 self.authenticateUser(skel["key"]) 

1732 

1733 case "kick": 

1734 session.killSessionByUser(skel["key"]) 

1735 

1736 case _: 

1737 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1738 

1739 return self.render.render(f"trigger/{action}Success", skel) 

1740 

1741 @exposed 

1742 @access("admin", "root", offer_login=True) 

1743 def get_cookie_for_app(self, redirect_to: str = None): 

1744 """ 

1745 Generates a session cookie for the currently logged-in user and hands it to an external 

1746 client (script, native app, or WebView). 

1747 

1748 This endpoint is the entry point of a *App Login Flow*. A privileged user 

1749 (admin/root) authenticates normally in the browser and then opens this URL. 

1750 The backend creates a fresh ViUR session (see :meth:`_get_cookie_for_app`) and 

1751 delivers the resulting ``Set-Cookie`` string to the caller. 

1752 

1753 **Typical usage — local Python client / script:** 

1754 

1755 The caller spins up a temporary local HTTP server (e.g. on ``http://localhost:60000``) 

1756 and passes its address as *redirect_to*:: 

1757 

1758 /vi/user/get_cookie_for_app?redirect_to=http://localhost:60000 

1759 

1760 After the user authenticates in the browser, the backend redirects to:: 

1761 

1762 http://localhost:60000?cookie=<url-encoded Set-Cookie string>&app=<project_id> 

1763 

1764 The local server can then extract only the ``name=value`` part of the cookie string 

1765 (everything before the first ``;``) and use it for subsequent API calls:: 

1766 

1767 cookie_str = qs["cookie"][0] # full Set-Cookie value 

1768 key, value = cookie_str.split(";", 1)[0].split("=") 

1769 session.cookies.update({key: value}) 

1770 

1771 The ``app`` query parameter is set by the server to ``conf.instance.project_id`` and 

1772 lets the client distinguish between multiple backends / cache credentials per project. 

1773 

1774 **Alternative — WebView / browser redirect:** 

1775 

1776 When the receiving side is a browser or WebView the full ``Set-Cookie`` string can be 

1777 forwarded to :meth:`apply_login_cookie` to let the framework activate the session. 

1778 

1779 :param redirect_to: Optional callback URL. When provided the caller is redirected to 

1780 that URL with two query parameters appended automatically: 

1781 

1782 - ``cookie`` – URL-encoded ``Set-Cookie`` string (``name=value;flags…``). 

1783 - ``app`` – the server's GCP project ID (``conf.instance.project_id``). 

1784 

1785 A ``?`` is appended if the URL does not already contain one. 

1786 When omitted, the raw ``Set-Cookie`` string is returned as ``text/plain`` 

1787 (useful for debugging or direct API calls). 

1788 

1789 .. warning:: **Open-redirect / session-hijacking risk** 

1790 

1791 Because the session cookie is appended as a plain query parameter, an 

1792 attacker who can convince an authenticated admin to click a crafted link 

1793 (e.g. via phishing) could redirect the browser to an evil server that 

1794 simply harvests the ``cookie`` parameter and gains full session access. 

1795 

1796 To mitigate this, all ``redirect_to`` values are validated against 

1797 :attr:`conf.user.redirect_whitelist` using :func:`fnmatch.fnmatch`. 

1798 Only explicitly whitelisted URL patterns are accepted; 

1799 anything else is rejected with ``403 Forbidden``. 

1800 Configure the whitelist in your project to include every legitimate 

1801 callback origin (local scripts, internal tooling, etc.). 

1802 

1803 :raises errors.Forbidden: When *redirect_to* does not match any pattern in 

1804 :attr:`conf.user.redirect_whitelist`. 

1805 :raises errors.Redirect: Always raised when *redirect_to* is supplied and allowed. 

1806 """ 

1807 if redirect_to: 

1808 whitelist = utils.ensure_iterable(conf.user.redirect_whitelist) 

1809 if not any(fnmatch.fnmatch(redirect_to, pat) for pat in whitelist): 

1810 raise errors.Forbidden(f"Redirect target is not whitelisted") 

1811 if "?" not in redirect_to: 

1812 redirect_to = f"{redirect_to}?" 

1813 raise errors.Redirect( 

1814 f"{redirect_to}" 

1815 f"&cookie={urllib.parse.quote_plus(self._get_cookie_for_app())}" 

1816 f"&app={conf.instance.project_id}" 

1817 ) 

1818 current.request.get().response.headers["Content-Type"] = "text/plain" 

1819 return self._get_cookie_for_app() 

1820 

1821 def _get_cookie_for_app(self) -> str: 

1822 """ 

1823 Creates a new, standalone ViUR session for the current user and returns the 

1824 corresponding ``Set-Cookie`` header value. 

1825 

1826 Unlike the regular session created during a normal login, this session is intentionally 

1827 **not** attached to the current HTTP request. Instead it is persisted directly in 

1828 Datastore so that a different HTTP client can pick it up via :meth:`apply_login_cookie`. 

1829 

1830 The created session entity mirrors the structure of a regular :class:`Session` entry: 

1831 

1832 - ``data["user"]`` – the full user ``dbEntity`` (needed by the session loader). 

1833 - ``data["is_app_session"]``– flag to distinguish app sessions from regular browser sessions. 

1834 - ``static_security_key`` – random value, same role as in normal sessions. 

1835 - ``lastseen`` – current timestamp so the session is not immediately garbage- 

1836 collected. 

1837 - ``user`` – stringified user key for server-side user-based queries. 

1838 

1839 :returns: A ``Set-Cookie`` header value in the form 

1840 ``<cookie_name>=<key>;<flags>`` where ``<flags>`` is produced by 

1841 :meth:`Session.build_flags` (``Path=/; HttpOnly; SameSite=…; Secure; Max-Age=…``). 

1842 """ 

1843 cookie_key = utils.string.random(42) 

1844 db_session = db.Entity(db.Key(Session.kindName, cookie_key)) 

1845 data = db.Entity() 

1846 data["user"] = current.user.get().dbEntity 

1847 data["is_app_session"] = True 

1848 db_session["data"] = db.fix_unindexable_properties(data) 

1849 db_session["static_security_key"] = utils.string.random(42) 

1850 db_session["lastseen"] = time.time() 

1851 db_session["user"] = str(current.user.get()["key"]) 

1852 db_session.exclude_from_indexes = {"data"} 

1853 db.put(db_session) 

1854 

1855 # Provide Set-Cookie header entry with configured properties 

1856 return f"{Session.cookie_name}={cookie_key};{Session.build_flags()}" 

1857 

1858 @exposed 

1859 def apply_login_cookie(self, cookie: str): 

1860 """ 

1861 Redirect endpoint to load session from the given cookie. 

1862 

1863 This is the second half of the *App Login Flow*. A native app or WebView that received a 

1864 ``Set-Cookie`` string from :meth:`get_cookie_for_app` (typically via a redirect URL 

1865 parameter) calls this endpoint to activate the embedded session for its own HTTP context. 

1866 

1867 The flow is: 

1868 

1869 1. Parse the raw ``Set-Cookie`` string with :class:`http.cookies.SimpleCookie`. 

1870 2. Look for the expected session cookie name (:attr:`Session.cookie_name`). 

1871 3. Reset the caller's current (anonymous) session. 

1872 4. Inject the cookie value into the current request's cookie jar so that 

1873 :meth:`Session.load` can find the pre-built Datastore session. 

1874 5. Redirect to ``/`` – from this point on the caller is fully authenticated. 

1875 

1876 :param cookie: A raw ``Set-Cookie`` header value as produced by :meth:`_get_cookie_for_app`, 

1877 e.g. ``viur_cookie_myproject=<key>;Path=/;HttpOnly;…``. 

1878 :raises errors.Redirect: On success – redirects to ``/``. 

1879 :raises errors.BadRequest: When the cookie string does not contain a recognisable session 

1880 cookie (i.e. :attr:`Session.cookie_name` is absent after parsing). 

1881 """ 

1882 cookies = SimpleCookie() 

1883 cookies.load(cookie) 

1884 if Session.cookie_name in cookies: 

1885 session_cookie = cookies[Session.cookie_name] 

1886 current.session.get().reset() 

1887 current.request.get().request.cookies[session_cookie.key] = session_cookie.value 

1888 current.session.get().load() 

1889 raise errors.Redirect("/") 

1890 else: 

1891 raise errors.BadRequest 

1892 

1893 def onEdited(self, skel): 

1894 super().onEdited(skel) 

1895 

1896 # In case the user is set to inactive, kill all sessions 

1897 if self.is_active(skel) is False: 

1898 session.killSessionByUser(skel["key"]) 

1899 

1900 # Update user setting in all sessions 

1901 for session_obj in db.Query("user").filter("user =", skel["key"]).iter(): 

1902 session_obj["data"]["user"] = skel.dbEntity 

1903 

1904 def onDeleted(self, skel): 

1905 super().onDeleted(skel) 

1906 # Invalidate all sessions of that user 

1907 session.killSessionByUser(skel["key"]) 

1908 

1909 

1910@tasks.StartupTask 

1911def createNewUserIfNotExists(): 

1912 """ 

1913 Create a new Admin user, if the userDB is empty 

1914 """ 

1915 if ( 

1916 (user_module := getattr(conf.main_app.vi, "user", None)) 

1917 and isinstance(user_module, User) 

1918 and "addSkel" in dir(user_module) 

1919 and "validAuthenticationMethods" in dir(user_module) 

1920 # UserPassword must be one of the primary login methods 

1921 and any( 

1922 issubclass(provider[0], UserPassword) 

1923 for provider in user_module.validAuthenticationMethods 

1924 ) 

1925 ): 

1926 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1927 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1928 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1929 pw = utils.string.random(13) 

1930 addSkel["name"] = uname 

1931 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1932 addSkel["access"] = ["root"] 

1933 addSkel["password"] = pw 

1934 

1935 try: 

1936 addSkel.write() 

1937 except Exception as e: 

1938 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1939 logging.exception(e) 

1940 return 

1941 

1942 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1943 

1944 logging.warning(msg) 

1945 email.send_email_to_admins("New ViUR password", msg) 

1946 

1947 

1948# DEPRECATED ATTRIBUTES HANDLING 

1949 

1950def __getattr__(attr): 

1951 match attr: 

1952 case "userSkel": 

1953 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1954 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1955 logging.warning(msg) 

1956 return UserSkel 

1957 

1958 return super(__import__(__name__).__class__).__getattr__(attr)