Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%

722 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-03 12:27 +0000

1import abc 

2import datetime 

3import enum 

4import functools 

5import hashlib 

6import hmac 

7import json 

8import logging 

9import secrets 

10import warnings 

11import user_agents 

12 

13import pyotp 

14import base64 

15import dataclasses 

16import typing as t 

17from google.auth.transport import requests 

18from google.oauth2 import id_token 

19 

20from viur.core import ( 

21 conf, current, db, email, errors, i18n, 

22 securitykey, session, skeleton, tasks, utils, Module 

23) 

24from viur.core.decorators import * 

25from viur.core.bones import * 

26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

27from viur.core.prototypes.list import List 

28from viur.core.ratelimit import RateLimit 

29from viur.core.securityheaders import extendCsp 

30 

31 

32@functools.total_ordering 

33class Status(enum.Enum): 

34 """Status enum for a user 

35 

36 Has backwards compatibility to be comparable with non-enum values. 

37 Will be removed with viur-core 4.0.0 

38 """ 

39 

40 UNSET = 0 # Status is unset 

41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

43 DISABLED = 5 # Account disabled 

44 ACTIVE = 10 # Active 

45 

46 def __eq__(self, other): 

47 if isinstance(other, Status): 

48 return super().__eq__(other) 

49 return self.value == other 

50 

51 def __lt__(self, other): 

52 if isinstance(other, Status): 

53 return super().__lt__(other) 

54 return self.value < other 

55 

56 

57class UserSkel(skeleton.Skeleton): 

58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

59 

60 name = EmailBone( 

61 descr="E-Mail", 

62 required=True, 

63 readOnly=True, 

64 caseSensitive=False, 

65 searchable=True, 

66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

67 ) 

68 

69 firstname = StringBone( 

70 descr="Firstname", 

71 searchable=True, 

72 ) 

73 

74 lastname = StringBone( 

75 descr="Lastname", 

76 searchable=True, 

77 ) 

78 

79 roles = SelectBone( 

80 descr=i18n.translate("viur.core.modules.user.bone.roles", defaultText="Roles"), 

81 values=conf.user.roles, 

82 required=True, 

83 multiple=True, 

84 # fixme: This is generally broken in VIUR! See #776 for details. 

85 # vfunc=lambda values: 

86 # i18n.translate( 

87 # "user.bone.roles.invalid", 

88 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

89 # if "custom" in values and len(values) > 1 else None, 

90 defaultValue=list(conf.user.roles.keys())[:1], 

91 ) 

92 

93 access = SelectBone( 

94 descr=i18n.translate("viur.core.modules.user.bone.access", defaultText="Access rights"), 

95 type_suffix="access", 

96 values=lambda: { 

97 right: i18n.translate(f"viur.core.modules.user.accessright.{right}", defaultText=right) 

98 for right in sorted(conf.user.access_rights) 

99 }, 

100 multiple=True, 

101 params={ 

102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

103 } 

104 ) 

105 

106 status = SelectBone( 

107 descr=i18n.translate("viur.core.modules.user.bone.status", "Account status"), 

108 values=Status, 

109 translation_key_prefix="viur.core.user.status.", 

110 defaultValue=Status.ACTIVE, 

111 required=True, 

112 ) 

113 

114 lastlogin = DateBone( 

115 descr="Last Login", 

116 readOnly=True, 

117 ) 

118 

119 admin_config = JsonBone( # This bone stores settings from the vi 

120 descr="Config for the User", 

121 visible=False 

122 ) 

123 

124 def __new__(cls, *args, **kwargs): 

125 """ 

126 Constructor for the UserSkel-class, with the capability 

127 to dynamically add bones required for the configured 

128 authentication methods. 

129 """ 

130 for provider in conf.main_app.vi.user.authenticationProviders: 

131 assert issubclass(provider, UserPrimaryAuthentication) 

132 provider.patch_user_skel(cls) 

133 

134 for provider in conf.main_app.vi.user.secondFactorProviders: 

135 assert issubclass(provider, UserSecondFactorAuthentication) 

136 provider.patch_user_skel(cls) 

137 

138 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

139 return super().__new__(cls, *args, **kwargs) 

140 

141 @classmethod 

142 def write(cls, skel, *args, **kwargs): 

143 # Roles 

144 if skel["roles"] and "custom" not in skel["roles"]: 

145 # Collect access rights through rules 

146 access = set() 

147 

148 for role in skel["roles"]: 

149 # Get default access for this role 

150 access |= conf.main_app.vi.user.get_role_defaults(role) 

151 

152 # Go through all modules and evaluate available role-settings 

153 for name in dir(conf.main_app.vi): 

154 if name.startswith("_"): 

155 continue 

156 

157 module = getattr(conf.main_app.vi, name) 

158 if not isinstance(module, Module): 

159 continue 

160 

161 roles = getattr(module, "roles", None) or {} 

162 rights = roles.get(role, roles.get("*", ())) 

163 

164 # Convert role into tuple if it's not 

165 if not isinstance(rights, (tuple, list)): 

166 rights = (rights, ) 

167 

168 if "*" in rights: 

169 for right in module.accessRights: 

170 access.add(f"{name}-{right}") 

171 else: 

172 for right in rights: 

173 if right in module.accessRights: 

174 access.add(f"{name}-{right}") 

175 

176 # special case: "edit" and "delete" actions require "view" as well! 

177 if right in ("edit", "delete") and "view" in module.accessRights: 

178 access.add(f"{name}-view") 

179 

180 skel["access"] = list(access) 

181 

182 return super().write(skel, *args, **kwargs) 

183 

184 

185class UserAuthentication(Module, abc.ABC): 

186 @property 

187 @abc.abstractstaticmethod 

188 def METHOD_NAME() -> str: 

189 """ 

190 Define a unique method name for this authentication. 

191 """ 

192 ... 

193 

194 def __init__(self, moduleName, modulePath, userModule): 

195 super().__init__(moduleName, modulePath) 

196 self._user_module = userModule 

197 

198 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

199 return True 

200 

201 @classmethod 

202 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

203 """ 

204 Allows for an UserAuthentication to patch the UserSkel 

205 class with additional bones which are required for 

206 the implemented authentication method. 

207 """ 

208 ... 

209 

210 

211class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

212 """Abstract class for all primary authentication methods.""" 

213 registrationEnabled = False 

214 

215 @abc.abstractmethod 

