Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / modules / user.py: 0%

733 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 14:23 +0000

1import abc 

2import datetime 

3import enum 

4import functools 

5import hashlib 

6import hmac 

7import json 

8import logging 

9import secrets 

10import warnings 

11import user_agents 

12 

13import pyotp 

14import base64 

15import dataclasses 

16import typing as t 

17from google.auth.transport import requests 

18from google.oauth2 import id_token 

19 

20from viur.core import ( 

21 conf, current, db, email, errors, i18n, 

22 securitykey, session, skeleton, tasks, utils, Module 

23) 

24from viur.core.decorators import * 

25from viur.core.bones import * 

26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

27from viur.core.prototypes.list import List 

28from viur.core.ratelimit import RateLimit 

29from viur.core.securityheaders import extendCsp 

30 

31 

32@functools.total_ordering 

33class Status(enum.Enum): 

34 """Status enum for a user 

35 

36 Has backwards compatibility to be comparable with non-enum values. 

37 Will be removed with viur-core 4.0.0 

38 """ 

39 

40 UNSET = 0 # Status is unset 

41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

43 DISABLED = 5 # Account disabled 

44 ACTIVE = 10 # Active 

45 

46 def __eq__(self, other): 

47 if isinstance(other, Status): 

48 return super().__eq__(other) 

49 return self.value == other 

50 

51 def __lt__(self, other): 

52 if isinstance(other, Status): 

53 return super().__lt__(other) 

54 return self.value < other 

55 

56 

57class UserSkel(skeleton.Skeleton): 

58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

59 

60 name = EmailBone( 

61 descr="E-Mail", 

62 required=True, 

63 readOnly=True, 

64 caseSensitive=False, 

65 searchable=True, 

66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

67 ) 

68 

69 firstname = StringBone( 

70 descr="Firstname", 

71 searchable=True, 

72 ) 

73 

74 lastname = StringBone( 

75 descr="Lastname", 

76 searchable=True, 

77 ) 

78 

79 roles = SelectBone( 

80 descr=i18n.translate("viur.core.modules.user.bone.roles", defaultText="Roles"), 

81 values=conf.user.roles, 

82 required=True, 

83 multiple=True, 

84 # fixme: This is generally broken in VIUR! See #776 for details. 

85 # vfunc=lambda values: 

86 # i18n.translate( 

87 # "user.bone.roles.invalid", 

88 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

89 # if "custom" in values and len(values) > 1 else None, 

90 defaultValue=list(conf.user.roles.keys())[:1], 

91 ) 

92 

93 access = SelectBone( 

94 descr=i18n.translate("viur.core.modules.user.bone.access", defaultText="Access rights"), 

95 type_suffix="access", 

96 values=lambda: { 

97 right: i18n.translate(f"viur.core.modules.user.accessright.{right}", defaultText=right) 

98 for right in sorted(conf.user.access_rights) 

99 }, 

100 multiple=True, 

101 params={ 

102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

103 } 

104 ) 

105 

106 status = SelectBone( 

107 descr=i18n.translate("viur.core.modules.user.bone.status", "Account status"), 

108 values=Status, 

109 translation_key_prefix="viur.core.user.status.", 

110 defaultValue=Status.ACTIVE, 

111 required=True, 

112 ) 

113 

114 lastlogin = DateBone( 

115 descr="Last Login", 

116 readOnly=True, 

117 ) 

118 

119 admin_config = JsonBone( # This bone stores settings from the vi 

120 descr="Config for the User", 

121 visible=False 

122 ) 

123 

124 def __new__(cls, *args, **kwargs): 

125 """ 

126 Constructor for the UserSkel-class, with the capability 

127 to dynamically add bones required for the configured 

128 authentication methods. 

129 """ 

130 for provider in conf.main_app.vi.user.authenticationProviders: 

131 assert issubclass(provider, UserPrimaryAuthentication) 

132 provider.patch_user_skel(cls) 

133 

134 for provider in conf.main_app.vi.user.secondFactorProviders: 

135 assert issubclass(provider, UserSecondFactorAuthentication) 

136 provider.patch_user_skel(cls) 

137 

138 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

139 return super().__new__(cls, *args, **kwargs) 

140 

141 @classmethod 

142 def write(cls, skel, *args, **kwargs): 

143 # Roles 

144 if skel["roles"] and "custom" not in skel["roles"]: 

145 # Collect access rights through rules 

146 access = set() 

147 

148 for role in skel["roles"]: 

149 # Get default access for this role 

150 access |= conf.main_app.vi.user.get_role_defaults(role) 

151 

152 # Go through all modules and evaluate available role-settings 

153 for name in dir(conf.main_app.vi): 

154 if name.startswith("_"): 

155 continue 

156 

157 module = getattr(conf.main_app.vi, name) 

158 if not isinstance(module, Module): 

159 continue 

160 

161 roles = getattr(module, "roles", None) or {} 

162 rights = roles.get(role, roles.get("*", ())) 

163 

164 # Convert role into tuple if it's not 

165 if not isinstance(rights, (tuple, list)): 

166 rights = (rights, ) 

167 

168 if "*" in rights: 

169 for right in module.accessRights: 

170 access.add(f"{name}-{right}") 

171 else: 

172 for right in rights: 

173 if right in module.accessRights: 

174 access.add(f"{name}-{right}") 

175 

176 # special case: "edit" and "delete" actions require "view" as well! 

177 if right in ("edit", "delete") and "view" in module.accessRights: 

178 access.add(f"{name}-view") 

179 

180 skel["access"] = list(access) 

181 

182 return super().write(skel, *args, **kwargs) 

183 

184 

185class UserAuthentication(Module, abc.ABC): 

186 @property 

187 @abc.abstractstaticmethod 

188 def METHOD_NAME() -> str: 

189 """ 

190 Define a unique method name for this authentication. 

191 """ 

192 ... 

193 

194 @property 

195 @abc.abstractstaticmethod 

196 def NAME() -> str: 

197 """ 

198 Define a descriptive name for this authentication. 

199 """ 

200 ... 

201 

202 @property 

203 @staticmethod 

204 def VISIBLE(cls) -> bool: 

205 """ 

206 Defines if the authentication method is visible to the user. 

207 """ 

208 return True 

209 

210 def __init__(self, moduleName, modulePath, userModule): 

211 super().__init__(moduleName, modulePath) 

212 self._user_module = userModule 

213 self.start_url = f"{self.modulePath}/login" 

214 

215 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

216 return True 

217 

218 @classmethod 

219 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

220 """ 

221 Allows for an UserAuthentication to patch the UserSkel 

222 class with additional bones which are required for 

223 the implemented authentication method. 

224 """ 

225 ... 

226 

227 

228class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

229 """Abstract class for all primary authentication methods.""" 

230 registrationEnabled = False 

231 

232 @abc.abstractmethod 

233 def login(self, *args, **kwargs): 

234 ... 

235 

236 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

237 """ 

238 Hook that is called whenever a part of the authentication was successful. 

239 It allows to perform further steps in custom authentications, 

240 e.g. change a password after first login. 

241 """ 

