Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%

733 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import abc 

2import datetime 

3import enum 

4import functools 

5import hashlib 

6import hmac 

7import json 

8import logging 

9import secrets 

10import warnings 

11import user_agents 

12 

13import pyotp 

14import base64 

15import dataclasses 

16import typing as t 

17from google.auth.transport import requests 

18from google.oauth2 import id_token 

19 

20from viur.core import ( 

21 conf, current, db, email, errors, i18n, 

22 securitykey, session, skeleton, tasks, utils, Module 

23) 

24from viur.core.decorators import * 

25from viur.core.bones import * 

26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

27from viur.core.prototypes.list import List 

28from viur.core.ratelimit import RateLimit 

29from viur.core.securityheaders import extendCsp 

30 

31 

32@functools.total_ordering 

33class Status(enum.Enum): 

34 """Status enum for a user 

35 

36 Has backwards compatibility to be comparable with non-enum values. 

37 Will be removed with viur-core 4.0.0 

38 """ 

39 

40 UNSET = 0 # Status is unset 

41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

43 DISABLED = 5 # Account disabled 

44 ACTIVE = 10 # Active 

45 

46 def __eq__(self, other): 

47 if isinstance(other, Status): 

48 return super().__eq__(other) 

49 return self.value == other 

50 

51 def __lt__(self, other): 

52 if isinstance(other, Status): 

53 return super().__lt__(other) 

54 return self.value < other 

55 

56 

57class UserSkel(skeleton.Skeleton): 

58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

59 

60 name = EmailBone( 

61 descr="E-Mail", 

62 required=True, 

63 readOnly=True, 

64 caseSensitive=False, 

65 searchable=True, 

66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

67 ) 

68 

69 firstname = StringBone( 

70 descr="Firstname", 

71 searchable=True, 

72 ) 

73 

74 lastname = StringBone( 

75 descr="Lastname", 

76 searchable=True, 

77 ) 

78 

79 roles = SelectBone( 

80 descr=i18n.translate("viur.core.modules.user.bone.roles", defaultText="Roles"), 

81 values=conf.user.roles, 

82 required=True, 

83 multiple=True, 

84 # fixme: This is generally broken in VIUR! See #776 for details. 

85 # vfunc=lambda values: 

86 # i18n.translate( 

87 # "user.bone.roles.invalid", 

88 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

89 # if "custom" in values and len(values) > 1 else None, 

90 defaultValue=list(conf.user.roles.keys())[:1], 

91 ) 

92 

93 access = SelectBone( 

94 descr=i18n.translate("viur.core.modules.user.bone.access", defaultText="Access rights"), 

95 type_suffix="access", 

96 values=lambda: { 

97 right: i18n.translate(f"viur.core.modules.user.accessright.{right}", defaultText=right) 

98 for right in sorted(conf.user.access_rights) 

99 }, 

100 multiple=True, 

101 params={ 

102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

103 } 

104 ) 

105 

106 status = SelectBone( 

107 descr=i18n.translate("viur.core.modules.user.bone.status", "Account status"), 

108 values=Status, 

109 translation_key_prefix="viur.core.user.status.", 

110 defaultValue=Status.ACTIVE, 

111 required=True, 

112 ) 

113 

114 lastlogin = DateBone( 

115 descr="Last Login", 

116 readOnly=True, 

117 ) 

118 

119 admin_config = JsonBone( # This bone stores settings from the vi 

120 descr="Config for the User", 

121 visible=False 

122 ) 

123 

124 def __new__(cls, *args, **kwargs): 

125 """ 

126 Constructor for the UserSkel-class, with the capability 

127 to dynamically add bones required for the configured 

128 authentication methods. 

129 """ 

130 for provider in conf.main_app.vi.user.authenticationProviders: 

131 assert issubclass(provider, UserPrimaryAuthentication) 

132 provider.patch_user_skel(cls) 

133 

134 for provider in conf.main_app.vi.user.secondFactorProviders: 

135 assert issubclass(provider, UserSecondFactorAuthentication) 

136 provider.patch_user_skel(cls) 

137 

138 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

139 return super().__new__(cls, *args, **kwargs) 

140 

141 @classmethod 

142 def write(cls, skel, *args, **kwargs): 

143 # Roles 

144 if skel["roles"] and "custom" not in skel["roles"]: 

145 # Collect access rights through rules 

146 access = set() 

147 

148 for role in skel["roles"]: 

149 # Get default access for this role 

150 access |= conf.main_app.vi.user.get_role_defaults(role) 

151 

152 # Go through all modules and evaluate available role-settings 

153 for name in dir(conf.main_app.vi): 

154 if name.startswith("_"): 

155 continue 

156 

157 module = getattr(conf.main_app.vi, name) 

158 if not isinstance(module, Module): 

159 continue 

160 

161 roles = getattr(module, "roles", None) or {} 

162 rights = roles.get(role, roles.get("*", ())) 

163 

164 # Convert role into tuple if it's not 

165 if not isinstance(rights, (tuple, list)): 

166 rights = (rights, ) 

167 

168 if "*" in rights: 

169 for right in module.accessRights: 

170 access.add(f"{name}-{right}") 

171 else: 

172 for right in rights: 

173 if right in module.accessRights: 

174 access.add(f"{name}-{right}") 

175 

176 # special case: "edit" and "delete" actions require "view" as well! 

177 if right in ("edit", "delete") and "view" in module.accessRights: 

178 access.add(f"{name}-view") 

179 

180 skel["access"] = list(access) 

181 

182 return super().write(skel, *args, **kwargs) 

183 

184 

185class UserAuthentication(Module, abc.ABC): 

186 @property 

187 @abc.abstractstaticmethod 

188 def METHOD_NAME() -> str: 

189 """ 

190 Define a unique method name for this authentication. 

191 """ 

192 ... 

193 

194 @property 

195 @abc.abstractstaticmethod 

196 def NAME() -> str: 

197 """ 

198 Define a descriptive name for this authentication. 

199 """ 

200 ... 

201 

202 @property 

203 @staticmethod 

204 def VISIBLE(cls) -> bool: 

205 """ 

206 Defines if the authentication method is visible to the user. 

207 """ 

208 return True 

209 

210 def __init__(self, moduleName, modulePath, userModule): 

211 super().__init__(moduleName, modulePath) 

212 self._user_module = userModule 

213 self.start_url = f"{self.modulePath}/login" 

214 

215 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

216 return True 

217 

218 @classmethod 

219 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

220 """ 

221 Allows for an UserAuthentication to patch the UserSkel 

222 class with additional bones which are required for 

223 the implemented authentication method. 

224 """ 

225 ... 

226 

227 

228class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

229 """Abstract class for all primary authentication methods.""" 

230 registrationEnabled = False 

231 

232 @abc.abstractmethod 

233 def login(self, *args, **kwargs): 

234 ... 

235 

236 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

237 """ 

238 Hook that is called whenever a part of the authentication was successful. 

239 It allows to perform further steps in custom authentications, 

240 e.g. change a password after first login. 

241 """ 

242 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