216 def login(self, *args, **kwargs): 

217 ... 

218 

219 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

220 """ 

221 Hook that is called whenever a part of the authentication was successful. 

222 It allows to perform further steps in custom authentications, 

223 e.g. change a password after first login. 

224 """ 

225 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

226 

227 

228class UserPassword(UserPrimaryAuthentication): 

229 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

230 

231 registrationEmailVerificationRequired = True 

232 registrationAdminVerificationRequired = True 

233 

234 verifySuccessTemplate = "user_verify_success" 

235 verifyEmailAddressMail = "user_verify_address" 

236 verifyFailedTemplate = "user_verify_failed" 

237 passwordRecoveryTemplate = "user_passwordrecover" 

238 passwordRecoveryMail = "user_password_recovery" 

239 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

240 passwordRecoveryStep1Template = "user_passwordrecover_step1" 

241 passwordRecoveryStep2Template = "user_passwordrecover_step2" 

242 passwordRecoveryStep3Template = "user_passwordrecover_step3" 

243 

244 # The default rate-limit for password recovery (10 tries each 15 minutes) 

245 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

246 

247 # Limit (invalid) login-retries to once per 5 seconds 

248 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

249 

250 @classmethod 

251 def patch_user_skel(cls, skel_cls): 

252 """ 

253 Modifies the UserSkel to be equipped by a PasswordBone. 

254 """ 

255 skel_cls.password = PasswordBone( 

256 readOnly=True, 

257 visible=False, 

258 params={ 

259 "category": "Authentication", 

260 } 

261 ) 

262 

263 class LoginSkel(skeleton.RelSkel): 

264 name = EmailBone( 

265 descr="E-Mail", 

266 required=True, 

267 caseSensitive=False, 

268 ) 

269 password = PasswordBone( 

270 required=True, 

271 test_threshold=0, 

272 tests=(), 

273 raw=True, 

274 ) 

275 

276 class LostPasswordStep1Skel(skeleton.RelSkel): 

277 name = EmailBone( 

278 descr="E-Mail", 

279 required=True, 

280 ) 

281 

282 class LostPasswordStep2Skel(skeleton.RelSkel): 

283 recovery_key = StringBone( 

284 descr="Recovery Key", 

285 required=True, 

286 params={ 

287 "tooltip": i18n.translate( 

288 key="viur.core.modules.user.userpassword.lostpasswordstep2.recoverykey", 

289 defaultText="Please enter the validation key you've received via e-mail.", 

290 hint="Shown when the user needs more than 15 minutes to paste the key", 

291 ), 

292 } 

293 ) 

294 

295 class LostPasswordStep3Skel(skeleton.RelSkel): 

296 # send the recovery key again, in case the password is rejected by some reason. 

297 recovery_key = StringBone( 

298 descr="Recovery Key", 

299 visible=False, 

300 readOnly=True, 

301 ) 

302 

303 password = PasswordBone( 

304 descr="New Password", 

305 required=True, 

306 params={ 

307 "tooltip": i18n.translate( 

308 key="viur.core.modules.user.userpassword.lostpasswordstep3.password", 

309 defaultText="Please enter a new password for your account.", 

310 ), 

311 } 

312 ) 

313 

314 @exposed 

315 @force_ssl 

316 @skey(allow_empty=True) 

317 def login(self, **kwargs): 

318 # Obtain a fresh login skel 

319 skel = self.LoginSkel() 

320 

321 # Read required bones from client 

322 if not skel.fromClient(kwargs): 

323 return self._user_module.render.login(skel, action="login") 

324 

325 self.loginRateLimit.assertQuotaIsAvailable() 

326 

327 # query for the username. The query might find another user, but the name is being checked for equality below 

328 name = skel["name"].lower().strip() 

329 user_skel = self._user_module.baseSkel() 

330 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

331 

332 # extract password hash from raw database entity (skeleton access blocks it) 

333 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

334 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

335 password_hash = encode_password(skel["password"], password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

336 

337 # now check if the username matches 

338 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

339 

340 # next, check if the password hash matches 

341 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

342 

343 if not is_okay: 

344 # Set error to all required fields 

345 for name, bone in skel.items(): 

346 if bone.required: 

347 skel.errors.append( 

348 ReadFromClientError( 

349 ReadFromClientErrorSeverity.Invalid, 

350 i18n.translate( 

351 key="viur.core.modules.user.userpassword.login.failed", 

352 defaultText="Invalid username or password provided", 

353 ), 

354 name, 

355 ) 

356 ) 

357 

358 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

359 return self._user_module.render.login(skel, action="login") 

360 

361 # check if iterations are below current security standards, and update if necessary. 

362 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

363 logging.info(f"Update password hash for user {name}.") 

364 # re-hash the password with more iterations 

365 # FIXME: This must be done within a transaction! 

366 user_skel["password"] = password # will be hashed on serialize 

367 user_skel.write(update_relations=False) 

368 

369 return self.next_or_finish(user_skel) 

370 

371 @exposed 

372 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs): 

373 """ 

374 This implements a password recovery process which lets users set a new password for their account, 

375 after validating a recovery key sent by email. 

376 

377 The process is as following: 

378 

379 - The user enters his email adress 

380 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode 

381 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

382 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user 

383 account exists. 

384 - If the user received his email, he can click on the link and set a new password for his account. 

385 

386 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function 

387 to 10 actions per 15 minutes. (One complete recovery process consists of two calls). 

388 """ 

389 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

390 current_request = current.request.get() 

391 

392 if recovery_key is None: 

393 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

394 skel = self.LostPasswordStep1Skel() 

395 

396 if not current_request.isPostRequest or not skel.fromClient(kwargs): 

397 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template) 

398 

399 # validate security key 

400 if not securitykey.validate(skey): 

401 raise errors.PreconditionFailed() 

402 

403 self.passwordRecoveryRateLimit.decrementQuota() 

404 

405 recovery_key = securitykey.create( 

406 duration=datetime.timedelta(minutes=15), 

407 key_length=conf.security.password_recovery_key_length, 

408 user_name=skel["name"].lower(), 

409 session_bound=False, 

410 ) 

411 

412 # Send the code in background 

413 self.sendUserPasswordRecoveryCode( 

414 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

415 ) 

416 

417 # step 2 is only an action-skel, and can be ignored by a direct link in the 

418 # e-mail previously sent. It depends on the implementation of the specific project. 