242 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

243 

244 

245class UserPassword(UserPrimaryAuthentication): 

246 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

247 NAME = "Username & Password" 

248 

249 registrationEmailVerificationRequired = True 

250 registrationAdminVerificationRequired = True 

251 

252 verifySuccessTemplate = "user_verify_success" 

253 verifyEmailAddressMail = "user_verify_address" 

254 verifyFailedTemplate = "user_verify_failed" 

255 passwordRecoveryMail = "user_password_recovery" 

256 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

257 passwordRecoveryTemplate = "user_passwordrecover" 

258 

259 # The default rate-limit for password recovery (10 tries each 15 minutes) 

260 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

261 

262 # Limit (invalid) login-retries to once per 5 seconds 

263 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

264 

265 @classmethod 

266 def patch_user_skel(cls, skel_cls): 

267 """ 

268 Modifies the UserSkel to be equipped by a PasswordBone. 

269 """ 

270 skel_cls.password = PasswordBone( 

271 readOnly=True, 

272 visible=False, 

273 params={ 

274 "category": "Authentication", 

275 } 

276 ) 

277 

278 class LoginSkel(skeleton.RelSkel): 

279 name = EmailBone( 

280 descr="E-Mail", 

281 required=True, 

282 caseSensitive=False, 

283 ) 

284 password = PasswordBone( 

285 required=True, 

286 test_threshold=0, 

287 tests=(), 

288 raw=True, 

289 ) 

290 

291 class LostPasswordStep1Skel(skeleton.RelSkel): 

292 name = EmailBone( 

293 descr="E-Mail", 

294 required=True, 

295 ) 

296 

297 class LostPasswordStep2Skel(skeleton.RelSkel): 

298 recovery_key = StringBone( 

299 descr="Recovery Key", 

300 required=True, 

301 params={ 

302 "tooltip": i18n.translate( 

303 key="viur.core.modules.user.userpassword.lostpasswordstep2.recoverykey", 

304 defaultText="Please enter the validation key you've received via e-mail.", 

305 hint="Shown when the user needs more than 15 minutes to paste the key", 

306 ), 

307 } 

308 ) 

309 

310 class LostPasswordStep3Skel(skeleton.RelSkel): 

311 # send the recovery key again, in case the password is rejected by some reason. 

312 recovery_key = StringBone( 

313 descr="Recovery Key", 

314 visible=False, 

315 ) 

316 

317 password = PasswordBone( 

318 descr="New Password", 

319 required=True, 

320 params={ 

321 "tooltip": i18n.translate( 

322 key="viur.core.modules.user.userpassword.lostpasswordstep3.password", 

323 defaultText="Please enter a new password for your account.", 

324 ), 

325 } 

326 ) 

327 

328 @exposed 

329 @force_ssl 

330 @skey(allow_empty=True) 

331 def login(self, **kwargs): 

332 # Obtain a fresh login skel 

333 skel = self.LoginSkel() 

334 

335 # Read required bones from client 

336 if not (kwargs and skel.fromClient(kwargs)): 

337 return self._user_module.render.render("login", skel) 

338 

339 self.loginRateLimit.assertQuotaIsAvailable() 

340 

341 # query for the username. The query might find another user, but the name is being checked for equality below 

342 name = skel["name"].lower().strip() 

343 user_skel = self._user_module.baseSkel() 

344 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

345 

346 # extract password hash from raw database entity (skeleton access blocks it) 

347 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

348 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

349 password_hash = encode_password(skel["password"], password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

350 

351 # now check if the username matches 

352 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

353 

354 # next, check if the password hash matches 

355 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

356 

357 if not is_okay: 

358 # Set error to all required fields 

359 for name, bone in skel.items(): 

360 if bone.required: 

361 skel.errors.append( 

362 ReadFromClientError( 

363 ReadFromClientErrorSeverity.Invalid, 

364 i18n.translate( 

365 key="viur.core.modules.user.userpassword.login.failed", 

366 defaultText="Invalid username or password provided", 

367 ), 

368 name, 

369 ) 

370 ) 

371 

372 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

373 return self._user_module.render.render("login", skel) 

374 

375 # check if iterations are below current security standards, and update if necessary. 

376 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

377 logging.info(f"Update password hash for user {name}.") 

378 # re-hash the password with more iterations 

379 # FIXME: This must be done within a transaction! 

380 user_skel["password"] = kwargs["password"] # will be hashed on serialize 

381 user_skel.write(update_relations=False) 

382 

383 return self.next_or_finish(user_skel) 

384 

385 @exposed 

386 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, **kwargs): 

387 """ 

388 This implements a password recovery process which lets users set a new password for their account, 

389 after validating a recovery key sent by email. 

390 

391 The process is as following: 

392 

393 - The user enters the registered email adress (not validated here) 

394 - A random code is generated and stored as a security-ke, then sendUserPasswordRecoveryCode is called. 

395 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

396 and send a link with the code. It runs as a deferred task so no information if a user account exists 

397 is being leaked. 

398 - If the user received an email, the link can be clicked to set a new password for the account. 

399 

400 To prevent automated attacks, the first step is guarded by limited calls to this function to 10 actions 

401 per 15 minutes. (One complete recovery process consists of two calls). 

402 """ 

403 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

404 current_request = current.request.get() 

405 

406 if recovery_key is None: 

407 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

408 skel = self.LostPasswordStep1Skel() 

409 

410 if ( 

411 not kwargs 

412 or not current_request.isPostRequest 

413 or not skel.fromClient(kwargs) 

414 ): 

415 return self._user_module.render.render( 

416 "pwrecover", skel, 

417 tpl=self.passwordRecoveryTemplate, 

418 ) 

419 

420 # validate security key 

421 if not securitykey.validate(skey): 

422 raise errors.PreconditionFailed() 

423 

424 self.passwordRecoveryRateLimit.decrementQuota() 

425 

426 recovery_key = securitykey.create( 

427 duration=datetime.timedelta(minutes=15), 

428 key_length=conf.security.password_recovery_key_length, 

429 user_name=skel["name"].lower(), 

430 session_bound=False, 

431 ) 

432 

433 # Send the code in background 

434 self.sendUserPasswordRecoveryCode( 

435 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

436 ) 

437 

438 # step 2 is only an action-skel, and can be ignored by a direct link in the 

439 # e-mail previously sent. It depends on the implementation of the specific project. 

440 return self._user_module.render.render( 

441 "pwrecover", self.LostPasswordStep2Skel(), 

442 tpl=self.passwordRecoveryTemplate, 

443 ) 

444 

445 # in step 3 

446 skel = self.LostPasswordStep3Skel() 

447 

448 # reset the recovery key again, in case the fromClient() fails. 

449 skel["recovery_key"] = str(recovery_key).strip() 

450 

451 # check for any input; Render input-form again when incomplete. 

452 if ( 

453 not kwargs 

454 or not current_request.isPostRequest 

455 or not skel.fromClient(kwargs, ignore=("recovery_key",)) 

456 ): 

457 return self._user_module.render.render( 

458 "pwrecover", skel, 

459 tpl=self.passwordRecoveryTemplate, 

460 ) 

