Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%

716 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1import abc 

2import datetime 

3import enum 

4import functools 

5import hashlib 

6import hmac 

7import json 

8import logging 

9import secrets 

10import warnings 

11import user_agents 

12 

13import pyotp 

14import base64 

15import dataclasses 

16import typing as t 

17from google.auth.transport import requests 

18from google.oauth2 import id_token 

19 

20from viur.core import ( 

21 conf, current, db, email, errors, i18n, 

22 securitykey, session, skeleton, tasks, utils, Module 

23) 

24from viur.core.decorators import * 

25from viur.core.bones import * 

26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

27from viur.core.prototypes.list import List 

28from viur.core.ratelimit import RateLimit 

29from viur.core.securityheaders import extendCsp 

30 

31 

32@functools.total_ordering 

33class Status(enum.Enum): 

34 """Status enum for a user 

35 

36 Has backwards compatibility to be comparable with non-enum values. 

37 Will be removed with viur-core 4.0.0 

38 """ 

39 

40 UNSET = 0 # Status is unset 

41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

43 DISABLED = 5 # Account disabled 

44 ACTIVE = 10 # Active 

45 

46 def __eq__(self, other): 

47 if isinstance(other, Status): 

48 return super().__eq__(other) 

49 return self.value == other 

50 

51 def __lt__(self, other): 

52 if isinstance(other, Status): 

53 return super().__lt__(other) 

54 return self.value < other 

55 

56 

57class UserSkel(skeleton.Skeleton): 

58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

59 

60 name = EmailBone( 

61 descr="E-Mail", 

62 required=True, 

63 readOnly=True, 

64 caseSensitive=False, 

65 searchable=True, 

66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

67 ) 

68 

69 firstname = StringBone( 

70 descr="Firstname", 

71 searchable=True, 

72 ) 

73 

74 lastname = StringBone( 

75 descr="Lastname", 

76 searchable=True, 

77 ) 

78 

79 roles = SelectBone( 

80 descr=i18n.translate("viur.user.bone.roles", defaultText="Roles"), 

81 values=conf.user.roles, 

82 required=True, 

83 multiple=True, 

84 # fixme: This is generally broken in VIUR! See #776 for details. 

85 # vfunc=lambda values: 

86 # i18n.translate( 

87 # "user.bone.roles.invalid", 

88 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

89 # if "custom" in values and len(values) > 1 else None, 

90 defaultValue=list(conf.user.roles.keys())[:1], 

91 ) 

92 

93 access = SelectBone( 

94 descr=i18n.translate("viur.user.bone.access", defaultText="Access rights"), 

95 type_suffix="access", 

96 values=lambda: { 

97 right: i18n.translate(f"viur.modules.user.accessright.{right}", defaultText=right) 

98 for right in sorted(conf.user.access_rights) 

99 }, 

100 multiple=True, 

101 params={ 

102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

103 } 

104 ) 

105 

106 status = SelectBone( 

107 descr="Account status", 

108 values=Status, 

109 defaultValue=Status.ACTIVE, 

110 required=True, 

111 ) 

112 

113 lastlogin = DateBone( 

114 descr="Last Login", 

115 readOnly=True, 

116 ) 

117 

118 admin_config = JsonBone( # This bone stores settings from the vi 

119 descr="Config for the User", 

120 visible=False 

121 ) 

122 

123 def __new__(cls, *args, **kwargs): 

124 """ 

125 Constructor for the UserSkel-class, with the capability 

126 to dynamically add bones required for the configured 

127 authentication methods. 

128 """ 

129 for provider in conf.main_app.vi.user.authenticationProviders: 

130 assert issubclass(provider, UserPrimaryAuthentication) 

131 provider.patch_user_skel(cls) 

132 

133 for provider in conf.main_app.vi.user.secondFactorProviders: 

134 assert issubclass(provider, UserSecondFactorAuthentication) 

135 provider.patch_user_skel(cls) 

136 

137 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

138 return super().__new__(cls, *args, **kwargs) 

139 

140 @classmethod 

141 def write(cls, skel, *args, **kwargs): 

142 # Roles 

143 if skel["roles"] and "custom" not in skel["roles"]: 

144 # Collect access rights through rules 

145 access = set() 

146 

147 for role in skel["roles"]: 

148 # Get default access for this role 

149 access |= conf.main_app.vi.user.get_role_defaults(role) 

150 

151 # Go through all modules and evaluate available role-settings 

152 for name in dir(conf.main_app.vi): 

153 if name.startswith("_"): 

154 continue 

155 

156 module = getattr(conf.main_app.vi, name) 

157 if not isinstance(module, Module): 

158 continue 

159 

160 roles = getattr(module, "roles", None) or {} 

161 rights = roles.get(role, roles.get("*", ())) 

162 

163 # Convert role into tuple if it's not 

164 if not isinstance(rights, (tuple, list)): 

165 rights = (rights, ) 

166 

167 if "*" in rights: 

168 for right in module.accessRights: 

169 access.add(f"{name}-{right}") 

170 else: 

171 for right in rights: 

172 if right in module.accessRights: 

173 access.add(f"{name}-{right}") 

174 

175 # special case: "edit" and "delete" actions require "view" as well! 

176 if right in ("edit", "delete") and "view" in module.accessRights: 

177 access.add(f"{name}-view") 

178 

179 skel["access"] = list(access) 

180 

181 return super().write(skel, *args, **kwargs) 

182 

183 

184class UserAuthentication(Module, abc.ABC): 

185 @property 

186 @abc.abstractstaticmethod 

187 def METHOD_NAME() -> str: 

188 """ 

189 Define a unique method name for this authentication. 

190 """ 

191 ... 

192 

193 def __init__(self, moduleName, modulePath, userModule): 

194 super().__init__(moduleName, modulePath) 

195 self._user_module = userModule 

196 

197 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

198 return True 

199 

200 @classmethod 

201 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

202 """ 

203 Allows for an UserAuthentication to patch the UserSkel 

204 class with additional bones which are required for 

205 the implemented authentication method. 

206 """ 

207 ... 

208 

209 

210class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

211 """Abstract class for all primary authentication methods.""" 

212 registrationEnabled = False 

213 

214 @abc.abstractmethod 

215 def login(self, *args, **kwargs): 

216 ... 

217 

218 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