419 return self._user_module.render.edit( 

420 self.LostPasswordStep2Skel(), 

421 tpl=self.passwordRecoveryStep2Template, 

422 ) 

423 

424 # in step 3 

425 skel = self.LostPasswordStep3Skel() 

426 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails. 

427 

428 # check for any input; Render input-form again when incomplete. 

429 if not skel.fromClient(kwargs) or not current_request.isPostRequest: 

430 return self._user_module.render.edit( 

431 skel=skel, 

432 tpl=self.passwordRecoveryStep3Template, 

433 ) 

434 

435 # validate security key 

436 if not securitykey.validate(skey): 

437 raise errors.PreconditionFailed() 

438 

439 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

440 raise errors.PreconditionFailed( 

441 i18n.translate( 

442 key="viur.core.modules.user.passwordrecovery.keyexpired", 

443 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

444 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

445 ) 

446 ) 

447 

448 self.passwordRecoveryRateLimit.decrementQuota() 

449 

450 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

451 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

452 

453 if not user_skel: 

454 raise errors.NotFound( 

455 i18n.translate( 

456 key="viur.core.modules.user.passwordrecovery.usernotfound", 

457 defaultText="There is no account with this name", 

458 hint="We cant find an account with that name (Should never happen)" 

459 ) 

460 ) 

461 

462 # If the account is locked or not yet validated, abort the process. 

463 if not self._user_module.is_active(user_skel): 

464 raise errors.NotFound( 

465 i18n.translate( 

466 key="viur.core.modules.user.passwordrecovery.accountlocked", 

467 defaultText="This account is currently locked. You cannot change its password.", 

468 hint="Attempted password recovery on a locked account" 

469 ) 

470 ) 

471 

472 # Update the password, save the user, reset his session and show the success-template 

473 user_skel["password"] = skel["password"] 

474 user_skel.write(update_relations=False) 

475 

476 return self._user_module.render.view( 

477 None, 

478 tpl=self.passwordRecoverySuccessTemplate, 

479 ) 

480 

481 @tasks.CallDeferred 

482 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

483 """ 

484 Sends the given recovery code to the user given in userName. This function runs deferred 

485 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

486 code by email (assuming we have working email delivery), but this can be overridden to send it 

487 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

488 can be send to any given user in four hours. 

489 """ 

490 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

491 user_agent = user_agents.parse(user_agent) 

492 email.send_email( 

493 tpl=self.passwordRecoveryMail, 

494 skel=user_skel, 

495 dests=[user_name], 

496 recovery_key=recovery_key, 

497 user_agent={ 

498 "device": user_agent.get_device(), 

499 "os": user_agent.get_os(), 

500 "browser": user_agent.get_browser() 

501 } 

502 ) 

503 

504 @exposed 

505 @skey(forward_payload="data", session_bound=False) 

506 def verify(self, data): 

507 def transact(key): 

508 skel = self._user_module.editSkel() 

509 if not key or not skel.read(key): 

510 return None 

511 

512 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

513 if self.registrationAdminVerificationRequired else Status.ACTIVE 

514 

515 skel.write(update_relations=False) 

516 return skel 

517 

518 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))): 

519 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

520 

521 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

522 

523 def canAdd(self) -> bool: 

524 return self.registrationEnabled 

525 

526 def addSkel(self): 

527 """ 

528 Prepare the add-Skel for rendering. 

529 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

530 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

531 :return: viur.core.skeleton.Skeleton 

532 """ 

533 skel = self._user_module.addSkel() 

534 

535 if self.registrationEmailVerificationRequired: 

536 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

537 elif self.registrationAdminVerificationRequired: 

538 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

539 else: # No further verification required 

540 defaultStatusValue = Status.ACTIVE 

541 

542 skel.status.readOnly = True 

543 skel["status"] = defaultStatusValue 

544 

545 if "password" in skel: 

546 skel.password.required = True # The user will have to set a password 

547 

548 return skel 

549 

550 @force_ssl 

551 @exposed 

552 @skey(allow_empty=True) 

553 def add(self, *args, **kwargs): 

554 """ 

555 Allows guests to register a new account if self.registrationEnabled is set to true 

556 

557 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

558 

559 :returns: The rendered, added object of the entry, eventually with error hints. 

560 

561 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

562 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

563 """ 

564 if not self.canAdd(): 

565 raise errors.Unauthorized() 

566 skel = self.addSkel() 

567 if ( 

568 not kwargs # no data supplied 

569 or not current.request.get().isPostRequest # bail out if not using POST-method 

570 or not skel.fromClient(kwargs) # failure on reading into the bones 

571 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

572 ): 

573 # render the skeleton in the version it could as far as it could be read. 

574 return self._user_module.render.add(skel) 

575 self._user_module.onAdd(skel) 

576 skel.write() 

577 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

578 # The user will have to verify his email-address. Create a skey and send it to his address 

579 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

580 user_key=utils.normalizeKey(skel["key"]), 

581 name=skel["name"]) 

582 skel.skey = BaseBone(descr="Skey") 

583 skel["skey"] = skey 

584 email.send_email(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel) 

585 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

586 return self._user_module.render.addSuccess(skel) 

587 

588 

589class GoogleAccount(UserPrimaryAuthentication): 

590 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

591 

592 @classmethod 

593 def patch_user_skel(cls, skel_cls): 

594 """ 

595 Modifies the UserSkel to be equipped by a bones required by Google Auth 

596 """ 

597 skel_cls.uid = StringBone( 

598 descr="Google UserID", 

599 required=False, 

600 readOnly=True, 

601 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

602 params={ 

603 "category": "Authentication", 

604 } 

605 ) 

606 

607 skel_cls.sync = BooleanBone( 

608 descr="Sync user data with OAuth-based services", 

609 defaultValue=True, 

610 params={ 

611 "category": "Authentication", 

612 "tooltip": 

613 "If set, user data like firstname and lastname is automatically kept" 

614 "synchronous with the information stored at the OAuth service provider" 

615 "(e.g. Google Login)." 

616 } 

617 ) 

618 

619 @exposed 

620 @force_ssl 

621 @skey(allow_empty=True) 

622 def login(self, token: str | None = None, *args, **kwargs): 

623 if not conf.user.google_client_id: 

624 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

625 

626 if not token: 

627 request = current.request.get() 

628 request.response.headers["Content-Type"] = "text/html" 

629 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

630 # We have to allow popups here 

631 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

632 