243 

244 

245class UserPassword(UserPrimaryAuthentication): 

246 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

247 NAME = "Username & Password" 

248 

249 registrationEmailVerificationRequired = True 

250 registrationAdminVerificationRequired = True 

251 

252 verifySuccessTemplate = "user_verify_success" 

253 verifyEmailAddressMail = "user_verify_address" 

254 verifyFailedTemplate = "user_verify_failed" 

255 passwordRecoveryMail = "user_password_recovery" 

256 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

257 passwordRecoveryTemplate = "user_passwordrecover" 

258 

259 # The default rate-limit for password recovery (10 tries each 15 minutes) 

260 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

261 

262 # Limit (invalid) login-retries to once per 5 seconds 

263 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

264 

265 @classmethod 

266 def patch_user_skel(cls, skel_cls): 

267 """ 

268 Modifies the UserSkel to be equipped by a PasswordBone. 

269 """ 

270 skel_cls.password = PasswordBone( 

271 readOnly=True, 

272 visible=False, 

273 params={ 

274 "category": "Authentication", 

275 } 

276 ) 

277 

278 class LoginSkel(skeleton.RelSkel): 

279 name = EmailBone( 

280 descr="E-Mail", 

281 required=True, 

282 caseSensitive=False, 

283 ) 

284 password = PasswordBone( 

285 required=True, 

286 test_threshold=0, 

287 tests=(), 

288 raw=True, 

289 ) 

290 

291 class LostPasswordStep1Skel(skeleton.RelSkel): 

292 name = EmailBone( 

293 descr="E-Mail", 

294 required=True, 

295 ) 

296 

297 class LostPasswordStep2Skel(skeleton.RelSkel): 

298 recovery_key = StringBone( 

299 descr="Recovery Key", 

300 required=True, 

301 params={ 

302 "tooltip": i18n.translate( 

303 key="viur.core.modules.user.userpassword.lostpasswordstep2.recoverykey", 

304 defaultText="Please enter the validation key you've received via e-mail.", 

305 hint="Shown when the user needs more than 15 minutes to paste the key", 

306 ), 

307 } 

308 ) 

309 

310 class LostPasswordStep3Skel(skeleton.RelSkel): 

311 # send the recovery key again, in case the password is rejected by some reason. 

312 recovery_key = StringBone( 

313 descr="Recovery Key", 

314 visible=False, 

315 ) 

316 

317 password = PasswordBone( 

318 descr="New Password", 

319 required=True, 

320 params={ 

321 "tooltip": i18n.translate( 

322 key="viur.core.modules.user.userpassword.lostpasswordstep3.password", 

323 defaultText="Please enter a new password for your account.", 

324 ), 

325 } 

326 ) 

327 

328 @exposed 

329 @force_ssl 

330 @skey(allow_empty=True) 

331 def login(self, **kwargs): 

332 # Obtain a fresh login skel 

333 skel = self.LoginSkel() 

334 

335 # Read required bones from client 

336 if not (kwargs and skel.fromClient(kwargs)): 

337 return self._user_module.render.render("login", skel) 

338 

339 self.loginRateLimit.assertQuotaIsAvailable() 

340 

341 # query for the username. The query might find another user, but the name is being checked for equality below 

342 name = skel["name"].lower().strip() 

343 user_skel = self._user_module.baseSkel() 

344 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

345 

346 # extract password hash from raw database entity (skeleton access blocks it) 

347 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

348 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

349 password_hash = encode_password(skel["password"], password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

350 

351 # now check if the username matches 

352 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

353 

354 # next, check if the password hash matches 

355 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

356 

357 if not is_okay: 

358 # Set error to all required fields 

359 for name, bone in skel.items(): 

360 if bone.required: 

361 skel.errors.append( 

362 ReadFromClientError( 

363 ReadFromClientErrorSeverity.Invalid, 

364 i18n.translate( 

365 key="viur.core.modules.user.userpassword.login.failed", 

366 defaultText="Invalid username or password provided", 

367 ), 

368 name, 

369 ) 

370 ) 

371 

372 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

373 return self._user_module.render.render("login", skel) 

374 

375 # check if iterations are below current security standards, and update if necessary. 

376 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

377 logging.info(f"Update password hash for user {name}.") 

378 # re-hash the password with more iterations 

379 # FIXME: This must be done within a transaction! 

380 user_skel["password"] = kwargs["password"] # will be hashed on serialize 

381 user_skel.write(update_relations=False) 

382 

383 return self.next_or_finish(user_skel) 

384 

385 @exposed 

386 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, **kwargs): 

387 """ 

388 This implements a password recovery process which lets users set a new password for their account, 

389 after validating a recovery key sent by email. 

390 

391 The process is as following: 

392 

393 - The user enters the registered email adress (not validated here) 

394 - A random code is generated and stored as a security-ke, then sendUserPasswordRecoveryCode is called. 

395 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

396 and send a link with the code. It runs as a deferred task so no information if a user account exists 

397 is being leaked. 

398 - If the user received an email, the link can be clicked to set a new password for the account. 

399 

400 To prevent automated attacks, the first step is guarded by limited calls to this function to 10 actions 

401 per 15 minutes. (One complete recovery process consists of two calls). 

402 """ 

403 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

404 current_request = current.request.get() 

405 

406 if recovery_key is None: 

407 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

408 skel = self.LostPasswordStep1Skel() 

409 

410 if ( 

411 not kwargs 

412 or not current_request.isPostRequest 

413 or not skel.fromClient(kwargs) 

414 ): 

415 return self._user_module.render.render( 

416 "pwrecover", skel, 

417 tpl=self.passwordRecoveryTemplate, 

418 ) 

419 

420 # validate security key 

421 if not securitykey.validate(skey): 

422 raise errors.PreconditionFailed() 

423 

424 self.passwordRecoveryRateLimit.decrementQuota() 

425 

426 recovery_key = securitykey.create( 

427 duration=datetime.timedelta(minutes=15), 

428 key_length=conf.security.password_recovery_key_length, 

429 user_name=skel["name"].lower(), 

430 session_bound=False, 

431 ) 

432 

433 # Send the code in background 

434 self.sendUserPasswordRecoveryCode( 

435 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

436 ) 

437 

438 # step 2 is only an action-skel, and can be ignored by a direct link in the 

439 # e-mail previously sent. It depends on the implementation of the specific project. 

440 return self._user_module.render.render( 

441 "pwrecover", self.LostPasswordStep2Skel(), 

442 tpl=self.passwordRecoveryTemplate, 

443 ) 

444 

445 # in step 3 

446 skel = self.LostPasswordStep3Skel() 

447 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails. 

448 

449 # check for any input; Render input-form again when incomplete. 

450 if ( 

451 not kwargs 

452 or not current_request.isPostRequest 

453 or not skel.fromClient(kwargs, ignore=("recovery_key",)) 

454 ): 

455 return self._user_module.render.render( 

456 "pwrecover", skel, 

457 tpl=self.passwordRecoveryTemplate, 

458 ) 

459 

460 # validate security key 