219 """ 

220 Hook that is called whenever a part of the authentication was successful. 

221 It allows to perform further steps in custom authentications, 

222 e.g. change a password after first login. 

223 """ 

224 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

225 

226 

227class UserPassword(UserPrimaryAuthentication): 

228 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

229 

230 registrationEmailVerificationRequired = True 

231 registrationAdminVerificationRequired = True 

232 

233 verifySuccessTemplate = "user_verify_success" 

234 verifyEmailAddressMail = "user_verify_address" 

235 verifyFailedTemplate = "user_verify_failed" 

236 passwordRecoveryTemplate = "user_passwordrecover" 

237 passwordRecoveryMail = "user_password_recovery" 

238 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

239 passwordRecoveryStep1Template = "user_passwordrecover_step1" 

240 passwordRecoveryStep2Template = "user_passwordrecover_step2" 

241 passwordRecoveryStep3Template = "user_passwordrecover_step3" 

242 

243 # The default rate-limit for password recovery (10 tries each 15 minutes) 

244 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

245 

246 # Limit (invalid) login-retries to once per 5 seconds 

247 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

248 

249 @classmethod 

250 def patch_user_skel(cls, skel_cls): 

251 """ 

252 Modifies the UserSkel to be equipped by a PasswordBone. 

253 """ 

254 skel_cls.password = PasswordBone( 

255 readOnly=True, 

256 visible=False, 

257 params={ 

258 "category": "Authentication", 

259 } 

260 ) 

261 

262 class LoginSkel(skeleton.RelSkel): 

263 name = EmailBone( 

264 descr="E-Mail", 

265 required=True, 

266 caseSensitive=False, 

267 ) 

268 password = PasswordBone( 

269 required=True, 

270 test_threshold=0, 

271 ) 

272 

273 class LostPasswordStep1Skel(skeleton.RelSkel): 

274 name = EmailBone( 

275 descr="E-Mail", 

276 required=True, 

277 ) 

278 

279 class LostPasswordStep2Skel(skeleton.RelSkel): 

280 recovery_key = StringBone( 

281 descr="Recovery Key", 

282 required=True, 

283 params={ 

284 "tooltip": i18n.translate( 

285 key="viur.modules.user.userpassword.lostpasswordstep2.recoverykey", 

286 defaultText="Please enter the validation key you've received via e-mail.", 

287 hint="Shown when the user needs more than 15 minutes to paste the key", 

288 ), 

289 } 

290 ) 

291 

292 class LostPasswordStep3Skel(skeleton.RelSkel): 

293 # send the recovery key again, in case the password is rejected by some reason. 

294 recovery_key = StringBone( 

295 descr="Recovery Key", 

296 visible=False, 

297 readOnly=True, 

298 ) 

299 

300 password = PasswordBone( 

301 descr="New Password", 

302 required=True, 

303 params={ 

304 "tooltip": i18n.translate( 

305 key="viur.modules.user.userpassword.lostpasswordstep3.password", 

306 defaultText="Please enter a new password for your account.", 

307 ), 

308 } 

309 ) 

310 

311 @exposed 

312 @force_ssl 

313 @skey(allow_empty=True) 

314 def login(self, *, name: str | None = None, password: str | None = None, **kwargs): 

315 if not name or not password: 

316 return self._user_module.render.login(self.LoginSkel(), action="login") 

317 

318 self.loginRateLimit.assertQuotaIsAvailable() 

319 

320 # query for the username. The query might find another user, but the name is being checked for equality below 

321 name = name.lower().strip() 

322 user_skel = self._user_module.baseSkel() 

323 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

324 

325 # extract password hash from raw database entity (skeleton access blocks it) 

326 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

327 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

328 password_hash = encode_password(password, password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

329 

330 # now check if the username matches 

331 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

332 

333 # next, check if the password hash matches 

334 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

335 

336 if not is_okay: 

337 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

338 return self._user_module.render.login(self.LoginSkel(), action="login") 

339 

340 # check if iterations are below current security standards, and update if necessary. 

341 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

342 logging.info(f"Update password hash for user {name}.") 

343 # re-hash the password with more iterations 

344 # FIXME: This must be done within a transaction! 

345 user_skel["password"] = password # will be hashed on serialize 

346 user_skel.write(update_relations=False) 

347 

348 return self.next_or_finish(user_skel) 

349 

350 @exposed 

351 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs): 

352 """ 

353 This implements a password recovery process which lets users set a new password for their account, 

354 after validating a recovery key sent by email. 

355 

356 The process is as following: 

357 

358 - The user enters his email adress 

359 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode 

360 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

361 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user 

362 account exists. 

363 - If the user received his email, he can click on the link and set a new password for his account. 

364 

365 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function 

366 to 10 actions per 15 minutes. (One complete recovery process consists of two calls). 

367 """ 

368 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

369 current_request = current.request.get() 

370 

371 if recovery_key is None: 

372 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

373 skel = self.LostPasswordStep1Skel() 

374 

375 if not current_request.isPostRequest or not skel.fromClient(kwargs): 

376 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template) 

377 

378 # validate security key 

379 if not securitykey.validate(skey): 

380 raise errors.PreconditionFailed() 

381 

382 self.passwordRecoveryRateLimit.decrementQuota() 

383 

384 recovery_key = securitykey.create( 

385 duration=datetime.timedelta(minutes=15), 

386 key_length=conf.security.password_recovery_key_length, 

387 user_name=skel["name"].lower(), 

388 session_bound=False, 

389 ) 

390 

391 # Send the code in background 

392 self.sendUserPasswordRecoveryCode( 

393 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

394 ) 

395 

396 # step 2 is only an action-skel, and can be ignored by a direct link in the 

397 # e-mail previously sent. It depends on the implementation of the specific project. 

398 return self._user_module.render.edit( 

399 self.LostPasswordStep2Skel(), 

400 tpl=self.passwordRecoveryStep2Template, 

401 ) 

402 

403 # in step 3 

404 skel = self.LostPasswordStep3Skel() 

405 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails. 

406 

407 # check for any input; Render input-form again when incomplete. 

408 if not skel.fromClient(kwargs) or not current_request.isPostRequest: 

409 return self._user_module.render.edit( 

410 skel=skel, 

411 tpl=self.passwordRecoveryStep3Template, 

412 ) 