633 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

634 with open(file_path) as file: 

635 tpl_string = file.read() 

636 

637 # FIXME: Use Jinja2 for rendering? 

638 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

639 extendCsp({ 

640 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

641 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

642 }) 

643 return tpl_string 

644 

645 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

646 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

647 raise ValueError("Invalid issuer") 

648 

649 # Token looks valid :) 

650 uid = user_info["sub"] 

651 email = user_info["email"] 

652 

653 base_skel = self._user_module.baseSkel() 

654 update = False 

655 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

656 # We'll try again - checking if there's already an user with that email 

657 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

658 # Still no luck - it's a completely new user 

659 if not self.registrationEnabled: 

660 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

661 logging.debug(f"Google user is from allowed {domain} - adding account") 

662 else: 

663 logging.debug(f"Google user is from {domain} - denying registration") 

664 raise errors.Forbidden("Registration for new users is disabled") 

665 

666 user_skel = base_skel 

667 user_skel["uid"] = uid 

668 user_skel["name"] = email 

669 update = True 

670 

671 # Take user information from Google, if wanted! 

672 if user_skel["sync"]: 

673 for target, source in { 

674 "name": email, 

675 "firstname": user_info.get("given_name"), 

676 "lastname": user_info.get("family_name"), 

677 }.items(): 

678 

679 if user_skel[target] != source: 

680 user_skel[target] = source 

681 update = True 

682 

683 if update: 

684 assert user_skel.write() 

685 

686 return self.next_or_finish(user_skel) 

687 

688 

689class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

690 """Abstract class for all second factors.""" 

691 MAX_RETRY = 3 

692 second_factor_login_template = "user_login_secondfactor" 

693 """Template to enter the TOPT on login""" 

694 

695 @property 

696 @abc.abstractmethod 

697 def NAME(self) -> str: 

698 """Name for this factor for templates.""" 

699 ... 

700 

701 @property 

702 @abc.abstractmethod 

703 def ACTION_NAME(self) -> str: 

704 """The action name for this factor, used as path-segment.""" 

705 ... 

706 

707 def __init__(self, moduleName, modulePath, _user_module): 

708 super().__init__(moduleName, modulePath, _user_module) 

709 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

710 self.add_url = f"{self.modulePath}/add" 

711 self.start_url = f"{self.modulePath}/start" 

712 

713 

714class TimeBasedOTP(UserSecondFactorAuthentication): 

715 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

716 WINDOW_SIZE = 5 

717 ACTION_NAME = "otp" 

718 NAME = "Time based Otp" 

719 second_factor_login_template = "user_login_secondfactor" 

720 

721 @dataclasses.dataclass 

722 class OtpConfig: 

723 """ 

724 This dataclass is used to provide an interface for a OTP token 

725 algorithm description that is passed within the TimeBasedOTP 

726 class for configuration. 

727 """ 

728 secret: str 

729 timedrift: float = 0.0 

730 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

731 interval: int = 60 

732 

733 class OtpSkel(skeleton.RelSkel): 

734 """ 

735 This is the Skeleton used to ask for the OTP token. 

736 """ 

737 otptoken = NumericBone( 

738 descr="Token", 

739 required=True, 

740 max=999999, 

741 min=0, 

742 ) 

743 

744 @classmethod 

745 def patch_user_skel(cls, skel_cls): 

746 """ 

747 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

748 """ 

749 # One-Time Password Verification 

750 skel_cls.otp_serial = StringBone( 

751 descr="OTP serial", 

752 searchable=True, 

753 params={ 

754 "category": "Second Factor Authentication", 

755 } 

756 ) 

757 

758 skel_cls.otp_secret = CredentialBone( 

759 descr="OTP secret", 

760 params={ 

761 "category": "Second Factor Authentication", 

762 } 

763 ) 

764 

765 skel_cls.otp_timedrift = NumericBone( 

766 descr="OTP time drift", 

767 readOnly=True, 

768 defaultValue=0, 

769 params={ 

770 "category": "Second Factor Authentication", 

771 } 

772 ) 

773 

774 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

775 """ 

776 Returns an instance of self.OtpConfig with a provided token configuration, 

777 or None when there is no appropriate configuration of this second factor handler available. 

778 """ 

779 

780 if otp_secret := skel.dbEntity.get("otp_secret"): 

781 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

782 

783 return None 

784 

785 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

786 """ 

787 Specified whether the second factor authentication can be handled by the given user or not. 

788 """ 

789 return bool(self.get_config(skel)) 

790 

791 @exposed 

792 def start(self): 

793 """ 

794 Configures OTP login for the current session. 

795 

796 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

797 """ 

798 session = current.session.get() 

799 

800 if not (user_key := session.get("possible_user_key")): 

801 raise errors.PreconditionFailed( 

802 "Second factor can only be triggered after successful primary authentication." 

803 ) 

804 

805 user_skel = self._user_module.baseSkel() 

806 if not user_skel.read(user_key): 

807 raise errors.NotFound("The previously authenticated user is gone.") 

808 

809 if not (otp_user_conf := self.get_config(user_skel)): 

810 raise errors.PreconditionFailed("This second factor is not available for the user") 

811 

812 otp_user_conf = { 

813 "key": str(user_key), 

814 } | dataclasses.asdict(otp_user_conf) 

815 

816 session = current.session.get() 

817 session["_otp_user"] = otp_user_conf 

818 session.markChanged() 

819 

820 return self._user_module.render.edit( 

821 self.OtpSkel(), 

822 params={ 

823 "name": i18n.translate(f"viur.core.modules.user.{self.NAME}"), 

824 "action_name": self.ACTION_NAME, 

825 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

826 }, 

827 tpl=self.second_factor_login_template 

828 ) 

829 

830 @exposed 

831 @force_ssl 

832 @skey(allow_empty=True) 

833 def otp(self, *args, **kwargs): 

834 """ 

835 Performs the second factor validation and interaction with the client. 

836 """ 

837 session = current.session.get() 

838 if not (otp_user_conf := session.get("_otp_user")): 

839 raise errors.PreconditionFailed("No OTP process started in this session") 

840 

841 # Check if maximum second factor verification attempts 

842 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

843 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

844 

845 # Read the OTP token via the skeleton, to obtain a valid value 

846 skel = self.OtpSkel() 

847 if skel.fromClient(kwargs): 

848 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