461 if not securitykey.validate(skey): 

462 raise errors.PreconditionFailed() 

463 

464 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

465 raise errors.PreconditionFailed( 

466 i18n.translate( 

467 key="viur.core.modules.user.passwordrecovery.keyexpired", 

468 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

469 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

470 ) 

471 ) 

472 

473 self.passwordRecoveryRateLimit.decrementQuota() 

474 

475 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

476 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

477 

478 if not user_skel: 

479 raise errors.NotFound( 

480 i18n.translate( 

481 key="viur.core.modules.user.passwordrecovery.usernotfound", 

482 defaultText="There is no account with this name", 

483 hint="We cant find an account with that name (Should never happen)" 

484 ) 

485 ) 

486 

487 # If the account is locked or not yet validated, abort the process. 

488 if not self._user_module.is_active(user_skel): 

489 raise errors.NotFound( 

490 i18n.translate( 

491 key="viur.core.modules.user.passwordrecovery.accountlocked", 

492 defaultText="This account is currently locked. You cannot change its password.", 

493 hint="Attempted password recovery on a locked account" 

494 ) 

495 ) 

496 

497 # Update the password, save the user, reset his session and show the success-template 

498 user_skel["password"] = skel["password"] 

499 user_skel.write(update_relations=False) 

500 

501 return self._user_module.render.render( 

502 "pwrecover_success", 

503 next_url=self.start_url, 

504 tpl=self.passwordRecoverySuccessTemplate 

505 ) 

506 

507 @tasks.CallDeferred 

508 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

509 """ 

510 Sends the given recovery code to the user given in userName. This function runs deferred 

511 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

512 code by email (assuming we have working email delivery), but this can be overridden to send it 

513 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

514 can be send to any given user in four hours. 

515 """ 

516 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

517 user_agent = user_agents.parse(user_agent) 

518 email.send_email( 

519 tpl=self.passwordRecoveryMail, 

520 skel=user_skel, 

521 dests=[user_name], 

522 recovery_key=recovery_key, 

523 user_agent={ 

524 "device": user_agent.get_device(), 

525 "os": user_agent.get_os(), 

526 "browser": user_agent.get_browser() 

527 } 

528 ) 

529 

530 @exposed 

531 @skey(forward_payload="data", session_bound=False) 

532 def verify(self, data): 

533 def transact(key): 

534 skel = self._user_module.editSkel() 

535 if not key or not skel.read(key): 

536 return None 

537 

538 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

539 if self.registrationAdminVerificationRequired else Status.ACTIVE 

540 

541 skel.write(update_relations=False) 

542 return skel 

543 

544 if not isinstance(data, dict) or not (skel := db.run_in_transaction(transact, data.get("user_key"))): 

545 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

546 

547 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

548 

549 def canAdd(self) -> bool: 

550 return self.registrationEnabled 

551 

552 def addSkel(self): 

553 """ 

554 Prepare the add-Skel for rendering. 

555 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

556 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

557 :return: viur.core.skeleton.Skeleton 

558 """ 

559 skel = self._user_module.addSkel() 

560 

561 if self.registrationEmailVerificationRequired: 

562 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

563 elif self.registrationAdminVerificationRequired: 

564 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

565 else: # No further verification required 

566 defaultStatusValue = Status.ACTIVE 

567 

568 skel.status.readOnly = True 

569 skel["status"] = defaultStatusValue 

570 

571 if "password" in skel: 

572 skel.password.required = True # The user will have to set a password 

573 

574 return skel 

575 

576 @force_ssl 

577 @exposed 

578 @skey(allow_empty=True) 

579 def add(self, *, bounce: bool = False, **kwargs): 

580 """ 

581 Allows guests to register a new account if self.registrationEnabled is set to true 

582 

583 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

584 

585 :returns: The rendered, added object of the entry, eventually with error hints. 

586 

587 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

588 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

589 """ 

590 if not self.canAdd(): 

591 raise errors.Unauthorized() 

592 

593 skel = self.addSkel() 

594 

595 if ( 

596 not kwargs # no data supplied 

597 or not current.request.get().isPostRequest # bail out if not using POST-method 

598 or not skel.fromClient(kwargs) # failure on reading into the bones 

599 or bounce # review before adding 

600 ): 

601 # render the skeleton in the version it could as far as it could be read. 

602 return self._user_module.render.add(skel) 

603 

604 self._user_module.onAdd(skel) 

605 skel.write() 

606 

607 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

608 # The user will have to verify his email-address. Create a skey and send it to his address 

609 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

610 user_key=db.normalize_key(skel["key"]), 

611 name=skel["name"]) 

612 skel.skey = BaseBone(descr="Skey") 

613 skel["skey"] = skey 

614 email.send_email(dests=[skel["name"]], tpl=self.verifyEmailAddressMail, skel=skel) 

615 

616 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

617 return self._user_module.render.addSuccess(skel) 

618 

619 

620class GoogleAccount(UserPrimaryAuthentication): 

621 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

622 NAME = "Google Account" 

623 

624 @classmethod 

625 def patch_user_skel(cls, skel_cls): 

626 """ 

627 Modifies the UserSkel to be equipped by a bones required by Google Auth 

628 """ 

629 skel_cls.uid = StringBone( 

630 descr="Google UserID", 

631 required=False, 

632 readOnly=True, 

633 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

634 params={ 

635 "category": "Authentication", 

636 } 

637 ) 

638 

639 skel_cls.sync = BooleanBone( 

640 descr="Sync user data with OAuth-based services", 

641 defaultValue=True, 

642 params={ 

643 "category": "Authentication", 

644 "tooltip": 

645 "If set, user data like firstname and lastname is automatically kept" 

646 "synchronous with the information stored at the OAuth service provider" 

647 "(e.g. Google Login)." 

648 } 

649 ) 

650 

651 @exposed 

652 @force_ssl 

653 @skey(allow_empty=True) 

654 def login(self, token: str | None = None, *args, **kwargs): 

655 if not conf.user.google_client_id: 

656 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

657 

658 if not token: 

659 request = current.request.get() 

660 request.response.headers["Content-Type"] = "text/html" 

661 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

662 # We have to allow popups here 

663 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

664 

665 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

666 with open(file_path) as file: 

667 tpl_string = file.read() 

668 

669 # FIXME: Use Jinja2 for rendering? 

670 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

671 extendCsp({ 

672 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

673 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

674 }) 

675 return tpl_string 

676 

677 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

678 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

679 raise ValueError("Invalid issuer") 

680 

681 # Token looks valid :) 

682 uid = user_info["sub"] 

683 email = user_info["email"] 

684 

685 base_skel = self._user_module.baseSkel() 

686 update = False 

687 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

688 # We'll try again - checking if there's already an user with that email 

689 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

690 # Still no luck - it's a completely new user 

691 if not self.registrationEnabled: 

692 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

693 logging.debug(f"Google user is from allowed {domain} - adding account") 

694 else: 

695 logging.debug(f"Google user is from {domain} - denying registration") 

696 raise errors.Forbidden("Registration for new users is disabled") 