461 

462 # validate security key 

463 if not securitykey.validate(skey): 

464 raise errors.PreconditionFailed() 

465 

466 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

467 raise errors.PreconditionFailed( 

468 i18n.translate( 

469 key="viur.core.modules.user.passwordrecovery.keyexpired", 

470 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

471 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

472 ) 

473 ) 

474 

475 self.passwordRecoveryRateLimit.decrementQuota() 

476 

477 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

478 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

479 

480 if not user_skel: 

481 raise errors.NotFound( 

482 i18n.translate( 

483 key="viur.core.modules.user.passwordrecovery.usernotfound", 

484 defaultText="There is no account with this name", 

485 hint="We cant find an account with that name (Should never happen)" 

486 ) 

487 ) 

488 

489 # If the account is locked or not yet validated, abort the process. 

490 if not self._user_module.is_active(user_skel): 

491 raise errors.NotFound( 

492 i18n.translate( 

493 key="viur.core.modules.user.passwordrecovery.accountlocked", 

494 defaultText="This account is currently locked. You cannot change its password.", 

495 hint="Attempted password recovery on a locked account" 

496 ) 

497 ) 

498 

499 # Update the password, save the user, reset his session and show the success-template 

500 user_skel["password"] = skel["password"] 

501 user_skel.write(update_relations=False) 

502 

503 return self._user_module.render.render( 

504 "pwrecover_success", 

505 next_url=self.start_url, 

506 tpl=self.passwordRecoverySuccessTemplate 

507 ) 

508 

509 @tasks.CallDeferred 

510 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

511 """ 

512 Sends the given recovery code to the user given in userName. This function runs deferred 

513 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

514 code by email (assuming we have working email delivery), but this can be overridden to send it 

515 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

516 can be send to any given user in four hours. 

517 """ 

518 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

519 user_agent = user_agents.parse(user_agent) 

520 email.send_email( 

521 tpl=self.passwordRecoveryMail, 

522 skel=user_skel, 

523 dests=[user_name], 

524 recovery_key=recovery_key, 

525 user_agent={ 

526 "device": user_agent.get_device(), 

527 "os": user_agent.get_os(), 

528 "browser": user_agent.get_browser() 

529 } 

530 ) 

531 

532 @exposed 

533 @skey(forward_payload="data", session_bound=False) 

534 def verify(self, data): 

535 def transact(key): 

536 skel = self._user_module.editSkel() 

537 if not key or not skel.read(key): 

538 return None 

539 

540 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

541 if self.registrationAdminVerificationRequired else Status.ACTIVE 

542 

543 skel.write(update_relations=False) 

544 return skel 

545 

546 if not isinstance(data, dict) or not (skel := db.run_in_transaction(transact, data.get("user_key"))): 

547 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

548 

549 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

550 

551 def canAdd(self) -> bool: 

552 return self.registrationEnabled 

553 

554 def addSkel(self): 

555 """ 

556 Prepare the add-Skel for rendering. 

557 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

558 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

559 :return: viur.core.skeleton.Skeleton 

560 """ 

561 skel = self._user_module.addSkel() 

562 

563 if self.registrationEmailVerificationRequired: 

564 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

565 elif self.registrationAdminVerificationRequired: 

566 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

567 else: # No further verification required 

568 defaultStatusValue = Status.ACTIVE 

569 

570 skel.status.readOnly = True 

571 skel["status"] = defaultStatusValue 

572 

573 if "password" in skel: 

574 skel.password.required = True # The user will have to set a password 

575 

576 return skel 

577 

578 @force_ssl 

579 @exposed 

580 @skey(allow_empty=True) 

581 def add(self, *, bounce: bool = False, **kwargs): 

582 """ 

583 Allows guests to register a new account if self.registrationEnabled is set to true 

584 

585 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

586 

587 :returns: The rendered, added object of the entry, eventually with error hints. 

588 

589 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

590 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

591 """ 

592 if not self.canAdd(): 

593 raise errors.Unauthorized() 

594 

595 skel = self.addSkel() 

596 

597 if ( 

598 not kwargs # no data supplied 

599 or not current.request.get().isPostRequest # bail out if not using POST-method 

600 or not skel.fromClient(kwargs) # failure on reading into the bones 

601 or bounce # review before adding 

602 ): 

603 # render the skeleton in the version it could as far as it could be read. 

604 return self._user_module.render.add(skel) 

605 

606 self._user_module.onAdd(skel) 

607 skel.write() 

608 

609 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

610 # The user will have to verify his email-address. Create a skey and send it to his address 

611 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

612 user_key=db.normalize_key(skel["key"]), 

613 name=skel["name"]) 

614 skel.skey = BaseBone(descr="Skey") 

615 skel["skey"] = skey 

616 email.send_email(dests=[skel["name"]], tpl=self.verifyEmailAddressMail, skel=skel) 

617 

618 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

619 return self._user_module.render.addSuccess(skel) 

620 

621 

622class GoogleAccount(UserPrimaryAuthentication): 

623 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

624 NAME = "Google Account" 

625 

626 @classmethod 

627 def patch_user_skel(cls, skel_cls): 

628 """ 

629 Modifies the UserSkel to be equipped by a bones required by Google Auth 

630 """ 

631 skel_cls.uid = StringBone( 

632 descr="Google UserID", 

633 required=False, 

634 readOnly=True, 

635 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

636 params={ 

637 "category": "Authentication", 

638 } 

639 ) 

640 

641 skel_cls.sync = BooleanBone( 

642 descr="Sync user data with OAuth-based services", 

643 defaultValue=True, 

644 params={ 

645 "category": "Authentication", 

646 "tooltip": 

647 "If set, user data like firstname and lastname is automatically kept" 

648 "synchronous with the information stored at the OAuth service provider" 

649 "(e.g. Google Login)." 

650 } 

651 ) 

652 

653 @exposed 

654 @force_ssl 

655 @skey(allow_empty=True) 

656 def login(self, token: str | None = None, *args, **kwargs): 

657 if not conf.user.google_client_id: 

658 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

659 

660 if not token: 

661 request = current.request.get() 

662 request.response.headers["Content-Type"] = "text/html" 

663 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

664 # We have to allow popups here 

665 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

666 

667 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

668 with open(file_path) as file: 

669 tpl_string = file.read() 

670 

671 # FIXME: Use Jinja2 for rendering? 

672 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

673 extendCsp({ 

674 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

675 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

676 }) 

677 return tpl_string 

678 

679 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

680 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

681 raise ValueError("Invalid issuer") 

682 

683 # Token looks valid :) 

684 uid = user_info["sub"] 

685 email = user_info["email"] 

686 

687 base_skel = self._user_module.baseSkel() 

688 update = False 

689 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

690 # We'll try again - checking if there's already an user with that email 

691 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

692 # Still no luck - it's a completely new user 

693 if not self.registrationEnabled: 

694 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

695 logging.debug(f"Google user is from allowed {domain} - adding account") 

696 else: 

697 logging.debug(f"Google user is from {domain} - denying registration") 