849 res = self.verify( 

850 otp=skel["otptoken"], 

851 secret=otp_user_conf["secret"], 

852 algorithm=otp_user_conf.get("algorithm") or "sha1", 

853 interval=otp_user_conf.get("interval") or 60, 

854 timedrift=otp_user_conf.get("timedrift") or 0.0, 

855 valid_window=self.WINDOW_SIZE 

856 ) 

857 else: 

858 res = None 

859 

860 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

861 if res is None: 

862 otp_user_conf["attempts"] = attempts + 1 

863 session.markChanged() 

864 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

865 return self._user_module.render.edit( 

866 skel, 

867 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"), 

868 action_name=self.ACTION_NAME, 

869 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

870 tpl=self.second_factor_login_template 

871 ) 

872 

873 # Remove otp user config from session 

874 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

875 del session["_otp_user"] 

876 session.markChanged() 

877 

878 # Check if the OTP device has a time drift 

879 

880 timedriftchange = float(res) - otp_user_conf["timedrift"] 

881 if abs(timedriftchange) > 2: 

882 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

883 # update clock-drift value accordingly 

884 self.updateTimeDrift(user_key, timedriftchange) 

885 

886 # Continue with authentication 

887 return self._user_module.secondFactorSucceeded(self, user_key) 

888 

889 @staticmethod 

890 def verify( 

891 otp: str | int, 

892 secret: str, 

893 algorithm: str = "sha1", 

894 interval: int = 60, 

895 timedrift: float = 0.0, 

896 for_time: datetime.datetime | None = None, 

897 valid_window: int = 0, 

898 ) -> int | None: 

899 """ 

900 Verifies the OTP passed in against the current time OTP. 

901 

902 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

903 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

904 on the hardware token generator. This can be used to store the time drift of a given token generator. 

905 

906 :param otp: the OTP token to check against 

907 :param secret: The OTP secret 

908 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

909 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In 

910 pyotp, default is 30! 

911 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

912 :param for_time: Time to check OTP at (defaults to now) 

913 :param valid_window: extends the validity to this many counter ticks before and after the current one 

914 :returns: The index where verification succeeded, None otherwise 

915 """ 

916 # get the hashing digest 

917 digest = { 

918 "sha1": hashlib.sha1, 

919 "sha256": hashlib.sha256, 

920 }.get(algorithm) 

921 

922 if not digest: 

923 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

924 

925 if for_time is None: 

926 for_time = datetime.datetime.now() 

927 

928 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

929 timedrift = round(timedrift) 

930 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

931 otp = str(otp).zfill(6) # fill with zeros in front 

932 

933 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

934 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

935 

936 if valid_window: 

937 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

938 token = str(totp.at(for_time, offset)) 

939 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

940 if hmac.compare_digest(otp, token): 

941 return offset 

942 

943 return None 

944 

945 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

946 

947 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

948 """ 

949 Updates the clock-drift value. 

950 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

951 it out of bounds. Maximum change per call is 0.3 minutes. 

952 :param user_key: For which user should the update occour 

953 :param idx: How many steps before/behind was that token 

954 :return: 

955 """ 

956 

957 # FIXME: The callback in viur-core must be improved, to accept user_skel 

958 

959 def transaction(user_key, idx): 

960 user = db.Get(user_key) 

961 if not isinstance(user.get("otp_timedrift"), float): 

962 user["otp_timedrift"] = 0.0 

963 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3) 

964 db.Put(user) 

965 

966 db.RunInTransaction(transaction, user_key, idx) 

967 

968 

969class AuthenticatorOTP(UserSecondFactorAuthentication): 

970 """ 

971 This class handles the second factor for apps like authy and so on 

972 """ 

973 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

974 

975 second_factor_add_template = "user_secondfactor_add" 

976 """Template to configure (add) a new TOPT""" 

977 

978 ACTION_NAME = "authenticator_otp" 

979 """Action name provided for *otp_template* on login""" 

980 

981 NAME = "Authenticator App" 

982 

983 @exposed 

984 @force_ssl 

985 @skey(allow_empty=True) 

986 def add(self, otp=None): 

987 """ 

988 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store 

989 it in the session. 

990 

991 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

992 the otp_app_secret from the session in the user entry. 

993 """ 

994 current_session = current.session.get() 

995 

996 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

997 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

998 current_session["_maybe_otp_app_secret"] = otp_app_secret 

999 current_session.markChanged() 

1000 

1001 if otp is None: 