697 

698 user_skel = base_skel 

699 user_skel["uid"] = uid 

700 user_skel["name"] = email 

701 update = True 

702 

703 # Take user information from Google, if wanted! 

704 if user_skel["sync"]: 

705 for target, source in { 

706 "name": email, 

707 "firstname": user_info.get("given_name"), 

708 "lastname": user_info.get("family_name"), 

709 }.items(): 

710 

711 if user_skel[target] != source: 

712 user_skel[target] = source 

713 update = True 

714 

715 if update: 

716 assert user_skel.write() 

717 

718 return self.next_or_finish(user_skel) 

719 

720 

721class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

722 """Abstract class for all second factors.""" 

723 MAX_RETRY = 3 

724 second_factor_login_template = "user_login_secondfactor" 

725 """Template to enter the TOPT on login""" 

726 

727 @property 

728 @abc.abstractmethod 

729 def NAME(self) -> str: 

730 """Name for this factor for templates.""" 

731 ... 

732 

733 @property 

734 @abc.abstractmethod 

735 def ACTION_NAME(self) -> str: 

736 """The action name for this factor, used as path-segment.""" 

737 ... 

738 

739 def __init__(self, moduleName, modulePath, _user_module): 

740 super().__init__(moduleName, modulePath, _user_module) 

741 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

742 self.add_url = f"{self.modulePath}/add" 

743 self.start_url = f"{self.modulePath}/start" 

744 

745 

746class TimeBasedOTP(UserSecondFactorAuthentication): 

747 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

748 WINDOW_SIZE = 5 

749 ACTION_NAME = "otp" 

750 NAME = "Time-based OTP" 

751 second_factor_login_template = "user_login_secondfactor" 

752 

753 @dataclasses.dataclass 

754 class OtpConfig: 

755 """ 

756 This dataclass is used to provide an interface for a OTP token 

757 algorithm description that is passed within the TimeBasedOTP 

758 class for configuration. 

759 """ 

760 secret: str 

761 timedrift: float = 0.0 

762 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

763 interval: int = 60 

764 

765 class OtpSkel(skeleton.RelSkel): 

766 """ 

767 This is the Skeleton used to ask for the OTP token. 

768 """ 

769 otptoken = NumericBone( 

770 descr="Token", 

771 required=True, 

772 max=999999, 

773 min=0, 

774 ) 

775 

776 @classmethod 

777 def patch_user_skel(cls, skel_cls): 

778 """ 

779 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

780 """ 

781 # One-Time Password Verification 

782 skel_cls.otp_serial = StringBone( 

783 descr="OTP serial", 

784 searchable=True, 

785 params={ 

786 "category": "Second Factor Authentication", 

787 } 

788 ) 

789 

790 skel_cls.otp_secret = CredentialBone( 

791 descr="OTP secret", 

792 params={ 

793 "category": "Second Factor Authentication", 

794 } 

795 ) 

796 

797 skel_cls.otp_timedrift = NumericBone( 

798 descr="OTP time drift", 

799 readOnly=True, 

800 defaultValue=0, 

801 precision=1, 

802 params={ 

803 "category": "Second Factor Authentication", 

804 } 

805 ) 

806 

807 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

808 """ 

809 Returns an instance of self.OtpConfig with a provided token configuration, 

810 or None when there is no appropriate configuration of this second factor handler available. 

811 """ 

812 

813 if otp_secret := skel.dbEntity.get("otp_secret"): 

814 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

815 

816 return None 

817 

818 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

819 """ 

820 Specified whether the second factor authentication can be handled by the given user or not. 

821 """ 

822 return bool(self.get_config(skel)) 

823 

824 @exposed 

825 def start(self): 

826 """ 

827 Configures OTP login for the current session. 

828 

829 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

830 """ 

831 session = current.session.get() 

832 

833 if not (user_key := session.get("possible_user_key")): 

834 raise errors.PreconditionFailed( 

835 "Second factor can only be triggered after successful primary authentication." 

836 ) 

837 

838 user_skel = self._user_module.baseSkel() 

839 if not user_skel.read(user_key): 

840 raise errors.NotFound("The previously authenticated user is gone.") 

841 

842 if not (otp_user_conf := self.get_config(user_skel)): 

843 raise errors.PreconditionFailed("This second factor is not available for the user") 

844 

845 otp_user_conf = { 

846 "key": str(user_key), 

847 } | dataclasses.asdict(otp_user_conf) 

848 

849 session = current.session.get() 

850 session["_otp_user"] = otp_user_conf 

851 session.markChanged() 

852 

853 return self._user_module.render.edit( 

854 self.OtpSkel(), 

855 params={ 

856 "name": i18n.translate( 

857 f"viur.core.modules.user.{self.ACTION_NAME}", 

858 default_variables={"name": self.NAME}, 

859 ), 

860 "action_name": self.ACTION_NAME, 

861 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

862 }, 

863 tpl=self.second_factor_login_template 

864 ) 

865 

866 @exposed 

867 @force_ssl 

868 @skey(allow_empty=True) 

869 def otp(self, *args, **kwargs): 

870 """ 

871 Performs the second factor validation and interaction with the client. 

872 """ 

873 session = current.session.get() 

874 if not (otp_user_conf := session.get("_otp_user")): 

875 raise errors.PreconditionFailed("No OTP process started in this session") 

876 

877 # Check if maximum second factor verification attempts 

878 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

879 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

880 

881 # Read the OTP token via the skeleton, to obtain a valid value 

882 skel = self.OtpSkel() 

883 if skel.fromClient(kwargs): 

884 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

885 res = self.verify( 

886 otp=skel["otptoken"], 

887 secret=otp_user_conf["secret"], 

888 algorithm=otp_user_conf.get("algorithm") or "sha1", 

889 interval=otp_user_conf.get("interval") or 60, 

890 timedrift=otp_user_conf.get("timedrift") or 0.0, 

891 valid_window=self.WINDOW_SIZE 

892 ) 

893 else: 

894 res = None 

895 

896 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

897 if res is None: 

898 otp_user_conf["attempts"] = attempts + 1 

899 session.markChanged() 

900 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

901 return self._user_module.render.edit( 

902 skel, 

903 name=i18n.translate( 

904 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

905 default_variables={"name": self.NAME}, 

906 ), 

907 action_name=self.ACTION_NAME, 

908 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

909 tpl=self.second_factor_login_template 

910 ) 

911 

912 # Remove otp user config from session 

913 user_key = db.key_helper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

914 del session["_otp_user"] 

915 session.markChanged() 

916 

917 # Check if the OTP device has a time drift 

918 

919 timedriftchange = float(res) - otp_user_conf["timedrift"] 

920 if abs(timedriftchange) > 2: 

921 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

922 # update clock-drift value accordingly 

923 self.updateTimeDrift(user_key, timedriftchange) 

924 

925 # Continue with authentication 

926 return self._user_module.secondFactorSucceeded(self, user_key) 

927 

928 @staticmethod 

