Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / modules / user.py: 0%
733 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 14:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 14:23 +0000
1import abc
2import datetime
3import enum
4import functools
5import hashlib
6import hmac
7import json
8import logging
9import secrets
10import warnings
11import user_agents
13import pyotp
14import base64
15import dataclasses
16import typing as t
17from google.auth.transport import requests
18from google.oauth2 import id_token
20from viur.core import (
21 conf, current, db, email, errors, i18n,
22 securitykey, session, skeleton, tasks, utils, Module
23)
24from viur.core.decorators import *
25from viur.core.bones import *
26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password
27from viur.core.prototypes.list import List
28from viur.core.ratelimit import RateLimit
29from viur.core.securityheaders import extendCsp
32@functools.total_ordering
33class Status(enum.Enum):
34 """Status enum for a user
36 Has backwards compatibility to be comparable with non-enum values.
37 Will be removed with viur-core 4.0.0
38 """
40 UNSET = 0 # Status is unset
41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification
42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin
43 DISABLED = 5 # Account disabled
44 ACTIVE = 10 # Active
46 def __eq__(self, other):
47 if isinstance(other, Status):
48 return super().__eq__(other)
49 return self.value == other
51 def __lt__(self, other):
52 if isinstance(other, Status):
53 return super().__lt__(other)
54 return self.value < other
57class UserSkel(skeleton.Skeleton):
58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604)
60 name = EmailBone(
61 descr="E-Mail",
62 required=True,
63 readOnly=True,
64 caseSensitive=False,
65 searchable=True,
66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"),
67 )
69 firstname = StringBone(
70 descr="Firstname",
71 searchable=True,
72 )
74 lastname = StringBone(
75 descr="Lastname",
76 searchable=True,
77 )
79 roles = SelectBone(
80 descr=i18n.translate("viur.core.modules.user.bone.roles", defaultText="Roles"),
81 values=conf.user.roles,
82 required=True,
83 multiple=True,
84 # fixme: This is generally broken in VIUR! See #776 for details.
85 # vfunc=lambda values:
86 # i18n.translate(
87 # "user.bone.roles.invalid",
88 # defaultText="Invalid role setting: 'custom' can only be set alone.")
89 # if "custom" in values and len(values) > 1 else None,
90 defaultValue=list(conf.user.roles.keys())[:1],
91 )
93 access = SelectBone(
94 descr=i18n.translate("viur.core.modules.user.bone.access", defaultText="Access rights"),
95 type_suffix="access",
96 values=lambda: {
97 right: i18n.translate(f"viur.core.modules.user.accessright.{right}", defaultText=right)
98 for right in sorted(conf.user.access_rights)
99 },
100 multiple=True,
101 params={
102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system
103 }
104 )
106 status = SelectBone(
107 descr=i18n.translate("viur.core.modules.user.bone.status", "Account status"),
108 values=Status,
109 translation_key_prefix="viur.core.user.status.",
110 defaultValue=Status.ACTIVE,
111 required=True,
112 )
114 lastlogin = DateBone(
115 descr="Last Login",
116 readOnly=True,
117 )
119 admin_config = JsonBone( # This bone stores settings from the vi
120 descr="Config for the User",
121 visible=False
122 )
124 def __new__(cls, *args, **kwargs):
125 """
126 Constructor for the UserSkel-class, with the capability
127 to dynamically add bones required for the configured
128 authentication methods.
129 """
130 for provider in conf.main_app.vi.user.authenticationProviders:
131 assert issubclass(provider, UserPrimaryAuthentication)
132 provider.patch_user_skel(cls)
134 for provider in conf.main_app.vi.user.secondFactorProviders:
135 assert issubclass(provider, UserSecondFactorAuthentication)
136 provider.patch_user_skel(cls)
138 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls)
139 return super().__new__(cls, *args, **kwargs)
141 @classmethod
142 def write(cls, skel, *args, **kwargs):
143 # Roles
144 if skel["roles"] and "custom" not in skel["roles"]:
145 # Collect access rights through rules
146 access = set()
148 for role in skel["roles"]:
149 # Get default access for this role
150 access |= conf.main_app.vi.user.get_role_defaults(role)
152 # Go through all modules and evaluate available role-settings
153 for name in dir(conf.main_app.vi):
154 if name.startswith("_"):
155 continue
157 module = getattr(conf.main_app.vi, name)
158 if not isinstance(module, Module):
159 continue
161 roles = getattr(module, "roles", None) or {}
162 rights = roles.get(role, roles.get("*", ()))
164 # Convert role into tuple if it's not
165 if not isinstance(rights, (tuple, list)):
166 rights = (rights, )
168 if "*" in rights:
169 for right in module.accessRights:
170 access.add(f"{name}-{right}")
171 else:
172 for right in rights:
173 if right in module.accessRights:
174 access.add(f"{name}-{right}")
176 # special case: "edit" and "delete" actions require "view" as well!
177 if right in ("edit", "delete") and "view" in module.accessRights:
178 access.add(f"{name}-view")
180 skel["access"] = list(access)
182 return super().write(skel, *args, **kwargs)
185class UserAuthentication(Module, abc.ABC):
186 @property
187 @abc.abstractstaticmethod
188 def METHOD_NAME() -> str:
189 """
190 Define a unique method name for this authentication.
191 """
192 ...
194 @property
195 @abc.abstractstaticmethod
196 def NAME() -> str:
197 """
198 Define a descriptive name for this authentication.
199 """
200 ...
202 @property
203 @staticmethod
204 def VISIBLE(cls) -> bool:
205 """
206 Defines if the authentication method is visible to the user.
207 """
208 return True
210 def __init__(self, moduleName, modulePath, userModule):
211 super().__init__(moduleName, modulePath)
212 self._user_module = userModule
213 self.start_url = f"{self.modulePath}/login"
215 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
216 return True
218 @classmethod
219 def patch_user_skel(cls, skel_cls: skeleton.Skeleton):
220 """
221 Allows for an UserAuthentication to patch the UserSkel
222 class with additional bones which are required for
223 the implemented authentication method.
224 """
225 ...
228class UserPrimaryAuthentication(UserAuthentication, abc.ABC):
229 """Abstract class for all primary authentication methods."""
230 registrationEnabled = False
232 @abc.abstractmethod
233 def login(self, *args, **kwargs):
234 ...
236 def next_or_finish(self, skel: skeleton.SkeletonInstance):
237 """
238 Hook that is called whenever a part of the authentication was successful.
239 It allows to perform further steps in custom authentications,
240 e.g. change a password after first login.
241 """
242 return self._user_module.continueAuthenticationFlow(self, skel["key"])
245class UserPassword(UserPrimaryAuthentication):
246 METHOD_NAME = "X-VIUR-AUTH-User-Password"
247 NAME = "Username & Password"
249 registrationEmailVerificationRequired = True
250 registrationAdminVerificationRequired = True
252 verifySuccessTemplate = "user_verify_success"
253 verifyEmailAddressMail = "user_verify_address"
254 verifyFailedTemplate = "user_verify_failed"
255 passwordRecoveryMail = "user_password_recovery"
256 passwordRecoverySuccessTemplate = "user_passwordrecover_success"
257 passwordRecoveryTemplate = "user_passwordrecover"
259 # The default rate-limit for password recovery (10 tries each 15 minutes)
260 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip")
262 # Limit (invalid) login-retries to once per 5 seconds
263 loginRateLimit = RateLimit("user.login", 12, 1, "ip")
265 @classmethod
266 def patch_user_skel(cls, skel_cls):
267 """
268 Modifies the UserSkel to be equipped by a PasswordBone.
269 """
270 skel_cls.password = PasswordBone(
271 readOnly=True,
272 visible=False,
273 params={
274 "category": "Authentication",
275 }
276 )
278 class LoginSkel(skeleton.RelSkel):
279 name = EmailBone(
280 descr="E-Mail",
281 required=True,
282 caseSensitive=False,
283 )
284 password = PasswordBone(
285 required=True,
286 test_threshold=0,
287 tests=(),
288 raw=True,
289 )
291 class LostPasswordStep1Skel(skeleton.RelSkel):
292 name = EmailBone(
293 descr="E-Mail",
294 required=True,
295 )
297 class LostPasswordStep2Skel(skeleton.RelSkel):
298 recovery_key = StringBone(
299 descr="Recovery Key",
300 required=True,
301 params={
302 "tooltip": i18n.translate(
303 key="viur.core.modules.user.userpassword.lostpasswordstep2.recoverykey",
304 defaultText="Please enter the validation key you've received via e-mail.",
305 hint="Shown when the user needs more than 15 minutes to paste the key",
306 ),
307 }
308 )
310 class LostPasswordStep3Skel(skeleton.RelSkel):
311 # send the recovery key again, in case the password is rejected by some reason.
312 recovery_key = StringBone(
313 descr="Recovery Key",
314 visible=False,
315 )
317 password = PasswordBone(
318 descr="New Password",
319 required=True,
320 params={
321 "tooltip": i18n.translate(
322 key="viur.core.modules.user.userpassword.lostpasswordstep3.password",
323 defaultText="Please enter a new password for your account.",
324 ),
325 }
326 )
328 @exposed
329 @force_ssl
330 @skey(allow_empty=True)
331 def login(self, **kwargs):
332 # Obtain a fresh login skel
333 skel = self.LoginSkel()
335 # Read required bones from client
336 if not (kwargs and skel.fromClient(kwargs)):
337 return self._user_module.render.render("login", skel)
339 self.loginRateLimit.assertQuotaIsAvailable()
341 # query for the username. The query might find another user, but the name is being checked for equality below
342 name = skel["name"].lower().strip()
343 user_skel = self._user_module.baseSkel()
344 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel
346 # extract password hash from raw database entity (skeleton access blocks it)
347 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {}
348 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001
349 password_hash = encode_password(skel["password"], password_data.get("salt", "-invalid-"), iterations)["pwhash"]
351 # now check if the username matches
352 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode())
354 # next, check if the password hash matches
355 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash)
357 if not is_okay:
358 # Set error to all required fields
359 for name, bone in skel.items():
360 if bone.required:
361 skel.errors.append(
362 ReadFromClientError(
363 ReadFromClientErrorSeverity.Invalid,
364 i18n.translate(
365 key="viur.core.modules.user.userpassword.login.failed",
366 defaultText="Invalid username or password provided",
367 ),
368 name,
369 )
370 )
372 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota
373 return self._user_module.render.render("login", skel)
375 # check if iterations are below current security standards, and update if necessary.
376 if iterations < PBKDF2_DEFAULT_ITERATIONS:
377 logging.info(f"Update password hash for user {name}.")
378 # re-hash the password with more iterations
379 # FIXME: This must be done within a transaction!
380 user_skel["password"] = kwargs["password"] # will be hashed on serialize
381 user_skel.write(update_relations=False)
383 return self.next_or_finish(user_skel)
385 @exposed
386 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, **kwargs):
387 """
388 This implements a password recovery process which lets users set a new password for their account,
389 after validating a recovery key sent by email.
391 The process is as following:
393 - The user enters the registered email adress (not validated here)
394 - A random code is generated and stored as a security-ke, then sendUserPasswordRecoveryCode is called.
395 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name
396 and send a link with the code. It runs as a deferred task so no information if a user account exists
397 is being leaked.
398 - If the user received an email, the link can be clicked to set a new password for the account.
400 To prevent automated attacks, the first step is guarded by limited calls to this function to 10 actions
401 per 15 minutes. (One complete recovery process consists of two calls).
402 """
403 self.passwordRecoveryRateLimit.assertQuotaIsAvailable()
404 current_request = current.request.get()
406 if recovery_key is None:
407 # This is the first step, where we ask for the username of the account we'll going to reset the password on
408 skel = self.LostPasswordStep1Skel()
410 if (
411 not kwargs
412 or not current_request.isPostRequest
413 or not skel.fromClient(kwargs)
414 ):
415 return self._user_module.render.render(
416 "pwrecover", skel,
417 tpl=self.passwordRecoveryTemplate,
418 )
420 # validate security key
421 if not securitykey.validate(skey):
422 raise errors.PreconditionFailed()
424 self.passwordRecoveryRateLimit.decrementQuota()
426 recovery_key = securitykey.create(
427 duration=datetime.timedelta(minutes=15),
428 key_length=conf.security.password_recovery_key_length,
429 user_name=skel["name"].lower(),
430 session_bound=False,
431 )
433 # Send the code in background
434 self.sendUserPasswordRecoveryCode(
435 skel["name"], recovery_key, current_request.request.headers["User-Agent"]
436 )
438 # step 2 is only an action-skel, and can be ignored by a direct link in the
439 # e-mail previously sent. It depends on the implementation of the specific project.
440 return self._user_module.render.render(
441 "pwrecover", self.LostPasswordStep2Skel(),
442 tpl=self.passwordRecoveryTemplate,
443 )
445 # in step 3
446 skel = self.LostPasswordStep3Skel()
448 # reset the recovery key again, in case the fromClient() fails.
449 skel["recovery_key"] = str(recovery_key).strip()
451 # check for any input; Render input-form again when incomplete.
452 if (
453 not kwargs
454 or not current_request.isPostRequest
455 or not skel.fromClient(kwargs, ignore=("recovery_key",))
456 ):
457 return self._user_module.render.render(
458 "pwrecover", skel,
459 tpl=self.passwordRecoveryTemplate,
460 )
462 # validate security key
463 if not securitykey.validate(skey):
464 raise errors.PreconditionFailed()
466 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)):
467 raise errors.PreconditionFailed(
468 i18n.translate(
469 key="viur.core.modules.user.passwordrecovery.keyexpired",
470 defaultText="The recovery key is expired or invalid. Please start the recovery process again.",
471 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key."
472 )
473 )
475 self.passwordRecoveryRateLimit.decrementQuota()
477 # If we made it here, the key was correct, so we'd hopefully have a valid user for this
478 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel()
480 if not user_skel:
481 raise errors.NotFound(
482 i18n.translate(
483 key="viur.core.modules.user.passwordrecovery.usernotfound",
484 defaultText="There is no account with this name",
485 hint="We cant find an account with that name (Should never happen)"
486 )
487 )
489 # If the account is locked or not yet validated, abort the process.
490 if not self._user_module.is_active(user_skel):
491 raise errors.NotFound(
492 i18n.translate(
493 key="viur.core.modules.user.passwordrecovery.accountlocked",
494 defaultText="This account is currently locked. You cannot change its password.",
495 hint="Attempted password recovery on a locked account"
496 )
497 )
499 # Update the password, save the user, reset his session and show the success-template
500 user_skel["password"] = skel["password"]
501 user_skel.write(update_relations=False)
503 return self._user_module.render.render(
504 "pwrecover_success",
505 next_url=self.start_url,
506 tpl=self.passwordRecoverySuccessTemplate
507 )
509 @tasks.CallDeferred
510 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None:
511 """
512 Sends the given recovery code to the user given in userName. This function runs deferred
513 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the
514 code by email (assuming we have working email delivery), but this can be overridden to send it
515 by SMS or other means. We'll also update the changedate for this user, so no more than one code
516 can be send to any given user in four hours.
517 """
518 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel():
519 user_agent = user_agents.parse(user_agent)
520 email.send_email(
521 tpl=self.passwordRecoveryMail,
522 skel=user_skel,
523 dests=[user_name],
524 recovery_key=recovery_key,
525 user_agent={
526 "device": user_agent.get_device(),
527 "os": user_agent.get_os(),
528 "browser": user_agent.get_browser()
529 }
530 )
532 @exposed
533 @skey(forward_payload="data", session_bound=False)
534 def verify(self, data):
535 def transact(key):
536 skel = self._user_module.editSkel()
537 if not key or not skel.read(key):
538 return None
540 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \
541 if self.registrationAdminVerificationRequired else Status.ACTIVE
543 skel.write(update_relations=False)
544 return skel
546 if not isinstance(data, dict) or not (skel := db.run_in_transaction(transact, data.get("user_key"))):
547 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate)
549 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate)
551 def canAdd(self) -> bool:
552 return self.registrationEnabled
554 def addSkel(self):
555 """
556 Prepare the add-Skel for rendering.
557 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on
558 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired
559 :return: viur.core.skeleton.Skeleton
560 """
561 skel = self._user_module.addSkel()
563 if self.registrationEmailVerificationRequired:
564 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION
565 elif self.registrationAdminVerificationRequired:
566 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION
567 else: # No further verification required
568 defaultStatusValue = Status.ACTIVE
570 skel.status.readOnly = True
571 skel["status"] = defaultStatusValue
573 if "password" in skel:
574 skel.password.required = True # The user will have to set a password
576 return skel
578 @force_ssl
579 @exposed
580 @skey(allow_empty=True)
581 def add(self, *, bounce: bool = False, **kwargs):
582 """
583 Allows guests to register a new account if self.registrationEnabled is set to true
585 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd`
587 :returns: The rendered, added object of the entry, eventually with error hints.
589 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
590 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
591 """
592 if not self.canAdd():
593 raise errors.Unauthorized()
595 skel = self.addSkel()
597 if (
598 not kwargs # no data supplied
599 or not current.request.get().isPostRequest # bail out if not using POST-method
600 or not skel.fromClient(kwargs) # failure on reading into the bones
601 or bounce # review before adding
602 ):
603 # render the skeleton in the version it could as far as it could be read.
604 return self._user_module.render.add(skel)
606 self._user_module.onAdd(skel)
607 skel.write()
609 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION:
610 # The user will have to verify his email-address. Create a skey and send it to his address
611 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False,
612 user_key=db.normalize_key(skel["key"]),
613 name=skel["name"])
614 skel.skey = BaseBone(descr="Skey")
615 skel["skey"] = skey
616 email.send_email(dests=[skel["name"]], tpl=self.verifyEmailAddressMail, skel=skel)
618 self._user_module.onAdded(skel) # Call onAdded on our parent user module
619 return self._user_module.render.addSuccess(skel)
622class GoogleAccount(UserPrimaryAuthentication):
623 METHOD_NAME = "X-VIUR-AUTH-Google-Account"
624 NAME = "Google Account"
626 @classmethod
627 def patch_user_skel(cls, skel_cls):
628 """
629 Modifies the UserSkel to be equipped by a bones required by Google Auth
630 """
631 skel_cls.uid = StringBone(
632 descr="Google UserID",
633 required=False,
634 readOnly=True,
635 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"),
636 params={
637 "category": "Authentication",
638 }
639 )
641 skel_cls.sync = BooleanBone(
642 descr="Sync user data with OAuth-based services",
643 defaultValue=True,
644 params={
645 "category": "Authentication",
646 "tooltip":
647 "If set, user data like firstname and lastname is automatically kept"
648 "synchronous with the information stored at the OAuth service provider"
649 "(e.g. Google Login)."
650 }
651 )
653 @exposed
654 @force_ssl
655 @skey(allow_empty=True)
656 def login(self, token: str | None = None, *args, **kwargs):
657 if not conf.user.google_client_id:
658 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!")
660 if not token:
661 request = current.request.get()
662 request.response.headers["Content-Type"] = "text/html"
663 if request.response.headers.get("cross-origin-opener-policy") == "same-origin":
664 # We have to allow popups here
665 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups"
667 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html")
668 with open(file_path) as file:
669 tpl_string = file.read()
671 # FIXME: Use Jinja2 for rendering?
672 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id)
673 extendCsp({
674 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="],
675 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="]
676 })
677 return tpl_string
679 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id)
680 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}:
681 raise ValueError("Invalid issuer")
683 # Token looks valid :)
684 uid = user_info["sub"]
685 email = user_info["email"]
687 base_skel = self._user_module.baseSkel()
688 update = False
689 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()):
690 # We'll try again - checking if there's already an user with that email
691 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()):
692 # Still no luck - it's a completely new user
693 if not self.registrationEnabled:
694 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains:
695 logging.debug(f"Google user is from allowed {domain} - adding account")
696 else:
697 logging.debug(f"Google user is from {domain} - denying registration")
698 raise errors.Forbidden("Registration for new users is disabled")
700 user_skel = base_skel
701 user_skel["uid"] = uid
702 user_skel["name"] = email
703 update = True
705 # Take user information from Google, if wanted!
706 if user_skel["sync"]:
707 for target, source in {
708 "name": email,
709 "firstname": user_info.get("given_name"),
710 "lastname": user_info.get("family_name"),
711 }.items():
713 if user_skel[target] != source:
714 user_skel[target] = source
715 update = True
717 if update:
718 assert user_skel.write()
720 return self.next_or_finish(user_skel)
723class UserSecondFactorAuthentication(UserAuthentication, abc.ABC):
724 """Abstract class for all second factors."""
725 MAX_RETRY = 3
726 second_factor_login_template = "user_login_secondfactor"
727 """Template to enter the TOPT on login"""
729 @property
730 @abc.abstractmethod
731 def NAME(self) -> str:
732 """Name for this factor for templates."""
733 ...
735 @property
736 @abc.abstractmethod
737 def ACTION_NAME(self) -> str:
738 """The action name for this factor, used as path-segment."""
739 ...
741 def __init__(self, moduleName, modulePath, _user_module):
742 super().__init__(moduleName, modulePath, _user_module)
743 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}"
744 self.add_url = f"{self.modulePath}/add"
745 self.start_url = f"{self.modulePath}/start"
748class TimeBasedOTP(UserSecondFactorAuthentication):
749 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP"
750 WINDOW_SIZE = 5
751 ACTION_NAME = "otp"
752 NAME = "Time-based OTP"
753 second_factor_login_template = "user_login_secondfactor"
755 @dataclasses.dataclass
756 class OtpConfig:
757 """
758 This dataclass is used to provide an interface for a OTP token
759 algorithm description that is passed within the TimeBasedOTP
760 class for configuration.
761 """
762 secret: str
763 timedrift: float = 0.0
764 algorithm: t.Literal["sha1", "sha256"] = "sha1"
765 interval: int = 60
767 class OtpSkel(skeleton.RelSkel):
768 """
769 This is the Skeleton used to ask for the OTP token.
770 """
771 otptoken = NumericBone(
772 descr="Token",
773 required=True,
774 max=999999,
775 min=0,
776 )
778 @classmethod
779 def patch_user_skel(cls, skel_cls):
780 """
781 Modifies the UserSkel to be equipped by a bones required by Timebased OTP
782 """
783 # One-Time Password Verification
784 skel_cls.otp_serial = StringBone(
785 descr="OTP serial",
786 searchable=True,
787 params={
788 "category": "Second Factor Authentication",
789 }
790 )
792 skel_cls.otp_secret = CredentialBone(
793 descr="OTP secret",
794 params={
795 "category": "Second Factor Authentication",
796 }
797 )
799 skel_cls.otp_timedrift = NumericBone(
800 descr="OTP time drift",
801 readOnly=True,
802 defaultValue=0,
803 precision=1,
804 params={
805 "category": "Second Factor Authentication",
806 }
807 )
809 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None:
810 """
811 Returns an instance of self.OtpConfig with a provided token configuration,
812 or None when there is no appropriate configuration of this second factor handler available.
813 """
815 if otp_secret := skel.dbEntity.get("otp_secret"):
816 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0)
818 return None
820 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
821 """
822 Specified whether the second factor authentication can be handled by the given user or not.
823 """
824 return bool(self.get_config(skel))
826 @exposed
827 def start(self):
828 """
829 Configures OTP login for the current session.
831 A special otp_user_conf has to be specified as a dict, which is stored into the session.
832 """
833 session = current.session.get()
835 if not (user_key := session.get("possible_user_key")):
836 raise errors.PreconditionFailed(
837 "Second factor can only be triggered after successful primary authentication."
838 )
840 user_skel = self._user_module.baseSkel()
841 if not user_skel.read(user_key):
842 raise errors.NotFound("The previously authenticated user is gone.")
844 if not (otp_user_conf := self.get_config(user_skel)):
845 raise errors.PreconditionFailed("This second factor is not available for the user")
847 otp_user_conf = {
848 "key": str(user_key),
849 } | dataclasses.asdict(otp_user_conf)
851 session = current.session.get()
852 session["_otp_user"] = otp_user_conf
853 session.markChanged()
855 return self._user_module.render.edit(
856 self.OtpSkel(),
857 params={
858 "name": i18n.translate(
859 f"viur.core.modules.user.{self.ACTION_NAME}",
860 default_variables={"name": self.NAME},
861 ),
862 "action_name": self.ACTION_NAME,
863 "action_url": f"{self.modulePath}/{self.ACTION_NAME}",
864 },
865 tpl=self.second_factor_login_template
866 )
868 @exposed
869 @force_ssl
870 @skey(allow_empty=True)
871 def otp(self, *args, **kwargs):
872 """
873 Performs the second factor validation and interaction with the client.
874 """
875 session = current.session.get()
876 if not (otp_user_conf := session.get("_otp_user")):
877 raise errors.PreconditionFailed("No OTP process started in this session")
879 # Check if maximum second factor verification attempts
880 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
881 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
883 # Read the OTP token via the skeleton, to obtain a valid value
884 skel = self.OtpSkel()
885 if skel.fromClient(kwargs):
886 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP.
887 res = self.verify(
888 otp=skel["otptoken"],
889 secret=otp_user_conf["secret"],
890 algorithm=otp_user_conf.get("algorithm") or "sha1",
891 interval=otp_user_conf.get("interval") or 60,
892 timedrift=otp_user_conf.get("timedrift") or 0.0,
893 valid_window=self.WINDOW_SIZE
894 )
895 else:
896 res = None
898 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0!
899 if res is None:
900 otp_user_conf["attempts"] = attempts + 1
901 session.markChanged()
902 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
903 return self._user_module.render.edit(
904 skel,
905 name=i18n.translate(
906 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
907 default_variables={"name": self.NAME},
908 ),
909 action_name=self.ACTION_NAME,
910 action_url=f"{self.modulePath}/{self.ACTION_NAME}",
911 tpl=self.second_factor_login_template
912 )
914 # Remove otp user config from session
915 user_key = db.key_helper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName)
916 del session["_otp_user"]
917 session.markChanged()
919 # Check if the OTP device has a time drift
921 timedriftchange = float(res) - otp_user_conf["timedrift"]
922 if abs(timedriftchange) > 2:
923 # The time-drift change accumulates to more than 2 minutes (for interval==60):
924 # update clock-drift value accordingly
925 self.updateTimeDrift(user_key, timedriftchange)
927 # Continue with authentication
928 return self._user_module.secondFactorSucceeded(self, user_key)
930 @staticmethod
931 def verify(
932 otp: str | int,
933 secret: str,
934 algorithm: str = "sha1",
935 interval: int = 60,
936 timedrift: float = 0.0,
937 for_time: datetime.datetime | None = None,
938 valid_window: int = 0,
939 ) -> int | None:
940 """
941 Verifies the OTP passed in against the current time OTP.
943 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which
944 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown
945 on the hardware token generator. This can be used to store the time drift of a given token generator.
947 :param otp: the OTP token to check against
948 :param secret: The OTP secret
949 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256)
950 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators).
951 :param timedrift: The known timedrift (old index) of the hardware OTP generator
952 :param for_time: Time to check OTP at (defaults to now)
953 :param valid_window: extends the validity to this many counter ticks before and after the current one
955 :returns: The index where verification succeeded, None otherwise
956 """
957 # get the hashing digest
958 digest = {
959 "sha1": hashlib.sha1,
960 "sha256": hashlib.sha256,
961 }.get(algorithm)
963 if not digest:
964 raise errors.NotImplemented(f"{algorithm=} is not implemented")
966 if for_time is None:
967 for_time = datetime.datetime.now()
969 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index
970 timedrift = round(timedrift)
971 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret
972 otp = str(otp).zfill(6) # fill with zeros in front
974 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}")
975 totp = pyotp.TOTP(secret, digest=digest, interval=interval)
977 if valid_window:
978 for offset in range(timedrift - valid_window, timedrift + valid_window + 1):
979 token = str(totp.at(for_time, offset))
980 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}")
981 if hmac.compare_digest(otp, token):
982 return offset
984 return None
986 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None
988 # FIXME: VIUR4 rename
989 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None:
990 """
991 Updates the clock-drift value.
993 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew
994 it out of bounds. Maximum change per call is 0.3 minutes.
996 :param user_key: For which user should the update occour
997 :param idx: How many steps before/behind was that token
998 """
999 if user_skel := self._user_module.skel().read(user_key):
1000 if otp_skel := self._get_otptoken(user_skel):
1001 otp_skel.patch(
1002 {
1003 "+otp_timedrift": min(max(0.1 * idx, -0.3), 0.3)
1004 },
1005 update_relations=False,
1006 )
1009class AuthenticatorOTP(UserSecondFactorAuthentication):
1010 """
1011 This class handles the second factor for apps like authy and so on
1012 """
1013 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP"
1015 # second_factor_add_template = "user_secondfactor_add"
1016 # """Template to configure (add) a new TOPT"""
1018 ACTION_NAME = "authenticator_otp"
1019 """Action name provided for *otp_template* on login"""
1021 NAME = "Authenticator App"
1023 # FIXME: The second factor add has to be rewritten entirely to ActionSkel paradigm.
1024 '''
1025 @exposed
1026 @force_ssl
1027 @skey(allow_empty=True)
1028 def add(self, otp=None):
1029 """
1030 We try to read the otp_app_secret from the current session. When this fails we generate a new one and store
1031 it in the session.
1033 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store
1034 the otp_app_secret from the session in the user entry.
1035 """
1036 current_session = current.session.get()
1038 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")):
1039 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret()
1040 current_session["_maybe_otp_app_secret"] = otp_app_secret
1041 current_session.markChanged()
1043 if otp is None:
1044 return self._user_module.render.second_factor_add(
1045 tpl=self.second_factor_add_template,
1046 action_name=self.ACTION_NAME,
1047 name=i18n.translate(
1048 f"viur.core.modules.user.auth{self.ACTION_NAME}",
1049 default_variables={"name": self.NAME},
1050 ),
1051 add_url=self.add_url,
1052 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret))
1053 else:
1054 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret):
1055 return self._user_module.render.second_factor_add(
1056 tpl=self.second_factor_add_template,
1057 action_name=self.ACTION_NAME,
1058 name=i18n.translate(
1059 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1060 default_variables={"name": self.NAME},
1061 ),
1062 add_url=self.add_url,
1063 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors
1065 # Now we can set the otp_app_secret to the current User and render der Success-template
1066 AuthenticatorOTP.set_otp_app_secret(otp_app_secret)
1067 return self._user_module.render.second_factor_add_success(
1068 action_name=self.ACTION_NAME,
1069 name=i18n.translate(
1070 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1071 default_variables={"name": self.NAME},
1072 ),
1073 )
1074 '''
1076 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
1077 """
1078 We can only handle the second factor if we have stored an otp_app_secret before.
1079 """
1080 return bool(skel.dbEntity.get("otp_app_secret", ""))
1082 @classmethod
1083 def patch_user_skel(cls, skel_cls):
1084 """
1085 Modifies the UserSkel to be equipped by bones required by Authenticator App
1086 """
1087 # Authenticator OTP Apps (like Authy)
1088 skel_cls.otp_app_secret = CredentialBone(
1089 descr="OTP Secret (App-Key)",
1090 params={
1091 "category": "Second Factor Authentication",
1092 }
1093 )
1095 @classmethod
1096 def set_otp_app_secret(cls, otp_app_secret=None):
1097 """
1098 Write a new OTP Token in the current user entry.
1099 """
1100 if otp_app_secret is None:
1101 logging.error("No 'otp_app_secret' is provided")
1102 raise errors.PreconditionFailed("No 'otp_app_secret' is provided")
1103 if not (cuser := current.user.get()):
1104 raise errors.Unauthorized()
1106 def transaction(user_key):
1107 if not (user := db.get(user_key)):
1108 raise errors.NotFound()
1109 user["otp_app_secret"] = otp_app_secret
1110 db.put(user)
1112 db.run_in_transaction(transaction, cuser["key"])
1114 @classmethod
1115 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str:
1116 """
1117 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example
1118 """
1119 if not (cuser := current.user.get()):
1120 raise errors.Unauthorized()
1121 if not (issuer := conf.user.otp_issuer):
1122 logging.warning(
1123 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}")
1124 issuer = conf.instance.project_id
1126 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer)
1128 @classmethod
1129 def generate_otp_app_secret(cls) -> str:
1130 """
1131 Generate a new OTP Secret
1132 :return an otp
1133 """
1134 return pyotp.random_base32()
1136 @classmethod
1137 def verify_otp(cls, otp: str | int, secret: str) -> bool:
1138 return pyotp.TOTP(secret).verify(otp)
1140 @exposed
1141 def start(self):
1142 otp_user_conf = {"attempts": 0}
1143 session = current.session.get()
1144 session["_otp_user"] = otp_user_conf
1145 session.markChanged()
1146 return self._user_module.render.edit(
1147 TimeBasedOTP.OtpSkel(),
1148 params={
1149 "name": i18n.translate(
1150 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1151 default_variables={"name": self.NAME},
1152 ),
1153 "action_name": self.ACTION_NAME,
1154 "action_url": self.action_url,
1155 },
1156 tpl=self.second_factor_login_template,
1157 )
1159 @exposed
1160 @force_ssl
1161 @skey
1162 def authenticator_otp(self, **kwargs):
1163 """
1164 We verify the otp here with the secret we stored before.
1165 """
1166 session = current.session.get()
1167 user_key = db.Key(self._user_module.kindName, session["possible_user_key"])
1169 if not (otp_user_conf := session.get("_otp_user")):
1170 raise errors.PreconditionFailed("No OTP process started in this session")
1172 # Check if maximum second factor verification attempts
1173 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
1174 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
1176 if not (user := db.get(user_key)):
1177 raise errors.NotFound()
1179 skel = TimeBasedOTP.OtpSkel()
1180 if not skel.fromClient(kwargs):
1181 raise errors.PreconditionFailed()
1182 otp_token = str(skel["otptoken"]).zfill(6)
1184 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]):
1185 return self._user_module.secondFactorSucceeded(self, user_key)
1186 otp_user_conf["attempts"] = attempts + 1
1187 session.markChanged()
1188 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
1189 return self._user_module.render.edit(
1190 skel,
1191 name=i18n.translate(
1192 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1193 default_variables={"name": self.NAME},
1194 ),
1195 action_name=self.ACTION_NAME,
1196 action_url=self.action_url,
1197 tpl=self.second_factor_login_template,
1198 )
1201class User(List):
1202 """
1203 The User module is used to manage and authenticate users in a ViUR system.
1205 It is used in almost any ViUR project, but ViUR can also function without any user capabilites.
1206 """
1208 kindName = "user"
1209 addTemplate = "user_add"
1210 addSuccessTemplate = "user_add_success"
1212 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter(
1213 None, (
1214 UserPassword,
1215 conf.user.google_client_id and GoogleAccount,
1216 )
1217 ))
1218 """
1219 Specifies primary authentication providers that are made available
1220 as sub-modules under `user/auth_<classname>`. They might require
1221 customization or configuration.
1222 """
1224 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = (
1225 TimeBasedOTP,
1226 AuthenticatorOTP,
1227 )
1228 """
1229 Specifies secondary authentication providers that are made available
1230 as sub-modules under `user/f2_<classname>`. They might require
1231 customization or configuration, which is determined during the
1232 login-process depending on the user that wants to login.
1233 """
1235 validAuthenticationMethods = tuple(filter(
1236 None, (
1237 (UserPassword, AuthenticatorOTP),
1238 (UserPassword, TimeBasedOTP),
1239 (UserPassword, None),
1240 (GoogleAccount, None) if conf.user.google_client_id else None,
1241 )
1242 ))
1243 """
1244 Specifies the possible combinations of primary- and secondary factor
1245 login methos.
1247 GoogleLogin defaults to no second factor, as the Google Account can be
1248 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only
1249 handled when there is a user-dependent configuration available.
1250 """
1252 msg_missing_second_factor = "Second factor required but not configured for this user."
1254 secondFactorTimeWindow = datetime.timedelta(minutes=10)
1256 default_order = "name.idx"
1258 roles = {
1259 "admin": "*",
1260 }
1262 def __init__(self, moduleName, modulePath):
1263 for provider in self.authenticationProviders:
1264 assert issubclass(provider, UserPrimaryAuthentication)
1265 name = f"auth_{provider.__name__.lower()}"
1266 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1268 for provider in self.secondFactorProviders:
1269 assert issubclass(provider, UserSecondFactorAuthentication)
1270 name = f"f2_{provider.__name__.lower()}"
1271 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1273 super().__init__(moduleName, modulePath)
1275 def adminInfo(self):
1276 ret = {
1277 "icon": "person-fill",
1278 }
1280 if self.is_admin(current.user.get()):
1281 ret |= {
1282 "actions": [
1283 "trigger_kick",
1284 "trigger_takeover",
1285 ],
1286 "customActions": {
1287 "trigger_kick": {
1288 "name": i18n.translate(
1289 key="viur.core.modules.user.customActions.kick",
1290 defaultText="Kick user",
1291 hint="Title of the kick user function"
1292 ),
1293 "icon": "trash2-fill",
1294 "action": "fetch",
1295 "url": "/vi/{{module}}/trigger/kick/{{key}}",
1296 "confirm": i18n.translate(
1297 key="viur.core.modules.user.customActions.kick.confirm",
1298 defaultText="Do you really want to drop all sessions of the selected user from the system?",
1299 ),
1300 "success": i18n.translate(
1301 key="viur.core.modules.user.customActions.kick.success",
1302 defaultText="Sessions of the user are being invalidated.",
1303 ),
1304 },
1305 "trigger_takeover": {
1306 "name": i18n.translate(
1307 key="viur.core.modules.user.customActions.takeover",
1308 defaultText="Take-over user",
1309 hint="Title of the take user over function"
1310 ),
1311 "icon": "file-person-fill",
1312 "action": "fetch",
1313 "url": "/vi/{{module}}/trigger/takeover/{{key}}",
1314 "confirm": i18n.translate(
1315 key="viur.core.modules.user.customActions.takeover.confirm",
1316 defaultText="Do you really want to replace your current user session by a "
1317 "user session of the selected user?",
1318 ),
1319 "success": i18n.translate(
1320 key="viur.core.modules.user.customActions.takeover.success",
1321 defaultText="You're now know as the selected user!",
1322 ),
1323 "then": "reload-vi",
1324 },
1325 },
1326 }
1328 return ret
1330 def get_role_defaults(self, role: str) -> set[str]:
1331 """
1332 Returns a set of default access rights for a given role.
1334 Defaults to "admin" usage for any role > "user"
1335 and "scriptor" usage for "admin" role.
1336 """
1337 ret = set()
1339 if role in ("viewer", "editor", "admin"):
1340 ret.add("admin")
1342 if role == "admin":
1343 ret.add("scriptor")
1345 return ret
1347 def addSkel(self):
1348 skel = super().addSkel().clone()
1350 if self.is_admin(current.user.get()):
1351 # An admin tries to add a new user.
1352 skel.status.readOnly = False
1353 skel.status.visible = True
1354 skel.access.readOnly = False
1355 skel.access.visible = True
1357 else:
1358 skel.status.readOnly = True
1359 skel["status"] = Status.UNSET
1360 skel.status.visible = False
1361 skel.access.readOnly = True
1362 skel["access"] = []
1363 skel.access.visible = False
1365 if "password" in skel:
1366 # Unlock and require a password
1367 skel.password.required = True
1368 skel.password.visible = True
1369 skel.password.readOnly = False
1371 skel.name.readOnly = False # Don't enforce readonly name in user/add
1372 return skel
1374 def editSkel(self, *args, **kwargs):
1375 skel = super().editSkel().clone()
1377 if "password" in skel:
1378 skel.password.required = False
1379 skel.password.visible = True
1380 skel.password.readOnly = False
1382 lock = not self.is_admin(current.user.get())
1383 skel.name.readOnly = lock
1384 skel.access.readOnly = lock
1385 skel.status.readOnly = lock
1387 return skel
1389 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication:
1390 return getattr(self, f"f2_{cls.__name__.lower()}")
1392 def getCurrentUser(self):
1393 session = current.session.get()
1395 req = current.request.get()
1396 if session and (session.loaded or req.is_deferred) and (user := session.get("user")):
1397 skel = self.baseSkel()
1398 skel.setEntity(user)
1399 return skel
1401 return None
1403 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key):
1404 """
1405 Continue authentication flow when primary authentication succeeded.
1406 """
1407 skel = self.baseSkel()
1409 if not skel.read(user_key):
1410 raise errors.NotFound("User was not found.")
1412 if not provider.can_handle(skel):
1413 raise errors.Forbidden("User is not allowed to use this primary login method.")
1415 session = current.session.get()
1416 session["possible_user_key"] = user_key.id_or_name
1417 session["_secondFactorStart"] = utils.utcNow()
1418 session.markChanged()
1420 second_factor_providers = []
1422 for auth_provider, second_factor in self.validAuthenticationMethods:
1423 if isinstance(provider, auth_provider):
1424 if second_factor is not None:
1425 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor)
1426 if second_factor_provider_instance.can_handle(skel):
1427 second_factor_providers.append(second_factor_provider_instance)
1428 else:
1429 second_factor_providers.append(None)
1431 if len(second_factor_providers) > 1 and None in second_factor_providers:
1432 # We have a second factor. So we can get rid of the None
1433 second_factor_providers.pop(second_factor_providers.index(None))
1435 if len(second_factor_providers) == 0:
1436 raise errors.NotAcceptable(self.msg_missing_second_factor)
1437 elif len(second_factor_providers) == 1:
1438 if second_factor_providers[0] is None:
1439 # We allow sign-in without a second factor
1440 return self.authenticateUser(user_key)
1441 # We have only one second factor we don't need the choice template
1442 return second_factor_providers[0].start(user_key)
1444 # In case there is more than one second factor provider remaining, let the user decide!
1445 current.session.get()["_secondfactor_providers"] = {
1446 second_factor.start_url: second_factor.NAME
1447 for second_factor in second_factor_providers
1448 if second_factor.VISIBLE
1449 }
1451 return self.select_secondfactor_provider()
1453 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key):
1454 """
1455 Continue authentication flow when secondary authentication succeeded.
1456 """
1457 session = current.session.get()
1458 if session["possible_user_key"] != user_key.id_or_name:
1459 raise errors.Forbidden()
1461 # Assert that the second factor verification finished in time
1462 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow:
1463 raise errors.RequestTimeout()
1465 return self.authenticateUser(user_key)
1467 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None:
1468 """
1469 Hookable check if a user is defined as "active" and can login.
1471 :param skel: The UserSkel of the user who wants to login.
1472 :returns: Returns True or False when the result is unambigous and the user is active or not. \
1473 Returns None when the provided skel doesn't provide enough information for determination.
1474 """
1475 if skel and "status" in skel:
1476 status = skel["status"]
1477 if not isinstance(status, (Status, int)):
1478 try:
1479 status = int(status)
1480 except ValueError:
1481 status = Status.UNSET
1483 return status >= Status.ACTIVE.value
1485 return None
1487 def is_admin(self, skel: skeleton.SkeletonInstance) -> bool | None:
1488 """
1489 Hookable check if a user is defined as "admin" and can edit or log into other users.
1490 Defaults to "root" users only.
1492 :param skel: The UserSkel of the user who wants should be checked for user admin privileges.
1493 :returns: Returns True or False when the result is unambigous and the user is admin or not. \
1494 Returns None when the provided skel doesn't provide enough information for determination.
1495 """
1496 if skel and "access" in skel:
1497 return "root" in skel["access"]
1499 return None
1501 def authenticateUser(self, key: db.Key, **kwargs):
1502 """
1503 Performs Log-In for the current session and the given user key.
1505 This resets the current session: All fields not explicitly marked as persistent
1506 by conf.user.session_persistent_fields_on_login are gone afterwards.
1508 :param key: The (DB-)Key of the user we shall authenticate
1509 """
1510 skel = self.baseSkel()
1511 if not skel.read(key):
1512 raise ValueError(f"Unable to authenticate unknown user {key}")
1514 # Verify that this user account is active
1515 if not self.is_active(skel):
1516 raise errors.Forbidden("The user is disabled and cannot be authenticated.")
1518 # Update session for user
1519 session = current.session.get()
1520 # Remember persistent fields...
1521 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login}
1522 session.reset()
1523 # and copy them over to the new session
1524 session |= take_over
1526 # Update session, user and request
1527 session["user"] = skel.dbEntity
1529 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key
1530 current.user.set(self.getCurrentUser())
1532 self.onLogin(skel)
1534 return self.render.render("login_success", skel, **kwargs)
1537 # Action for primary authentication selection
1539 def SelectAuthenticationProviderSkel(self):
1540 providers = {}
1541 first = None
1542 for provider in self.authenticationProviders:
1543 if not provider.VISIBLE:
1544 continue
1546 provider = getattr(self, f"auth_{provider.__name__.lower()}")
1547 providers[provider.start_url] = provider.NAME
1549 if first is None:
1550 first = provider.start_url
1552 class SelectAuthenticationProviderSkel(skeleton.RelSkel):
1553 provider = SelectBone(
1554 descr="Authentication method",
1555 required=True,
1556 values=providers,
1557 defaultValue=first,
1558 )
1560 return SelectAuthenticationProviderSkel()
1562 @exposed
1563 def select_authentication_provider(self, **kwargs):
1564 skel = self.SelectAuthenticationProviderSkel()
1566 # Read required bones from client
1567 if len(skel.provider.values) > 1 and (not kwargs or not skel.fromClient(kwargs)):
1568 return self.render.render("select_authentication_provider", skel)
1570 return self.render.render("select_authentication_provider_success", skel, next_url=skel["provider"])
1572 # Action for second factor select
1574 class SelectSecondFactorProviderSkel(skeleton.RelSkel):
1575 provider = SelectBone(
1576 descr="Second factor",
1577 required=True,
1578 values=lambda: current.session.get()["_secondfactor_providers"] or (),
1579 )
1581 @exposed
1582 def select_secondfactor_provider(self, **kwargs):
1583 skel = self.SelectSecondFactorProviderSkel()
1585 # Read required bones from client
1586 if not kwargs or not skel.fromClient(kwargs):
1587 return self.render.render("select_secondfactor_provider", skel)
1589 del current.session.get()["_secondfactor_providers"]
1591 return self.render.render("select_secondfactor_provider_success", skel, next_url=skel["provider"])
1593 @exposed
1594 @skey
1595 def logout(self, **kwargs):
1596 """
1597 Implements the logout action. It also terminates the current session (all keys not listed
1598 in viur.session_persistent_fields_on_logout will be lost).
1599 """
1600 if not (user := current.user.get()):
1601 raise errors.Unauthorized()
1603 self.onLogout(user)
1605 session = current.session.get()
1607 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}:
1608 session.reset()
1609 session |= take_over
1610 else:
1611 session.clear()
1613 current.user.set(None) # set user to none in context var
1615 return self.render.render("logout_success")
1617 @exposed
1618 def login(self, *args, **kwargs):
1619 return self.select_authentication_provider()
1621 def onLogin(self, skel: skeleton.SkeletonInstance):
1622 """
1623 Hook to be called on user login.
1624 """
1625 # Update the lastlogin timestamp (if available!)
1626 if "lastlogin" in skel:
1627 now = utils.utcNow()
1629 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??)
1630 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)):
1631 skel["lastlogin"] = now
1632 skel.write(update_relations=False)
1634 logging.info(f"""User {skel["name"]} logged in""")
1636 def onLogout(self, skel: skeleton.SkeletonInstance):
1637 """
1638 Hook to be called on user logout.
1639 """
1640 logging.info(f"""User {skel["name"]} logged out""")
1642 @exposed
1643 def view(self, key: db.Key | int | str = "self", *args, **kwargs):
1644 """
1645 Allow a special key "self" to reference the current user.
1647 By default, any authenticated user can view its own user entry,
1648 to obtain access rights and any specific user information.
1649 This behavior is defined in the customized `canView` function,
1650 which is overwritten by the User-module.
1652 The rendered skeleton can be modified or restriced by specifying
1653 a customized view-skeleton.
1654 """
1655 if key == "self":
1656 if user := current.user.get():
1657 key = user["key"]
1658 else:
1659 raise errors.Unauthorized("Cannot view 'self' with unknown user")
1661 return super().view(key, *args, **kwargs)
1663 def canView(self, skel) -> bool:
1664 if user := current.user.get():
1665 if skel["key"] == user["key"]:
1666 return True
1668 if self.is_admin(user) or "user-view" in user["access"]:
1669 return True
1671 return False
1673 @exposed
1674 @skey(allow_empty=True)
1675 def edit(self, key: db.Key | int | str = "self", *args, **kwargs):
1676 """
1677 Allow a special key "self" to reference the current user.
1679 This modification will only allow to use "self" as a key;
1680 The specific access right to let the user edit itself must
1681 still be customized.
1683 The rendered and editable skeleton can be modified or restriced
1684 by specifying a customized edit-skeleton.
1685 """
1686 if key == "self":
1687 if user := current.user.get():
1688 key = user["key"]
1689 else:
1690 raise errors.Unauthorized("Cannot edit 'self' with unknown user")
1692 return super().edit(key, *args, **kwargs)
1694 @exposed
1695 def getAuthMethods(self, *args, **kwargs):
1696 """Legacy method prior < viur-core 3.8: Inform tools like Admin which authentication to use"""
1697 logging.warning("DEPRECATED!!! Use '/user/login'-method for this, or update your admin version!")
1699 res = [
1700 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1701 for primary, secondary in self.validAuthenticationMethods
1702 ]
1704 return json.dumps(res)
1706 @exposed
1707 def trigger(self, action: str, key: str):
1708 # Check for provided access right definition (equivalent to client-side check), fallback to root!
1709 access = self.adminInfo().get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ()
1710 if not (
1711 (cuser := current.user.get())
1712 and (
1713 any(role in cuser["access"] for role in access)
1714 or self.is_admin(cuser)
1715 )
1716 ):
1717 raise errors.Unauthorized()
1719 skel = self.skel()
1720 if not skel.read(key) and not (skel := skel.all().mergeExternalFilter({"name": key}).getSkel()):
1721 raise errors.NotFound("The provided user does not exist.")
1723 match action:
1724 case "takeover":
1725 self.authenticateUser(skel["key"])
1727 case "kick":
1728 session.killSessionByUser(skel["key"])
1730 case _:
1731 raise errors.NotImplemented(f"Action {action!r} not implemented")
1733 return self.render.render(f"trigger/{action}Success", skel)
1735 def onEdited(self, skel):
1736 super().onEdited(skel)
1738 # In case the user is set to inactive, kill all sessions
1739 if self.is_active(skel) is False:
1740 session.killSessionByUser(skel["key"])
1742 # Update user setting in all sessions
1743 for session_obj in db.Query("user").filter("user =", skel["key"]).iter():
1744 session_obj["data"]["user"] = skel.dbEntity
1746 def onDeleted(self, skel):
1747 super().onDeleted(skel)
1748 # Invalidate all sessions of that user
1749 session.killSessionByUser(skel["key"])
1752@tasks.StartupTask
1753def createNewUserIfNotExists():
1754 """
1755 Create a new Admin user, if the userDB is empty
1756 """
1757 if (
1758 (user_module := getattr(conf.main_app.vi, "user", None))
1759 and isinstance(user_module, User)
1760 and "addSkel" in dir(user_module)
1761 and "validAuthenticationMethods" in dir(user_module)
1762 # UserPassword must be one of the primary login methods
1763 and any(
1764 issubclass(provider[0], UserPassword)
1765 for provider in user_module.validAuthenticationMethods
1766 )
1767 ):
1768 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database
1769 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton
1770 uname = f"""admin@{conf.instance.project_id}.appspot.com"""
1771 pw = utils.string.random(13)
1772 addSkel["name"] = uname
1773 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away
1774 addSkel["access"] = ["root"]
1775 addSkel["password"] = pw
1777 try:
1778 addSkel.write()
1779 except Exception as e:
1780 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}")
1781 logging.exception(e)
1782 return
1784 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}"
1786 logging.warning(msg)
1787 email.send_email_to_admins("New ViUR password", msg)
1790# DEPRECATED ATTRIBUTES HANDLING
1792def __getattr__(attr):
1793 match attr:
1794 case "userSkel":
1795 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!"
1796 warnings.warn(msg, DeprecationWarning, stacklevel=2)
1797 logging.warning(msg)
1798 return UserSkel
1800 return super(__import__(__name__).__class__).__getattr__(attr)