413 

414 # validate security key 

415 if not securitykey.validate(skey): 

416 raise errors.PreconditionFailed() 

417 

418 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

419 raise errors.PreconditionFailed( 

420 i18n.translate( 

421 key="viur.modules.user.passwordrecovery.keyexpired", 

422 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

423 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

424 ) 

425 ) 

426 

427 self.passwordRecoveryRateLimit.decrementQuota() 

428 

429 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

430 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

431 

432 if not user_skel: 

433 raise errors.NotFound( 

434 i18n.translate( 

435 key="viur.modules.user.passwordrecovery.usernotfound", 

436 defaultText="There is no account with this name", 

437 hint="We cant find an account with that name (Should never happen)" 

438 ) 

439 ) 

440 

441 # If the account is locked or not yet validated, abort the process. 

442 if not self._user_module.is_active(user_skel): 

443 raise errors.NotFound( 

444 i18n.translate( 

445 key="viur.modules.user.passwordrecovery.accountlocked", 

446 defaultText="This account is currently locked. You cannot change its password.", 

447 hint="Attempted password recovery on a locked account" 

448 ) 

449 ) 

450 

451 # Update the password, save the user, reset his session and show the success-template 

452 user_skel["password"] = skel["password"] 

453 user_skel.write(update_relations=False) 

454 

455 return self._user_module.render.view( 

456 None, 

457 tpl=self.passwordRecoverySuccessTemplate, 

458 ) 

459 

460 @tasks.CallDeferred 

461 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

462 """ 

463 Sends the given recovery code to the user given in userName. This function runs deferred 

464 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

465 code by email (assuming we have working email delivery), but this can be overridden to send it 

466 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

467 can be send to any given user in four hours. 

468 """ 

469 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

470 user_agent = user_agents.parse(user_agent) 

471 email.send_email( 

472 tpl=self.passwordRecoveryMail, 

473 skel=user_skel, 

474 dests=[user_name], 

475 recovery_key=recovery_key, 

476 user_agent={ 

477 "device": user_agent.get_device(), 

478 "os": user_agent.get_os(), 

479 "browser": user_agent.get_browser() 

480 } 

481 ) 

482 

483 @exposed 

484 @skey(forward_payload="data", session_bound=False) 

485 def verify(self, data): 

486 def transact(key): 

487 skel = self._user_module.editSkel() 

488 if not key or not skel.read(key): 

489 return None 

490 

491 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

492 if self.registrationAdminVerificationRequired else Status.ACTIVE 

493 

494 skel.write(update_relations=False) 

495 return skel 

496 

497 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))): 

498 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

499 

500 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

501 

502 def canAdd(self) -> bool: 

503 return self.registrationEnabled 

504 

505 def addSkel(self): 

506 """ 

507 Prepare the add-Skel for rendering. 

508 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

509 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

510 :return: viur.core.skeleton.Skeleton 

511 """ 

512 skel = self._user_module.addSkel() 

513 

514 if self.registrationEmailVerificationRequired: 

515 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

516 elif self.registrationAdminVerificationRequired: 

517 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

518 else: # No further verification required 

519 defaultStatusValue = Status.ACTIVE 

520 

521 skel.status.readOnly = True 

522 skel["status"] = defaultStatusValue 

523 

524 if "password" in skel: 

525 skel.password.required = True # The user will have to set a password 

526 

527 return skel 

528 

529 @force_ssl 

530 @exposed 

531 @skey(allow_empty=True) 

532 def add(self, *args, **kwargs): 

533 """ 

534 Allows guests to register a new account if self.registrationEnabled is set to true 

535 

536 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

537 

538 :returns: The rendered, added object of the entry, eventually with error hints. 

539 

540 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

541 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

542 """ 

543 if not self.canAdd(): 

544 raise errors.Unauthorized() 

545 skel = self.addSkel() 

546 if ( 

547 not kwargs # no data supplied 

548 or not current.request.get().isPostRequest # bail out if not using POST-method 

549 or not skel.fromClient(kwargs) # failure on reading into the bones 

550 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

551 ): 

552 # render the skeleton in the version it could as far as it could be read. 

553 return self._user_module.render.add(skel) 

554 self._user_module.onAdd(skel) 

555 skel.write() 

556 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

557 # The user will have to verify his email-address. Create a skey and send it to his address 

558 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

559 user_key=utils.normalizeKey(skel["key"]), 

560 name=skel["name"]) 

561 skel.skey = BaseBone(descr="Skey") 

562 skel["skey"] = skey 

563 email.send_email(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel) 

564 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

565 return self._user_module.render.addSuccess(skel) 

566 

567 

568class GoogleAccount(UserPrimaryAuthentication): 

569 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

570 

571 @classmethod 

572 def patch_user_skel(cls, skel_cls): 

573 """ 

574 Modifies the UserSkel to be equipped by a bones required by Google Auth 

575 """ 

576 skel_cls.uid = StringBone( 

577 descr="Google UserID", 

578 required=False, 

579 readOnly=True, 

580 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

581 params={ 

582 "category": "Authentication", 

583 } 

584 ) 

585 

586 skel_cls.sync = BooleanBone( 

587 descr="Sync user data with OAuth-based services", 

588 defaultValue=True, 

589 params={ 

590 "category": "Authentication", 

591 "tooltip": 

592 "If set, user data like firstname and lastname is automatically kept" 

593 "synchronous with the information stored at the OAuth service provider" 

594 "(e.g. Google Login)." 

595 } 

596 ) 

597 

598 @exposed 

599 @force_ssl 

600 @skey(allow_empty=True) 

601 def login(self, token: str | None = None, *args, **kwargs): 

602 if not conf.user.google_client_id: 

603 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

604 

605 if not token: 

606 request = current.request.get() 

607 request.response.headers["Content-Type"] = "text/html" 

608 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

609 # We have to allow popups here 

610 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

611 

612 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

613 with open(file_path) as file: 

614 tpl_string = file.read() 

615 

616 # FIXME: Use Jinja2 for rendering? 

617 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

618 extendCsp({ 

619 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

620 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

621 }) 

622 return tpl_string 

623 

624 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

625 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

626 raise ValueError("Invalid issuer") 

627 

628 # Token looks valid :) 

629 uid = user_info["sub"] 