929 def verify( 

930 otp: str | int, 

931 secret: str, 

932 algorithm: str = "sha1", 

933 interval: int = 60, 

934 timedrift: float = 0.0, 

935 for_time: datetime.datetime | None = None, 

936 valid_window: int = 0, 

937 ) -> int | None: 

938 """ 

939 Verifies the OTP passed in against the current time OTP. 

940 

941 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

942 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

943 on the hardware token generator. This can be used to store the time drift of a given token generator. 

944 

945 :param otp: the OTP token to check against 

946 :param secret: The OTP secret 

947 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

948 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). 

949 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

950 :param for_time: Time to check OTP at (defaults to now) 

951 :param valid_window: extends the validity to this many counter ticks before and after the current one 

952 

953 :returns: The index where verification succeeded, None otherwise 

954 """ 

955 # get the hashing digest 

956 digest = { 

957 "sha1": hashlib.sha1, 

958 "sha256": hashlib.sha256, 

959 }.get(algorithm) 

960 

961 if not digest: 

962 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

963 

964 if for_time is None: 

965 for_time = datetime.datetime.now() 

966 

967 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

968 timedrift = round(timedrift) 

969 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

970 otp = str(otp).zfill(6) # fill with zeros in front 

971 

972 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

973 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

974 

975 if valid_window: 

976 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

977 token = str(totp.at(for_time, offset)) 

978 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

979 if hmac.compare_digest(otp, token): 

980 return offset 

981 

982 return None 

983 

984 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

985 

986 # FIXME: VIUR4 rename 

987 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

988 """ 

989 Updates the clock-drift value. 

990 

991 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

992 it out of bounds. Maximum change per call is 0.3 minutes. 

993 

994 :param user_key: For which user should the update occour 

995 :param idx: How many steps before/behind was that token 

996 """ 

997 if user_skel := self._user_module.skel().read(user_key): 

998 if otp_skel := self._get_otptoken(user_skel): 

999 otp_skel.patch( 

1000 { 

1001 "+otp_timedrift": min(max(0.1 * idx, -0.3), 0.3) 

1002 }, 

1003 update_relations=False, 

1004 ) 

1005 

1006 

1007class AuthenticatorOTP(UserSecondFactorAuthentication): 

1008 """ 

1009 This class handles the second factor for apps like authy and so on 

1010 """ 

1011 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

1012 

1013 # second_factor_add_template = "user_secondfactor_add" 

1014 # """Template to configure (add) a new TOPT""" 

1015 

1016 ACTION_NAME = "authenticator_otp" 

1017 """Action name provided for *otp_template* on login""" 

1018 

1019 NAME = "Authenticator App" 

1020 

1021 # FIXME: The second factor add has to be rewritten entirely to ActionSkel paradigm. 

1022 ''' 

1023 @exposed 

1024 @force_ssl 

1025 @skey(allow_empty=True) 

1026 def add(self, otp=None): 

1027 """ 

1028 We try to read the otp_app_secret from the current session. When this fails we generate a new one and store 

1029 it in the session. 

1030 

1031 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

1032 the otp_app_secret from the session in the user entry. 

1033 """ 

1034 current_session = current.session.get() 

1035 

1036 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

1037 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

1038 current_session["_maybe_otp_app_secret"] = otp_app_secret 

1039 current_session.markChanged() 

1040 

1041 if otp is None: 

1042 return self._user_module.render.second_factor_add( 

1043 tpl=self.second_factor_add_template, 

1044 action_name=self.ACTION_NAME, 

1045 name=i18n.translate( 

1046 f"viur.core.modules.user.auth{self.ACTION_NAME}", 

1047 default_variables={"name": self.NAME}, 

1048 ), 

1049 add_url=self.add_url, 

1050 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

1051 else: 

1052 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

1053 return self._user_module.render.second_factor_add( 

1054 tpl=self.second_factor_add_template, 

1055 action_name=self.ACTION_NAME, 

1056 name=i18n.translate( 

1057 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1058 default_variables={"name": self.NAME}, 

1059 ), 

1060 add_url=self.add_url, 

1061 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

1062 

1063 # Now we can set the otp_app_secret to the current User and render der Success-template 

1064 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

1065 return self._user_module.render.second_factor_add_success( 

1066 action_name=self.ACTION_NAME, 

1067 name=i18n.translate( 

1068 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1069 default_variables={"name": self.NAME}, 

1070 ), 

1071 ) 

1072 ''' 

1073 

1074 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1075 """ 

1076 We can only handle the second factor if we have stored an otp_app_secret before. 

1077 """ 

1078 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1079 

1080 @classmethod 

1081 def patch_user_skel(cls, skel_cls): 

1082 """ 

1083 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1084 """ 

1085 # Authenticator OTP Apps (like Authy) 

1086 skel_cls.otp_app_secret = CredentialBone( 

1087 descr="OTP Secret (App-Key)", 

1088 params={ 

1089 "category": "Second Factor Authentication", 

1090 } 

1091 ) 

1092 

1093 @classmethod 

1094 def set_otp_app_secret(cls, otp_app_secret=None): 

1095 """ 

1096 Write a new OTP Token in the current user entry. 

1097 """ 

1098 if otp_app_secret is None: 

1099 logging.error("No 'otp_app_secret' is provided") 

1100 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1101 if not (cuser := current.user.get()): 

1102 raise errors.Unauthorized() 

1103 

1104 def transaction(user_key): 

1105 if not (user := db.get(user_key)): 

1106 raise errors.NotFound() 

1107 user["otp_app_secret"] = otp_app_secret 

1108 db.put(user) 

1109 

1110 db.run_in_transaction(transaction, cuser["key"]) 

1111 

1112 @classmethod 

1113 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1114 """ 

1115 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1116 """ 

1117 if not (cuser := current.user.get()): 

1118 raise errors.Unauthorized() 

1119 if not (issuer := conf.user.otp_issuer): 