698 raise errors.Forbidden("Registration for new users is disabled") 

699 

700 user_skel = base_skel 

701 user_skel["uid"] = uid 

702 user_skel["name"] = email 

703 update = True 

704 

705 # Take user information from Google, if wanted! 

706 if user_skel["sync"]: 

707 for target, source in { 

708 "name": email, 

709 "firstname": user_info.get("given_name"), 

710 "lastname": user_info.get("family_name"), 

711 }.items(): 

712 

713 if user_skel[target] != source: 

714 user_skel[target] = source 

715 update = True 

716 

717 if update: 

718 assert user_skel.write() 

719 

720 return self.next_or_finish(user_skel) 

721 

722 

723class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

724 """Abstract class for all second factors.""" 

725 MAX_RETRY = 3 

726 second_factor_login_template = "user_login_secondfactor" 

727 """Template to enter the TOPT on login""" 

728 

729 @property 

730 @abc.abstractmethod 

731 def NAME(self) -> str: 

732 """Name for this factor for templates.""" 

733 ... 

734 

735 @property 

736 @abc.abstractmethod 

737 def ACTION_NAME(self) -> str: 

738 """The action name for this factor, used as path-segment.""" 

739 ... 

740 

741 def __init__(self, moduleName, modulePath, _user_module): 

742 super().__init__(moduleName, modulePath, _user_module) 

743 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

744 self.add_url = f"{self.modulePath}/add" 

745 self.start_url = f"{self.modulePath}/start" 

746 

747 

748class TimeBasedOTP(UserSecondFactorAuthentication): 

749 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

750 WINDOW_SIZE = 5 

751 ACTION_NAME = "otp" 

752 NAME = "Time-based OTP" 

753 second_factor_login_template = "user_login_secondfactor" 

754 

755 @dataclasses.dataclass 

756 class OtpConfig: 

757 """ 

758 This dataclass is used to provide an interface for a OTP token 

759 algorithm description that is passed within the TimeBasedOTP 

760 class for configuration. 

761 """ 

762 secret: str 

763 timedrift: float = 0.0 

764 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

765 interval: int = 60 

766 

767 class OtpSkel(skeleton.RelSkel): 

768 """ 

769 This is the Skeleton used to ask for the OTP token. 

770 """ 

771 otptoken = NumericBone( 

772 descr="Token", 

773 required=True, 

774 max=999999, 

775 min=0, 

776 ) 

777 

778 @classmethod 

779 def patch_user_skel(cls, skel_cls): 

780 """ 

781 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

782 """ 

783 # One-Time Password Verification 

784 skel_cls.otp_serial = StringBone( 

785 descr="OTP serial", 

786 searchable=True, 

787 params={ 

788 "category": "Second Factor Authentication", 

789 } 

790 ) 

791 

792 skel_cls.otp_secret = CredentialBone( 

793 descr="OTP secret", 

794 params={ 

795 "category": "Second Factor Authentication", 

796 } 

797 ) 

798 

799 skel_cls.otp_timedrift = NumericBone( 

800 descr="OTP time drift", 

801 readOnly=True, 

802 defaultValue=0, 

803 precision=1, 

804 params={ 

805 "category": "Second Factor Authentication", 

806 } 

807 ) 

808 

809 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

810 """ 

811 Returns an instance of self.OtpConfig with a provided token configuration, 

812 or None when there is no appropriate configuration of this second factor handler available. 

813 """ 

814 

815 if otp_secret := skel.dbEntity.get("otp_secret"): 

816 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

817 

818 return None 

819 

820 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

821 """ 

822 Specified whether the second factor authentication can be handled by the given user or not. 

823 """ 

824 return bool(self.get_config(skel)) 

825 

826 @exposed 

827 def start(self): 

828 """ 

829 Configures OTP login for the current session. 

830 

831 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

832 """ 

833 session = current.session.get() 

834 

835 if not (user_key := session.get("possible_user_key")): 

836 raise errors.PreconditionFailed( 

837 "Second factor can only be triggered after successful primary authentication." 

838 ) 

839 

840 user_skel = self._user_module.baseSkel() 

841 if not user_skel.read(user_key): 

842 raise errors.NotFound("The previously authenticated user is gone.") 

843 

844 if not (otp_user_conf := self.get_config(user_skel)): 

845 raise errors.PreconditionFailed("This second factor is not available for the user") 

846 

847 otp_user_conf = { 

848 "key": str(user_key), 

849 } | dataclasses.asdict(otp_user_conf) 

850 

851 session = current.session.get() 

852 session["_otp_user"] = otp_user_conf 

853 session.markChanged() 

854 

855 return self._user_module.render.edit( 

856 self.OtpSkel(), 

857 params={ 

858 "name": i18n.translate( 

859 f"viur.core.modules.user.{self.ACTION_NAME}", 

860 default_variables={"name": self.NAME}, 

861 ), 

862 "action_name": self.ACTION_NAME, 

863 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

864 }, 

865 tpl=self.second_factor_login_template 

866 ) 

867 

868 @exposed 

869 @force_ssl 

870 @skey(allow_empty=True) 

871 def otp(self, *args, **kwargs): 

872 """ 

873 Performs the second factor validation and interaction with the client. 

874 """ 

875 session = current.session.get() 

876 if not (otp_user_conf := session.get("_otp_user")): 

877 raise errors.PreconditionFailed("No OTP process started in this session") 

878 

879 # Check if maximum second factor verification attempts 

880 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

881 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

882 

883 # Read the OTP token via the skeleton, to obtain a valid value 

884 skel = self.OtpSkel() 

885 if skel.fromClient(kwargs): 

886 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

887 res = self.verify( 

888 otp=skel["otptoken"], 

889 secret=otp_user_conf["secret"], 

890 algorithm=otp_user_conf.get("algorithm") or "sha1", 

891 interval=otp_user_conf.get("interval") or 60, 

892 timedrift=otp_user_conf.get("timedrift") or 0.0, 

893 valid_window=self.WINDOW_SIZE 

894 ) 

895 else: 

896 res = None 

897 

898 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

899 if res is None: 

900 otp_user_conf["attempts"] = attempts + 1 

901 session.markChanged() 

902 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

903 return self._user_module.render.edit( 

904 skel, 

905 name=i18n.translate( 

906 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

907 default_variables={"name": self.NAME}, 

908 ), 

909 action_name=self.ACTION_NAME, 

910 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

911 tpl=self.second_factor_login_template 

912 ) 

913 

914 # Remove otp user config from session 

915 user_key = db.key_helper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

916 del session["_otp_user"] 

917 session.markChanged() 

918 

919 # Check if the OTP device has a time drift 

920 

921 timedriftchange = float(res) - otp_user_conf["timedrift"] 

922 if abs(timedriftchange) > 2: 

923 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

924 # update clock-drift value accordingly 

925 self.updateTimeDrift(user_key, timedriftchange) 

926 

927 # Continue with authentication 

928 return self._user_module.secondFactorSucceeded(self, user_key) 

929 

930 @staticmethod 

