Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / modules / user.py: 0%
774 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 23:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 23:08 +0000
1import abc
2import datetime
3import enum
4import fnmatch
5import functools
6import hashlib
7import hmac
8import json
9import logging
10import secrets
11import time
12import urllib.parse
13import warnings
14from http.cookies import SimpleCookie
16import user_agents
18import pyotp
19import base64
20import dataclasses
21import typing as t
22from google.auth.transport import requests
23from google.oauth2 import id_token
25from viur.core import (
26 conf, current, db, email, errors, i18n,
27 securitykey, session, skeleton, tasks, utils, Module
28)
29from viur.core.decorators import *
30from viur.core.bones import *
31from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password
32from viur.core.prototypes.list import List
33from viur.core.ratelimit import RateLimit
34from viur.core.securityheaders import extendCsp
35from viur.core.session import Session
38@functools.total_ordering
39class Status(enum.Enum):
40 """Status enum for a user
42 Has backwards compatibility to be comparable with non-enum values.
43 Will be removed with viur-core 4.0.0
44 """
46 UNSET = 0 # Status is unset
47 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification
48 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin
49 DISABLED = 5 # Account disabled
50 ACTIVE = 10 # Active
52 def __eq__(self, other):
53 if isinstance(other, Status):
54 return super().__eq__(other)
55 return self.value == other
57 def __lt__(self, other):
58 if isinstance(other, Status):
59 return super().__lt__(other)
60 return self.value < other
63class UserSkel(skeleton.Skeleton):
64 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604)
66 name = EmailBone(
67 descr="E-Mail",
68 required=True,
69 readOnly=True,
70 caseSensitive=False,
71 searchable=True,
72 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"),
73 )
75 firstname = StringBone(
76 descr="Firstname",
77 searchable=True,
78 )
80 lastname = StringBone(
81 descr="Lastname",
82 searchable=True,
83 )
85 roles = SelectBone(
86 descr=i18n.translate("viur.core.modules.user.bone.roles", defaultText="Roles"),
87 values=conf.user.roles,
88 required=True,
89 multiple=True,
90 # fixme: This is generally broken in VIUR! See #776 for details.
91 # vfunc=lambda values:
92 # i18n.translate(
93 # "user.bone.roles.invalid",
94 # defaultText="Invalid role setting: 'custom' can only be set alone.")
95 # if "custom" in values and len(values) > 1 else None,
96 defaultValue=list(conf.user.roles.keys())[:1],
97 )
99 access = SelectBone(
100 descr=i18n.translate("viur.core.modules.user.bone.access", defaultText="Access rights"),
101 type_suffix="access",
102 values=lambda: {
103 right: i18n.translate(f"viur.core.modules.user.accessright.{right}", defaultText=right)
104 for right in sorted(conf.user.access_rights)
105 },
106 multiple=True,
107 params={
108 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system
109 }
110 )
112 status = SelectBone(
113 descr=i18n.translate("viur.core.modules.user.bone.status", "Account status"),
114 values=Status,
115 translation_key_prefix="viur.core.user.status.",
116 defaultValue=Status.ACTIVE,
117 required=True,
118 )
120 lastlogin = DateBone(
121 descr="Last Login",
122 readOnly=True,
123 )
125 admin_config = JsonBone( # This bone stores settings from the vi
126 descr="Config for the User",
127 visible=False
128 )
130 def __new__(cls, *args, **kwargs):
131 """
132 Constructor for the UserSkel-class, with the capability
133 to dynamically add bones required for the configured
134 authentication methods.
135 """
136 for provider in conf.main_app.vi.user.authenticationProviders:
137 assert issubclass(provider, UserPrimaryAuthentication)
138 provider.patch_user_skel(cls)
140 for provider in conf.main_app.vi.user.secondFactorProviders:
141 assert issubclass(provider, UserSecondFactorAuthentication)
142 provider.patch_user_skel(cls)
144 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls)
145 return super().__new__(cls, *args, **kwargs)
147 @classmethod
148 def write(cls, skel, *args, **kwargs):
149 # Roles
150 if skel["roles"] and "custom" not in skel["roles"]:
151 # Collect access rights through rules
152 access = set()
154 for role in skel["roles"]:
155 # Get default access for this role
156 access |= conf.main_app.vi.user.get_role_defaults(role)
158 # Go through all modules and evaluate available role-settings
159 for name in dir(conf.main_app.vi):
160 if name.startswith("_"):
161 continue
163 module = getattr(conf.main_app.vi, name)
164 if not isinstance(module, Module):
165 continue
167 roles = getattr(module, "roles", None) or {}
168 rights = roles.get(role, roles.get("*", ()))
170 # Convert role into tuple if it's not
171 if not isinstance(rights, (tuple, list)):
172 rights = (rights, )
174 if "*" in rights:
175 for right in module.accessRights:
176 access.add(f"{name}-{right}")
177 else:
178 for right in rights:
179 if right in module.accessRights:
180 access.add(f"{name}-{right}")
182 # special case: "edit" and "delete" actions require "view" as well!
183 if right in ("edit", "delete") and "view" in module.accessRights:
184 access.add(f"{name}-view")
186 skel["access"] = list(access)
188 return super().write(skel, *args, **kwargs)
191class UserAuthentication(Module, abc.ABC):
192 @property
193 @abc.abstractstaticmethod
194 def METHOD_NAME() -> str:
195 """
196 Define a unique method name for this authentication.
197 """
198 ...
200 @property
201 @abc.abstractstaticmethod
202 def NAME() -> str:
203 """
204 Define a descriptive name for this authentication.
205 """
206 ...
208 @property
209 @staticmethod
210 def VISIBLE(cls) -> bool:
211 """
212 Defines if the authentication method is visible to the user.
213 """
214 return True
216 def __init__(self, moduleName, modulePath, userModule):
217 super().__init__(moduleName, modulePath)
218 self._user_module = userModule
219 self.start_url = f"{self.modulePath}/login"
221 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
222 return True
224 @classmethod
225 def patch_user_skel(cls, skel_cls: skeleton.Skeleton):
226 """
227 Allows for an UserAuthentication to patch the UserSkel
228 class with additional bones which are required for
229 the implemented authentication method.
230 """
231 ...
234class UserPrimaryAuthentication(UserAuthentication, abc.ABC):
235 """Abstract class for all primary authentication methods."""
236 registrationEnabled = False
238 @abc.abstractmethod
239 def login(self, *args, **kwargs):
240 ...
242 def next_or_finish(self, skel: skeleton.SkeletonInstance):
243 """
244 Hook that is called whenever a part of the authentication was successful.
245 It allows to perform further steps in custom authentications,
246 e.g. change a password after first login.
247 """
248 return self._user_module.continueAuthenticationFlow(self, skel["key"])
251class UserPassword(UserPrimaryAuthentication):
252 METHOD_NAME = "X-VIUR-AUTH-User-Password"
253 NAME = "Username & Password"
255 registrationEmailVerificationRequired = True
256 registrationAdminVerificationRequired = True
258 verifySuccessTemplate = "user_verify_success"
259 verifyEmailAddressMail = "user_verify_address"
260 verifyFailedTemplate = "user_verify_failed"
261 passwordRecoveryMail = "user_password_recovery"
262 passwordRecoverySuccessTemplate = "user_passwordrecover_success"
263 passwordRecoveryTemplate = "user_passwordrecover"
265 # The default rate-limit for password recovery (10 tries each 15 minutes)
266 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip")
268 # Limit (invalid) login-retries to once per 5 seconds
269 loginRateLimit = RateLimit("user.login", 12, 1, "ip")
271 @classmethod
272 def patch_user_skel(cls, skel_cls):
273 """
274 Modifies the UserSkel to be equipped by a PasswordBone.
275 """
276 skel_cls.password = PasswordBone(
277 readOnly=True,
278 visible=False,
279 params={
280 "category": "Authentication",
281 }
282 )
284 class LoginSkel(skeleton.RelSkel):
285 name = EmailBone(
286 descr="E-Mail",
287 required=True,
288 caseSensitive=False,
289 )
290 password = PasswordBone(
291 required=True,
292 test_threshold=0,
293 tests=(),
294 raw=True,
295 )
297 class LostPasswordStep1Skel(skeleton.RelSkel):
298 name = EmailBone(
299 descr="E-Mail",
300 required=True,
301 )
303 class LostPasswordStep2Skel(skeleton.RelSkel):
304 recovery_key = StringBone(
305 descr="Recovery Key",
306 required=True,
307 params={
308 "tooltip": i18n.translate(
309 key="viur.core.modules.user.userpassword.lostpasswordstep2.recoverykey",
310 defaultText="Please enter the validation key you've received via e-mail.",
311 hint="Shown when the user needs more than 15 minutes to paste the key",
312 ),
313 }
314 )
316 class LostPasswordStep3Skel(skeleton.RelSkel):
317 # send the recovery key again, in case the password is rejected by some reason.
318 recovery_key = StringBone(
319 descr="Recovery Key",
320 visible=False,
321 )
323 password = PasswordBone(
324 descr="New Password",
325 required=True,
326 params={
327 "tooltip": i18n.translate(
328 key="viur.core.modules.user.userpassword.lostpasswordstep3.password",
329 defaultText="Please enter a new password for your account.",
330 ),
331 }
332 )
334 @exposed
335 @force_ssl
336 @skey(allow_empty=True)
337 def login(self, **kwargs):
338 # Obtain a fresh login skel
339 skel = self.LoginSkel()
341 # Read required bones from client
342 if not (kwargs and skel.fromClient(kwargs)):
343 return self._user_module.render.render("login", skel)
345 self.loginRateLimit.assertQuotaIsAvailable()
347 # query for the username. The query might find another user, but the name is being checked for equality below
348 name = skel["name"].lower().strip()
349 user_skel = self._user_module.baseSkel()
350 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel
352 # extract password hash from raw database entity (skeleton access blocks it)
353 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {}
354 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001
355 password_hash = encode_password(skel["password"], password_data.get("salt", "-invalid-"), iterations)["pwhash"]
357 # now check if the username matches
358 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode())
360 # next, check if the password hash matches
361 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash)
363 if not is_okay:
364 # Set error to all required fields
365 for name, bone in skel.items():
366 if bone.required:
367 skel.errors.append(
368 ReadFromClientError(
369 ReadFromClientErrorSeverity.Invalid,
370 i18n.translate(
371 key="viur.core.modules.user.userpassword.login.failed",
372 defaultText="Invalid username or password provided",
373 ),
374 name,
375 )
376 )
378 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota
379 return self._user_module.render.render("login", skel)
381 # check if iterations are below current security standards, and update if necessary.
382 if iterations < PBKDF2_DEFAULT_ITERATIONS:
383 logging.info(f"Update password hash for user {name}.")
384 # re-hash the password with more iterations
385 # FIXME: This must be done within a transaction!
386 user_skel["password"] = kwargs["password"] # will be hashed on serialize
387 user_skel.write(update_relations=False)
389 return self.next_or_finish(user_skel)
391 @exposed
392 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, **kwargs):
393 """
394 This implements a password recovery process which lets users set a new password for their account,
395 after validating a recovery key sent by email.
397 The process is as following:
399 - The user enters the registered email adress (not validated here)
400 - A random code is generated and stored as a security-ke, then sendUserPasswordRecoveryCode is called.
401 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name
402 and send a link with the code. It runs as a deferred task so no information if a user account exists
403 is being leaked.
404 - If the user received an email, the link can be clicked to set a new password for the account.
406 To prevent automated attacks, the first step is guarded by limited calls to this function to 10 actions
407 per 15 minutes. (One complete recovery process consists of two calls).
408 """
409 self.passwordRecoveryRateLimit.assertQuotaIsAvailable()
410 current_request = current.request.get()
412 if recovery_key is None:
413 # This is the first step, where we ask for the username of the account we'll going to reset the password on
414 skel = self.LostPasswordStep1Skel()
416 if (
417 not kwargs
418 or not current_request.isPostRequest
419 or not skel.fromClient(kwargs)
420 ):
421 return self._user_module.render.render(
422 "pwrecover", skel,
423 tpl=self.passwordRecoveryTemplate,
424 )
426 # validate security key
427 if not securitykey.validate(skey):
428 raise errors.PreconditionFailed()
430 self.passwordRecoveryRateLimit.decrementQuota()
432 recovery_key = securitykey.create(
433 duration=datetime.timedelta(minutes=15),
434 key_length=conf.security.password_recovery_key_length,
435 user_name=skel["name"].lower(),
436 session_bound=False,
437 )
439 # Send the code in background
440 self.sendUserPasswordRecoveryCode(
441 skel["name"], recovery_key, current_request.request.headers["User-Agent"]
442 )
444 # step 2 is only an action-skel, and can be ignored by a direct link in the
445 # e-mail previously sent. It depends on the implementation of the specific project.
446 return self._user_module.render.render(
447 "pwrecover", self.LostPasswordStep2Skel(),
448 tpl=self.passwordRecoveryTemplate,
449 )
451 # in step 3
452 skel = self.LostPasswordStep3Skel()
454 # reset the recovery key again, in case the fromClient() fails.
455 skel["recovery_key"] = str(recovery_key).strip()
457 # check for any input; Render input-form again when incomplete.
458 if (
459 not kwargs
460 or not current_request.isPostRequest
461 or not skel.fromClient(kwargs, ignore=("recovery_key",))
462 ):
463 return self._user_module.render.render(
464 "pwrecover", skel,
465 tpl=self.passwordRecoveryTemplate,
466 )
468 # validate security key
469 if not securitykey.validate(skey):
470 raise errors.PreconditionFailed()
472 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)):
473 raise errors.PreconditionFailed(
474 i18n.translate(
475 key="viur.core.modules.user.passwordrecovery.keyexpired",
476 defaultText="The recovery key is expired or invalid. Please start the recovery process again.",
477 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key."
478 )
479 )
481 self.passwordRecoveryRateLimit.decrementQuota()
483 # If we made it here, the key was correct, so we'd hopefully have a valid user for this
484 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel()
486 if not user_skel:
487 raise errors.NotFound(
488 i18n.translate(
489 key="viur.core.modules.user.passwordrecovery.usernotfound",
490 defaultText="There is no account with this name",
491 hint="We cant find an account with that name (Should never happen)"
492 )
493 )
495 # If the account is locked or not yet validated, abort the process.
496 if not self._user_module.is_active(user_skel):
497 raise errors.NotFound(
498 i18n.translate(
499 key="viur.core.modules.user.passwordrecovery.accountlocked",
500 defaultText="This account is currently locked. You cannot change its password.",
501 hint="Attempted password recovery on a locked account"
502 )
503 )
505 # Update the password, save the user, reset his session and show the success-template
506 user_skel["password"] = skel["password"]
507 user_skel.write(update_relations=False)
509 return self._user_module.render.render(
510 "pwrecover_success",
511 next_url=self.start_url,
512 tpl=self.passwordRecoverySuccessTemplate
513 )
515 @tasks.CallDeferred
516 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None:
517 """
518 Sends the given recovery code to the user given in userName. This function runs deferred
519 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the
520 code by email (assuming we have working email delivery), but this can be overridden to send it
521 by SMS or other means. We'll also update the changedate for this user, so no more than one code
522 can be send to any given user in four hours.
523 """
524 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel():
525 user_agent = user_agents.parse(user_agent)
526 email.send_email(
527 tpl=self.passwordRecoveryMail,
528 skel=user_skel,
529 dests=[user_name],
530 recovery_key=recovery_key,
531 user_agent={
532 "device": user_agent.get_device(),
533 "os": user_agent.get_os(),
534 "browser": user_agent.get_browser()
535 }
536 )
538 @exposed
539 @skey(forward_payload="data", session_bound=False)
540 def verify(self, data):
541 def transact(key):
542 skel = self._user_module.editSkel()
543 if not key or not skel.read(key):
544 return None
546 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \
547 if self.registrationAdminVerificationRequired else Status.ACTIVE
549 skel.write(update_relations=False)
550 return skel
552 if not isinstance(data, dict) or not (skel := db.run_in_transaction(transact, data.get("user_key"))):
553 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate)
555 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate)
557 def canAdd(self) -> bool:
558 return self.registrationEnabled
560 def addSkel(self):
561 """
562 Prepare the add-Skel for rendering.
563 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on
564 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired
565 :return: viur.core.skeleton.Skeleton
566 """
567 skel = self._user_module.addSkel()
569 if self.registrationEmailVerificationRequired:
570 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION
571 elif self.registrationAdminVerificationRequired:
572 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION
573 else: # No further verification required
574 defaultStatusValue = Status.ACTIVE
576 skel.status.readOnly = True
577 skel["status"] = defaultStatusValue
579 if "password" in skel:
580 skel.password.required = True # The user will have to set a password
582 return skel
584 @force_ssl
585 @exposed
586 @skey(allow_empty=True)
587 def add(self, *, bounce: bool = False, **kwargs):
588 """
589 Allows guests to register a new account if self.registrationEnabled is set to true
591 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd`
593 :returns: The rendered, added object of the entry, eventually with error hints.
595 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
596 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
597 """
598 if not self.canAdd():
599 raise errors.Unauthorized()
601 skel = self.addSkel()
603 if (
604 not kwargs # no data supplied
605 or not current.request.get().isPostRequest # bail out if not using POST-method
606 or not skel.fromClient(kwargs) # failure on reading into the bones
607 or bounce # review before adding
608 ):
609 # render the skeleton in the version it could as far as it could be read.
610 return self._user_module.render.add(skel)
612 self._user_module.onAdd(skel)
613 skel.write()
615 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION:
616 # The user will have to verify his email-address. Create a skey and send it to his address
617 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False,
618 user_key=db.normalize_key(skel["key"]),
619 name=skel["name"])
620 skel.skey = BaseBone(descr="Skey")
621 skel["skey"] = skey
622 email.send_email(dests=[skel["name"]], tpl=self.verifyEmailAddressMail, skel=skel)
624 self._user_module.onAdded(skel) # Call onAdded on our parent user module
625 return self._user_module.render.addSuccess(skel)
628class GoogleAccount(UserPrimaryAuthentication):
629 METHOD_NAME = "X-VIUR-AUTH-Google-Account"
630 NAME = "Google Account"
632 @classmethod
633 def patch_user_skel(cls, skel_cls):
634 """
635 Modifies the UserSkel to be equipped by a bones required by Google Auth
636 """
637 skel_cls.uid = StringBone(
638 descr="Google UserID",
639 required=False,
640 readOnly=True,
641 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"),
642 params={
643 "category": "Authentication",
644 }
645 )
647 skel_cls.sync = BooleanBone(
648 descr="Sync user data with OAuth-based services",
649 defaultValue=True,
650 params={
651 "category": "Authentication",
652 "tooltip":
653 "If set, user data like firstname and lastname is automatically kept"
654 "synchronous with the information stored at the OAuth service provider"
655 "(e.g. Google Login)."
656 }
657 )
659 @exposed
660 @force_ssl
661 @skey(allow_empty=True)
662 def login(self, token: str | None = None, *args, **kwargs):
663 if not conf.user.google_client_id:
664 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!")
666 if not token:
667 request = current.request.get()
668 request.response.headers["Content-Type"] = "text/html"
669 if request.response.headers.get("cross-origin-opener-policy") == "same-origin":
670 # We have to allow popups here
671 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups"
673 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html")
674 with open(file_path) as file:
675 tpl_string = file.read()
677 # FIXME: Use Jinja2 for rendering?
678 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id)
679 extendCsp({
680 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="],
681 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="]
682 })
683 return tpl_string
685 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id)
686 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}:
687 raise ValueError("Invalid issuer")
689 # Token looks valid :)
690 uid = user_info["sub"]
691 email = user_info["email"]
693 base_skel = self._user_module.baseSkel()
694 update = False
695 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()):
696 # We'll try again - checking if there's already an user with that email
697 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()):
698 # Still no luck - it's a completely new user
699 if not self.registrationEnabled:
700 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains:
701 logging.debug(f"Google user is from allowed {domain} - adding account")
702 else:
703 logging.debug(f"Google user is from {domain} - denying registration")
704 raise errors.Forbidden("Registration for new users is disabled")
706 user_skel = base_skel
707 user_skel["uid"] = uid
708 user_skel["name"] = email
709 update = True
711 # Take user information from Google, if wanted!
712 if user_skel["sync"]:
713 for target, source in {
714 "name": email,
715 "firstname": user_info.get("given_name"),
716 "lastname": user_info.get("family_name"),
717 }.items():
719 if user_skel[target] != source:
720 user_skel[target] = source
721 update = True
723 if update:
724 assert user_skel.write()
726 return self.next_or_finish(user_skel)
729class UserSecondFactorAuthentication(UserAuthentication, abc.ABC):
730 """Abstract class for all second factors."""
731 MAX_RETRY = 3
732 second_factor_login_template = "user_login_secondfactor"
733 """Template to enter the TOPT on login"""
735 @property
736 @abc.abstractmethod
737 def NAME(self) -> str:
738 """Name for this factor for templates."""
739 ...
741 @property
742 @abc.abstractmethod
743 def ACTION_NAME(self) -> str:
744 """The action name for this factor, used as path-segment."""
745 ...
747 def __init__(self, moduleName, modulePath, _user_module):
748 super().__init__(moduleName, modulePath, _user_module)
749 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}"
750 self.add_url = f"{self.modulePath}/add"
751 self.start_url = f"{self.modulePath}/start"
754class TimeBasedOTP(UserSecondFactorAuthentication):
755 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP"
756 WINDOW_SIZE = 5
757 ACTION_NAME = "otp"
758 NAME = "Time-based OTP"
759 second_factor_login_template = "user_login_secondfactor"
761 @dataclasses.dataclass
762 class OtpConfig:
763 """
764 This dataclass is used to provide an interface for a OTP token
765 algorithm description that is passed within the TimeBasedOTP
766 class for configuration.
767 """
768 secret: str
769 timedrift: float = 0.0
770 algorithm: t.Literal["sha1", "sha256"] = "sha1"
771 interval: int = 60
773 class OtpSkel(skeleton.RelSkel):
774 """
775 This is the Skeleton used to ask for the OTP token.
776 """
777 otptoken = NumericBone(
778 descr="Token",
779 required=True,
780 max=999999,
781 min=0,
782 )
784 @classmethod
785 def patch_user_skel(cls, skel_cls):
786 """
787 Modifies the UserSkel to be equipped by a bones required by Timebased OTP
788 """
789 # One-Time Password Verification
790 skel_cls.otp_serial = StringBone(
791 descr="OTP serial",
792 searchable=True,
793 params={
794 "category": "Second Factor Authentication",
795 }
796 )
798 skel_cls.otp_secret = CredentialBone(
799 descr="OTP secret",
800 params={
801 "category": "Second Factor Authentication",
802 }
803 )
805 skel_cls.otp_timedrift = NumericBone(
806 descr="OTP time drift",
807 readOnly=True,
808 defaultValue=0,
809 precision=1,
810 params={
811 "category": "Second Factor Authentication",
812 }
813 )
815 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None:
816 """
817 Returns an instance of self.OtpConfig with a provided token configuration,
818 or None when there is no appropriate configuration of this second factor handler available.
819 """
821 if otp_secret := skel.dbEntity.get("otp_secret"):
822 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0)
824 return None
826 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
827 """
828 Specified whether the second factor authentication can be handled by the given user or not.
829 """
830 return bool(self.get_config(skel))
832 @exposed
833 def start(self):
834 """
835 Configures OTP login for the current session.
837 A special otp_user_conf has to be specified as a dict, which is stored into the session.
838 """
839 session = current.session.get()
841 if not (user_key := session.get("possible_user_key")):
842 raise errors.PreconditionFailed(
843 "Second factor can only be triggered after successful primary authentication."
844 )
846 user_skel = self._user_module.baseSkel()
847 if not user_skel.read(user_key):
848 raise errors.NotFound("The previously authenticated user is gone.")
850 if not (otp_user_conf := self.get_config(user_skel)):
851 raise errors.PreconditionFailed("This second factor is not available for the user")
853 otp_user_conf = {
854 "key": str(user_key),
855 } | dataclasses.asdict(otp_user_conf)
857 session = current.session.get()
858 session["_otp_user"] = otp_user_conf
859 session.markChanged()
861 return self._user_module.render.edit(
862 self.OtpSkel(),
863 params={
864 "name": i18n.translate(
865 f"viur.core.modules.user.{self.ACTION_NAME}",
866 default_variables={"name": self.NAME},
867 ),
868 "action_name": self.ACTION_NAME,
869 "action_url": f"{self.modulePath}/{self.ACTION_NAME}",
870 },
871 tpl=self.second_factor_login_template
872 )
874 @exposed
875 @force_ssl
876 @skey(allow_empty=True)
877 def otp(self, *args, **kwargs):
878 """
879 Performs the second factor validation and interaction with the client.
880 """
881 session = current.session.get()
882 if not (otp_user_conf := session.get("_otp_user")):
883 raise errors.PreconditionFailed("No OTP process started in this session")
885 # Check if maximum second factor verification attempts
886 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
887 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
889 # Read the OTP token via the skeleton, to obtain a valid value
890 skel = self.OtpSkel()
891 if skel.fromClient(kwargs):
892 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP.
893 res = self.verify(
894 otp=skel["otptoken"],
895 secret=otp_user_conf["secret"],
896 algorithm=otp_user_conf.get("algorithm") or "sha1",
897 interval=otp_user_conf.get("interval") or 60,
898 timedrift=otp_user_conf.get("timedrift") or 0.0,
899 valid_window=self.WINDOW_SIZE
900 )
901 else:
902 res = None
904 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0!
905 if res is None:
906 otp_user_conf["attempts"] = attempts + 1
907 session.markChanged()
908 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
909 return self._user_module.render.edit(
910 skel,
911 name=i18n.translate(
912 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
913 default_variables={"name": self.NAME},
914 ),
915 action_name=self.ACTION_NAME,
916 action_url=f"{self.modulePath}/{self.ACTION_NAME}",
917 tpl=self.second_factor_login_template
918 )
920 # Remove otp user config from session
921 user_key = db.key_helper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName)
922 del session["_otp_user"]
923 session.markChanged()
925 # Check if the OTP device has a time drift
927 timedriftchange = float(res) - otp_user_conf["timedrift"]
928 if abs(timedriftchange) > 2:
929 # The time-drift change accumulates to more than 2 minutes (for interval==60):
930 # update clock-drift value accordingly
931 self.updateTimeDrift(user_key, timedriftchange)
933 # Continue with authentication
934 return self._user_module.secondFactorSucceeded(self, user_key)
936 @staticmethod
937 def verify(
938 otp: str | int,
939 secret: str,
940 algorithm: str = "sha1",
941 interval: int = 60,
942 timedrift: float = 0.0,
943 for_time: datetime.datetime | None = None,
944 valid_window: int = 0,
945 ) -> int | None:
946 """
947 Verifies the OTP passed in against the current time OTP.
949 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which
950 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown
951 on the hardware token generator. This can be used to store the time drift of a given token generator.
953 :param otp: the OTP token to check against
954 :param secret: The OTP secret
955 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256)
956 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators).
957 :param timedrift: The known timedrift (old index) of the hardware OTP generator
958 :param for_time: Time to check OTP at (defaults to now)
959 :param valid_window: extends the validity to this many counter ticks before and after the current one
961 :returns: The index where verification succeeded, None otherwise
962 """
963 # get the hashing digest
964 digest = {
965 "sha1": hashlib.sha1,
966 "sha256": hashlib.sha256,
967 }.get(algorithm)
969 if not digest:
970 raise errors.NotImplemented(f"{algorithm=} is not implemented")
972 if for_time is None:
973 for_time = datetime.datetime.now()
975 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index
976 timedrift = round(timedrift)
977 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret
978 otp = str(otp).zfill(6) # fill with zeros in front
980 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}")
981 totp = pyotp.TOTP(secret, digest=digest, interval=interval)
983 if valid_window:
984 for offset in range(timedrift - valid_window, timedrift + valid_window + 1):
985 token = str(totp.at(for_time, offset))
986 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}")
987 if hmac.compare_digest(otp, token):
988 return offset
990 return None
992 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None
994 # FIXME: VIUR4 rename
995 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None:
996 """
997 Updates the clock-drift value.
999 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew
1000 it out of bounds. Maximum change per call is 0.3 minutes.
1002 :param user_key: For which user should the update occour
1003 :param idx: How many steps before/behind was that token
1004 """
1005 if user_skel := self._user_module.skel().read(user_key):
1006 if otp_skel := self._get_otptoken(user_skel):
1007 otp_skel.patch(
1008 {
1009 "+otp_timedrift": min(max(0.1 * idx, -0.3), 0.3)
1010 },
1011 update_relations=False,
1012 )
1015class AuthenticatorOTP(UserSecondFactorAuthentication):
1016 """
1017 This class handles the second factor for apps like authy and so on
1018 """
1019 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP"
1021 # second_factor_add_template = "user_secondfactor_add"
1022 # """Template to configure (add) a new TOPT"""
1024 ACTION_NAME = "authenticator_otp"
1025 """Action name provided for *otp_template* on login"""
1027 NAME = "Authenticator App"
1029 # FIXME: The second factor add has to be rewritten entirely to ActionSkel paradigm.
1030 '''
1031 @exposed
1032 @force_ssl
1033 @skey(allow_empty=True)
1034 def add(self, otp=None):
1035 """
1036 We try to read the otp_app_secret from the current session. When this fails we generate a new one and store
1037 it in the session.
1039 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store
1040 the otp_app_secret from the session in the user entry.
1041 """
1042 current_session = current.session.get()
1044 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")):
1045 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret()
1046 current_session["_maybe_otp_app_secret"] = otp_app_secret
1047 current_session.markChanged()
1049 if otp is None:
1050 return self._user_module.render.second_factor_add(
1051 tpl=self.second_factor_add_template,
1052 action_name=self.ACTION_NAME,
1053 name=i18n.translate(
1054 f"viur.core.modules.user.auth{self.ACTION_NAME}",
1055 default_variables={"name": self.NAME},
1056 ),
1057 add_url=self.add_url,
1058 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret))
1059 else:
1060 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret):
1061 return self._user_module.render.second_factor_add(
1062 tpl=self.second_factor_add_template,
1063 action_name=self.ACTION_NAME,
1064 name=i18n.translate(
1065 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1066 default_variables={"name": self.NAME},
1067 ),
1068 add_url=self.add_url,
1069 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors
1071 # Now we can set the otp_app_secret to the current User and render der Success-template
1072 AuthenticatorOTP.set_otp_app_secret(otp_app_secret)
1073 return self._user_module.render.second_factor_add_success(
1074 action_name=self.ACTION_NAME,
1075 name=i18n.translate(
1076 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1077 default_variables={"name": self.NAME},
1078 ),
1079 )
1080 '''
1082 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
1083 """
1084 We can only handle the second factor if we have stored an otp_app_secret before.
1085 """
1086 return bool(skel.dbEntity.get("otp_app_secret", ""))
1088 @classmethod
1089 def patch_user_skel(cls, skel_cls):
1090 """
1091 Modifies the UserSkel to be equipped by bones required by Authenticator App
1092 """
1093 # Authenticator OTP Apps (like Authy)
1094 skel_cls.otp_app_secret = CredentialBone(
1095 descr="OTP Secret (App-Key)",
1096 params={
1097 "category": "Second Factor Authentication",
1098 }
1099 )
1101 @classmethod
1102 def set_otp_app_secret(cls, otp_app_secret=None):
1103 """
1104 Write a new OTP Token in the current user entry.
1105 """
1106 if otp_app_secret is None:
1107 logging.error("No 'otp_app_secret' is provided")
1108 raise errors.PreconditionFailed("No 'otp_app_secret' is provided")
1109 if not (cuser := current.user.get()):
1110 raise errors.Unauthorized()
1112 def transaction(user_key):
1113 if not (user := db.get(user_key)):
1114 raise errors.NotFound()
1115 user["otp_app_secret"] = otp_app_secret
1116 db.put(user)
1118 db.run_in_transaction(transaction, cuser["key"])
1120 @classmethod
1121 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str:
1122 """
1123 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example
1124 """
1125 if not (cuser := current.user.get()):
1126 raise errors.Unauthorized()
1127 if not (issuer := conf.user.otp_issuer):
1128 logging.warning(
1129 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}")
1130 issuer = conf.instance.project_id
1132 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer)
1134 @classmethod
1135 def generate_otp_app_secret(cls) -> str:
1136 """
1137 Generate a new OTP Secret
1138 :return an otp
1139 """
1140 return pyotp.random_base32()
1142 @classmethod
1143 def verify_otp(cls, otp: str | int, secret: str) -> bool:
1144 return pyotp.TOTP(secret).verify(otp)
1146 @exposed
1147 def start(self):
1148 otp_user_conf = {"attempts": 0}
1149 session = current.session.get()
1150 session["_otp_user"] = otp_user_conf
1151 session.markChanged()
1152 return self._user_module.render.edit(
1153 TimeBasedOTP.OtpSkel(),
1154 params={
1155 "name": i18n.translate(
1156 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1157 default_variables={"name": self.NAME},
1158 ),
1159 "action_name": self.ACTION_NAME,
1160 "action_url": self.action_url,
1161 },
1162 tpl=self.second_factor_login_template,
1163 )
1165 @exposed
1166 @force_ssl
1167 @skey
1168 def authenticator_otp(self, **kwargs):
1169 """
1170 We verify the otp here with the secret we stored before.
1171 """
1172 session = current.session.get()
1173 user_key = db.Key(self._user_module.kindName, session["possible_user_key"])
1175 if not (otp_user_conf := session.get("_otp_user")):
1176 raise errors.PreconditionFailed("No OTP process started in this session")
1178 # Check if maximum second factor verification attempts
1179 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
1180 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
1182 if not (user := db.get(user_key)):
1183 raise errors.NotFound()
1185 skel = TimeBasedOTP.OtpSkel()
1186 if not skel.fromClient(kwargs):
1187 raise errors.PreconditionFailed()
1188 otp_token = str(skel["otptoken"]).zfill(6)
1190 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]):
1191 return self._user_module.secondFactorSucceeded(self, user_key)
1192 otp_user_conf["attempts"] = attempts + 1
1193 session.markChanged()
1194 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
1195 return self._user_module.render.edit(
1196 skel,
1197 name=i18n.translate(
1198 f"viur.core.modules.user.auth.{self.ACTION_NAME}",
1199 default_variables={"name": self.NAME},
1200 ),
1201 action_name=self.ACTION_NAME,
1202 action_url=self.action_url,
1203 tpl=self.second_factor_login_template,
1204 )
1207class User(List):
1208 """
1209 The User module is used to manage and authenticate users in a ViUR system.
1211 It is used in almost any ViUR project, but ViUR can also function without any user capabilites.
1212 """
1214 kindName = "user"
1215 addTemplate = "user_add"
1216 addSuccessTemplate = "user_add_success"
1218 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter(
1219 None, (
1220 UserPassword,
1221 conf.user.google_client_id and GoogleAccount,
1222 )
1223 ))
1224 """
1225 Specifies primary authentication providers that are made available
1226 as sub-modules under `user/auth_<classname>`. They might require
1227 customization or configuration.
1228 """
1230 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = (
1231 TimeBasedOTP,
1232 AuthenticatorOTP,
1233 )
1234 """
1235 Specifies secondary authentication providers that are made available
1236 as sub-modules under `user/f2_<classname>`. They might require
1237 customization or configuration, which is determined during the
1238 login-process depending on the user that wants to login.
1239 """
1241 validAuthenticationMethods = tuple(filter(
1242 None, (
1243 (UserPassword, AuthenticatorOTP),
1244 (UserPassword, TimeBasedOTP),
1245 (UserPassword, None),
1246 (GoogleAccount, None) if conf.user.google_client_id else None,
1247 )
1248 ))
1249 """
1250 Specifies the possible combinations of primary- and secondary factor
1251 login methos.
1253 GoogleLogin defaults to no second factor, as the Google Account can be
1254 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only
1255 handled when there is a user-dependent configuration available.
1256 """
1258 msg_missing_second_factor = "Second factor required but not configured for this user."
1260 secondFactorTimeWindow = datetime.timedelta(minutes=10)
1262 default_order = "name.idx"
1264 roles = {
1265 "admin": "*",
1266 }
1268 def __init__(self, moduleName, modulePath):
1269 for provider in self.authenticationProviders:
1270 assert issubclass(provider, UserPrimaryAuthentication)
1271 name = f"auth_{provider.__name__.lower()}"
1272 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1274 for provider in self.secondFactorProviders:
1275 assert issubclass(provider, UserSecondFactorAuthentication)
1276 name = f"f2_{provider.__name__.lower()}"
1277 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1279 super().__init__(moduleName, modulePath)
1281 def adminInfo(self):
1282 ret = {
1283 "icon": "person-fill",
1284 }
1286 if self.is_admin(current.user.get()):
1287 ret |= {
1288 "actions": [
1289 "trigger_kick",
1290 "trigger_takeover",
1291 ],
1292 "customActions": {
1293 "trigger_kick": {
1294 "name": i18n.translate(
1295 key="viur.core.modules.user.customActions.kick",
1296 defaultText="Kick user",
1297 hint="Title of the kick user function"
1298 ),
1299 "icon": "trash2-fill",
1300 "action": "fetch",
1301 "url": "/vi/{{module}}/trigger/kick/{{key}}",
1302 "confirm": i18n.translate(
1303 key="viur.core.modules.user.customActions.kick.confirm",
1304 defaultText="Do you really want to drop all sessions of the selected user from the system?",
1305 ),
1306 "success": i18n.translate(
1307 key="viur.core.modules.user.customActions.kick.success",
1308 defaultText="Sessions of the user are being invalidated.",
1309 ),
1310 },
1311 "trigger_takeover": {
1312 "name": i18n.translate(
1313 key="viur.core.modules.user.customActions.takeover",
1314 defaultText="Take-over user",
1315 hint="Title of the take user over function"
1316 ),
1317 "icon": "file-person-fill",
1318 "action": "fetch",
1319 "url": "/vi/{{module}}/trigger/takeover/{{key}}",
1320 "confirm": i18n.translate(
1321 key="viur.core.modules.user.customActions.takeover.confirm",
1322 defaultText="Do you really want to replace your current user session by a "
1323 "user session of the selected user?",
1324 ),
1325 "success": i18n.translate(
1326 key="viur.core.modules.user.customActions.takeover.success",
1327 defaultText="You're now know as the selected user!",
1328 ),
1329 "then": "reload-vi",
1330 },
1331 },
1332 }
1334 return ret
1336 def get_role_defaults(self, role: str) -> set[str]:
1337 """
1338 Returns a set of default access rights for a given role.
1340 Defaults to "admin" usage for any role > "user"
1341 and "scriptor" usage for "admin" role.
1342 """
1343 ret = set()
1345 if role in ("viewer", "editor", "admin"):
1346 ret.add("admin")
1348 if role == "admin":
1349 ret.add("scriptor")
1351 return ret
1353 def addSkel(self):
1354 skel = super().addSkel().clone()
1356 if self.is_admin(current.user.get()):
1357 # An admin tries to add a new user.
1358 skel.status.readOnly = False
1359 skel.status.visible = True
1360 skel.access.readOnly = False
1361 skel.access.visible = True
1363 else:
1364 skel.status.readOnly = True
1365 skel["status"] = Status.UNSET
1366 skel.status.visible = False
1367 skel.access.readOnly = True
1368 skel["access"] = []
1369 skel.access.visible = False
1371 if "password" in skel:
1372 # Unlock and require a password
1373 skel.password.required = True
1374 skel.password.visible = True
1375 skel.password.readOnly = False
1377 skel.name.readOnly = False # Don't enforce readonly name in user/add
1378 return skel
1380 def editSkel(self, *args, **kwargs):
1381 skel = super().editSkel().clone()
1383 if "password" in skel:
1384 skel.password.required = False
1385 skel.password.visible = True
1386 skel.password.readOnly = False
1388 lock = not self.is_admin(current.user.get())
1389 skel.name.readOnly = lock
1390 skel.access.readOnly = lock
1391 skel.status.readOnly = lock
1393 return skel
1395 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication:
1396 return getattr(self, f"f2_{cls.__name__.lower()}")
1398 def getCurrentUser(self):
1399 session = current.session.get()
1401 req = current.request.get()
1402 if session and (session.loaded or req.is_deferred) and (user := session.get("user")):
1403 skel = self.baseSkel()
1404 skel.setEntity(user)
1405 return skel
1407 return None
1409 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key):
1410 """
1411 Continue authentication flow when primary authentication succeeded.
1412 """
1413 skel = self.baseSkel()
1415 if not skel.read(user_key):
1416 raise errors.NotFound("User was not found.")
1418 if not provider.can_handle(skel):
1419 raise errors.Forbidden("User is not allowed to use this primary login method.")
1421 session = current.session.get()
1422 session["possible_user_key"] = user_key.id_or_name
1423 session["_secondFactorStart"] = utils.utcNow()
1424 session.markChanged()
1426 second_factor_providers = []
1428 for auth_provider, second_factor in self.validAuthenticationMethods:
1429 if isinstance(provider, auth_provider):
1430 if second_factor is not None:
1431 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor)
1432 if second_factor_provider_instance.can_handle(skel):
1433 second_factor_providers.append(second_factor_provider_instance)
1434 else:
1435 second_factor_providers.append(None)
1437 if len(second_factor_providers) > 1 and None in second_factor_providers:
1438 # We have a second factor. So we can get rid of the None
1439 second_factor_providers.pop(second_factor_providers.index(None))
1441 if len(second_factor_providers) == 0:
1442 raise errors.NotAcceptable(self.msg_missing_second_factor)
1443 elif len(second_factor_providers) == 1:
1444 if second_factor_providers[0] is None:
1445 # We allow sign-in without a second factor
1446 return self.authenticateUser(user_key)
1447 # We have only one second factor we don't need the choice template
1448 return second_factor_providers[0].start(user_key)
1450 # In case there is more than one second factor provider remaining, let the user decide!
1451 current.session.get()["_secondfactor_providers"] = {
1452 second_factor.start_url: second_factor.NAME
1453 for second_factor in second_factor_providers
1454 if second_factor.VISIBLE
1455 }
1457 return self.select_secondfactor_provider()
1459 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key):
1460 """
1461 Continue authentication flow when secondary authentication succeeded.
1462 """
1463 session = current.session.get()
1464 if session["possible_user_key"] != user_key.id_or_name:
1465 raise errors.Forbidden()
1467 # Assert that the second factor verification finished in time
1468 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow:
1469 raise errors.RequestTimeout()
1471 return self.authenticateUser(user_key)
1473 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None:
1474 """
1475 Hookable check if a user is defined as "active" and can login.
1477 :param skel: The UserSkel of the user who wants to login.
1478 :returns: Returns True or False when the result is unambigous and the user is active or not. \
1479 Returns None when the provided skel doesn't provide enough information for determination.
1480 """
1481 if skel and "status" in skel:
1482 status = skel["status"]
1483 if not isinstance(status, (Status, int)):
1484 try:
1485 status = int(status)
1486 except ValueError:
1487 status = Status.UNSET
1489 return status >= Status.ACTIVE.value
1491 return None
1493 def is_admin(self, skel: skeleton.SkeletonInstance) -> bool | None:
1494 """
1495 Hookable check if a user is defined as "admin" and can edit or log into other users.
1496 Defaults to "root" users only.
1498 :param skel: The UserSkel of the user who wants should be checked for user admin privileges.
1499 :returns: Returns True or False when the result is unambigous and the user is admin or not. \
1500 Returns None when the provided skel doesn't provide enough information for determination.
1501 """
1502 if skel and "access" in skel:
1503 return "root" in skel["access"]
1505 return None
1507 def authenticateUser(self, key: db.Key, **kwargs):
1508 """
1509 Performs Log-In for the current session and the given user key.
1511 This resets the current session: All fields not explicitly marked as persistent
1512 by conf.user.session_persistent_fields_on_login are gone afterwards.
1514 :param key: The (DB-)Key of the user we shall authenticate
1515 """
1516 skel = self.baseSkel()
1517 if not skel.read(key):
1518 raise ValueError(f"Unable to authenticate unknown user {key}")
1520 # Verify that this user account is active
1521 if not self.is_active(skel):
1522 raise errors.Forbidden("The user is disabled and cannot be authenticated.")
1524 # Update session for user
1525 session = current.session.get()
1526 # Remember persistent fields...
1527 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login}
1528 session.reset()
1529 # and copy them over to the new session
1530 session |= take_over
1532 # Update session, user and request
1533 session["user"] = skel.dbEntity
1535 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key
1536 current.user.set(self.getCurrentUser())
1538 self.onLogin(skel)
1540 return self.render.render("login_success", skel, **kwargs)
1543 # Action for primary authentication selection
1545 def SelectAuthenticationProviderSkel(self):
1546 providers = {}
1547 first = None
1548 for provider in self.authenticationProviders:
1549 if not provider.VISIBLE:
1550 continue
1552 provider = getattr(self, f"auth_{provider.__name__.lower()}")
1553 providers[provider.start_url] = provider.NAME
1555 if first is None:
1556 first = provider.start_url
1558 class SelectAuthenticationProviderSkel(skeleton.RelSkel):
1559 provider = SelectBone(
1560 descr="Authentication method",
1561 required=True,
1562 values=providers,
1563 defaultValue=first,
1564 )
1566 return SelectAuthenticationProviderSkel()
1568 @exposed
1569 def select_authentication_provider(self, **kwargs):
1570 skel = self.SelectAuthenticationProviderSkel()
1572 # Read required bones from client
1573 if len(skel.provider.values) > 1 and (not kwargs or not skel.fromClient(kwargs)):
1574 return self.render.render("select_authentication_provider", skel)
1576 return self.render.render("select_authentication_provider_success", skel, next_url=skel["provider"])
1578 # Action for second factor select
1580 class SelectSecondFactorProviderSkel(skeleton.RelSkel):
1581 provider = SelectBone(
1582 descr="Second factor",
1583 required=True,
1584 values=lambda: current.session.get()["_secondfactor_providers"] or (),
1585 )
1587 @exposed
1588 def select_secondfactor_provider(self, **kwargs):
1589 skel = self.SelectSecondFactorProviderSkel()
1591 # Read required bones from client
1592 if not kwargs or not skel.fromClient(kwargs):
1593 return self.render.render("select_secondfactor_provider", skel)
1595 del current.session.get()["_secondfactor_providers"]
1597 return self.render.render("select_secondfactor_provider_success", skel, next_url=skel["provider"])
1599 @exposed
1600 @skey
1601 def logout(self, **kwargs):
1602 """
1603 Implements the logout action. It also terminates the current session (all keys not listed
1604 in viur.session_persistent_fields_on_logout will be lost).
1605 """
1606 if not (user := current.user.get()):
1607 raise errors.Unauthorized()
1609 self.onLogout(user)
1611 session = current.session.get()
1613 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}:
1614 session.reset()
1615 session |= take_over
1616 else:
1617 session.clear()
1619 current.user.set(None) # set user to none in context var
1621 return self.render.render("logout_success")
1623 @exposed
1624 def login(self, *args, **kwargs):
1625 return self.select_authentication_provider()
1627 def onLogin(self, skel: skeleton.SkeletonInstance):
1628 """
1629 Hook to be called on user login.
1630 """
1631 # Update the lastlogin timestamp (if available!)
1632 if "lastlogin" in skel:
1633 now = utils.utcNow()
1635 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??)
1636 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)):
1637 skel["lastlogin"] = now
1638 skel.write(update_relations=False)
1640 logging.info(f"""User {skel["name"]} logged in""")
1642 def onLogout(self, skel: skeleton.SkeletonInstance):
1643 """
1644 Hook to be called on user logout.
1645 """
1646 logging.info(f"""User {skel["name"]} logged out""")
1648 @exposed
1649 def view(self, key: db.Key | int | str = "self", *args, **kwargs):
1650 """
1651 Allow a special key "self" to reference the current user.
1653 By default, any authenticated user can view its own user entry,
1654 to obtain access rights and any specific user information.
1655 This behavior is defined in the customized `canView` function,
1656 which is overwritten by the User-module.
1658 The rendered skeleton can be modified or restriced by specifying
1659 a customized view-skeleton.
1660 """
1661 if key == "self":
1662 if user := current.user.get():
1663 key = user["key"]
1664 else:
1665 raise errors.Unauthorized("Cannot view 'self' with unknown user")
1667 return super().view(key, *args, **kwargs)
1669 def canView(self, skel) -> bool:
1670 if user := current.user.get():
1671 if skel["key"] == user["key"]:
1672 return True
1674 if self.is_admin(user) or "user-view" in user["access"]:
1675 return True
1677 return False
1679 @exposed
1680 @skey(allow_empty=True)
1681 def edit(self, key: db.Key | int | str = "self", *args, **kwargs):
1682 """
1683 Allow a special key "self" to reference the current user.
1685 This modification will only allow to use "self" as a key;
1686 The specific access right to let the user edit itself must
1687 still be customized.
1689 The rendered and editable skeleton can be modified or restriced
1690 by specifying a customized edit-skeleton.
1691 """
1692 if key == "self":
1693 if user := current.user.get():
1694 key = user["key"]
1695 else:
1696 raise errors.Unauthorized("Cannot edit 'self' with unknown user")
1698 return super().edit(key, *args, **kwargs)
1700 @exposed
1701 def getAuthMethods(self, *args, **kwargs):
1702 """Legacy method prior < viur-core 3.8: Inform tools like Admin which authentication to use"""
1703 logging.warning("DEPRECATED!!! Use '/user/login'-method for this, or update your admin version!")
1705 res = [
1706 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1707 for primary, secondary in self.validAuthenticationMethods
1708 ]
1710 return json.dumps(res)
1712 @exposed
1713 def trigger(self, action: str, key: str):
1714 # Check for provided access right definition (equivalent to client-side check), fallback to root!
1715 access = self.adminInfo().get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ()
1716 if not (
1717 (cuser := current.user.get())
1718 and (
1719 any(role in cuser["access"] for role in access)
1720 or self.is_admin(cuser)
1721 )
1722 ):
1723 raise errors.Unauthorized()
1725 skel = self.skel()
1726 if not skel.read(key) and not (skel := skel.all().mergeExternalFilter({"name": key}).getSkel()):
1727 raise errors.NotFound("The provided user does not exist.")
1729 match action:
1730 case "takeover":
1731 self.authenticateUser(skel["key"])
1733 case "kick":
1734 session.killSessionByUser(skel["key"])
1736 case _:
1737 raise errors.NotImplemented(f"Action {action!r} not implemented")
1739 return self.render.render(f"trigger/{action}Success", skel)
1741 @exposed
1742 @access("admin", "root", offer_login=True)
1743 def get_cookie_for_app(self, redirect_to: str = None):
1744 """
1745 Generates a session cookie for the currently logged-in user and hands it to an external
1746 client (script, native app, or WebView).
1748 This endpoint is the entry point of a *App Login Flow*. A privileged user
1749 (admin/root) authenticates normally in the browser and then opens this URL.
1750 The backend creates a fresh ViUR session (see :meth:`_get_cookie_for_app`) and
1751 delivers the resulting ``Set-Cookie`` string to the caller.
1753 **Typical usage — local Python client / script:**
1755 The caller spins up a temporary local HTTP server (e.g. on ``http://localhost:60000``)
1756 and passes its address as *redirect_to*::
1758 /vi/user/get_cookie_for_app?redirect_to=http://localhost:60000
1760 After the user authenticates in the browser, the backend redirects to::
1762 http://localhost:60000?cookie=<url-encoded Set-Cookie string>&app=<project_id>
1764 The local server can then extract only the ``name=value`` part of the cookie string
1765 (everything before the first ``;``) and use it for subsequent API calls::
1767 cookie_str = qs["cookie"][0] # full Set-Cookie value
1768 key, value = cookie_str.split(";", 1)[0].split("=")
1769 session.cookies.update({key: value})
1771 The ``app`` query parameter is set by the server to ``conf.instance.project_id`` and
1772 lets the client distinguish between multiple backends / cache credentials per project.
1774 **Alternative — WebView / browser redirect:**
1776 When the receiving side is a browser or WebView the full ``Set-Cookie`` string can be
1777 forwarded to :meth:`apply_login_cookie` to let the framework activate the session.
1779 :param redirect_to: Optional callback URL. When provided the caller is redirected to
1780 that URL with two query parameters appended automatically:
1782 - ``cookie`` – URL-encoded ``Set-Cookie`` string (``name=value;flags…``).
1783 - ``app`` – the server's GCP project ID (``conf.instance.project_id``).
1785 A ``?`` is appended if the URL does not already contain one.
1786 When omitted, the raw ``Set-Cookie`` string is returned as ``text/plain``
1787 (useful for debugging or direct API calls).
1789 .. warning:: **Open-redirect / session-hijacking risk**
1791 Because the session cookie is appended as a plain query parameter, an
1792 attacker who can convince an authenticated admin to click a crafted link
1793 (e.g. via phishing) could redirect the browser to an evil server that
1794 simply harvests the ``cookie`` parameter and gains full session access.
1796 To mitigate this, all ``redirect_to`` values are validated against
1797 :attr:`conf.user.redirect_whitelist` using :func:`fnmatch.fnmatch`.
1798 Only explicitly whitelisted URL patterns are accepted;
1799 anything else is rejected with ``403 Forbidden``.
1800 Configure the whitelist in your project to include every legitimate
1801 callback origin (local scripts, internal tooling, etc.).
1803 :raises errors.Forbidden: When *redirect_to* does not match any pattern in
1804 :attr:`conf.user.redirect_whitelist`.
1805 :raises errors.Redirect: Always raised when *redirect_to* is supplied and allowed.
1806 """
1807 if redirect_to:
1808 whitelist = utils.ensure_iterable(conf.user.redirect_whitelist)
1809 if not any(fnmatch.fnmatch(redirect_to, pat) for pat in whitelist):
1810 raise errors.Forbidden(f"Redirect target is not whitelisted")
1811 if "?" not in redirect_to:
1812 redirect_to = f"{redirect_to}?"
1813 raise errors.Redirect(
1814 f"{redirect_to}"
1815 f"&cookie={urllib.parse.quote_plus(self._get_cookie_for_app())}"
1816 f"&app={conf.instance.project_id}"
1817 )
1818 current.request.get().response.headers["Content-Type"] = "text/plain"
1819 return self._get_cookie_for_app()
1821 def _get_cookie_for_app(self) -> str:
1822 """
1823 Creates a new, standalone ViUR session for the current user and returns the
1824 corresponding ``Set-Cookie`` header value.
1826 Unlike the regular session created during a normal login, this session is intentionally
1827 **not** attached to the current HTTP request. Instead it is persisted directly in
1828 Datastore so that a different HTTP client can pick it up via :meth:`apply_login_cookie`.
1830 The created session entity mirrors the structure of a regular :class:`Session` entry:
1832 - ``data["user"]`` – the full user ``dbEntity`` (needed by the session loader).
1833 - ``data["is_app_session"]``– flag to distinguish app sessions from regular browser sessions.
1834 - ``static_security_key`` – random value, same role as in normal sessions.
1835 - ``lastseen`` – current timestamp so the session is not immediately garbage-
1836 collected.
1837 - ``user`` – stringified user key for server-side user-based queries.
1839 :returns: A ``Set-Cookie`` header value in the form
1840 ``<cookie_name>=<key>;<flags>`` where ``<flags>`` is produced by
1841 :meth:`Session.build_flags` (``Path=/; HttpOnly; SameSite=…; Secure; Max-Age=…``).
1842 """
1843 cookie_key = utils.string.random(42)
1844 db_session = db.Entity(db.Key(Session.kindName, cookie_key))
1845 data = db.Entity()
1846 data["user"] = current.user.get().dbEntity
1847 data["is_app_session"] = True
1848 db_session["data"] = db.fix_unindexable_properties(data)
1849 db_session["static_security_key"] = utils.string.random(42)
1850 db_session["lastseen"] = time.time()
1851 db_session["user"] = str(current.user.get()["key"])
1852 db_session.exclude_from_indexes = {"data"}
1853 db.put(db_session)
1855 # Provide Set-Cookie header entry with configured properties
1856 return f"{Session.cookie_name}={cookie_key};{Session.build_flags()}"
1858 @exposed
1859 def apply_login_cookie(self, cookie: str):
1860 """
1861 Redirect endpoint to load session from the given cookie.
1863 This is the second half of the *App Login Flow*. A native app or WebView that received a
1864 ``Set-Cookie`` string from :meth:`get_cookie_for_app` (typically via a redirect URL
1865 parameter) calls this endpoint to activate the embedded session for its own HTTP context.
1867 The flow is:
1869 1. Parse the raw ``Set-Cookie`` string with :class:`http.cookies.SimpleCookie`.
1870 2. Look for the expected session cookie name (:attr:`Session.cookie_name`).
1871 3. Reset the caller's current (anonymous) session.
1872 4. Inject the cookie value into the current request's cookie jar so that
1873 :meth:`Session.load` can find the pre-built Datastore session.
1874 5. Redirect to ``/`` – from this point on the caller is fully authenticated.
1876 :param cookie: A raw ``Set-Cookie`` header value as produced by :meth:`_get_cookie_for_app`,
1877 e.g. ``viur_cookie_myproject=<key>;Path=/;HttpOnly;…``.
1878 :raises errors.Redirect: On success – redirects to ``/``.
1879 :raises errors.BadRequest: When the cookie string does not contain a recognisable session
1880 cookie (i.e. :attr:`Session.cookie_name` is absent after parsing).
1881 """
1882 cookies = SimpleCookie()
1883 cookies.load(cookie)
1884 if Session.cookie_name in cookies:
1885 session_cookie = cookies[Session.cookie_name]
1886 current.session.get().reset()
1887 current.request.get().request.cookies[session_cookie.key] = session_cookie.value
1888 current.session.get().load()
1889 raise errors.Redirect("/")
1890 else:
1891 raise errors.BadRequest
1893 def onEdited(self, skel):
1894 super().onEdited(skel)
1896 # In case the user is set to inactive, kill all sessions
1897 if self.is_active(skel) is False:
1898 session.killSessionByUser(skel["key"])
1900 # Update user setting in all sessions
1901 for session_obj in db.Query("user").filter("user =", skel["key"]).iter():
1902 session_obj["data"]["user"] = skel.dbEntity
1904 def onDeleted(self, skel):
1905 super().onDeleted(skel)
1906 # Invalidate all sessions of that user
1907 session.killSessionByUser(skel["key"])
1910@tasks.StartupTask
1911def createNewUserIfNotExists():
1912 """
1913 Create a new Admin user, if the userDB is empty
1914 """
1915 if (
1916 (user_module := getattr(conf.main_app.vi, "user", None))
1917 and isinstance(user_module, User)
1918 and "addSkel" in dir(user_module)
1919 and "validAuthenticationMethods" in dir(user_module)
1920 # UserPassword must be one of the primary login methods
1921 and any(
1922 issubclass(provider[0], UserPassword)
1923 for provider in user_module.validAuthenticationMethods
1924 )
1925 ):
1926 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database
1927 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton
1928 uname = f"""admin@{conf.instance.project_id}.appspot.com"""
1929 pw = utils.string.random(13)
1930 addSkel["name"] = uname
1931 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away
1932 addSkel["access"] = ["root"]
1933 addSkel["password"] = pw
1935 try:
1936 addSkel.write()
1937 except Exception as e:
1938 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}")
1939 logging.exception(e)
1940 return
1942 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}"
1944 logging.warning(msg)
1945 email.send_email_to_admins("New ViUR password", msg)
1948# DEPRECATED ATTRIBUTES HANDLING
1950def __getattr__(attr):
1951 match attr:
1952 case "userSkel":
1953 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!"
1954 warnings.warn(msg, DeprecationWarning, stacklevel=2)
1955 logging.warning(msg)
1956 return UserSkel
1958 return super(__import__(__name__).__class__).__getattr__(attr)