1120 logging.warning( 

1121 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1122 issuer = conf.instance.project_id 

1123 

1124 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1125 

1126 @classmethod 

1127 def generate_otp_app_secret(cls) -> str: 

1128 """ 

1129 Generate a new OTP Secret 

1130 :return an otp 

1131 """ 

1132 return pyotp.random_base32() 

1133 

1134 @classmethod 

1135 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1136 return pyotp.TOTP(secret).verify(otp) 

1137 

1138 @exposed 

1139 def start(self): 

1140 otp_user_conf = {"attempts": 0} 

1141 session = current.session.get() 

1142 session["_otp_user"] = otp_user_conf 

1143 session.markChanged() 

1144 return self._user_module.render.edit( 

1145 TimeBasedOTP.OtpSkel(), 

1146 params={ 

1147 "name": i18n.translate( 

1148 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1149 default_variables={"name": self.NAME}, 

1150 ), 

1151 "action_name": self.ACTION_NAME, 

1152 "action_url": self.action_url, 

1153 }, 

1154 tpl=self.second_factor_login_template, 

1155 ) 

1156 

1157 @exposed 

1158 @force_ssl 

1159 @skey 

1160 def authenticator_otp(self, **kwargs): 

1161 """ 

1162 We verify the otp here with the secret we stored before. 

1163 """ 

1164 session = current.session.get() 

1165 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1166 

1167 if not (otp_user_conf := session.get("_otp_user")): 

1168 raise errors.PreconditionFailed("No OTP process started in this session") 

1169 

1170 # Check if maximum second factor verification attempts 

1171 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1172 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1173 

1174 if not (user := db.get(user_key)): 

1175 raise errors.NotFound() 

1176 

1177 skel = TimeBasedOTP.OtpSkel() 

1178 if not skel.fromClient(kwargs): 

1179 raise errors.PreconditionFailed() 

1180 otp_token = str(skel["otptoken"]).zfill(6) 

1181 

1182 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1183 return self._user_module.secondFactorSucceeded(self, user_key) 

1184 otp_user_conf["attempts"] = attempts + 1 

1185 session.markChanged() 

1186 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1187 return self._user_module.render.edit( 

1188 skel, 

1189 name=i18n.translate( 

1190 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1191 default_variables={"name": self.NAME}, 

1192 ), 

1193 action_name=self.ACTION_NAME, 

1194 action_url=self.action_url, 

1195 tpl=self.second_factor_login_template, 

1196 ) 

1197 

1198 

1199class User(List): 

1200 """ 

1201 The User module is used to manage and authenticate users in a ViUR system. 

1202 

1203 It is used in almost any ViUR project, but ViUR can also function without any user capabilites. 

1204 """ 

1205 

1206 kindName = "user" 

1207 addTemplate = "user_add" 

1208 addSuccessTemplate = "user_add_success" 

1209 

1210 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1211 None, ( 

1212 UserPassword, 

1213 conf.user.google_client_id and GoogleAccount, 

1214 ) 

1215 )) 

1216 """ 

1217 Specifies primary authentication providers that are made available 

1218 as sub-modules under `user/auth_<classname>`. They might require 

1219 customization or configuration. 

1220 """ 

1221 

1222 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1223 TimeBasedOTP, 

1224 AuthenticatorOTP, 

1225 ) 

1226 """ 

1227 Specifies secondary authentication providers that are made available 

1228 as sub-modules under `user/f2_<classname>`. They might require 

1229 customization or configuration, which is determined during the 

1230 login-process depending on the user that wants to login. 

1231 """ 

1232 

1233 validAuthenticationMethods = tuple(filter( 

1234 None, ( 

1235 (UserPassword, AuthenticatorOTP), 

1236 (UserPassword, TimeBasedOTP), 

1237 (UserPassword, None), 

1238 (GoogleAccount, None) if conf.user.google_client_id else None, 

1239 ) 

1240 )) 

1241 """ 

1242 Specifies the possible combinations of primary- and secondary factor 

1243 login methos. 

1244 

1245 GoogleLogin defaults to no second factor, as the Google Account can be 

1246 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1247 handled when there is a user-dependent configuration available. 

1248 """ 

1249 

1250 msg_missing_second_factor = "Second factor required but not configured for this user." 

1251 

1252 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1253 

1254 default_order = "name.idx" 

1255 

1256 roles = { 

1257 "admin": "*", 

1258 } 

1259 

1260 def __init__(self, moduleName, modulePath): 

1261 for provider in self.authenticationProviders: 

1262 assert issubclass(provider, UserPrimaryAuthentication) 

1263 name = f"auth_{provider.__name__.lower()}" 

1264 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1265 

1266 for provider in self.secondFactorProviders: 

1267 assert issubclass(provider, UserSecondFactorAuthentication) 

1268 name = f"f2_{provider.__name__.lower()}" 

1269 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1270 

1271 super().__init__(moduleName, modulePath) 

1272 

1273 def adminInfo(self): 

1274 ret = { 

1275 "icon": "person-fill", 

1276 } 

1277 

1278 if self.is_admin(current.user.get()): 

1279 ret |= { 

1280 "actions": [ 

1281 "trigger_kick", 

1282 "trigger_takeover", 

1283 ], 

1284 "customActions": { 

1285 "trigger_kick": { 

1286 "name": i18n.translate( 

1287 key="viur.core.modules.user.customActions.kick", 

1288 defaultText="Kick user", 

1289 hint="Title of the kick user function" 

1290 ), 

1291 "icon": "trash2-fill", 

1292 "action": "fetch", 

1293 "url": "/vi/{{module}}/trigger/kick/{{key}}", 

1294 "confirm": i18n.translate( 

1295 key="viur.core.modules.user.customActions.kick.confirm", 

1296 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1297 ), 

1298 "success": i18n.translate( 

1299 key="viur.core.modules.user.customActions.kick.success", 

1300 defaultText="Sessions of the user are being invalidated.", 

1301 ), 

1302 }, 

1303 "trigger_takeover": { 

1304 "name": i18n.translate( 

1305 key="viur.core.modules.user.customActions.takeover", 

1306 defaultText="Take-over user", 

1307 hint="Title of the take user over function" 

1308 ), 

1309 "icon": "file-person-fill", 

1310 "action": "fetch", 

1311 "url": "/vi/{{module}}/trigger/takeover/{{key}}", 

1312 "confirm": i18n.translate( 

1313 key="viur.core.modules.user.customActions.takeover.confirm", 

1314 defaultText="Do you really want to replace your current user session by a " 

1315 "user session of the selected user?", 

1316 ), 

1317 "success": i18n.translate( 

1318 key="viur.core.modules.user.customActions.takeover.success", 

1319 defaultText="You're now know as the selected user!", 

1320 ), 

1321 "then": "reload-vi", 

1322 }, 

1323 }, 

1324 } 

1325 

1326 return ret 

1327 

1328 def get_role_defaults(self, role: str) -> set[str]: 

1329 """ 

1330 Returns a set of default access rights for a given role. 

1331 

1332 Defaults to "admin" usage for any role > "user" 

1333 and "scriptor" usage for "admin" role. 

1334 """ 

1335 ret = set() 

1336 

1337 if role in ("viewer", "editor", "admin"): 

1338 ret.add("admin") 

1339 

1340 if role == "admin": 

1341 ret.add("scriptor") 

1342 

1343 return ret 

1344 

1345 def addSkel(self): 

1346 skel = super().addSkel().clone() 

1347 

1348 if self.is_admin(current.user.get()): 

1349 # An admin tries to add a new user. 

1350 skel.status.readOnly = False 

1351 skel.status.visible = True 

1352 skel.access.readOnly = False 

1353 skel.access.visible = True 

1354 

1355 else: 

1356 skel.status.readOnly = True 

1357 skel["status"] = Status.UNSET 

1358 skel.status.visible = False 

1359 skel.access.readOnly = True 

1360 skel["access"] = [] 

1361 skel.access.visible = False 

1362 

1363 if "password" in skel: 

1364 # Unlock and require a password 

1365 skel.password.required = True 

1366 skel.password.visible = True 

1367 skel.password.readOnly = False 

1368 

1369 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1370 return skel 

1371 

1372 def editSkel(self, *args, **kwargs): 