931 def verify( 

932 otp: str | int, 

933 secret: str, 

934 algorithm: str = "sha1", 

935 interval: int = 60, 

936 timedrift: float = 0.0, 

937 for_time: datetime.datetime | None = None, 

938 valid_window: int = 0, 

939 ) -> int | None: 

940 """ 

941 Verifies the OTP passed in against the current time OTP. 

942 

943 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

944 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

945 on the hardware token generator. This can be used to store the time drift of a given token generator. 

946 

947 :param otp: the OTP token to check against 

948 :param secret: The OTP secret 

949 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

950 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). 

951 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

952 :param for_time: Time to check OTP at (defaults to now) 

953 :param valid_window: extends the validity to this many counter ticks before and after the current one 

954 

955 :returns: The index where verification succeeded, None otherwise 

956 """ 

957 # get the hashing digest 

958 digest = { 

959 "sha1": hashlib.sha1, 

960 "sha256": hashlib.sha256, 

961 }.get(algorithm) 

962 

963 if not digest: 

964 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

965 

966 if for_time is None: 

967 for_time = datetime.datetime.now() 

968 

969 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

970 timedrift = round(timedrift) 

971 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

972 otp = str(otp).zfill(6) # fill with zeros in front 

973 

974 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

975 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

976 

977 if valid_window: 

978 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

979 token = str(totp.at(for_time, offset)) 

980 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

981 if hmac.compare_digest(otp, token): 

982 return offset 

983 

984 return None 

985 

986 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

987 

988 # FIXME: VIUR4 rename 

989 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

990 """ 

991 Updates the clock-drift value. 

992 

993 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

994 it out of bounds. Maximum change per call is 0.3 minutes. 

995 

996 :param user_key: For which user should the update occour 

997 :param idx: How many steps before/behind was that token 

998 """ 

999 if user_skel := self._user_module.skel().read(user_key): 

1000 if otp_skel := self._get_otptoken(user_skel): 

1001 otp_skel.patch( 

1002 { 

1003 "+otp_timedrift": min(max(0.1 * idx, -0.3), 0.3) 

1004 }, 

1005 update_relations=False, 

1006 ) 

1007 

1008 

1009class AuthenticatorOTP(UserSecondFactorAuthentication): 

1010 """ 

1011 This class handles the second factor for apps like authy and so on 

1012 """ 

1013 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

1014 

1015 # second_factor_add_template = "user_secondfactor_add" 

1016 # """Template to configure (add) a new TOPT""" 

1017 

1018 ACTION_NAME = "authenticator_otp" 

1019 """Action name provided for *otp_template* on login""" 

1020 

1021 NAME = "Authenticator App" 

1022 

1023 # FIXME: The second factor add has to be rewritten entirely to ActionSkel paradigm. 

1024 ''' 

1025 @exposed 

1026 @force_ssl 

1027 @skey(allow_empty=True) 

1028 def add(self, otp=None): 

1029 """ 

1030 We try to read the otp_app_secret from the current session. When this fails we generate a new one and store 

1031 it in the session. 

1032 

1033 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

1034 the otp_app_secret from the session in the user entry. 

1035 """ 

1036 current_session = current.session.get() 

1037 

1038 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

1039 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

1040 current_session["_maybe_otp_app_secret"] = otp_app_secret 

1041 current_session.markChanged() 

1042 

1043 if otp is None: 

1044 return self._user_module.render.second_factor_add( 

1045 tpl=self.second_factor_add_template, 

1046 action_name=self.ACTION_NAME, 

1047 name=i18n.translate( 

1048 f"viur.core.modules.user.auth{self.ACTION_NAME}", 

1049 default_variables={"name": self.NAME}, 

1050 ), 

1051 add_url=self.add_url, 

1052 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

1053 else: 

1054 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

1055 return self._user_module.render.second_factor_add( 

1056 tpl=self.second_factor_add_template, 

1057 action_name=self.ACTION_NAME, 

1058 name=i18n.translate( 

1059 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1060 default_variables={"name": self.NAME}, 

1061 ), 

1062 add_url=self.add_url, 

1063 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

1064 

1065 # Now we can set the otp_app_secret to the current User and render der Success-template 

1066 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

1067 return self._user_module.render.second_factor_add_success( 

1068 action_name=self.ACTION_NAME, 

1069 name=i18n.translate( 

1070 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1071 default_variables={"name": self.NAME}, 

1072 ), 

1073 ) 

1074 ''' 

1075 

1076 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1077 """ 

1078 We can only handle the second factor if we have stored an otp_app_secret before. 

1079 """ 

1080 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1081 

1082 @classmethod 

1083 def patch_user_skel(cls, skel_cls): 

1084 """ 

1085 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1086 """ 

1087 # Authenticator OTP Apps (like Authy) 

1088 skel_cls.otp_app_secret = CredentialBone( 

1089 descr="OTP Secret (App-Key)", 

1090 params={ 

1091 "category": "Second Factor Authentication", 

1092 } 

1093 ) 

1094 

1095 @classmethod 

1096 def set_otp_app_secret(cls, otp_app_secret=None): 

1097 """ 

1098 Write a new OTP Token in the current user entry. 

1099 """ 

1100 if otp_app_secret is None: 

1101 logging.error("No 'otp_app_secret' is provided") 

1102 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1103 if not (cuser := current.user.get()): 

1104 raise errors.Unauthorized() 

1105 

1106 def transaction(user_key): 

1107 if not (user := db.get(user_key)): 

1108 raise errors.NotFound() 

1109 user["otp_app_secret"] = otp_app_secret 

1110 db.put(user) 

1111 

1112 db.run_in_transaction(transaction, cuser["key"]) 

1113 

1114 @classmethod 

1115 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1116 """ 

1117 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1118 """ 

1119 if not (cuser := current.user.get()): 

1120 raise errors.Unauthorized() 

1121 if not (issuer := conf.user.otp_issuer): 