630 email = user_info["email"] 

631 

632 base_skel = self._user_module.baseSkel() 

633 update = False 

634 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

635 # We'll try again - checking if there's already an user with that email 

636 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

637 # Still no luck - it's a completely new user 

638 if not self.registrationEnabled: 

639 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

640 logging.debug(f"Google user is from allowed {domain} - adding account") 

641 else: 

642 logging.debug(f"Google user is from {domain} - denying registration") 

643 raise errors.Forbidden("Registration for new users is disabled") 

644 

645 user_skel = base_skel 

646 user_skel["uid"] = uid 

647 user_skel["name"] = email 

648 update = True 

649 

650 # Take user information from Google, if wanted! 

651 if user_skel["sync"]: 

652 for target, source in { 

653 "name": email, 

654 "firstname": user_info.get("given_name"), 

655 "lastname": user_info.get("family_name"), 

656 }.items(): 

657 

658 if user_skel[target] != source: 

659 user_skel[target] = source 

660 update = True 

661 

662 if update: 

663 assert user_skel.write() 

664 

665 return self.next_or_finish(user_skel) 

666 

667 

668class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

669 """Abstract class for all second factors.""" 

670 MAX_RETRY = 3 

671 second_factor_login_template = "user_login_secondfactor" 

672 """Template to enter the TOPT on login""" 

673 

674 @property 

675 @abc.abstractmethod 

676 def NAME(self) -> str: 

677 """Name for this factor for templates.""" 

678 ... 

679 

680 @property 

681 @abc.abstractmethod 

682 def ACTION_NAME(self) -> str: 

683 """The action name for this factor, used as path-segment.""" 

684 ... 

685 

686 def __init__(self, moduleName, modulePath, _user_module): 

687 super().__init__(moduleName, modulePath, _user_module) 

688 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

689 self.add_url = f"{self.modulePath}/add" 

690 self.start_url = f"{self.modulePath}/start" 

691 

692 

693class TimeBasedOTP(UserSecondFactorAuthentication): 

694 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

695 WINDOW_SIZE = 5 

696 ACTION_NAME = "otp" 

697 NAME = "Time based Otp" 

698 second_factor_login_template = "user_login_secondfactor" 

699 

700 @dataclasses.dataclass 

701 class OtpConfig: 

702 """ 

703 This dataclass is used to provide an interface for a OTP token 

704 algorithm description that is passed within the TimeBasedOTP 

705 class for configuration. 

706 """ 

707 secret: str 

708 timedrift: float = 0.0 

709 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

710 interval: int = 60 

711 

712 class OtpSkel(skeleton.RelSkel): 

713 """ 

714 This is the Skeleton used to ask for the OTP token. 

715 """ 

716 otptoken = NumericBone( 

717 descr="Token", 

718 required=True, 

719 max=999999, 

720 min=0, 

721 ) 

722 

723 @classmethod 

724 def patch_user_skel(cls, skel_cls): 

725 """ 

726 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

727 """ 

728 # One-Time Password Verification 

729 skel_cls.otp_serial = StringBone( 

730 descr="OTP serial", 

731 searchable=True, 

732 params={ 

733 "category": "Second Factor Authentication", 

734 } 

735 ) 

736 

737 skel_cls.otp_secret = CredentialBone( 

738 descr="OTP secret", 

739 params={ 

740 "category": "Second Factor Authentication", 

741 } 

742 ) 

743 

744 skel_cls.otp_timedrift = NumericBone( 

745 descr="OTP time drift", 

746 readOnly=True, 

747 defaultValue=0, 

748 params={ 

749 "category": "Second Factor Authentication", 

750 } 

751 ) 

752 

753 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

754 """ 

755 Returns an instance of self.OtpConfig with a provided token configuration, 

756 or None when there is no appropriate configuration of this second factor handler available. 

757 """ 

758 

759 if otp_secret := skel.dbEntity.get("otp_secret"): 

760 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

761 

762 return None 

763 

764 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

765 """ 

766 Specified whether the second factor authentication can be handled by the given user or not. 

767 """ 

768 return bool(self.get_config(skel)) 

769 

770 @exposed 

771 def start(self): 

772 """ 

773 Configures OTP login for the current session. 

774 

775 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

776 """ 

777 session = current.session.get() 

778 

779 if not (user_key := session.get("possible_user_key")): 

780 raise errors.PreconditionFailed( 

781 "Second factor can only be triggered after successful primary authentication." 

782 ) 

783 

784 user_skel = self._user_module.baseSkel() 

785 if not user_skel.read(user_key): 

786 raise errors.NotFound("The previously authenticated user is gone.") 

787 

788 if not (otp_user_conf := self.get_config(user_skel)): 

789 raise errors.PreconditionFailed("This second factor is not available for the user") 

790 

791 otp_user_conf = { 

792 "key": str(user_key), 

793 } | dataclasses.asdict(otp_user_conf) 

794 

795 session = current.session.get() 

796 session["_otp_user"] = otp_user_conf 

797 session.markChanged() 

798 

799 return self._user_module.render.edit( 

800 self.OtpSkel(), 

801 params={ 

802 "name": i18n.translate(self.NAME), 

803 "action_name": self.ACTION_NAME, 

804 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

805 }, 

806 tpl=self.second_factor_login_template 

807 ) 

808 

809 @exposed 

810 @force_ssl 

811 @skey(allow_empty=True) 

812 def otp(self, *args, **kwargs): 

813 """ 

814 Performs the second factor validation and interaction with the client. 

815 """ 

816 session = current.session.get() 

817 if not (otp_user_conf := session.get("_otp_user")): 

818 raise errors.PreconditionFailed("No OTP process started in this session") 

819 

820 # Check if maximum second factor verification attempts 

821 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

822 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

823 

824 # Read the OTP token via the skeleton, to obtain a valid value 

825 skel = self.OtpSkel() 

826 if skel.fromClient(kwargs): 

827 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

828 res = self.verify( 

829 otp=skel["otptoken"], 

830 secret=otp_user_conf["secret"], 

831 algorithm=otp_user_conf.get("algorithm") or "sha1", 

832 interval=otp_user_conf.get("interval") or 60, 

833 timedrift=otp_user_conf.get("timedrift") or 0.0, 

834 valid_window=self.WINDOW_SIZE 

835 ) 

