Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/decorators.py: 19%
107 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
1import typing as t
2import logging
3from viur.core import current, errors
4from viur.core.config import conf
5from viur.core.module import Method
7__all__ = [
8 "access",
9 "exposed",
10 "force_post",
11 "force_ssl",
12 "internal_exposed",
13 "skey",
14 "cors",
15]
18def exposed(func: t.Callable) -> Method:
19 """
20 Decorator, which marks a function as exposed.
22 Only exposed functions are callable by http-requests.
23 Can optionally receive a dict of language->translated name to make that function
24 available under different names
25 """
26 if isinstance(func, dict): 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true
27 seo_language_map = func
29 # We received said dictionary:
30 def expose_with_translations(func: t.Callable) -> Method:
31 func = Method.ensure(func)
32 func.exposed = True
33 func.seo_language_map = seo_language_map
34 return func
36 return expose_with_translations
38 func = Method.ensure(func)
39 func.exposed = True
40 return func
43def internal_exposed(func: t.Callable) -> Method:
44 """
45 Decorator, which marks a function as internal exposed.
46 """
47 func = Method.ensure(func)
48 func.exposed = False
49 return func
52def force_ssl(func: t.Callable) -> Method:
53 """
54 Decorator, which enforces usage of an encrypted channel for a given resource.
55 Has no effect on development-servers.
56 """
57 func = Method.ensure(func)
58 func.ssl = True
59 return func
62def force_post(func: t.Callable) -> Method:
63 """
64 Decorator, which enforces usage of a http post request.
65 """
66 func = Method.ensure(func)
67 func.methods = ("POST",)
68 return func
71def access(
72 *access: str | list[str] | tuple[str] | set[str] | t.Callable,
73 offer_login: bool | str = False,
74 message: str | None = None,
75) -> t.Callable:
76 """
77 Decorator, which performs an authentication and authorization check primarily based on the current user's access,
78 which is defined via the `UserSkel.access`-bone. Additionally, a callable for individual access checking can be
79 provided.
81 In case no user is logged in, the decorator enforces to raise an HTTP error 401 - Unauthorized in case no user is
82 logged in, otherwise it returns an HTTP error 403 - Forbidden when the specified access parameters prohibit to call
83 the decorated method.
85 :params access: Access configuration, either names of access rights or a callable for verification.
86 :params offer_login: Offers a way to login; Either set it to True, to automatically redirect to /user/login,
87 or set it to any other URL.
88 :params message: A custom message to be printed when access is denied or unauthorized.
90 To check on authenticated users with the access "root" or ("admin" and "file-edit") or "maintainer" use the
91 decorator like this:
93 .. code-block:: python
94 from viur.core.decorators import access
95 @access("root", ["admin", "file-edit"], ["maintainer"])
96 def my_method(self):
97 return "You're allowed!"
99 Furthermore, instead of a list/tuple/set/str, a callable can be provided which performs custom access checking,
100 and directly is checked on True for access grant.
101 """
102 access_config = locals()
104 def validate(*args, **kwargs):
105 # evaluate access guard setting?
106 user = current.user.get()
108 if trace := conf.debug.trace:
109 logging.debug(f"@access {user=} {access_config=}")
111 if not user:
112 if offer_login := access_config["offer_login"]:
113 raise errors.Redirect(offer_login if isinstance(offer_login, str) else "/user/login")
115 raise errors.Unauthorized(access_config["message"]) if access_config["message"] else errors.Unauthorized()
117 ok = "root" in user["access"]
119 if not ok and access_config["access"]:
120 for acc in access_config["access"]:
121 if trace:
122 logging.debug(f"@access checking {acc=}")
124 # Callable directly tests access
125 if callable(acc):
126 if acc():
127 ok = True
128 break
130 continue
132 # Otherwise, check for access rights
133 if isinstance(acc, str):
134 acc = (acc,)
136 assert isinstance(acc, (tuple, list, set))
138 if all(a in user["access"] for a in acc):
139 ok = True
140 break
142 if trace:
143 logging.debug(f"@access {ok=}")
145 if not ok:
146 raise errors.Forbidden(access_config["message"]) if access_config["message"] else errors.Forbidden()
148 def decorator(func):
149 meth = Method.ensure(func)
150 meth.guards.append(validate)
152 # extend additional access descr, must be a list to be JSON-serializable
153 meth.additional_descr["access"] = [str(access) for access in access_config["access"]]
155 return meth
157 return decorator
160def skey(
161 func: t.Callable = None,
162 *,
163 allow_empty: bool | list[str] | tuple[str] | t.Callable = False,
164 forward_payload: str | None = None,
165 message: str = None,
166 name: str = "skey",
167 validate: t.Callable | None = None,
168 **extra_kwargs: dict,
169) -> Method:
170 """
171 Decorator, which configures an exposed method for requiring a CSRF-security-key.
172 The decorator enforces a raise of HTTP error 406 - Precondition failed in case the security-key is not provided
173 or became invalid.
175 :param allow_empty: Allows to call the method without a security-key when no other parameters where provided.
176 This can also be a tuple or list of keys which are being ignored, or a callable taking args and kwargs, and
177 programmatically decide whether security-key is required or not.
178 :param forward_payload: Forwards the extracted payload of the security-key to the method under the key specified
179 here as a value in kwargs.
180 :param message: Allows to specify a custom error message in case a HTTP 406 is raised.
181 :param name: Defaults to "skey", but allows also for another name passed to the method.
182 :param validate: Allows to specify a Callable used to further evaluate the payload of the security-key.
183 Security-keys can be equipped with further data, see the securitykey-module for details.
184 :param extra_kwargs: Any provided extra_kwargs are being passed to securitykey.validate as kwargs.
185 """
186 skey_config = locals()
188 def validate(args, kwargs, varargs, varkwargs):
189 # evaluate skey guard setting?
190 if not current.request.get().skey_checked: # skey guardiance is only required once per request
191 if conf.debug.trace:
192 logging.debug(f"@skey {skey_config=}")
194 security_key = kwargs.pop(skey_config["name"], "")
196 # validation is necessary?
197 if allow_empty := skey_config["allow_empty"]:
198 # allow_empty can be callable, to detect programmatically
199 if callable(allow_empty):
200 required = not allow_empty(args, kwargs)
201 # or allow_empty can be a sequence of allowed keys
202 elif isinstance(allow_empty, (list, tuple)):
203 required = any(k for k in kwargs.keys() if k not in allow_empty)
204 # otherwise, varargs or varkwargs may not be empty.
205 else:
206 required = varargs or varkwargs or security_key
207 if conf.debug.trace:
208 logging.debug(f"@skey {required=} because either {varargs=} or {varkwargs=} or {security_key=}")
209 else:
210 required = True
212 if required:
213 if conf.debug.trace:
214 logging.debug(f"@skey wanted, validating {security_key!r}")
216 from viur.core import securitykey
217 payload = securitykey.validate(security_key, **skey_config["extra_kwargs"])
218 current.request.get().skey_checked = True
220 if not payload or (skey_config["validate"] and not skey_config["validate"](payload)):
221 raise errors.PreconditionFailed(
222 skey_config["message"] or f"Missing or invalid parameter {skey_config['name']!r}"
223 )
225 if skey_config["forward_payload"]:
226 kwargs |= {skey_config["forward_payload"]: payload}
228 def decorator(func):
229 meth = Method.ensure(func)
230 meth.skey = skey_config
231 meth.guards.append(validate)
233 # extend additional access descr, must be a list to be JSON-serializable
234 meth.additional_descr["skey"] = skey_config["name"]
236 return meth
238 if func is None: 238 ↛ 241line 238 didn't jump to line 241 because the condition on line 238 was always true
239 return decorator
241 return decorator(func)
244def cors(
245 allow_headers: t.Iterable[str] = (),
246) -> t.Callable:
247 """Add additional CORS setting for a decorated :meth:`exposed` method."""
249 def decorator(func):
250 meth = Method.ensure(func)
251 meth.cors_allow_headers = allow_headers
252 return meth
254 return decorator