1373 skel = super().editSkel().clone() 

1374 

1375 if "password" in skel: 

1376 skel.password.required = False 

1377 skel.password.visible = True 

1378 skel.password.readOnly = False 

1379 

1380 lock = not self.is_admin(current.user.get()) 

1381 skel.name.readOnly = lock 

1382 skel.access.readOnly = lock 

1383 skel.status.readOnly = lock 

1384 

1385 return skel 

1386 

1387 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1388 return getattr(self, f"f2_{cls.__name__.lower()}") 

1389 

1390 def getCurrentUser(self): 

1391 session = current.session.get() 

1392 

1393 req = current.request.get() 

1394 if session and (session.loaded or req.is_deferred) and (user := session.get("user")): 

1395 skel = self.baseSkel() 

1396 skel.setEntity(user) 

1397 return skel 

1398 

1399 return None 

1400 

1401 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1402 """ 

1403 Continue authentication flow when primary authentication succeeded. 

1404 """ 

1405 skel = self.baseSkel() 

1406 

1407 if not skel.read(user_key): 

1408 raise errors.NotFound("User was not found.") 

1409 

1410 if not provider.can_handle(skel): 

1411 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1412 

1413 session = current.session.get() 

1414 session["possible_user_key"] = user_key.id_or_name 

1415 session["_secondFactorStart"] = utils.utcNow() 

1416 session.markChanged() 

1417 

1418 second_factor_providers = [] 

1419 

1420 for auth_provider, second_factor in self.validAuthenticationMethods: 

1421 if isinstance(provider, auth_provider): 

1422 if second_factor is not None: 

1423 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1424 if second_factor_provider_instance.can_handle(skel): 

1425 second_factor_providers.append(second_factor_provider_instance) 

1426 else: 

1427 second_factor_providers.append(None) 

1428 

1429 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1430 # We have a second factor. So we can get rid of the None 

1431 second_factor_providers.pop(second_factor_providers.index(None)) 

1432 

1433 if len(second_factor_providers) == 0: 

1434 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1435 elif len(second_factor_providers) == 1: 

1436 if second_factor_providers[0] is None: 

1437 # We allow sign-in without a second factor 

1438 return self.authenticateUser(user_key) 

1439 # We have only one second factor we don't need the choice template 

1440 return second_factor_providers[0].start(user_key) 

1441 

1442 # In case there is more than one second factor provider remaining, let the user decide! 

1443 current.session.get()["_secondfactor_providers"] = { 

1444 second_factor.start_url: second_factor.NAME 

1445 for second_factor in second_factor_providers 

1446 if second_factor.VISIBLE 

1447 } 

1448 

1449 return self.select_secondfactor_provider() 

1450 

1451 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1452 """ 

1453 Continue authentication flow when secondary authentication succeeded. 

1454 """ 

1455 session = current.session.get() 

1456 if session["possible_user_key"] != user_key.id_or_name: 

1457 raise errors.Forbidden() 

1458 

1459 # Assert that the second factor verification finished in time 

1460 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1461 raise errors.RequestTimeout() 

1462 

1463 return self.authenticateUser(user_key) 

1464 

1465 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1466 """ 

1467 Hookable check if a user is defined as "active" and can login. 

1468 

1469 :param skel: The UserSkel of the user who wants to login. 

1470 :returns: Returns True or False when the result is unambigous and the user is active or not. \ 

1471 Returns None when the provided skel doesn't provide enough information for determination. 

1472 """ 

1473 if skel and "status" in skel: 

1474 status = skel["status"] 

1475 if not isinstance(status, (Status, int)): 

1476 try: 

1477 status = int(status) 

1478 except ValueError: 

1479 status = Status.UNSET 

1480 

1481 return status >= Status.ACTIVE.value 

1482 

1483 return None 

1484 

1485 def is_admin(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1486 """ 

1487 Hookable check if a user is defined as "admin" and can edit or log into other users. 

1488 Defaults to "root" users only. 

1489 

1490 :param skel: The UserSkel of the user who wants should be checked for user admin privileges. 

1491 :returns: Returns True or False when the result is unambigous and the user is admin or not. \ 

1492 Returns None when the provided skel doesn't provide enough information for determination. 

1493 """ 

1494 if skel and "access" in skel: 

1495 return "root" in skel["access"] 

1496 

1497 return None 

1498 

1499 def authenticateUser(self, key: db.Key, **kwargs): 

1500 """ 

1501 Performs Log-In for the current session and the given user key. 

1502 

1503 This resets the current session: All fields not explicitly marked as persistent 

1504 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1505 

1506 :param key: The (DB-)Key of the user we shall authenticate 

1507 """ 

1508 skel = self.baseSkel() 

1509 if not skel.read(key): 

1510 raise ValueError(f"Unable to authenticate unknown user {key}") 

1511 

1512 # Verify that this user account is active 

1513 if not self.is_active(skel): 

1514 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1515 

1516 # Update session for user 

1517 session = current.session.get() 

1518 # Remember persistent fields... 

1519 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1520 session.reset() 

1521 # and copy them over to the new session 

1522 session |= take_over 

1523 

1524 # Update session, user and request 

1525 session["user"] = skel.dbEntity 

1526 

1527 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1528 current.user.set(self.getCurrentUser()) 

1529 

1530 self.onLogin(skel) 

1531 

1532 return self.render.render("login_success", skel, **kwargs) 

1533 

1534 # Action for primary authentication selection 

1535 

1536 def SelectAuthenticationProviderSkel(self): 

1537 providers = {} 

1538 first = None 

1539 for provider in self.authenticationProviders: 

1540 if not provider.VISIBLE: 

1541 continue 

1542 

1543 provider = getattr(self, f"auth_{provider.__name__.lower()}") 

1544 providers[provider.start_url] = provider.NAME 

1545 

1546 if first is None: 

1547 first = provider.start_url 

1548 

1549 class SelectAuthenticationProviderSkel(skeleton.RelSkel): 

1550 provider = SelectBone( 

1551 descr="Authentication method", 

1552 required=True, 

1553 values=providers, 

1554 defaultValue=first, 

1555 ) 

1556 

1557 return SelectAuthenticationProviderSkel() 

1558 

1559 @exposed 

1560 def select_authentication_provider(self, **kwargs): 

1561 skel = self.SelectAuthenticationProviderSkel() 

1562 

1563 # Read required bones from client 

1564 if len(skel.provider.values) > 1 and (not kwargs or not skel.fromClient(kwargs)): 

1565 return self.render.render("select_authentication_provider", skel) 

1566 

1567 return self.render.render("select_authentication_provider_success", skel, next_url=skel["provider"]) 

1568 

1569 # Action for second factor select 

1570 

1571 class SelectSecondFactorProviderSkel(skeleton.RelSkel): 

1572 provider = SelectBone( 

1573 descr="Second factor", 

1574 required=True, 

1575 values=lambda: current.session.get()["_secondfactor_providers"] or (), 

1576 ) 

1577 

1578 @exposed 

1579 def select_secondfactor_provider(self, **kwargs): 

1580 skel = self.SelectSecondFactorProviderSkel() 

1581 

1582 # Read required bones from client 

1583 if not kwargs or not skel.fromClient(kwargs): 

1584 return self.render.render("select_secondfactor_provider", skel) 

1585 

1586 del current.session.get()["_secondfactor_providers"] 

1587 

1588 return self.render.render("select_secondfactor_provider_success", skel, next_url=skel["provider"]) 

1589 

1590 @exposed 

1591 @skey 

1592 def logout(self, **kwargs): 

1593 """ 

1594 Implements the logout action. It also terminates the current session (all keys not listed 

1595 in viur.session_persistent_fields_on_logout will be lost). 

1596 """ 

1597 if not (user := current.user.get()): 

1598 raise errors.Unauthorized() 

1599 

1600 self.onLogout(user) 

1601 

1602 session = current.session.get() 

1603 

1604 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}: 