1122 logging.warning( 

1123 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1124 issuer = conf.instance.project_id 

1125 

1126 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1127 

1128 @classmethod 

1129 def generate_otp_app_secret(cls) -> str: 

1130 """ 

1131 Generate a new OTP Secret 

1132 :return an otp 

1133 """ 

1134 return pyotp.random_base32() 

1135 

1136 @classmethod 

1137 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1138 return pyotp.TOTP(secret).verify(otp) 

1139 

1140 @exposed 

1141 def start(self): 

1142 otp_user_conf = {"attempts": 0} 

1143 session = current.session.get() 

1144 session["_otp_user"] = otp_user_conf 

1145 session.markChanged() 

1146 return self._user_module.render.edit( 

1147 TimeBasedOTP.OtpSkel(), 

1148 params={ 

1149 "name": i18n.translate( 

1150 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1151 default_variables={"name": self.NAME}, 

1152 ), 

1153 "action_name": self.ACTION_NAME, 

1154 "action_url": self.action_url, 

1155 }, 

1156 tpl=self.second_factor_login_template, 

1157 ) 

1158 

1159 @exposed 

1160 @force_ssl 

1161 @skey 

1162 def authenticator_otp(self, **kwargs): 

1163 """ 

1164 We verify the otp here with the secret we stored before. 

1165 """ 

1166 session = current.session.get() 

1167 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1168 

1169 if not (otp_user_conf := session.get("_otp_user")): 

1170 raise errors.PreconditionFailed("No OTP process started in this session") 

1171 

1172 # Check if maximum second factor verification attempts 

1173 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1174 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1175 

1176 if not (user := db.get(user_key)): 

1177 raise errors.NotFound() 

1178 

1179 skel = TimeBasedOTP.OtpSkel() 

1180 if not skel.fromClient(kwargs): 

1181 raise errors.PreconditionFailed() 

1182 otp_token = str(skel["otptoken"]).zfill(6) 

1183 

1184 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1185 return self._user_module.secondFactorSucceeded(self, user_key) 

1186 otp_user_conf["attempts"] = attempts + 1 

1187 session.markChanged() 

1188 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1189 return self._user_module.render.edit( 

1190 skel, 

1191 name=i18n.translate( 

1192 f"viur.core.modules.user.auth.{self.ACTION_NAME}", 

1193 default_variables={"name": self.NAME}, 

1194 ), 

1195 action_name=self.ACTION_NAME, 

1196 action_url=self.action_url, 

1197 tpl=self.second_factor_login_template, 

1198 ) 

1199 

1200 

1201class User(List): 

1202 """ 

1203 The User module is used to manage and authenticate users in a ViUR system. 

1204 

1205 It is used in almost any ViUR project, but ViUR can also function without any user capabilites. 

1206 """ 

1207 

1208 kindName = "user" 

1209 addTemplate = "user_add" 

1210 addSuccessTemplate = "user_add_success" 

1211 

1212 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1213 None, ( 

1214 UserPassword, 

1215 conf.user.google_client_id and GoogleAccount, 

1216 ) 

1217 )) 

1218 """ 

1219 Specifies primary authentication providers that are made available 

1220 as sub-modules under `user/auth_<classname>`. They might require 

1221 customization or configuration. 

1222 """ 

1223 

1224 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1225 TimeBasedOTP, 

1226 AuthenticatorOTP, 

1227 ) 

1228 """ 

1229 Specifies secondary authentication providers that are made available 

1230 as sub-modules under `user/f2_<classname>`. They might require 

1231 customization or configuration, which is determined during the 

1232 login-process depending on the user that wants to login. 

1233 """ 

1234 

1235 validAuthenticationMethods = tuple(filter( 

1236 None, ( 

1237 (UserPassword, AuthenticatorOTP), 

1238 (UserPassword, TimeBasedOTP), 

1239 (UserPassword, None), 

1240 (GoogleAccount, None) if conf.user.google_client_id else None, 

1241 ) 

1242 )) 

1243 """ 

1244 Specifies the possible combinations of primary- and secondary factor 

1245 login methos. 

1246 

1247 GoogleLogin defaults to no second factor, as the Google Account can be 

1248 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1249 handled when there is a user-dependent configuration available. 

1250 """ 

1251 

1252 msg_missing_second_factor = "Second factor required but not configured for this user." 

1253 

1254 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1255 

1256 default_order = "name.idx" 

1257 

1258 roles = { 

1259 "admin": "*", 

1260 } 

1261 

1262 def __init__(self, moduleName, modulePath): 

1263 for provider in self.authenticationProviders: 

1264 assert issubclass(provider, UserPrimaryAuthentication) 

1265 name = f"auth_{provider.__name__.lower()}" 

1266 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1267 

1268 for provider in self.secondFactorProviders: 

1269 assert issubclass(provider, UserSecondFactorAuthentication) 

1270 name = f"f2_{provider.__name__.lower()}" 

1271 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1272 

1273 super().__init__(moduleName, modulePath) 

1274 

1275 def adminInfo(self): 

1276 ret = { 

1277 "icon": "person-fill", 

1278 } 

1279 

1280 if self.is_admin(current.user.get()): 

1281 ret |= { 

1282 "actions": [ 

1283 "trigger_kick", 

1284 "trigger_takeover", 

1285 ], 

1286 "customActions": { 

1287 "trigger_kick": { 

1288 "name": i18n.translate( 

1289 key="viur.core.modules.user.customActions.kick", 

1290 defaultText="Kick user", 

1291 hint="Title of the kick user function" 

1292 ), 

1293 "icon": "trash2-fill", 

1294 "action": "fetch", 

1295 "url": "/vi/{{module}}/trigger/kick/{{key}}", 

1296 "confirm": i18n.translate( 

1297 key="viur.core.modules.user.customActions.kick.confirm", 

1298 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1299 ), 

1300 "success": i18n.translate( 

1301 key="viur.core.modules.user.customActions.kick.success", 

1302 defaultText="Sessions of the user are being invalidated.", 

1303 ), 

1304 }, 

1305 "trigger_takeover": { 

1306 "name": i18n.translate( 

1307 key="viur.core.modules.user.customActions.takeover", 

1308 defaultText="Take-over user", 

1309 hint="Title of the take user over function" 

1310 ), 

1311 "icon": "file-person-fill", 

1312 "action": "fetch", 

1313 "url": "/vi/{{module}}/trigger/takeover/{{key}}", 

1314 "confirm": i18n.translate( 

1315 key="viur.core.modules.user.customActions.takeover.confirm", 

1316 defaultText="Do you really want to replace your current user session by a " 

1317 "user session of the selected user?", 

1318 ), 

1319 "success": i18n.translate( 

1320 key="viur.core.modules.user.customActions.takeover.success", 

1321 defaultText="You're now know as the selected user!", 

1322 ), 

1323 "then": "reload-vi", 

1324 }, 

1325 }, 

1326 } 

1327 

1328 return ret 

1329 

1330 def get_role_defaults(self, role: str) -> set[str]: 

1331 """ 

1332 Returns a set of default access rights for a given role. 

1333 

1334 Defaults to "admin" usage for any role > "user" 

1335 and "scriptor" usage for "admin" role. 

1336 """ 

1337 ret = set() 

1338 

1339 if role in ("viewer", "editor", "admin"): 

1340 ret.add("admin") 

1341 

1342 if role == "admin": 

1343 ret.add("scriptor") 

1344 

1345 return ret 

1346 

1347 def addSkel(self): 

1348 skel = super().addSkel().clone() 

1349 

1350 if self.is_admin(current.user.get()): 

1351 # An admin tries to add a new user. 

1352 skel.status.readOnly = False 

1353 skel.status.visible = True 

1354 skel.access.readOnly = False 

1355 skel.access.visible = True 

1356 

1357 else: 

1358 skel.status.readOnly = True 

1359 skel["status"] = Status.UNSET 

1360 skel.status.visible = False 

1361 skel.access.readOnly = True 

1362 skel["access"] = [] 

1363 skel.access.visible = False 

1364 

1365 if "password" in skel: 

1366 # Unlock and require a password 

1367 skel.password.required = True 

1368 skel.password.visible = True 

1369 skel.password.readOnly = False 

1370 

1371 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1372 return skel 

1373 