1002 return self._user_module.render.second_factor_add( 

1003 tpl=self.second_factor_add_template, 

1004 action_name=self.ACTION_NAME, 

1005 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"), 

1006 add_url=self.add_url, 

1007 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

1008 else: 

1009 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

1010 return self._user_module.render.second_factor_add( 

1011 tpl=self.second_factor_add_template, 

1012 action_name=self.ACTION_NAME, 

1013 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"), 

1014 add_url=self.add_url, 

1015 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

1016 

1017 # Now we can set the otp_app_secret to the current User and render der Success-template 

1018 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

1019 return self._user_module.render.second_factor_add_success( 

1020 action_name=self.ACTION_NAME, 

1021 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"), 

1022 ) 

1023 

1024 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1025 """ 

1026 We can only handle the second factor if we have stored an otp_app_secret before. 

1027 """ 

1028 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1029 

1030 @classmethod 

1031 def patch_user_skel(cls, skel_cls): 

1032 """ 

1033 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1034 """ 

1035 # Authenticator OTP Apps (like Authy) 

1036 skel_cls.otp_app_secret = CredentialBone( 

1037 descr="OTP Secret (App-Key)", 

1038 params={ 

1039 "category": "Second Factor Authentication", 

1040 } 

1041 ) 

1042 

1043 @classmethod 

1044 def set_otp_app_secret(cls, otp_app_secret=None): 

1045 """ 

1046 Write a new OTP Token in the current user entry. 

1047 """ 

1048 if otp_app_secret is None: 

1049 logging.error("No 'otp_app_secret' is provided") 

1050 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1051 if not (cuser := current.user.get()): 

1052 raise errors.Unauthorized() 

1053 

1054 def transaction(user_key): 

1055 if not (user := db.Get(user_key)): 

1056 raise errors.NotFound() 

1057 user["otp_app_secret"] = otp_app_secret 

1058 db.Put(user) 

1059 

1060 db.RunInTransaction(transaction, cuser["key"]) 

1061 

1062 @classmethod 

1063 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1064 """ 

1065 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1066 """ 

1067 if not (cuser := current.user.get()): 

1068 raise errors.Unauthorized() 

1069 if not (issuer := conf.user.otp_issuer): 

1070 logging.warning( 

1071 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1072 issuer = conf.instance.project_id 

1073 

1074 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1075 

1076 @classmethod 

1077 def generate_otp_app_secret(cls) -> str: 

1078 """ 

1079 Generate a new OTP Secret 

1080 :return an otp 

1081 """ 

1082 return pyotp.random_base32() 

1083 

1084 @classmethod 

1085 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1086 return pyotp.TOTP(secret).verify(otp) 

1087 

1088 @exposed 

1089 def start(self): 

1090 otp_user_conf = {"attempts": 0} 

1091 session = current.session.get() 

1092 session["_otp_user"] = otp_user_conf 

1093 session.markChanged() 

1094 return self._user_module.render.edit( 

1095 TimeBasedOTP.OtpSkel(), 

1096 params={ 

1097 "name": i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"), 

1098 "action_name": self.ACTION_NAME, 

1099 "action_url": self.action_url, 

1100 }, 

1101 tpl=self.second_factor_login_template, 

1102 ) 

1103 

1104 @exposed 

1105 @force_ssl 

1106 @skey 

1107 def authenticator_otp(self, **kwargs): 

1108 """ 

1109 We verify the otp here with the secret we stored before. 

1110 """ 

1111 session = current.session.get() 

1112 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1113 

1114 if not (otp_user_conf := session.get("_otp_user")): 

1115 raise errors.PreconditionFailed("No OTP process started in this session") 

1116 

1117 # Check if maximum second factor verification attempts 

1118 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1119 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1120 

1121 if not (user := db.Get(user_key)): 

1122 raise errors.NotFound() 

1123 

1124 skel = TimeBasedOTP.OtpSkel() 

1125 if not skel.fromClient(kwargs): 

1126 raise errors.PreconditionFailed() 

1127 otp_token = str(skel["otptoken"]).zfill(6) 

1128 

1129 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1130 return self._user_module.secondFactorSucceeded(self, user_key) 

1131 otp_user_conf["attempts"] = attempts + 1 

1132 session.markChanged() 

1133 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1134 return self._user_module.render.edit( 

1135 skel, 

1136 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"), 

1137 action_name=self.ACTION_NAME, 

1138 action_url=self.action_url, 

1139 tpl=self.second_factor_login_template, 

1140 ) 

1141 

1142 

1143class User(List): 

1144 """ 

1145 The User module is used to manage and authenticate users in a ViUR system. 

1146 

1147 It is used in almost any ViUR project, but ViUR can also function without any user capabilites. 

1148 """ 

1149 

1150 kindName = "user" 

1151 addTemplate = "user_add" 

1152 addSuccessTemplate = "user_add_success" 

1153 lostPasswordTemplate = "user_lostpassword" 

1154 verifyEmailAddressMail = "user_verify_address" 

1155 passwordRecoveryMail = "user_password_recovery" 

1156 

1157 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1158 None, ( 

1159 UserPassword, 

1160 conf.user.google_client_id and GoogleAccount, 

1161 ) 

1162 )) 

1163 """ 

1164 Specifies primary authentication providers that are made available 

1165 as sub-modules under `user/auth_<classname>`. They might require 

1166 customization or configuration. 

1167 """ 

1168 

1169 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1170 TimeBasedOTP, 

1171 AuthenticatorOTP, 

1172 ) 

1173 """ 

1174 Specifies secondary authentication providers that are made available 

1175 as sub-modules under `user/f2_<classname>`. They might require 

1176 customization or configuration, which is determined during the 

1177 login-process depending on the user that wants to login. 

1178 """ 

1179 

1180 validAuthenticationMethods = tuple(filter( 

1181 None, ( 

1182 (UserPassword, AuthenticatorOTP), 

1183 (UserPassword, TimeBasedOTP), 

1184 (UserPassword, None), 

1185 (GoogleAccount, None) if conf.user.google_client_id else None, 

1186 ) 

1187 )) 

1188 """ 

1189 Specifies the possible combinations of primary- and secondary factor 

1190 login methos. 

1191 

1192 GoogleLogin defaults to no second factor, as the Google Account can be 

1193 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1194 handled when there is a user-dependent configuration available. 

1195 """ 

1196 

1197 msg_missing_second_factor = "Second factor required but not configured for this user." 

1198 

1199 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1200 

1201 default_order = "name.idx" 

1202 

1203 adminInfo = { 

1204 "icon": "person-fill", 

1205 "actions": [ 

1206 "trigger_kick", 

1207 "trigger_takeover", 

1208 ], 

1209 "customActions": { 

1210 "trigger_kick": { 

1211 "name": i18n.translate( 

1212 key="viur.core.modules.user.customActions.kick", 

1213 defaultText="Kick user", 

1214 hint="Title of the kick user function" 

1215 ), 

1216 "icon": "trash2-fill", 

1217 "access": ["root"], 

1218 "action": "fetch", 

1219 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}", 

1220 "confirm": i18n.translate( 

1221 key="viur.core.modules.user.customActions.kick.confirm", 

1222 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1223 ), 

1224 "success": i18n.translate( 

1225 key="viur.core.modules.user.customActions.kick.success", 

1226 defaultText="Sessions of the user are being invalidated.", 

1227 ), 

1228 }, 

1229 "trigger_takeover": { 

1230 "name": i18n.translate( 

1231 key="viur.core.modules.user.customActions.takeover", 

1232 defaultText="Take-over user", 

1233 hint="Title of the take user over function" 

1234 ), 

1235 "icon": "file-person-fill", 

1236 "access": ["root"], 

1237 "action": "fetch", 

1238 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}", 

1239 "confirm": i18n.translate( 

1240 key="viur.core.modules.user.customActions.takeover.confirm", 

1241 defaultText="Do you really want to replace your current user session by a " 

1242 "user session of the selected user?", 

1243 ), 

1244 "success": i18n.translate( 

1245 key="viur.core.modules.user.customActions.takeover.success", 

1246 defaultText="You're now know as the selected user!", 

1247 ), 

1248 "then": "reload-vi", 

1249 }, 

1250 }, 

1251 } 

1252 

1253 roles = { 

1254 "admin": "*", 

1255 } 

1256 

1257 def __init__(self, moduleName, modulePath): 