836 else: 

837 res = None 

838 

839 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

840 if res is None: 

841 otp_user_conf["attempts"] = attempts + 1 

842 session.markChanged() 

843 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

844 return self._user_module.render.edit( 

845 skel, 

846 name=i18n.translate(self.NAME), 

847 action_name=self.ACTION_NAME, 

848 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

849 tpl=self.second_factor_login_template 

850 ) 

851 

852 # Remove otp user config from session 

853 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

854 del session["_otp_user"] 

855 session.markChanged() 

856 

857 # Check if the OTP device has a time drift 

858 

859 timedriftchange = float(res) - otp_user_conf["timedrift"] 

860 if abs(timedriftchange) > 2: 

861 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

862 # update clock-drift value accordingly 

863 self.updateTimeDrift(user_key, timedriftchange) 

864 

865 # Continue with authentication 

866 return self._user_module.secondFactorSucceeded(self, user_key) 

867 

868 @staticmethod 

869 def verify( 

870 otp: str | int, 

871 secret: str, 

872 algorithm: str = "sha1", 

873 interval: int = 60, 

874 timedrift: float = 0.0, 

875 for_time: datetime.datetime | None = None, 

876 valid_window: int = 0, 

877 ) -> int | None: 

878 """ 

879 Verifies the OTP passed in against the current time OTP. 

880 

881 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

882 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

883 on the hardware token generator. This can be used to store the time drift of a given token generator. 

884 

885 :param otp: the OTP token to check against 

886 :param secret: The OTP secret 

887 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

888 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In 

889 pyotp, default is 30! 

890 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

891 :param for_time: Time to check OTP at (defaults to now) 

892 :param valid_window: extends the validity to this many counter ticks before and after the current one 

893 :returns: The index where verification succeeded, None otherwise 

894 """ 

895 # get the hashing digest 

896 digest = { 

897 "sha1": hashlib.sha1, 

898 "sha256": hashlib.sha256, 

899 }.get(algorithm) 

900 

901 if not digest: 

902 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

903 

904 if for_time is None: 

905 for_time = datetime.datetime.now() 

906 

907 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

908 timedrift = round(timedrift) 

909 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

910 otp = str(otp).zfill(6) # fill with zeros in front 

911 

912 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

913 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

914 

915 if valid_window: 

916 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

917 token = str(totp.at(for_time, offset)) 

918 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

919 if hmac.compare_digest(otp, token): 

920 return offset 

921 

922 return None 

923 

924 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

925 

926 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

927 """ 

928 Updates the clock-drift value. 

929 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

930 it out of bounds. Maximum change per call is 0.3 minutes. 

931 :param user_key: For which user should the update occour 

932 :param idx: How many steps before/behind was that token 

933 :return: 

934 """ 

935 

936 # FIXME: The callback in viur-core must be improved, to accept user_skel 

937 

938 def transaction(user_key, idx): 

939 user = db.Get(user_key) 

940 if not isinstance(user.get("otp_timedrift"), float): 

941 user["otp_timedrift"] = 0.0 

942 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3) 

943 db.Put(user) 

944 

945 db.RunInTransaction(transaction, user_key, idx) 

946 

947 

948class AuthenticatorOTP(UserSecondFactorAuthentication): 

949 """ 

950 This class handles the second factor for apps like authy and so on 

951 """ 

952 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

953 

954 second_factor_add_template = "user_secondfactor_add" 

955 """Template to configure (add) a new TOPT""" 

956 

957 ACTION_NAME = "authenticator_otp" 

958 """Action name provided for *otp_template* on login""" 

959 

960 NAME = "Authenticator App" 

961 

962 @exposed 

963 @force_ssl 

964 @skey(allow_empty=True) 

965 def add(self, otp=None): 

966 """ 

967 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store 

968 it in the session. 

969 

970 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

971 the otp_app_secret from the session in the user entry. 

972 """ 

973 current_session = current.session.get() 

974 

975 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

976 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

977 current_session["_maybe_otp_app_secret"] = otp_app_secret 

978 current_session.markChanged() 

979 

980 if otp is None: 