1374 def editSkel(self, *args, **kwargs): 

1375 skel = super().editSkel().clone() 

1376 

1377 if "password" in skel: 

1378 skel.password.required = False 

1379 skel.password.visible = True 

1380 skel.password.readOnly = False 

1381 

1382 lock = not self.is_admin(current.user.get()) 

1383 skel.name.readOnly = lock 

1384 skel.access.readOnly = lock 

1385 skel.status.readOnly = lock 

1386 

1387 return skel 

1388 

1389 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1390 return getattr(self, f"f2_{cls.__name__.lower()}") 

1391 

1392 def getCurrentUser(self): 

1393 session = current.session.get() 

1394 

1395 req = current.request.get() 

1396 if session and (session.loaded or req.is_deferred) and (user := session.get("user")): 

1397 skel = self.baseSkel() 

1398 skel.setEntity(user) 

1399 return skel 

1400 

1401 return None 

1402 

1403 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1404 """ 

1405 Continue authentication flow when primary authentication succeeded. 

1406 """ 

1407 skel = self.baseSkel() 

1408 

1409 if not skel.read(user_key): 

1410 raise errors.NotFound("User was not found.") 

1411 

1412 if not provider.can_handle(skel): 

1413 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1414 

1415 session = current.session.get() 

1416 session["possible_user_key"] = user_key.id_or_name 

1417 session["_secondFactorStart"] = utils.utcNow() 

1418 session.markChanged() 

1419 

1420 second_factor_providers = [] 

1421 

1422 for auth_provider, second_factor in self.validAuthenticationMethods: 

1423 if isinstance(provider, auth_provider): 

1424 if second_factor is not None: 

1425 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1426 if second_factor_provider_instance.can_handle(skel): 

1427 second_factor_providers.append(second_factor_provider_instance) 

1428 else: 

1429 second_factor_providers.append(None) 

1430 

1431 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1432 # We have a second factor. So we can get rid of the None 

1433 second_factor_providers.pop(second_factor_providers.index(None)) 

1434 

1435 if len(second_factor_providers) == 0: 

1436 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1437 elif len(second_factor_providers) == 1: 

1438 if second_factor_providers[0] is None: 

1439 # We allow sign-in without a second factor 

1440 return self.authenticateUser(user_key) 

1441 # We have only one second factor we don't need the choice template 

1442 return second_factor_providers[0].start(user_key) 

1443 

1444 # In case there is more than one second factor provider remaining, let the user decide! 

1445 current.session.get()["_secondfactor_providers"] = { 

1446 second_factor.start_url: second_factor.NAME 

1447 for second_factor in second_factor_providers 

1448 if second_factor.VISIBLE 

1449 } 

1450 

1451 return self.select_secondfactor_provider() 

1452 

1453 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1454 """ 

1455 Continue authentication flow when secondary authentication succeeded. 

1456 """ 

1457 session = current.session.get() 

1458 if session["possible_user_key"] != user_key.id_or_name: 

1459 raise errors.Forbidden() 

1460 

1461 # Assert that the second factor verification finished in time 

1462 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1463 raise errors.RequestTimeout() 

1464 

1465 return self.authenticateUser(user_key) 

1466 

1467 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1468 """ 

1469 Hookable check if a user is defined as "active" and can login. 

1470 

1471 :param skel: The UserSkel of the user who wants to login. 

1472 :returns: Returns True or False when the result is unambigous and the user is active or not. \ 

1473 Returns None when the provided skel doesn't provide enough information for determination. 

1474 """ 

1475 if skel and "status" in skel: 

1476 status = skel["status"] 

1477 if not isinstance(status, (Status, int)): 

1478 try: 

1479 status = int(status) 

1480 except ValueError: 

1481 status = Status.UNSET 

1482 

1483 return status >= Status.ACTIVE.value 

1484 

1485 return None 

1486 

1487 def is_admin(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1488 """ 

1489 Hookable check if a user is defined as "admin" and can edit or log into other users. 

1490 Defaults to "root" users only. 

1491 

1492 :param skel: The UserSkel of the user who wants should be checked for user admin privileges. 

1493 :returns: Returns True or False when the result is unambigous and the user is admin or not. \ 

1494 Returns None when the provided skel doesn't provide enough information for determination. 

1495 """ 

1496 if skel and "access" in skel: 

1497 return "root" in skel["access"] 

1498 

1499 return None 

1500 

1501 def authenticateUser(self, key: db.Key, **kwargs): 

1502 """ 

1503 Performs Log-In for the current session and the given user key. 

1504 

1505 This resets the current session: All fields not explicitly marked as persistent 

1506 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1507 

1508 :param key: The (DB-)Key of the user we shall authenticate 

1509 """ 

1510 skel = self.baseSkel() 

1511 if not skel.read(key): 

1512 raise ValueError(f"Unable to authenticate unknown user {key}") 

1513 

1514 # Verify that this user account is active 

1515 if not self.is_active(skel): 

1516 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1517 

1518 # Update session for user 

1519 session = current.session.get() 

1520 # Remember persistent fields... 

1521 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1522 session.reset() 

1523 # and copy them over to the new session 

1524 session |= take_over 

1525 

1526 # Update session, user and request 

1527 session["user"] = skel.dbEntity 

1528 

1529 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1530 current.user.set(self.getCurrentUser()) 

1531 

1532 self.onLogin(skel) 

1533 

1534 return self.render.render("login_success", skel, **kwargs) 

1535 

1536 

1537 # Action for primary authentication selection 

1538 

1539 def SelectAuthenticationProviderSkel(self): 

1540 providers = {} 

1541 first = None 

1542 for provider in self.authenticationProviders: 

1543 if not provider.VISIBLE: 

1544 continue 

1545 

1546 provider = getattr(self, f"auth_{provider.__name__.lower()}") 

1547 providers[provider.start_url] = provider.NAME 

1548 

1549 if first is None: 

1550 first = provider.start_url 

1551 

1552 class SelectAuthenticationProviderSkel(skeleton.RelSkel): 

1553 provider = SelectBone( 

1554 descr="Authentication method", 

1555 required=True, 

1556 values=providers, 

1557 defaultValue=first, 

1558 ) 

1559 

1560 return SelectAuthenticationProviderSkel() 

1561 

1562 @exposed 

1563 def select_authentication_provider(self, **kwargs): 

1564 skel = self.SelectAuthenticationProviderSkel() 

1565 

1566 # Read required bones from client 

1567 if len(skel.provider.values) > 1 and (not kwargs or not skel.fromClient(kwargs)): 

1568 return self.render.render("select_authentication_provider", skel) 

1569 

1570 return self.render.render("select_authentication_provider_success", skel, next_url=skel["provider"]) 

1571 

1572 # Action for second factor select 

1573 

1574 class SelectSecondFactorProviderSkel(skeleton.RelSkel): 

1575 provider = SelectBone( 

1576 descr="Second factor", 

1577 required=True, 

1578 values=lambda: current.session.get()["_secondfactor_providers"] or (), 

1579 ) 

1580 

1581 @exposed 

1582 def select_secondfactor_provider(self, **kwargs): 

1583 skel = self.SelectSecondFactorProviderSkel() 

1584 

1585 # Read required bones from client 

1586 if not kwargs or not skel.fromClient(kwargs): 

1587 return self.render.render("select_secondfactor_provider", skel) 

1588 

1589 del current.session.get()["_secondfactor_providers"] 

1590 

1591 return self.render.render("select_secondfactor_provider_success", skel, next_url=skel["provider"]) 

1592 

1593 @exposed 

1594 @skey 

1595 def logout(self, **kwargs): 

1596 """ 

1597 Implements the logout action. It also terminates the current session (all keys not listed 

1598 in viur.session_persistent_fields_on_logout will be lost). 

1599 """ 

1600 if not (user := current.user.get()): 

1601 raise errors.Unauthorized() 

1602 

1603 self.onLogout(user) 

1604 

1605 session = current.session.get() 

1606 

1607 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}: 