1258 for provider in self.authenticationProviders: 

1259 assert issubclass(provider, UserPrimaryAuthentication) 

1260 name = f"auth_{provider.__name__.lower()}" 

1261 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1262 

1263 for provider in self.secondFactorProviders: 

1264 assert issubclass(provider, UserSecondFactorAuthentication) 

1265 name = f"f2_{provider.__name__.lower()}" 

1266 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1267 

1268 super().__init__(moduleName, modulePath) 

1269 

1270 def get_role_defaults(self, role: str) -> set[str]: 

1271 """ 

1272 Returns a set of default access rights for a given role. 

1273 

1274 Defaults to "admin" usage for any role > "user" 

1275 and "scriptor" usage for "admin" role. 

1276 """ 

1277 ret = set() 

1278 

1279 if role in ("viewer", "editor", "admin"): 

1280 ret.add("admin") 

1281 

1282 if role == "admin": 

1283 ret.add("scriptor") 

1284 

1285 return ret 

1286 

1287 def addSkel(self): 

1288 skel = super().addSkel().clone() 

1289 user = current.user.get() 

1290 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])): 

1291 skel.status.readOnly = True 

1292 skel["status"] = Status.UNSET 

1293 skel.status.visible = False 

1294 skel.access.readOnly = True 

1295 skel["access"] = [] 

1296 skel.access.visible = False 

1297 else: 

1298 # An admin tries to add a new user. 

1299 skel.status.readOnly = False 

1300 skel.status.visible = True 

1301 skel.access.readOnly = False 

1302 skel.access.visible = True 

1303 

1304 if "password" in skel: 

1305 # Unlock and require a password 

1306 skel.password.required = True 

1307 skel.password.visible = True 

1308 skel.password.readOnly = False 

1309 

1310 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1311 return skel 

1312 

1313 def editSkel(self, *args, **kwargs): 

1314 skel = super().editSkel().clone() 

1315 

1316 if "password" in skel: 

1317 skel.password.required = False 

1318 skel.password.visible = True 

1319 skel.password.readOnly = False 

1320 

1321 user = current.user.get() 

1322 

1323 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only 

1324 skel.name.readOnly = lockFields 

1325 skel.access.readOnly = lockFields 

1326 skel.status.readOnly = lockFields 

1327 

1328 return skel 

1329 

1330 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1331 return getattr(self, f"f2_{cls.__name__.lower()}") 

1332 

1333 def getCurrentUser(self): 

1334 session = current.session.get() 

1335 

1336 req = current.request.get() 

1337 if session and (session.loaded or req.is_deferred) and (user := session.get("user")): 

1338 skel = self.baseSkel() 

1339 skel.setEntity(user) 

1340 return skel 

1341 

1342 return None 

1343 

1344 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1345 """ 

1346 Continue authentication flow when primary authentication succeeded. 

1347 """ 

1348 skel = self.baseSkel() 

1349 

1350 if not skel.read(user_key): 

1351 raise errors.NotFound("User was not found.") 

1352 

1353 if not provider.can_handle(skel): 

1354 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1355 

1356 session = current.session.get() 

1357 session["possible_user_key"] = user_key.id_or_name 

1358 session["_secondFactorStart"] = utils.utcNow() 

1359 session.markChanged() 

1360 

1361 second_factor_providers = [] 

1362 

1363 for auth_provider, second_factor in self.validAuthenticationMethods: 

1364 if isinstance(provider, auth_provider): 

1365 if second_factor is not None: 

1366 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1367 if second_factor_provider_instance.can_handle(skel): 

1368 second_factor_providers.append(second_factor_provider_instance) 

1369 else: 

1370 second_factor_providers.append(None) 

1371 

1372 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1373 # We have a second factor. So we can get rid of the None 

1374 second_factor_providers.pop(second_factor_providers.index(None)) 

1375 

1376 if len(second_factor_providers) == 0: 

1377 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1378 elif len(second_factor_providers) == 1: 

1379 if second_factor_providers[0] is None: 

1380 # We allow sign-in without a second factor 

1381 return self.authenticateUser(user_key) 

1382 # We have only one second factor we don't need the choice template 

1383 return second_factor_providers[0].start(user_key) 

1384 

1385 # In case there is more than one second factor, let the user select a method. 

1386 return self.render.second_factor_choice(second_factors=second_factor_providers) 

1387 

1388 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1389 """ 

1390 Continue authentication flow when secondary authentication succeeded. 

1391 """ 

1392 session = current.session.get() 

1393 if session["possible_user_key"] != user_key.id_or_name: 

1394 raise errors.Forbidden() 

1395 

1396 # Assert that the second factor verification finished in time 

1397 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1398 raise errors.RequestTimeout() 

1399 

1400 return self.authenticateUser(user_key) 

1401 

1402 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1403 """ 

1404 Hookable check if a user is defined as "active" and can login. 

1405 

1406 :param skel: The UserSkel of the user who wants to login. 

1407 """ 

1408 if "status" in skel: 

1409 status = skel["status"] 

1410 if not isinstance(status, (Status, int)): 

1411 try: 

1412 status = int(status) 

1413 except ValueError: 

1414 status = Status.UNSET 

1415 

1416 return status >= Status.ACTIVE.value 

1417 

1418 return None 

1419 

1420 def authenticateUser(self, key: db.Key, **kwargs): 

1421 """ 

1422 Performs Log-In for the current session and the given user key. 

1423 

1424 This resets the current session: All fields not explicitly marked as persistent 

1425 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1426 

1427 :param key: The (DB-)Key of the user we shall authenticate 

1428 """ 

1429 skel = self.baseSkel() 

1430 if not skel.read(key): 

1431 raise ValueError(f"Unable to authenticate unknown user {key}") 

1432 

1433 # Verify that this user account is active 

1434 if not self.is_active(skel): 

1435 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1436 

1437 # Update session for user 

1438 session = current.session.get() 

1439 # Remember persistent fields... 

1440 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1441 session.reset() 

1442 # and copy them over to the new session 

1443 session |= take_over 

1444 

1445 # Update session, user and request 

1446 session["user"] = skel.dbEntity 

1447 

1448 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1449 current.user.set(self.getCurrentUser()) 

1450 

1451 self.onLogin(skel) 

1452 

1453 return self.render.loginSucceeded(**kwargs) 

1454 

1455 @exposed 

1456 @skey 

1457 def logout(self, *args, **kwargs): 