981 return self._user_module.render.second_factor_add( 

982 tpl=self.second_factor_add_template, 

983 action_name=self.ACTION_NAME, 

984 name=i18n.translate(self.NAME), 

985 add_url=self.add_url, 

986 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

987 else: 

988 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

989 return self._user_module.render.second_factor_add( 

990 tpl=self.second_factor_add_template, 

991 action_name=self.ACTION_NAME, 

992 name=i18n.translate(self.NAME), 

993 add_url=self.add_url, 

994 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

995 

996 # Now we can set the otp_app_secret to the current User and render der Success-template 

997 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

998 return self._user_module.render.second_factor_add_success( 

999 action_name=self.ACTION_NAME, 

1000 name=i18n.translate(self.NAME), 

1001 ) 

1002 

1003 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1004 """ 

1005 We can only handle the second factor if we have stored an otp_app_secret before. 

1006 """ 

1007 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1008 

1009 @classmethod 

1010 def patch_user_skel(cls, skel_cls): 

1011 """ 

1012 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1013 """ 

1014 # Authenticator OTP Apps (like Authy) 

1015 skel_cls.otp_app_secret = CredentialBone( 

1016 descr="OTP Secret (App-Key)", 

1017 params={ 

1018 "category": "Second Factor Authentication", 

1019 } 

1020 ) 

1021 

1022 @classmethod 

1023 def set_otp_app_secret(cls, otp_app_secret=None): 

1024 """ 

1025 Write a new OTP Token in the current user entry. 

1026 """ 

1027 if otp_app_secret is None: 

1028 logging.error("No 'otp_app_secret' is provided") 

1029 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1030 if not (cuser := current.user.get()): 

1031 raise errors.Unauthorized() 

1032 

1033 def transaction(user_key): 

1034 if not (user := db.Get(user_key)): 

1035 raise errors.NotFound() 

1036 user["otp_app_secret"] = otp_app_secret 

1037 db.Put(user) 

1038 

1039 db.RunInTransaction(transaction, cuser["key"]) 

1040 

1041 @classmethod 

1042 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1043 """ 

1044 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1045 """ 

1046 if not (cuser := current.user.get()): 

1047 raise errors.Unauthorized() 

1048 if not (issuer := conf.user.otp_issuer): 

1049 logging.warning( 

1050 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1051 issuer = conf.instance.project_id 

1052 

1053 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1054 

1055 @classmethod 

1056 def generate_otp_app_secret(cls) -> str: 

1057 """ 

1058 Generate a new OTP Secret 

1059 :return an otp 

1060 """ 

1061 return pyotp.random_base32() 

1062 

1063 @classmethod 

1064 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1065 return pyotp.TOTP(secret).verify(otp) 

1066 

1067 @exposed 

1068 def start(self): 

1069 otp_user_conf = {"attempts": 0} 

1070 session = current.session.get() 

1071 session["_otp_user"] = otp_user_conf 

1072 session.markChanged() 

1073 return self._user_module.render.edit( 

1074 TimeBasedOTP.OtpSkel(), 

1075 params={ 

1076 "name": i18n.translate(self.NAME), 

1077 "action_name": self.ACTION_NAME, 

1078 "action_url": self.action_url, 

1079 }, 

1080 tpl=self.second_factor_login_template, 

1081 ) 

1082 

1083 @exposed 

1084 @force_ssl 

1085 @skey 

1086 def authenticator_otp(self, **kwargs): 

1087 """ 

1088 We verify the otp here with the secret we stored before. 

1089 """ 

1090 session = current.session.get() 

1091 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1092 

1093 if not (otp_user_conf := session.get("_otp_user")): 

1094 raise errors.PreconditionFailed("No OTP process started in this session") 

1095 

1096 # Check if maximum second factor verification attempts 

1097 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1098 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1099 

1100 if not (user := db.Get(user_key)): 

1101 raise errors.NotFound() 

1102 

1103 skel = TimeBasedOTP.OtpSkel() 

1104 if not skel.fromClient(kwargs): 

1105 raise errors.PreconditionFailed() 

1106 otp_token = str(skel["otptoken"]).zfill(6) 

1107 

1108 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1109 return self._user_module.secondFactorSucceeded(self, user_key) 

1110 otp_user_conf["attempts"] = attempts + 1 

1111 session.markChanged() 

1112 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1113 return self._user_module.render.edit( 

1114 skel, 

1115 name=i18n.translate(self.NAME), 

1116 action_name=self.ACTION_NAME, 

1117 action_url=self.action_url, 

1118 tpl=self.second_factor_login_template, 

1119 ) 

1120 

1121 

1122class User(List): 

1123 """ 

1124 The User module is used to manage and authenticate users in a ViUR system. 

1125 

1126 It is used in almost any ViUR project, but ViUR can also function without any user capabilites. 

1127 """ 

1128 

1129 kindName = "user" 

1130 addTemplate = "user_add" 

1131 addSuccessTemplate = "user_add_success" 

1132 lostPasswordTemplate = "user_lostpassword" 

1133 verifyEmailAddressMail = "user_verify_address" 

1134 passwordRecoveryMail = "user_password_recovery" 

1135 

1136 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1137 None, ( 

1138 UserPassword, 

1139 conf.user.google_client_id and GoogleAccount, 

1140 ) 

1141 )) 

1142 """ 

1143 Specifies primary authentication providers that are made available 

1144 as sub-modules under `user/auth_<classname>`. They might require 

1145 customization or configuration. 

1146 """ 

1147 

1148 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1149 TimeBasedOTP, 

1150 AuthenticatorOTP, 

1151 ) 

1152 """ 

1153 Specifies secondary authentication providers that are made available 

1154 as sub-modules under `user/f2_<classname>`. They might require 

1155 customization or configuration, which is determined during the 

1156 login-process depending on the user that wants to login. 

1157 """ 

1158 

1159 validAuthenticationMethods = tuple(filter( 

1160 None, ( 

1161 (UserPassword, AuthenticatorOTP), 

1162 (UserPassword, TimeBasedOTP), 

1163 (UserPassword, None), 

1164 (GoogleAccount, None) if conf.user.google_client_id else None, 

1165 ) 

1166 )) 

1167 """ 

1168 Specifies the possible combinations of primary- and secondary factor 

1169 login methos. 

1170 

1171 GoogleLogin defaults to no second factor, as the Google Account can be 

1172 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1173 handled when there is a user-dependent configuration available. 

1174 """ 

1175 

1176 msg_missing_second_factor = "Second factor required but not configured for this user." 

1177 

1178 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1179 

1180 default_order = "name.idx" 

1181 

1182 adminInfo = { 

1183 "icon": "person-fill", 

1184 "actions": [ 

1185 "trigger_kick", 

1186 "trigger_takeover", 

1187 ], 

1188 "customActions": { 

1189 "trigger_kick": { 

1190 "name": i18n.translate( 

1191 key="viur.modules.user.customActions.kick", 

1192 defaultText="Kick user", 

1193 hint="Title of the kick user function" 

1194 ), 

1195 "icon": "trash2-fill", 

1196 "access": ["root"], 

1197 "action": "fetch", 

1198 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}", 

1199 "confirm": i18n.translate( 

1200 key="viur.modules.user.customActions.kick.confirm", 

1201 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1202 ), 

1203 "success": i18n.translate( 

1204 key="viur.modules.user.customActions.kick.success", 

1205 defaultText="Sessions of the user are being invalidated.", 

1206 ), 

1207 }, 

1208 "trigger_takeover": { 

1209 "name": i18n.translate( 

1210 key="viur.modules.user.customActions.takeover", 

1211 defaultText="Take-over user", 

1212 hint="Title of the take user over function" 

1213 ), 

1214 "icon": "file-person-fill", 

1215 "access": ["root"], 

1216 "action": "fetch", 

1217 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}", 

1218 "confirm": i18n.translate( 

1219 key="viur.modules.user.customActions.takeover.confirm", 

1220 defaultText="Do you really want to replace your current user session by a " 

1221 "user session of the selected user?", 

1222 ), 

1223 "success": i18n.translate( 

1224 key="viur.modules.user.customActions.takeover.success", 

1225 defaultText="You're now know as the selected user!", 

1226 ), 

1227 "then": "reload-vi", 

1228 }, 

1229 }, 

1230 } 

1231 

1232 roles = { 

1233 "admin": "*", 

1234 } 

1235 

1236 def __init__(self, moduleName, modulePath): 

1237 for provider in self.authenticationProviders: 