1608 session.reset() 

1609 session |= take_over 

1610 else: 

1611 session.clear() 

1612 

1613 current.user.set(None) # set user to none in context var 

1614 

1615 return self.render.render("logout_success") 

1616 

1617 @exposed 

1618 def login(self, *args, **kwargs): 

1619 return self.select_authentication_provider() 

1620 

1621 def onLogin(self, skel: skeleton.SkeletonInstance): 

1622 """ 

1623 Hook to be called on user login. 

1624 """ 

1625 # Update the lastlogin timestamp (if available!) 

1626 if "lastlogin" in skel: 

1627 now = utils.utcNow() 

1628 

1629 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1630 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1631 skel["lastlogin"] = now 

1632 skel.write(update_relations=False) 

1633 

1634 logging.info(f"""User {skel["name"]} logged in""") 

1635 

1636 def onLogout(self, skel: skeleton.SkeletonInstance): 

1637 """ 

1638 Hook to be called on user logout. 

1639 """ 

1640 logging.info(f"""User {skel["name"]} logged out""") 

1641 

1642 @exposed 

1643 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1644 """ 

1645 Allow a special key "self" to reference the current user. 

1646 

1647 By default, any authenticated user can view its own user entry, 

1648 to obtain access rights and any specific user information. 

1649 This behavior is defined in the customized `canView` function, 

1650 which is overwritten by the User-module. 

1651 

1652 The rendered skeleton can be modified or restriced by specifying 

1653 a customized view-skeleton. 

1654 """ 

1655 if key == "self": 

1656 if user := current.user.get(): 

1657 key = user["key"] 

1658 else: 

1659 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1660 

1661 return super().view(key, *args, **kwargs) 

1662 

1663 def canView(self, skel) -> bool: 

1664 if user := current.user.get(): 

1665 if skel["key"] == user["key"]: 

1666 return True 

1667 

1668 if self.is_admin(user) or "user-view" in user["access"]: 

1669 return True 

1670 

1671 return False 

1672 

1673 @exposed 

1674 @skey(allow_empty=True) 

1675 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1676 """ 

1677 Allow a special key "self" to reference the current user. 

1678 

1679 This modification will only allow to use "self" as a key; 

1680 The specific access right to let the user edit itself must 

1681 still be customized. 

1682 

1683 The rendered and editable skeleton can be modified or restriced 

1684 by specifying a customized edit-skeleton. 

1685 """ 

1686 if key == "self": 

1687 if user := current.user.get(): 

1688 key = user["key"] 

1689 else: 

1690 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1691 

1692 return super().edit(key, *args, **kwargs) 

1693 

1694 @exposed 

1695 def getAuthMethods(self, *args, **kwargs): 

1696 """Legacy method prior < viur-core 3.8: Inform tools like Admin which authentication to use""" 

1697 logging.warning("DEPRECATED!!! Use '/user/login'-method for this, or update your admin version!") 

1698 

1699 res = [ 

1700 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1701 for primary, secondary in self.validAuthenticationMethods 

1702 ] 

1703 

1704 return json.dumps(res) 

1705 

1706 @exposed 

1707 def trigger(self, action: str, key: str): 

1708 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1709 access = self.adminInfo().get("customActions", {}).get(f"trigger_{action}", {}).get("access") or () 

1710 if not ( 

1711 (cuser := current.user.get()) 

1712 and ( 

1713 any(role in cuser["access"] for role in access) 

1714 or self.is_admin(cuser) 

1715 ) 

1716 ): 

1717 raise errors.Unauthorized() 

1718 

1719 skel = self.skel() 

1720 if not skel.read(key) and not (skel := skel.all().mergeExternalFilter({"name": key}).getSkel()): 

1721 raise errors.NotFound("The provided user does not exist.") 

1722 

1723 match action: 

1724 case "takeover": 

1725 self.authenticateUser(skel["key"]) 

1726 

1727 case "kick": 

1728 session.killSessionByUser(skel["key"]) 

1729 

1730 case _: 

1731 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1732 

1733 return self.render.render(f"trigger/{action}Success", skel) 

1734 

1735 def onEdited(self, skel): 

1736 super().onEdited(skel) 

1737 

1738 # In case the user is set to inactive, kill all sessions 

1739 if self.is_active(skel) is False: 

1740 session.killSessionByUser(skel["key"]) 

1741 

1742 # Update user setting in all sessions 

1743 for session_obj in db.Query("user").filter("user =", skel["key"]).iter(): 

1744 session_obj["data"]["user"] = skel.dbEntity 

1745 

1746 def onDeleted(self, skel): 

1747 super().onDeleted(skel) 

1748 # Invalidate all sessions of that user 

1749 session.killSessionByUser(skel["key"]) 

1750 

1751 

1752@tasks.StartupTask 

1753def createNewUserIfNotExists(): 

1754 """ 

1755 Create a new Admin user, if the userDB is empty 

1756 """ 

1757 if ( 

1758 (user_module := getattr(conf.main_app.vi, "user", None)) 

1759 and isinstance(user_module, User) 

1760 and "addSkel" in dir(user_module) 

1761 and "validAuthenticationMethods" in dir(user_module) 

1762 # UserPassword must be one of the primary login methods 

1763 and any( 

1764 issubclass(provider[0], UserPassword) 

1765 for provider in user_module.validAuthenticationMethods 

1766 ) 

1767 ): 

1768 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1769 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1770 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1771 pw = utils.string.random(13) 

1772 addSkel["name"] = uname 

1773 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1774 addSkel["access"] = ["root"] 

1775 addSkel["password"] = pw 

1776 

1777 try: 

1778 addSkel.write() 

1779 except Exception as e: 

1780 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1781 logging.exception(e) 

1782 return 

1783 

1784 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1785 

1786 logging.warning(msg) 

1787 email.send_email_to_admins("New ViUR password", msg) 

1788 

1789 

1790# DEPRECATED ATTRIBUTES HANDLING 

1791 

1792def __getattr__(attr): 

1793 match attr: 

1794 case "userSkel": 

1795 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1796 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1797 logging.warning(msg) 

1798 return UserSkel 

1799 

1800 return super(__import__(__name__).__class__).__getattr__(attr)