1458 """ 

1459 Implements the logout action. It also terminates the current session (all keys not listed 

1460 in viur.session_persistent_fields_on_logout will be lost). 

1461 """ 

1462 if not (user := current.user.get()): 

1463 raise errors.Unauthorized() 

1464 

1465 self.onLogout(user) 

1466 

1467 session = current.session.get() 

1468 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}: 

1469 session.reset() 

1470 session |= take_over 

1471 else: 

1472 session.clear() 

1473 current.user.set(None) # set user to none in context var 

1474 return self.render.logoutSuccess() 

1475 

1476 @exposed 

1477 def login(self, *args, **kwargs): 

1478 return self.render.loginChoices([ 

1479 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1480 for primary, secondary in self.validAuthenticationMethods 

1481 ]) 

1482 

1483 def onLogin(self, skel: skeleton.SkeletonInstance): 

1484 """ 

1485 Hook to be called on user login. 

1486 """ 

1487 # Update the lastlogin timestamp (if available!) 

1488 if "lastlogin" in skel: 

1489 now = utils.utcNow() 

1490 

1491 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1492 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1493 skel["lastlogin"] = now 

1494 skel.write(update_relations=False) 

1495 

1496 logging.info(f"""User {skel["name"]} logged in""") 

1497 

1498 def onLogout(self, skel: skeleton.SkeletonInstance): 

1499 """ 

1500 Hook to be called on user logout. 

1501 """ 

1502 logging.info(f"""User {skel["name"]} logged out""") 

1503 

1504 @exposed 

1505 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1506 """ 

1507 Allow a special key "self" to reference the current user. 

1508 

1509 By default, any authenticated user can view its own user entry, 

1510 to obtain access rights and any specific user information. 

1511 This behavior is defined in the customized `canView` function, 

1512 which is overwritten by the User-module. 

1513 

1514 The rendered skeleton can be modified or restriced by specifying 

1515 a customized view-skeleton. 

1516 """ 

1517 if key == "self": 

1518 if user := current.user.get(): 

1519 key = user["key"] 

1520 else: 

1521 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1522 

1523 return super().view(key, *args, **kwargs) 

1524 

1525 def canView(self, skel) -> bool: 

1526 if user := current.user.get(): 

1527 if skel["key"] == user["key"]: 

1528 return True 

1529 

1530 if "root" in user["access"] or "user-view" in user["access"]: 

1531 return True 

1532 

1533 return False 

1534 

1535 @exposed 

1536 @skey(allow_empty=True) 

1537 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1538 """ 

1539 Allow a special key "self" to reference the current user. 

1540 

1541 This modification will only allow to use "self" as a key; 

1542 The specific access right to let the user edit itself must 

1543 still be customized. 

1544 

1545 The rendered and editable skeleton can be modified or restriced 

1546 by specifying a customized edit-skeleton. 

1547 """ 

1548 if key == "self": 

1549 if user := current.user.get(): 

1550 key = user["key"] 

1551 else: 

1552 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1553 

1554 return super().edit(key, *args, **kwargs) 

1555 

1556 @exposed 

1557 def getAuthMethods(self, *args, **kwargs): 

1558 """Inform tools like Viur-Admin which authentication to use""" 

1559 # FIXME: This is almost the same code as in index()... 

1560 # FIXME: VIUR4: The entire function should be removed! 

1561 # TODO: Align result with index(), so that primary and secondary login is presented. 

1562 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!") 

1563 

1564 res = [ 

1565 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1566 for primary, secondary in self.validAuthenticationMethods 

1567 ] 

1568 

1569 return json.dumps(res) 

1570 

1571 @exposed 

1572 @skey 

1573 def trigger(self, action: str, key: str): 

1574 current.request.get().response.headers["Content-Type"] = "application/json" 

1575 

1576 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1577 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", ) 

1578 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)): 

1579 raise errors.Unauthorized() 

1580 

1581 skel = self.baseSkel() 

1582 if not skel.read(key): 

1583 raise errors.NotFound() 

1584 

1585 match action: 

1586 case "takeover": 

1587 self.authenticateUser(skel["key"]) 

1588 

1589 case "kick": 

1590 session.killSessionByUser(skel["key"]) 

1591 

1592 case _: 

1593 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1594 

1595 return json.dumps("OKAY") 

1596 

1597 def onEdited(self, skel): 

1598 super().onEdited(skel) 

1599 

1600 # In case the user is set to inactive, kill all sessions 

1601 if self.is_active(skel) is False: 

1602 session.killSessionByUser(skel["key"]) 

1603 

1604 # Update user setting in all sessions 

1605 for session_obj in db.Query("user").filter("user =", skel["key"]).iter(): 

1606 session_obj["data"]["user"] = skel.dbEntity 

1607 

1608 

1609 def onDeleted(self, skel): 

1610 super().onDeleted(skel) 

1611 # Invalidate all sessions of that user 

1612 session.killSessionByUser(skel["key"]) 

1613 

1614 

1615@tasks.StartupTask 

1616def createNewUserIfNotExists(): 

1617 """ 

1618 Create a new Admin user, if the userDB is empty 

1619 """ 

1620 if ( 

1621 (user_module := getattr(conf.main_app.vi, "user", None)) 

1622 and isinstance(user_module, User) 

1623 and "addSkel" in dir(user_module) 

1624 and "validAuthenticationMethods" in dir(user_module) 

1625 # UserPassword must be one of the primary login methods 

1626 and any( 

1627 issubclass(provider[0], UserPassword) 

1628 for provider in user_module.validAuthenticationMethods 

1629 ) 

1630 ): 

1631 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1632 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1633 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1634 pw = utils.string.random(13) 

1635 addSkel["name"] = uname 

1636 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1637 addSkel["access"] = ["root"] 

1638 addSkel["password"] = pw 

1639 

1640 try: 

1641 addSkel.write() 

1642 except Exception as e: 

1643 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1644 logging.exception(e) 

1645 return 

1646 

1647 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1648 

1649 logging.warning(msg) 

1650 email.send_email_to_admins("New ViUR password", msg) 

1651 

1652 

1653# DEPRECATED ATTRIBUTES HANDLING 

1654 

1655def __getattr__(attr): 

1656 match attr: 

1657 case "userSkel": 

1658 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1659 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1660 logging.warning(msg) 

1661 return UserSkel 

1662 

1663 return super(__import__(__name__).__class__).__getattr__(attr)