1605 session.reset() 

1606 session |= take_over 

1607 else: 

1608 session.clear() 

1609 

1610 current.user.set(None) # set user to none in context var 

1611 

1612 return self.render.render("logout_success") 

1613 

1614 @exposed 

1615 def login(self, *args, **kwargs): 

1616 return self.select_authentication_provider() 

1617 

1618 def onLogin(self, skel: skeleton.SkeletonInstance): 

1619 """ 

1620 Hook to be called on user login. 

1621 """ 

1622 # Update the lastlogin timestamp (if available!) 

1623 if "lastlogin" in skel: 

1624 now = utils.utcNow() 

1625 

1626 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1627 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1628 skel["lastlogin"] = now 

1629 skel.write(update_relations=False) 

1630 

1631 logging.info(f"""User {skel["name"]} logged in""") 

1632 

1633 def onLogout(self, skel: skeleton.SkeletonInstance): 

1634 """ 

1635 Hook to be called on user logout. 

1636 """ 

1637 logging.info(f"""User {skel["name"]} logged out""") 

1638 

1639 @exposed 

1640 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1641 """ 

1642 Allow a special key "self" to reference the current user. 

1643 

1644 By default, any authenticated user can view its own user entry, 

1645 to obtain access rights and any specific user information. 

1646 This behavior is defined in the customized `canView` function, 

1647 which is overwritten by the User-module. 

1648 

1649 The rendered skeleton can be modified or restriced by specifying 

1650 a customized view-skeleton. 

1651 """ 

1652 if key == "self": 

1653 if user := current.user.get(): 

1654 key = user["key"] 

1655 else: 

1656 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1657 

1658 return super().view(key, *args, **kwargs) 

1659 

1660 def canView(self, skel) -> bool: 

1661 if user := current.user.get(): 

1662 if skel["key"] == user["key"]: 

1663 return True 

1664 

1665 if self.is_admin(user) or "user-view" in user["access"]: 

1666 return True 

1667 

1668 return False 

1669 

1670 @exposed 

1671 @skey(allow_empty=True) 

1672 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1673 """ 

1674 Allow a special key "self" to reference the current user. 

1675 

1676 This modification will only allow to use "self" as a key; 

1677 The specific access right to let the user edit itself must 

1678 still be customized. 

1679 

1680 The rendered and editable skeleton can be modified or restriced 

1681 by specifying a customized edit-skeleton. 

1682 """ 

1683 if key == "self": 

1684 if user := current.user.get(): 

1685 key = user["key"] 

1686 else: 

1687 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1688 

1689 return super().edit(key, *args, **kwargs) 

1690 

1691 @exposed 

1692 def getAuthMethods(self, *args, **kwargs): 

1693 """Legacy method prior < viur-core 3.8: Inform tools like Admin which authentication to use""" 

1694 logging.warning("DEPRECATED!!! Use '/user/login'-method for this, or update your admin version!") 

1695 

1696 res = [ 

1697 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1698 for primary, secondary in self.validAuthenticationMethods 

1699 ] 

1700 

1701 return json.dumps(res) 

1702 

1703 @exposed 

1704 def trigger(self, action: str, key: str): 

1705 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1706 access = self.adminInfo().get("customActions", {}).get(f"trigger_{action}", {}).get("access") or () 

1707 if not ( 

1708 (cuser := current.user.get()) 

1709 and ( 

1710 any(role in cuser["access"] for role in access) 

1711 or self.is_admin(cuser) 

1712 ) 

1713 ): 

1714 raise errors.Unauthorized() 

1715 

1716 skel = self.skel() 

1717 if not skel.read(key) and not (skel := skel.all().mergeExternalFilter({"name": key}).getSkel()): 

1718 raise errors.NotFound("The provided user does not exist.") 

1719 

1720 match action: 

1721 case "takeover": 

1722 self.authenticateUser(skel["key"]) 

1723 

1724 case "kick": 

1725 session.killSessionByUser(skel["key"]) 

1726 

1727 case _: 

1728 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1729 

1730 return self.render.render(f"trigger/{action}Success", skel) 

1731 

1732 def onEdited(self, skel): 

1733 super().onEdited(skel) 

1734 

1735 # In case the user is set to inactive, kill all sessions 

1736 if self.is_active(skel) is False: 

1737 session.killSessionByUser(skel["key"]) 

1738 

1739 # Update user setting in all sessions 

1740 for session_obj in db.Query("user").filter("user =", skel["key"]).iter(): 

1741 session_obj["data"]["user"] = skel.dbEntity 

1742 

1743 def onDeleted(self, skel): 

1744 super().onDeleted(skel) 

1745 # Invalidate all sessions of that user 

1746 session.killSessionByUser(skel["key"]) 

1747 

1748 

1749@tasks.StartupTask 

1750def createNewUserIfNotExists(): 

1751 """ 

1752 Create a new Admin user, if the userDB is empty 

1753 """ 

1754 if ( 

1755 (user_module := getattr(conf.main_app.vi, "user", None)) 

1756 and isinstance(user_module, User) 

1757 and "addSkel" in dir(user_module) 

1758 and "validAuthenticationMethods" in dir(user_module) 

1759 # UserPassword must be one of the primary login methods 

1760 and any( 

1761 issubclass(provider[0], UserPassword) 

1762 for provider in user_module.validAuthenticationMethods 

1763 ) 

1764 ): 

1765 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1766 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1767 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1768 pw = utils.string.random(13) 

1769 addSkel["name"] = uname 

1770 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1771 addSkel["access"] = ["root"] 

1772 addSkel["password"] = pw 

1773 

1774 try: 

1775 addSkel.write() 

1776 except Exception as e: 

1777 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1778 logging.exception(e) 

1779 return 

1780 

1781 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1782 

1783 logging.warning(msg) 

1784 email.send_email_to_admins("New ViUR password", msg) 

1785 

1786 

1787# DEPRECATED ATTRIBUTES HANDLING 

1788 

1789def __getattr__(attr): 

1790 match attr: 

1791 case "userSkel": 

1792 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1793 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1794 logging.warning(msg) 

1795 return UserSkel 

1796 

1797 return super(__import__(__name__).__class__).__getattr__(attr)