Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%
722 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-03 12:27 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-03 12:27 +0000
1import abc
2import datetime
3import enum
4import functools
5import hashlib
6import hmac
7import json
8import logging
9import secrets
10import warnings
11import user_agents
13import pyotp
14import base64
15import dataclasses
16import typing as t
17from google.auth.transport import requests
18from google.oauth2 import id_token
20from viur.core import (
21 conf, current, db, email, errors, i18n,
22 securitykey, session, skeleton, tasks, utils, Module
23)
24from viur.core.decorators import *
25from viur.core.bones import *
26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password
27from viur.core.prototypes.list import List
28from viur.core.ratelimit import RateLimit
29from viur.core.securityheaders import extendCsp
32@functools.total_ordering
33class Status(enum.Enum):
34 """Status enum for a user
36 Has backwards compatibility to be comparable with non-enum values.
37 Will be removed with viur-core 4.0.0
38 """
40 UNSET = 0 # Status is unset
41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification
42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin
43 DISABLED = 5 # Account disabled
44 ACTIVE = 10 # Active
46 def __eq__(self, other):
47 if isinstance(other, Status):
48 return super().__eq__(other)
49 return self.value == other
51 def __lt__(self, other):
52 if isinstance(other, Status):
53 return super().__lt__(other)
54 return self.value < other
57class UserSkel(skeleton.Skeleton):
58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604)
60 name = EmailBone(
61 descr="E-Mail",
62 required=True,
63 readOnly=True,
64 caseSensitive=False,
65 searchable=True,
66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"),
67 )
69 firstname = StringBone(
70 descr="Firstname",
71 searchable=True,
72 )
74 lastname = StringBone(
75 descr="Lastname",
76 searchable=True,
77 )
79 roles = SelectBone(
80 descr=i18n.translate("viur.core.modules.user.bone.roles", defaultText="Roles"),
81 values=conf.user.roles,
82 required=True,
83 multiple=True,
84 # fixme: This is generally broken in VIUR! See #776 for details.
85 # vfunc=lambda values:
86 # i18n.translate(
87 # "user.bone.roles.invalid",
88 # defaultText="Invalid role setting: 'custom' can only be set alone.")
89 # if "custom" in values and len(values) > 1 else None,
90 defaultValue=list(conf.user.roles.keys())[:1],
91 )
93 access = SelectBone(
94 descr=i18n.translate("viur.core.modules.user.bone.access", defaultText="Access rights"),
95 type_suffix="access",
96 values=lambda: {
97 right: i18n.translate(f"viur.core.modules.user.accessright.{right}", defaultText=right)
98 for right in sorted(conf.user.access_rights)
99 },
100 multiple=True,
101 params={
102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system
103 }
104 )
106 status = SelectBone(
107 descr=i18n.translate("viur.core.modules.user.bone.status", "Account status"),
108 values=Status,
109 translation_key_prefix="viur.core.user.status.",
110 defaultValue=Status.ACTIVE,
111 required=True,
112 )
114 lastlogin = DateBone(
115 descr="Last Login",
116 readOnly=True,
117 )
119 admin_config = JsonBone( # This bone stores settings from the vi
120 descr="Config for the User",
121 visible=False
122 )
124 def __new__(cls, *args, **kwargs):
125 """
126 Constructor for the UserSkel-class, with the capability
127 to dynamically add bones required for the configured
128 authentication methods.
129 """
130 for provider in conf.main_app.vi.user.authenticationProviders:
131 assert issubclass(provider, UserPrimaryAuthentication)
132 provider.patch_user_skel(cls)
134 for provider in conf.main_app.vi.user.secondFactorProviders:
135 assert issubclass(provider, UserSecondFactorAuthentication)
136 provider.patch_user_skel(cls)
138 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls)
139 return super().__new__(cls, *args, **kwargs)
141 @classmethod
142 def write(cls, skel, *args, **kwargs):
143 # Roles
144 if skel["roles"] and "custom" not in skel["roles"]:
145 # Collect access rights through rules
146 access = set()
148 for role in skel["roles"]:
149 # Get default access for this role
150 access |= conf.main_app.vi.user.get_role_defaults(role)
152 # Go through all modules and evaluate available role-settings
153 for name in dir(conf.main_app.vi):
154 if name.startswith("_"):
155 continue
157 module = getattr(conf.main_app.vi, name)
158 if not isinstance(module, Module):
159 continue
161 roles = getattr(module, "roles", None) or {}
162 rights = roles.get(role, roles.get("*", ()))
164 # Convert role into tuple if it's not
165 if not isinstance(rights, (tuple, list)):
166 rights = (rights, )
168 if "*" in rights:
169 for right in module.accessRights:
170 access.add(f"{name}-{right}")
171 else:
172 for right in rights:
173 if right in module.accessRights:
174 access.add(f"{name}-{right}")
176 # special case: "edit" and "delete" actions require "view" as well!
177 if right in ("edit", "delete") and "view" in module.accessRights:
178 access.add(f"{name}-view")
180 skel["access"] = list(access)
182 return super().write(skel, *args, **kwargs)
185class UserAuthentication(Module, abc.ABC):
186 @property
187 @abc.abstractstaticmethod
188 def METHOD_NAME() -> str:
189 """
190 Define a unique method name for this authentication.
191 """
192 ...
194 def __init__(self, moduleName, modulePath, userModule):
195 super().__init__(moduleName, modulePath)
196 self._user_module = userModule
198 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
199 return True
201 @classmethod
202 def patch_user_skel(cls, skel_cls: skeleton.Skeleton):
203 """
204 Allows for an UserAuthentication to patch the UserSkel
205 class with additional bones which are required for
206 the implemented authentication method.
207 """
208 ...
211class UserPrimaryAuthentication(UserAuthentication, abc.ABC):
212 """Abstract class for all primary authentication methods."""
213 registrationEnabled = False
215 @abc.abstractmethod
216 def login(self, *args, **kwargs):
217 ...
219 def next_or_finish(self, skel: skeleton.SkeletonInstance):
220 """
221 Hook that is called whenever a part of the authentication was successful.
222 It allows to perform further steps in custom authentications,
223 e.g. change a password after first login.
224 """
225 return self._user_module.continueAuthenticationFlow(self, skel["key"])
228class UserPassword(UserPrimaryAuthentication):
229 METHOD_NAME = "X-VIUR-AUTH-User-Password"
231 registrationEmailVerificationRequired = True
232 registrationAdminVerificationRequired = True
234 verifySuccessTemplate = "user_verify_success"
235 verifyEmailAddressMail = "user_verify_address"
236 verifyFailedTemplate = "user_verify_failed"
237 passwordRecoveryTemplate = "user_passwordrecover"
238 passwordRecoveryMail = "user_password_recovery"
239 passwordRecoverySuccessTemplate = "user_passwordrecover_success"
240 passwordRecoveryStep1Template = "user_passwordrecover_step1"
241 passwordRecoveryStep2Template = "user_passwordrecover_step2"
242 passwordRecoveryStep3Template = "user_passwordrecover_step3"
244 # The default rate-limit for password recovery (10 tries each 15 minutes)
245 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip")
247 # Limit (invalid) login-retries to once per 5 seconds
248 loginRateLimit = RateLimit("user.login", 12, 1, "ip")
250 @classmethod
251 def patch_user_skel(cls, skel_cls):
252 """
253 Modifies the UserSkel to be equipped by a PasswordBone.
254 """
255 skel_cls.password = PasswordBone(
256 readOnly=True,
257 visible=False,
258 params={
259 "category": "Authentication",
260 }
261 )
263 class LoginSkel(skeleton.RelSkel):
264 name = EmailBone(
265 descr="E-Mail",
266 required=True,
267 caseSensitive=False,
268 )
269 password = PasswordBone(
270 required=True,
271 test_threshold=0,
272 tests=(),
273 raw=True,
274 )
276 class LostPasswordStep1Skel(skeleton.RelSkel):
277 name = EmailBone(
278 descr="E-Mail",
279 required=True,
280 )
282 class LostPasswordStep2Skel(skeleton.RelSkel):
283 recovery_key = StringBone(
284 descr="Recovery Key",
285 required=True,
286 params={
287 "tooltip": i18n.translate(
288 key="viur.core.modules.user.userpassword.lostpasswordstep2.recoverykey",
289 defaultText="Please enter the validation key you've received via e-mail.",
290 hint="Shown when the user needs more than 15 minutes to paste the key",
291 ),
292 }
293 )
295 class LostPasswordStep3Skel(skeleton.RelSkel):
296 # send the recovery key again, in case the password is rejected by some reason.
297 recovery_key = StringBone(
298 descr="Recovery Key",
299 visible=False,
300 readOnly=True,
301 )
303 password = PasswordBone(
304 descr="New Password",
305 required=True,
306 params={
307 "tooltip": i18n.translate(
308 key="viur.core.modules.user.userpassword.lostpasswordstep3.password",
309 defaultText="Please enter a new password for your account.",
310 ),
311 }
312 )
314 @exposed
315 @force_ssl
316 @skey(allow_empty=True)
317 def login(self, **kwargs):
318 # Obtain a fresh login skel
319 skel = self.LoginSkel()
321 # Read required bones from client
322 if not skel.fromClient(kwargs):
323 return self._user_module.render.login(skel, action="login")
325 self.loginRateLimit.assertQuotaIsAvailable()
327 # query for the username. The query might find another user, but the name is being checked for equality below
328 name = skel["name"].lower().strip()
329 user_skel = self._user_module.baseSkel()
330 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel
332 # extract password hash from raw database entity (skeleton access blocks it)
333 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {}
334 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001
335 password_hash = encode_password(skel["password"], password_data.get("salt", "-invalid-"), iterations)["pwhash"]
337 # now check if the username matches
338 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode())
340 # next, check if the password hash matches
341 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash)
343 if not is_okay:
344 # Set error to all required fields
345 for name, bone in skel.items():
346 if bone.required:
347 skel.errors.append(
348 ReadFromClientError(
349 ReadFromClientErrorSeverity.Invalid,
350 i18n.translate(
351 key="viur.core.modules.user.userpassword.login.failed",
352 defaultText="Invalid username or password provided",
353 ),
354 name,
355 )
356 )
358 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota
359 return self._user_module.render.login(skel, action="login")
361 # check if iterations are below current security standards, and update if necessary.
362 if iterations < PBKDF2_DEFAULT_ITERATIONS:
363 logging.info(f"Update password hash for user {name}.")
364 # re-hash the password with more iterations
365 # FIXME: This must be done within a transaction!
366 user_skel["password"] = password # will be hashed on serialize
367 user_skel.write(update_relations=False)
369 return self.next_or_finish(user_skel)
371 @exposed
372 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs):
373 """
374 This implements a password recovery process which lets users set a new password for their account,
375 after validating a recovery key sent by email.
377 The process is as following:
379 - The user enters his email adress
380 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode
381 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name
382 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user
383 account exists.
384 - If the user received his email, he can click on the link and set a new password for his account.
386 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function
387 to 10 actions per 15 minutes. (One complete recovery process consists of two calls).
388 """
389 self.passwordRecoveryRateLimit.assertQuotaIsAvailable()
390 current_request = current.request.get()
392 if recovery_key is None:
393 # This is the first step, where we ask for the username of the account we'll going to reset the password on
394 skel = self.LostPasswordStep1Skel()
396 if not current_request.isPostRequest or not skel.fromClient(kwargs):
397 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template)
399 # validate security key
400 if not securitykey.validate(skey):
401 raise errors.PreconditionFailed()
403 self.passwordRecoveryRateLimit.decrementQuota()
405 recovery_key = securitykey.create(
406 duration=datetime.timedelta(minutes=15),
407 key_length=conf.security.password_recovery_key_length,
408 user_name=skel["name"].lower(),
409 session_bound=False,
410 )
412 # Send the code in background
413 self.sendUserPasswordRecoveryCode(
414 skel["name"], recovery_key, current_request.request.headers["User-Agent"]
415 )
417 # step 2 is only an action-skel, and can be ignored by a direct link in the
418 # e-mail previously sent. It depends on the implementation of the specific project.
419 return self._user_module.render.edit(
420 self.LostPasswordStep2Skel(),
421 tpl=self.passwordRecoveryStep2Template,
422 )
424 # in step 3
425 skel = self.LostPasswordStep3Skel()
426 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails.
428 # check for any input; Render input-form again when incomplete.
429 if not skel.fromClient(kwargs) or not current_request.isPostRequest:
430 return self._user_module.render.edit(
431 skel=skel,
432 tpl=self.passwordRecoveryStep3Template,
433 )
435 # validate security key
436 if not securitykey.validate(skey):
437 raise errors.PreconditionFailed()
439 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)):
440 raise errors.PreconditionFailed(
441 i18n.translate(
442 key="viur.core.modules.user.passwordrecovery.keyexpired",
443 defaultText="The recovery key is expired or invalid. Please start the recovery process again.",
444 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key."
445 )
446 )
448 self.passwordRecoveryRateLimit.decrementQuota()
450 # If we made it here, the key was correct, so we'd hopefully have a valid user for this
451 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel()
453 if not user_skel:
454 raise errors.NotFound(
455 i18n.translate(
456 key="viur.core.modules.user.passwordrecovery.usernotfound",
457 defaultText="There is no account with this name",
458 hint="We cant find an account with that name (Should never happen)"
459 )
460 )
462 # If the account is locked or not yet validated, abort the process.
463 if not self._user_module.is_active(user_skel):
464 raise errors.NotFound(
465 i18n.translate(
466 key="viur.core.modules.user.passwordrecovery.accountlocked",
467 defaultText="This account is currently locked. You cannot change its password.",
468 hint="Attempted password recovery on a locked account"
469 )
470 )
472 # Update the password, save the user, reset his session and show the success-template
473 user_skel["password"] = skel["password"]
474 user_skel.write(update_relations=False)
476 return self._user_module.render.view(
477 None,
478 tpl=self.passwordRecoverySuccessTemplate,
479 )
481 @tasks.CallDeferred
482 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None:
483 """
484 Sends the given recovery code to the user given in userName. This function runs deferred
485 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the
486 code by email (assuming we have working email delivery), but this can be overridden to send it
487 by SMS or other means. We'll also update the changedate for this user, so no more than one code
488 can be send to any given user in four hours.
489 """
490 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel():
491 user_agent = user_agents.parse(user_agent)
492 email.send_email(
493 tpl=self.passwordRecoveryMail,
494 skel=user_skel,
495 dests=[user_name],
496 recovery_key=recovery_key,
497 user_agent={
498 "device": user_agent.get_device(),
499 "os": user_agent.get_os(),
500 "browser": user_agent.get_browser()
501 }
502 )
504 @exposed
505 @skey(forward_payload="data", session_bound=False)
506 def verify(self, data):
507 def transact(key):
508 skel = self._user_module.editSkel()
509 if not key or not skel.read(key):
510 return None
512 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \
513 if self.registrationAdminVerificationRequired else Status.ACTIVE
515 skel.write(update_relations=False)
516 return skel
518 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))):
519 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate)
521 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate)
523 def canAdd(self) -> bool:
524 return self.registrationEnabled
526 def addSkel(self):
527 """
528 Prepare the add-Skel for rendering.
529 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on
530 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired
531 :return: viur.core.skeleton.Skeleton
532 """
533 skel = self._user_module.addSkel()
535 if self.registrationEmailVerificationRequired:
536 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION
537 elif self.registrationAdminVerificationRequired:
538 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION
539 else: # No further verification required
540 defaultStatusValue = Status.ACTIVE
542 skel.status.readOnly = True
543 skel["status"] = defaultStatusValue
545 if "password" in skel:
546 skel.password.required = True # The user will have to set a password
548 return skel
550 @force_ssl
551 @exposed
552 @skey(allow_empty=True)
553 def add(self, *args, **kwargs):
554 """
555 Allows guests to register a new account if self.registrationEnabled is set to true
557 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd`
559 :returns: The rendered, added object of the entry, eventually with error hints.
561 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
562 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
563 """
564 if not self.canAdd():
565 raise errors.Unauthorized()
566 skel = self.addSkel()
567 if (
568 not kwargs # no data supplied
569 or not current.request.get().isPostRequest # bail out if not using POST-method
570 or not skel.fromClient(kwargs) # failure on reading into the bones
571 or utils.parse.bool(kwargs.get("bounce")) # review before adding
572 ):
573 # render the skeleton in the version it could as far as it could be read.
574 return self._user_module.render.add(skel)
575 self._user_module.onAdd(skel)
576 skel.write()
577 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION:
578 # The user will have to verify his email-address. Create a skey and send it to his address
579 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False,
580 user_key=utils.normalizeKey(skel["key"]),
581 name=skel["name"])
582 skel.skey = BaseBone(descr="Skey")
583 skel["skey"] = skey
584 email.send_email(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel)
585 self._user_module.onAdded(skel) # Call onAdded on our parent user module
586 return self._user_module.render.addSuccess(skel)
589class GoogleAccount(UserPrimaryAuthentication):
590 METHOD_NAME = "X-VIUR-AUTH-Google-Account"
592 @classmethod
593 def patch_user_skel(cls, skel_cls):
594 """
595 Modifies the UserSkel to be equipped by a bones required by Google Auth
596 """
597 skel_cls.uid = StringBone(
598 descr="Google UserID",
599 required=False,
600 readOnly=True,
601 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"),
602 params={
603 "category": "Authentication",
604 }
605 )
607 skel_cls.sync = BooleanBone(
608 descr="Sync user data with OAuth-based services",
609 defaultValue=True,
610 params={
611 "category": "Authentication",
612 "tooltip":
613 "If set, user data like firstname and lastname is automatically kept"
614 "synchronous with the information stored at the OAuth service provider"
615 "(e.g. Google Login)."
616 }
617 )
619 @exposed
620 @force_ssl
621 @skey(allow_empty=True)
622 def login(self, token: str | None = None, *args, **kwargs):
623 if not conf.user.google_client_id:
624 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!")
626 if not token:
627 request = current.request.get()
628 request.response.headers["Content-Type"] = "text/html"
629 if request.response.headers.get("cross-origin-opener-policy") == "same-origin":
630 # We have to allow popups here
631 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups"
633 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html")
634 with open(file_path) as file:
635 tpl_string = file.read()
637 # FIXME: Use Jinja2 for rendering?
638 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id)
639 extendCsp({
640 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="],
641 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="]
642 })
643 return tpl_string
645 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id)
646 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}:
647 raise ValueError("Invalid issuer")
649 # Token looks valid :)
650 uid = user_info["sub"]
651 email = user_info["email"]
653 base_skel = self._user_module.baseSkel()
654 update = False
655 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()):
656 # We'll try again - checking if there's already an user with that email
657 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()):
658 # Still no luck - it's a completely new user
659 if not self.registrationEnabled:
660 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains:
661 logging.debug(f"Google user is from allowed {domain} - adding account")
662 else:
663 logging.debug(f"Google user is from {domain} - denying registration")
664 raise errors.Forbidden("Registration for new users is disabled")
666 user_skel = base_skel
667 user_skel["uid"] = uid
668 user_skel["name"] = email
669 update = True
671 # Take user information from Google, if wanted!
672 if user_skel["sync"]:
673 for target, source in {
674 "name": email,
675 "firstname": user_info.get("given_name"),
676 "lastname": user_info.get("family_name"),
677 }.items():
679 if user_skel[target] != source:
680 user_skel[target] = source
681 update = True
683 if update:
684 assert user_skel.write()
686 return self.next_or_finish(user_skel)
689class UserSecondFactorAuthentication(UserAuthentication, abc.ABC):
690 """Abstract class for all second factors."""
691 MAX_RETRY = 3
692 second_factor_login_template = "user_login_secondfactor"
693 """Template to enter the TOPT on login"""
695 @property
696 @abc.abstractmethod
697 def NAME(self) -> str:
698 """Name for this factor for templates."""
699 ...
701 @property
702 @abc.abstractmethod
703 def ACTION_NAME(self) -> str:
704 """The action name for this factor, used as path-segment."""
705 ...
707 def __init__(self, moduleName, modulePath, _user_module):
708 super().__init__(moduleName, modulePath, _user_module)
709 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}"
710 self.add_url = f"{self.modulePath}/add"
711 self.start_url = f"{self.modulePath}/start"
714class TimeBasedOTP(UserSecondFactorAuthentication):
715 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP"
716 WINDOW_SIZE = 5
717 ACTION_NAME = "otp"
718 NAME = "Time based Otp"
719 second_factor_login_template = "user_login_secondfactor"
721 @dataclasses.dataclass
722 class OtpConfig:
723 """
724 This dataclass is used to provide an interface for a OTP token
725 algorithm description that is passed within the TimeBasedOTP
726 class for configuration.
727 """
728 secret: str
729 timedrift: float = 0.0
730 algorithm: t.Literal["sha1", "sha256"] = "sha1"
731 interval: int = 60
733 class OtpSkel(skeleton.RelSkel):
734 """
735 This is the Skeleton used to ask for the OTP token.
736 """
737 otptoken = NumericBone(
738 descr="Token",
739 required=True,
740 max=999999,
741 min=0,
742 )
744 @classmethod
745 def patch_user_skel(cls, skel_cls):
746 """
747 Modifies the UserSkel to be equipped by a bones required by Timebased OTP
748 """
749 # One-Time Password Verification
750 skel_cls.otp_serial = StringBone(
751 descr="OTP serial",
752 searchable=True,
753 params={
754 "category": "Second Factor Authentication",
755 }
756 )
758 skel_cls.otp_secret = CredentialBone(
759 descr="OTP secret",
760 params={
761 "category": "Second Factor Authentication",
762 }
763 )
765 skel_cls.otp_timedrift = NumericBone(
766 descr="OTP time drift",
767 readOnly=True,
768 defaultValue=0,
769 params={
770 "category": "Second Factor Authentication",
771 }
772 )
774 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None:
775 """
776 Returns an instance of self.OtpConfig with a provided token configuration,
777 or None when there is no appropriate configuration of this second factor handler available.
778 """
780 if otp_secret := skel.dbEntity.get("otp_secret"):
781 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0)
783 return None
785 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
786 """
787 Specified whether the second factor authentication can be handled by the given user or not.
788 """
789 return bool(self.get_config(skel))
791 @exposed
792 def start(self):
793 """
794 Configures OTP login for the current session.
796 A special otp_user_conf has to be specified as a dict, which is stored into the session.
797 """
798 session = current.session.get()
800 if not (user_key := session.get("possible_user_key")):
801 raise errors.PreconditionFailed(
802 "Second factor can only be triggered after successful primary authentication."
803 )
805 user_skel = self._user_module.baseSkel()
806 if not user_skel.read(user_key):
807 raise errors.NotFound("The previously authenticated user is gone.")
809 if not (otp_user_conf := self.get_config(user_skel)):
810 raise errors.PreconditionFailed("This second factor is not available for the user")
812 otp_user_conf = {
813 "key": str(user_key),
814 } | dataclasses.asdict(otp_user_conf)
816 session = current.session.get()
817 session["_otp_user"] = otp_user_conf
818 session.markChanged()
820 return self._user_module.render.edit(
821 self.OtpSkel(),
822 params={
823 "name": i18n.translate(f"viur.core.modules.user.{self.NAME}"),
824 "action_name": self.ACTION_NAME,
825 "action_url": f"{self.modulePath}/{self.ACTION_NAME}",
826 },
827 tpl=self.second_factor_login_template
828 )
830 @exposed
831 @force_ssl
832 @skey(allow_empty=True)
833 def otp(self, *args, **kwargs):
834 """
835 Performs the second factor validation and interaction with the client.
836 """
837 session = current.session.get()
838 if not (otp_user_conf := session.get("_otp_user")):
839 raise errors.PreconditionFailed("No OTP process started in this session")
841 # Check if maximum second factor verification attempts
842 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
843 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
845 # Read the OTP token via the skeleton, to obtain a valid value
846 skel = self.OtpSkel()
847 if skel.fromClient(kwargs):
848 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP.
849 res = self.verify(
850 otp=skel["otptoken"],
851 secret=otp_user_conf["secret"],
852 algorithm=otp_user_conf.get("algorithm") or "sha1",
853 interval=otp_user_conf.get("interval") or 60,
854 timedrift=otp_user_conf.get("timedrift") or 0.0,
855 valid_window=self.WINDOW_SIZE
856 )
857 else:
858 res = None
860 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0!
861 if res is None:
862 otp_user_conf["attempts"] = attempts + 1
863 session.markChanged()
864 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
865 return self._user_module.render.edit(
866 skel,
867 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"),
868 action_name=self.ACTION_NAME,
869 action_url=f"{self.modulePath}/{self.ACTION_NAME}",
870 tpl=self.second_factor_login_template
871 )
873 # Remove otp user config from session
874 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName)
875 del session["_otp_user"]
876 session.markChanged()
878 # Check if the OTP device has a time drift
880 timedriftchange = float(res) - otp_user_conf["timedrift"]
881 if abs(timedriftchange) > 2:
882 # The time-drift change accumulates to more than 2 minutes (for interval==60):
883 # update clock-drift value accordingly
884 self.updateTimeDrift(user_key, timedriftchange)
886 # Continue with authentication
887 return self._user_module.secondFactorSucceeded(self, user_key)
889 @staticmethod
890 def verify(
891 otp: str | int,
892 secret: str,
893 algorithm: str = "sha1",
894 interval: int = 60,
895 timedrift: float = 0.0,
896 for_time: datetime.datetime | None = None,
897 valid_window: int = 0,
898 ) -> int | None:
899 """
900 Verifies the OTP passed in against the current time OTP.
902 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which
903 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown
904 on the hardware token generator. This can be used to store the time drift of a given token generator.
906 :param otp: the OTP token to check against
907 :param secret: The OTP secret
908 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256)
909 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In
910 pyotp, default is 30!
911 :param timedrift: The known timedrift (old index) of the hardware OTP generator
912 :param for_time: Time to check OTP at (defaults to now)
913 :param valid_window: extends the validity to this many counter ticks before and after the current one
914 :returns: The index where verification succeeded, None otherwise
915 """
916 # get the hashing digest
917 digest = {
918 "sha1": hashlib.sha1,
919 "sha256": hashlib.sha256,
920 }.get(algorithm)
922 if not digest:
923 raise errors.NotImplemented(f"{algorithm=} is not implemented")
925 if for_time is None:
926 for_time = datetime.datetime.now()
928 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index
929 timedrift = round(timedrift)
930 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret
931 otp = str(otp).zfill(6) # fill with zeros in front
933 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}")
934 totp = pyotp.TOTP(secret, digest=digest, interval=interval)
936 if valid_window:
937 for offset in range(timedrift - valid_window, timedrift + valid_window + 1):
938 token = str(totp.at(for_time, offset))
939 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}")
940 if hmac.compare_digest(otp, token):
941 return offset
943 return None
945 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None
947 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None:
948 """
949 Updates the clock-drift value.
950 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew
951 it out of bounds. Maximum change per call is 0.3 minutes.
952 :param user_key: For which user should the update occour
953 :param idx: How many steps before/behind was that token
954 :return:
955 """
957 # FIXME: The callback in viur-core must be improved, to accept user_skel
959 def transaction(user_key, idx):
960 user = db.Get(user_key)
961 if not isinstance(user.get("otp_timedrift"), float):
962 user["otp_timedrift"] = 0.0
963 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3)
964 db.Put(user)
966 db.RunInTransaction(transaction, user_key, idx)
969class AuthenticatorOTP(UserSecondFactorAuthentication):
970 """
971 This class handles the second factor for apps like authy and so on
972 """
973 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP"
975 second_factor_add_template = "user_secondfactor_add"
976 """Template to configure (add) a new TOPT"""
978 ACTION_NAME = "authenticator_otp"
979 """Action name provided for *otp_template* on login"""
981 NAME = "Authenticator App"
983 @exposed
984 @force_ssl
985 @skey(allow_empty=True)
986 def add(self, otp=None):
987 """
988 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store
989 it in the session.
991 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store
992 the otp_app_secret from the session in the user entry.
993 """
994 current_session = current.session.get()
996 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")):
997 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret()
998 current_session["_maybe_otp_app_secret"] = otp_app_secret
999 current_session.markChanged()
1001 if otp is None:
1002 return self._user_module.render.second_factor_add(
1003 tpl=self.second_factor_add_template,
1004 action_name=self.ACTION_NAME,
1005 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"),
1006 add_url=self.add_url,
1007 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret))
1008 else:
1009 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret):
1010 return self._user_module.render.second_factor_add(
1011 tpl=self.second_factor_add_template,
1012 action_name=self.ACTION_NAME,
1013 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"),
1014 add_url=self.add_url,
1015 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors
1017 # Now we can set the otp_app_secret to the current User and render der Success-template
1018 AuthenticatorOTP.set_otp_app_secret(otp_app_secret)
1019 return self._user_module.render.second_factor_add_success(
1020 action_name=self.ACTION_NAME,
1021 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"),
1022 )
1024 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
1025 """
1026 We can only handle the second factor if we have stored an otp_app_secret before.
1027 """
1028 return bool(skel.dbEntity.get("otp_app_secret", ""))
1030 @classmethod
1031 def patch_user_skel(cls, skel_cls):
1032 """
1033 Modifies the UserSkel to be equipped by bones required by Authenticator App
1034 """
1035 # Authenticator OTP Apps (like Authy)
1036 skel_cls.otp_app_secret = CredentialBone(
1037 descr="OTP Secret (App-Key)",
1038 params={
1039 "category": "Second Factor Authentication",
1040 }
1041 )
1043 @classmethod
1044 def set_otp_app_secret(cls, otp_app_secret=None):
1045 """
1046 Write a new OTP Token in the current user entry.
1047 """
1048 if otp_app_secret is None:
1049 logging.error("No 'otp_app_secret' is provided")
1050 raise errors.PreconditionFailed("No 'otp_app_secret' is provided")
1051 if not (cuser := current.user.get()):
1052 raise errors.Unauthorized()
1054 def transaction(user_key):
1055 if not (user := db.Get(user_key)):
1056 raise errors.NotFound()
1057 user["otp_app_secret"] = otp_app_secret
1058 db.Put(user)
1060 db.RunInTransaction(transaction, cuser["key"])
1062 @classmethod
1063 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str:
1064 """
1065 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example
1066 """
1067 if not (cuser := current.user.get()):
1068 raise errors.Unauthorized()
1069 if not (issuer := conf.user.otp_issuer):
1070 logging.warning(
1071 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}")
1072 issuer = conf.instance.project_id
1074 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer)
1076 @classmethod
1077 def generate_otp_app_secret(cls) -> str:
1078 """
1079 Generate a new OTP Secret
1080 :return an otp
1081 """
1082 return pyotp.random_base32()
1084 @classmethod
1085 def verify_otp(cls, otp: str | int, secret: str) -> bool:
1086 return pyotp.TOTP(secret).verify(otp)
1088 @exposed
1089 def start(self):
1090 otp_user_conf = {"attempts": 0}
1091 session = current.session.get()
1092 session["_otp_user"] = otp_user_conf
1093 session.markChanged()
1094 return self._user_module.render.edit(
1095 TimeBasedOTP.OtpSkel(),
1096 params={
1097 "name": i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"),
1098 "action_name": self.ACTION_NAME,
1099 "action_url": self.action_url,
1100 },
1101 tpl=self.second_factor_login_template,
1102 )
1104 @exposed
1105 @force_ssl
1106 @skey
1107 def authenticator_otp(self, **kwargs):
1108 """
1109 We verify the otp here with the secret we stored before.
1110 """
1111 session = current.session.get()
1112 user_key = db.Key(self._user_module.kindName, session["possible_user_key"])
1114 if not (otp_user_conf := session.get("_otp_user")):
1115 raise errors.PreconditionFailed("No OTP process started in this session")
1117 # Check if maximum second factor verification attempts
1118 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
1119 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
1121 if not (user := db.Get(user_key)):
1122 raise errors.NotFound()
1124 skel = TimeBasedOTP.OtpSkel()
1125 if not skel.fromClient(kwargs):
1126 raise errors.PreconditionFailed()
1127 otp_token = str(skel["otptoken"]).zfill(6)
1129 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]):
1130 return self._user_module.secondFactorSucceeded(self, user_key)
1131 otp_user_conf["attempts"] = attempts + 1
1132 session.markChanged()
1133 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
1134 return self._user_module.render.edit(
1135 skel,
1136 name=i18n.translate(f"viur.core.modules.user.auth.{self.NAME}"),
1137 action_name=self.ACTION_NAME,
1138 action_url=self.action_url,
1139 tpl=self.second_factor_login_template,
1140 )
1143class User(List):
1144 """
1145 The User module is used to manage and authenticate users in a ViUR system.
1147 It is used in almost any ViUR project, but ViUR can also function without any user capabilites.
1148 """
1150 kindName = "user"
1151 addTemplate = "user_add"
1152 addSuccessTemplate = "user_add_success"
1153 lostPasswordTemplate = "user_lostpassword"
1154 verifyEmailAddressMail = "user_verify_address"
1155 passwordRecoveryMail = "user_password_recovery"
1157 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter(
1158 None, (
1159 UserPassword,
1160 conf.user.google_client_id and GoogleAccount,
1161 )
1162 ))
1163 """
1164 Specifies primary authentication providers that are made available
1165 as sub-modules under `user/auth_<classname>`. They might require
1166 customization or configuration.
1167 """
1169 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = (
1170 TimeBasedOTP,
1171 AuthenticatorOTP,
1172 )
1173 """
1174 Specifies secondary authentication providers that are made available
1175 as sub-modules under `user/f2_<classname>`. They might require
1176 customization or configuration, which is determined during the
1177 login-process depending on the user that wants to login.
1178 """
1180 validAuthenticationMethods = tuple(filter(
1181 None, (
1182 (UserPassword, AuthenticatorOTP),
1183 (UserPassword, TimeBasedOTP),
1184 (UserPassword, None),
1185 (GoogleAccount, None) if conf.user.google_client_id else None,
1186 )
1187 ))
1188 """
1189 Specifies the possible combinations of primary- and secondary factor
1190 login methos.
1192 GoogleLogin defaults to no second factor, as the Google Account can be
1193 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only
1194 handled when there is a user-dependent configuration available.
1195 """
1197 msg_missing_second_factor = "Second factor required but not configured for this user."
1199 secondFactorTimeWindow = datetime.timedelta(minutes=10)
1201 default_order = "name.idx"
1203 adminInfo = {
1204 "icon": "person-fill",
1205 "actions": [
1206 "trigger_kick",
1207 "trigger_takeover",
1208 ],
1209 "customActions": {
1210 "trigger_kick": {
1211 "name": i18n.translate(
1212 key="viur.core.modules.user.customActions.kick",
1213 defaultText="Kick user",
1214 hint="Title of the kick user function"
1215 ),
1216 "icon": "trash2-fill",
1217 "access": ["root"],
1218 "action": "fetch",
1219 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}",
1220 "confirm": i18n.translate(
1221 key="viur.core.modules.user.customActions.kick.confirm",
1222 defaultText="Do you really want to drop all sessions of the selected user from the system?",
1223 ),
1224 "success": i18n.translate(
1225 key="viur.core.modules.user.customActions.kick.success",
1226 defaultText="Sessions of the user are being invalidated.",
1227 ),
1228 },
1229 "trigger_takeover": {
1230 "name": i18n.translate(
1231 key="viur.core.modules.user.customActions.takeover",
1232 defaultText="Take-over user",
1233 hint="Title of the take user over function"
1234 ),
1235 "icon": "file-person-fill",
1236 "access": ["root"],
1237 "action": "fetch",
1238 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}",
1239 "confirm": i18n.translate(
1240 key="viur.core.modules.user.customActions.takeover.confirm",
1241 defaultText="Do you really want to replace your current user session by a "
1242 "user session of the selected user?",
1243 ),
1244 "success": i18n.translate(
1245 key="viur.core.modules.user.customActions.takeover.success",
1246 defaultText="You're now know as the selected user!",
1247 ),
1248 "then": "reload-vi",
1249 },
1250 },
1251 }
1253 roles = {
1254 "admin": "*",
1255 }
1257 def __init__(self, moduleName, modulePath):
1258 for provider in self.authenticationProviders:
1259 assert issubclass(provider, UserPrimaryAuthentication)
1260 name = f"auth_{provider.__name__.lower()}"
1261 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1263 for provider in self.secondFactorProviders:
1264 assert issubclass(provider, UserSecondFactorAuthentication)
1265 name = f"f2_{provider.__name__.lower()}"
1266 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1268 super().__init__(moduleName, modulePath)
1270 def get_role_defaults(self, role: str) -> set[str]:
1271 """
1272 Returns a set of default access rights for a given role.
1274 Defaults to "admin" usage for any role > "user"
1275 and "scriptor" usage for "admin" role.
1276 """
1277 ret = set()
1279 if role in ("viewer", "editor", "admin"):
1280 ret.add("admin")
1282 if role == "admin":
1283 ret.add("scriptor")
1285 return ret
1287 def addSkel(self):
1288 skel = super().addSkel().clone()
1289 user = current.user.get()
1290 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])):
1291 skel.status.readOnly = True
1292 skel["status"] = Status.UNSET
1293 skel.status.visible = False
1294 skel.access.readOnly = True
1295 skel["access"] = []
1296 skel.access.visible = False
1297 else:
1298 # An admin tries to add a new user.
1299 skel.status.readOnly = False
1300 skel.status.visible = True
1301 skel.access.readOnly = False
1302 skel.access.visible = True
1304 if "password" in skel:
1305 # Unlock and require a password
1306 skel.password.required = True
1307 skel.password.visible = True
1308 skel.password.readOnly = False
1310 skel.name.readOnly = False # Don't enforce readonly name in user/add
1311 return skel
1313 def editSkel(self, *args, **kwargs):
1314 skel = super().editSkel().clone()
1316 if "password" in skel:
1317 skel.password.required = False
1318 skel.password.visible = True
1319 skel.password.readOnly = False
1321 user = current.user.get()
1323 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only
1324 skel.name.readOnly = lockFields
1325 skel.access.readOnly = lockFields
1326 skel.status.readOnly = lockFields
1328 return skel
1330 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication:
1331 return getattr(self, f"f2_{cls.__name__.lower()}")
1333 def getCurrentUser(self):
1334 session = current.session.get()
1336 req = current.request.get()
1337 if session and (session.loaded or req.is_deferred) and (user := session.get("user")):
1338 skel = self.baseSkel()
1339 skel.setEntity(user)
1340 return skel
1342 return None
1344 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key):
1345 """
1346 Continue authentication flow when primary authentication succeeded.
1347 """
1348 skel = self.baseSkel()
1350 if not skel.read(user_key):
1351 raise errors.NotFound("User was not found.")
1353 if not provider.can_handle(skel):
1354 raise errors.Forbidden("User is not allowed to use this primary login method.")
1356 session = current.session.get()
1357 session["possible_user_key"] = user_key.id_or_name
1358 session["_secondFactorStart"] = utils.utcNow()
1359 session.markChanged()
1361 second_factor_providers = []
1363 for auth_provider, second_factor in self.validAuthenticationMethods:
1364 if isinstance(provider, auth_provider):
1365 if second_factor is not None:
1366 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor)
1367 if second_factor_provider_instance.can_handle(skel):
1368 second_factor_providers.append(second_factor_provider_instance)
1369 else:
1370 second_factor_providers.append(None)
1372 if len(second_factor_providers) > 1 and None in second_factor_providers:
1373 # We have a second factor. So we can get rid of the None
1374 second_factor_providers.pop(second_factor_providers.index(None))
1376 if len(second_factor_providers) == 0:
1377 raise errors.NotAcceptable(self.msg_missing_second_factor)
1378 elif len(second_factor_providers) == 1:
1379 if second_factor_providers[0] is None:
1380 # We allow sign-in without a second factor
1381 return self.authenticateUser(user_key)
1382 # We have only one second factor we don't need the choice template
1383 return second_factor_providers[0].start(user_key)
1385 # In case there is more than one second factor, let the user select a method.
1386 return self.render.second_factor_choice(second_factors=second_factor_providers)
1388 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key):
1389 """
1390 Continue authentication flow when secondary authentication succeeded.
1391 """
1392 session = current.session.get()
1393 if session["possible_user_key"] != user_key.id_or_name:
1394 raise errors.Forbidden()
1396 # Assert that the second factor verification finished in time
1397 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow:
1398 raise errors.RequestTimeout()
1400 return self.authenticateUser(user_key)
1402 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None:
1403 """
1404 Hookable check if a user is defined as "active" and can login.
1406 :param skel: The UserSkel of the user who wants to login.
1407 """
1408 if "status" in skel:
1409 status = skel["status"]
1410 if not isinstance(status, (Status, int)):
1411 try:
1412 status = int(status)
1413 except ValueError:
1414 status = Status.UNSET
1416 return status >= Status.ACTIVE.value
1418 return None
1420 def authenticateUser(self, key: db.Key, **kwargs):
1421 """
1422 Performs Log-In for the current session and the given user key.
1424 This resets the current session: All fields not explicitly marked as persistent
1425 by conf.user.session_persistent_fields_on_login are gone afterwards.
1427 :param key: The (DB-)Key of the user we shall authenticate
1428 """
1429 skel = self.baseSkel()
1430 if not skel.read(key):
1431 raise ValueError(f"Unable to authenticate unknown user {key}")
1433 # Verify that this user account is active
1434 if not self.is_active(skel):
1435 raise errors.Forbidden("The user is disabled and cannot be authenticated.")
1437 # Update session for user
1438 session = current.session.get()
1439 # Remember persistent fields...
1440 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login}
1441 session.reset()
1442 # and copy them over to the new session
1443 session |= take_over
1445 # Update session, user and request
1446 session["user"] = skel.dbEntity
1448 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key
1449 current.user.set(self.getCurrentUser())
1451 self.onLogin(skel)
1453 return self.render.loginSucceeded(**kwargs)
1455 @exposed
1456 @skey
1457 def logout(self, *args, **kwargs):
1458 """
1459 Implements the logout action. It also terminates the current session (all keys not listed
1460 in viur.session_persistent_fields_on_logout will be lost).
1461 """
1462 if not (user := current.user.get()):
1463 raise errors.Unauthorized()
1465 self.onLogout(user)
1467 session = current.session.get()
1468 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}:
1469 session.reset()
1470 session |= take_over
1471 else:
1472 session.clear()
1473 current.user.set(None) # set user to none in context var
1474 return self.render.logoutSuccess()
1476 @exposed
1477 def login(self, *args, **kwargs):
1478 return self.render.loginChoices([
1479 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1480 for primary, secondary in self.validAuthenticationMethods
1481 ])
1483 def onLogin(self, skel: skeleton.SkeletonInstance):
1484 """
1485 Hook to be called on user login.
1486 """
1487 # Update the lastlogin timestamp (if available!)
1488 if "lastlogin" in skel:
1489 now = utils.utcNow()
1491 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??)
1492 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)):
1493 skel["lastlogin"] = now
1494 skel.write(update_relations=False)
1496 logging.info(f"""User {skel["name"]} logged in""")
1498 def onLogout(self, skel: skeleton.SkeletonInstance):
1499 """
1500 Hook to be called on user logout.
1501 """
1502 logging.info(f"""User {skel["name"]} logged out""")
1504 @exposed
1505 def view(self, key: db.Key | int | str = "self", *args, **kwargs):
1506 """
1507 Allow a special key "self" to reference the current user.
1509 By default, any authenticated user can view its own user entry,
1510 to obtain access rights and any specific user information.
1511 This behavior is defined in the customized `canView` function,
1512 which is overwritten by the User-module.
1514 The rendered skeleton can be modified or restriced by specifying
1515 a customized view-skeleton.
1516 """
1517 if key == "self":
1518 if user := current.user.get():
1519 key = user["key"]
1520 else:
1521 raise errors.Unauthorized("Cannot view 'self' with unknown user")
1523 return super().view(key, *args, **kwargs)
1525 def canView(self, skel) -> bool:
1526 if user := current.user.get():
1527 if skel["key"] == user["key"]:
1528 return True
1530 if "root" in user["access"] or "user-view" in user["access"]:
1531 return True
1533 return False
1535 @exposed
1536 @skey(allow_empty=True)
1537 def edit(self, key: db.Key | int | str = "self", *args, **kwargs):
1538 """
1539 Allow a special key "self" to reference the current user.
1541 This modification will only allow to use "self" as a key;
1542 The specific access right to let the user edit itself must
1543 still be customized.
1545 The rendered and editable skeleton can be modified or restriced
1546 by specifying a customized edit-skeleton.
1547 """
1548 if key == "self":
1549 if user := current.user.get():
1550 key = user["key"]
1551 else:
1552 raise errors.Unauthorized("Cannot edit 'self' with unknown user")
1554 return super().edit(key, *args, **kwargs)
1556 @exposed
1557 def getAuthMethods(self, *args, **kwargs):
1558 """Inform tools like Viur-Admin which authentication to use"""
1559 # FIXME: This is almost the same code as in index()...
1560 # FIXME: VIUR4: The entire function should be removed!
1561 # TODO: Align result with index(), so that primary and secondary login is presented.
1562 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!")
1564 res = [
1565 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1566 for primary, secondary in self.validAuthenticationMethods
1567 ]
1569 return json.dumps(res)
1571 @exposed
1572 @skey
1573 def trigger(self, action: str, key: str):
1574 current.request.get().response.headers["Content-Type"] = "application/json"
1576 # Check for provided access right definition (equivalent to client-side check), fallback to root!
1577 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", )
1578 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)):
1579 raise errors.Unauthorized()
1581 skel = self.baseSkel()
1582 if not skel.read(key):
1583 raise errors.NotFound()
1585 match action:
1586 case "takeover":
1587 self.authenticateUser(skel["key"])
1589 case "kick":
1590 session.killSessionByUser(skel["key"])
1592 case _:
1593 raise errors.NotImplemented(f"Action {action!r} not implemented")
1595 return json.dumps("OKAY")
1597 def onEdited(self, skel):
1598 super().onEdited(skel)
1600 # In case the user is set to inactive, kill all sessions
1601 if self.is_active(skel) is False:
1602 session.killSessionByUser(skel["key"])
1604 # Update user setting in all sessions
1605 for session_obj in db.Query("user").filter("user =", skel["key"]).iter():
1606 session_obj["data"]["user"] = skel.dbEntity
1609 def onDeleted(self, skel):
1610 super().onDeleted(skel)
1611 # Invalidate all sessions of that user
1612 session.killSessionByUser(skel["key"])
1615@tasks.StartupTask
1616def createNewUserIfNotExists():
1617 """
1618 Create a new Admin user, if the userDB is empty
1619 """
1620 if (
1621 (user_module := getattr(conf.main_app.vi, "user", None))
1622 and isinstance(user_module, User)
1623 and "addSkel" in dir(user_module)
1624 and "validAuthenticationMethods" in dir(user_module)
1625 # UserPassword must be one of the primary login methods
1626 and any(
1627 issubclass(provider[0], UserPassword)
1628 for provider in user_module.validAuthenticationMethods
1629 )
1630 ):
1631 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database
1632 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton
1633 uname = f"""admin@{conf.instance.project_id}.appspot.com"""
1634 pw = utils.string.random(13)
1635 addSkel["name"] = uname
1636 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away
1637 addSkel["access"] = ["root"]
1638 addSkel["password"] = pw
1640 try:
1641 addSkel.write()
1642 except Exception as e:
1643 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}")
1644 logging.exception(e)
1645 return
1647 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}"
1649 logging.warning(msg)
1650 email.send_email_to_admins("New ViUR password", msg)
1653# DEPRECATED ATTRIBUTES HANDLING
1655def __getattr__(attr):
1656 match attr:
1657 case "userSkel":
1658 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!"
1659 warnings.warn(msg, DeprecationWarning, stacklevel=2)
1660 logging.warning(msg)
1661 return UserSkel
1663 return super(__import__(__name__).__class__).__getattr__(attr)