1238 assert issubclass(provider, UserPrimaryAuthentication) 

1239 name = f"auth_{provider.__name__.lower()}" 

1240 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1241 

1242 for provider in self.secondFactorProviders: 

1243 assert issubclass(provider, UserSecondFactorAuthentication) 

1244 name = f"f2_{provider.__name__.lower()}" 

1245 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1246 

1247 super().__init__(moduleName, modulePath) 

1248 

1249 def get_role_defaults(self, role: str) -> set[str]: 

1250 """ 

1251 Returns a set of default access rights for a given role. 

1252 

1253 Defaults to "admin" usage for any role > "user" 

1254 and "scriptor" usage for "admin" role. 

1255 """ 

1256 ret = set() 

1257 

1258 if role in ("viewer", "editor", "admin"): 

1259 ret.add("admin") 

1260 

1261 if role == "admin": 

1262 ret.add("scriptor") 

1263 

1264 return ret 

1265 

1266 def addSkel(self): 

1267 skel = super().addSkel().clone() 

1268 user = current.user.get() 

1269 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])): 

1270 skel.status.readOnly = True 

1271 skel["status"] = Status.UNSET 

1272 skel.status.visible = False 

1273 skel.access.readOnly = True 

1274 skel["access"] = [] 

1275 skel.access.visible = False 

1276 else: 

1277 # An admin tries to add a new user. 

1278 skel.status.readOnly = False 

1279 skel.status.visible = True 

1280 skel.access.readOnly = False 

1281 skel.access.visible = True 

1282 

1283 if "password" in skel: 

1284 # Unlock and require a password 

1285 skel.password.required = True 

1286 skel.password.visible = True 

1287 skel.password.readOnly = False 

1288 

1289 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1290 return skel 

1291 

1292 def editSkel(self, *args, **kwargs): 

1293 skel = super().editSkel().clone() 

1294 

1295 if "password" in skel: 

1296 skel.password.required = False 

1297 skel.password.visible = True 

1298 skel.password.readOnly = False 

1299 

1300 user = current.user.get() 

1301 

1302 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only 

1303 skel.name.readOnly = lockFields 

1304 skel.access.readOnly = lockFields 

1305 skel.status.readOnly = lockFields 

1306 

1307 return skel 

1308 

1309 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1310 return getattr(self, f"f2_{cls.__name__.lower()}") 

1311 

1312 def getCurrentUser(self): 

1313 session = current.session.get() 

1314 

1315 req = current.request.get() 

1316 if session and (session.loaded or req.is_deferred) and (user := session.get("user")): 

1317 skel = self.baseSkel() 

1318 skel.setEntity(user) 

1319 return skel 

1320 

1321 return None 

1322 

1323 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1324 """ 

1325 Continue authentication flow when primary authentication succeeded. 

1326 """ 

1327 skel = self.baseSkel() 

1328 

1329 if not skel.read(user_key): 

1330 raise errors.NotFound("User was not found.") 

1331 

1332 if not provider.can_handle(skel): 

1333 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1334 

1335 session = current.session.get() 

1336 session["possible_user_key"] = user_key.id_or_name 

1337 session["_secondFactorStart"] = utils.utcNow() 

1338 session.markChanged() 

1339 

1340 second_factor_providers = [] 

1341 

1342 for auth_provider, second_factor in self.validAuthenticationMethods: 

1343 if isinstance(provider, auth_provider): 

1344 if second_factor is not None: 

1345 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1346 if second_factor_provider_instance.can_handle(skel): 

1347 second_factor_providers.append(second_factor_provider_instance) 

1348 else: 

1349 second_factor_providers.append(None) 

1350 

1351 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1352 # We have a second factor. So we can get rid of the None 

1353 second_factor_providers.pop(second_factor_providers.index(None)) 

1354 

1355 if len(second_factor_providers) == 0: 

1356 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1357 elif len(second_factor_providers) == 1: 

1358 if second_factor_providers[0] is None: 

1359 # We allow sign-in without a second factor 

1360 return self.authenticateUser(user_key) 

1361 # We have only one second factor we don't need the choice template 

1362 return second_factor_providers[0].start(user_key) 

1363 

1364 # In case there is more than one second factor, let the user select a method. 

1365 return self.render.second_factor_choice(second_factors=second_factor_providers) 

1366 

1367 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1368 """ 

1369 Continue authentication flow when secondary authentication succeeded. 

1370 """ 

1371 session = current.session.get() 

1372 if session["possible_user_key"] != user_key.id_or_name: 

1373 raise errors.Forbidden() 

1374 

1375 # Assert that the second factor verification finished in time 

1376 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1377 raise errors.RequestTimeout() 

1378 

1379 return self.authenticateUser(user_key) 

1380 

1381 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1382 """ 

1383 Hookable check if a user is defined as "active" and can login. 

1384 

1385 :param skel: The UserSkel of the user who wants to login. 

1386 """ 

1387 if "status" in skel: 

1388 status = skel["status"] 

1389 if not isinstance(status, (Status, int)): 

1390 try: 

1391 status = int(status) 

1392 except ValueError: 

1393 status = Status.UNSET 

1394 

1395 return status >= Status.ACTIVE.value 

1396 

1397 return None 

1398 

1399 def authenticateUser(self, key: db.Key, **kwargs): 

1400 """ 

1401 Performs Log-In for the current session and the given user key. 

1402 

1403 This resets the current session: All fields not explicitly marked as persistent 

1404 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1405 

1406 :param key: The (DB-)Key of the user we shall authenticate 

1407 """ 

1408 skel = self.baseSkel() 

1409 if not skel.read(key): 

1410 raise ValueError(f"Unable to authenticate unknown user {key}") 

1411 

1412 # Verify that this user account is active 

1413 if not self.is_active(skel): 

1414 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1415 

1416 # Update session for user 

1417 session = current.session.get() 

1418 # Remember persistent fields... 

1419 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1420 session.reset() 

1421 # and copy them over to the new session 

1422 session |= take_over 

1423 

1424 # Update session, user and request 

1425 session["user"] = skel.dbEntity 

1426 

1427 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1428 current.user.set(self.getCurrentUser()) 

1429 

1430 self.onLogin(skel) 

1431 

1432 return self.render.loginSucceeded(**kwargs) 

1433 

1434 @exposed 

1435 @skey 

1436 def logout(self, *args, **kwargs): 

1437 """ 

1438 Implements the logout action. It also terminates the current session (all keys not listed 

1439 in viur.session_persistent_fields_on_logout will be lost). 

1440 """ 

1441 if not (user := current.user.get()): 

1442 raise errors.Unauthorized() 

1443 

1444 self.onLogout(user) 

1445 

1446 session = current.session.get() 

1447 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}: 

1448 session.reset() 

1449 session |= take_over 

1450 else: 

1451 session.clear() 

1452 current.user.set(None) # set user to none in context var 

1453 return self.render.logoutSuccess() 

1454 

1455 @exposed 

1456 def login(self, *args, **kwargs): 

1457 return self.render.loginChoices([ 

1458 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1459 for primary, secondary in self.validAuthenticationMethods 

1460 ]) 

1461 

1462 def onLogin(self, skel: skeleton.SkeletonInstance): 

1463 """ 

1464 Hook to be called on user login. 

1465 """ 

1466 # Update the lastlogin timestamp (if available!) 

1467 if "lastlogin" in skel: 

1468 now = utils.utcNow() 

1469 

1470 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1471 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1472 skel["lastlogin"] = now 

1473 skel.write(update_relations=False) 

1474 

1475 logging.info(f"""User {skel["name"]} logged in""") 

1476 

1477 def onLogout(self, skel: skeleton.SkeletonInstance): 

1478 """ 

1479 Hook to be called on user logout. 

1480 """ 

1481 logging.info(f"""User {skel["name"]} logged out""") 

1482 

1483 @exposed 

1484 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1485 """ 

1486 Allow a special key "self" to reference the current user. 

1487 

1488 By default, any authenticated user can view its own user entry, 

1489 to obtain access rights and any specific user information. 

1490 This behavior is defined in the customized `canView` function, 

1491 which is overwritten by the User-module. 

1492 

1493 The rendered skeleton can be modified or restriced by specifying 

1494 a customized view-skeleton. 

1495 """ 

1496 if key == "self": 

1497 if user := current.user.get(): 

1498 key = user["key"] 

1499 else: 

1500 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1501 

1502 return super().view(key, *args, **kwargs) 

1503 

1504 def canView(self, skel) -> bool: 

1505 if user := current.user.get(): 

1506 if skel["key"] == user["key"]: 

1507 return True 

1508 

1509 if "root" in user["access"] or "user-view" in user["access"]: 

1510 return True 

1511 

1512 return False 

1513 

1514 @exposed 

1515 @skey(allow_empty=True) 

1516 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1517 """ 

1518 Allow a special key "self" to reference the current user. 

1519 

1520 This modification will only allow to use "self" as a key; 

1521 The specific access right to let the user edit itself must 

1522 still be customized. 

1523 

1524 The rendered and editable skeleton can be modified or restriced 

1525 by specifying a customized edit-skeleton. 

1526 """ 

1527 if key == "self": 

1528 if user := current.user.get(): 

1529 key = user["key"] 

1530 else: 

1531 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1532 

1533 return super().edit(key, *args, **kwargs) 

1534 

1535 @exposed 

1536 def getAuthMethods(self, *args, **kwargs): 

1537 """Inform tools like Viur-Admin which authentication to use""" 

1538 # FIXME: This is almost the same code as in index()... 

1539 # FIXME: VIUR4: The entire function should be removed! 

1540 # TODO: Align result with index(), so that primary and secondary login is presented. 

1541 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!") 

1542 

1543 res = [ 

1544 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1545 for primary, secondary in self.validAuthenticationMethods 

1546 ] 

1547 

1548 return json.dumps(res) 

1549 

1550 @exposed 

1551 @skey 

1552 def trigger(self, action: str, key: str): 

1553 current.request.get().response.headers["Content-Type"] = "application/json" 

1554 

1555 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1556 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", ) 

1557 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)): 

1558 raise errors.Unauthorized() 

1559 

1560 skel = self.baseSkel() 

1561 if not skel.read(key): 

1562 raise errors.NotFound() 

1563 

1564 match action: 

1565 case "takeover": 

1566 self.authenticateUser(skel["key"]) 

1567 

1568 case "kick": 

1569 session.killSessionByUser(skel["key"]) 

1570 

1571 case _: 

1572 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1573 

1574 return json.dumps("OKAY") 

1575 

1576 def onEdited(self, skel): 

1577 super().onEdited(skel) 

1578 

1579 # In case the user is set to inactive, kill all sessions 

1580 if self.is_active(skel) is False: 

1581 session.killSessionByUser(skel["key"]) 

1582 

1583 def onDeleted(self, skel): 

1584 super().onDeleted(skel) 

1585 # Invalidate all sessions of that user 

1586 session.killSessionByUser(skel["key"]) 

1587 

1588 

1589@tasks.StartupTask 

1590def createNewUserIfNotExists(): 

1591 """ 

1592 Create a new Admin user, if the userDB is empty 

1593 """ 

1594 if ( 

1595 (user_module := getattr(conf.main_app.vi, "user", None)) 

1596 and isinstance(user_module, User) 

1597 and "addSkel" in dir(user_module) 

1598 and "validAuthenticationMethods" in dir(user_module) 

1599 # UserPassword must be one of the primary login methods 

1600 and any( 

1601 issubclass(provider[0], UserPassword) 

1602 for provider in user_module.validAuthenticationMethods 

1603 ) 

1604 ): 

1605 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1606 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1607 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1608 pw = utils.string.random(13) 

1609 addSkel["name"] = uname 

1610 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1611 addSkel["access"] = ["root"] 

1612 addSkel["password"] = pw 

1613 

1614 try: 

1615 addSkel.write() 

1616 except Exception as e: 

1617 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1618 logging.exception(e) 

1619 return 

1620 

1621 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1622 

1623 logging.warning(msg) 

1624 email.send_email_to_admins("New ViUR password", msg) 

1625 

1626 

1627# DEPRECATED ATTRIBUTES HANDLING 

1628 

1629def __getattr__(attr): 

1630 match attr: 

1631 case "userSkel": 

1632 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1633 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1634 logging.warning(msg) 

1635 return UserSkel 

1636 

1637 return super(__import__(__name__).__class__).__getattr__(attr)