Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/decorators.py: 19%

107 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1import typing as t 

2import logging 

3from viur.core import current, errors 

4from viur.core.config import conf 

5from viur.core.module import Method 

6 

7__all__ = [ 

8 "access", 

9 "exposed", 

10 "force_post", 

11 "force_ssl", 

12 "internal_exposed", 

13 "skey", 

14 "cors", 

15] 

16 

17 

18def exposed(func: t.Callable) -> Method: 

19 """ 

20 Decorator, which marks a function as exposed. 

21 

22 Only exposed functions are callable by http-requests. 

23 Can optionally receive a dict of language->translated name to make that function 

24 available under different names 

25 """ 

26 if isinstance(func, dict): 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 seo_language_map = func 

28 

29 # We received said dictionary: 

30 def expose_with_translations(func: t.Callable) -> Method: 

31 func = Method.ensure(func) 

32 func.exposed = True 

33 func.seo_language_map = seo_language_map 

34 return func 

35 

36 return expose_with_translations 

37 

38 func = Method.ensure(func) 

39 func.exposed = True 

40 return func 

41 

42 

43def internal_exposed(func: t.Callable) -> Method: 

44 """ 

45 Decorator, which marks a function as internal exposed. 

46 """ 

47 func = Method.ensure(func) 

48 func.exposed = False 

49 return func 

50 

51 

52def force_ssl(func: t.Callable) -> Method: 

53 """ 

54 Decorator, which enforces usage of an encrypted channel for a given resource. 

55 Has no effect on development-servers. 

56 """ 

57 func = Method.ensure(func) 

58 func.ssl = True 

59 return func 

60 

61 

62def force_post(func: t.Callable) -> Method: 

63 """ 

64 Decorator, which enforces usage of a http post request. 

65 """ 

66 func = Method.ensure(func) 

67 func.methods = ("POST",) 

68 return func 

69 

70 

71def access( 

72 *access: str | list[str] | tuple[str] | set[str] | t.Callable, 

73 offer_login: bool | str = False, 

74 message: str | None = None, 

75) -> t.Callable: 

76 """ 

77 Decorator, which performs an authentication and authorization check primarily based on the current user's access, 

78 which is defined via the `UserSkel.access`-bone. Additionally, a callable for individual access checking can be 

79 provided. 

80 

81 In case no user is logged in, the decorator enforces to raise an HTTP error 401 - Unauthorized in case no user is 

82 logged in, otherwise it returns an HTTP error 403 - Forbidden when the specified access parameters prohibit to call 

83 the decorated method. 

84 

85 :params access: Access configuration, either names of access rights or a callable for verification. 

86 :params offer_login: Offers a way to login; Either set it to True, to automatically redirect to /user/login, 

87 or set it to any other URL. 

88 :params message: A custom message to be printed when access is denied or unauthorized. 

89 

90 To check on authenticated users with the access "root" or ("admin" and "file-edit") or "maintainer" use the 

91 decorator like this: 

92 

93 .. code-block:: python 

94 from viur.core.decorators import access 

95 @access("root", ["admin", "file-edit"], ["maintainer"]) 

96 def my_method(self): 

97 return "You're allowed!" 

98 

99 Furthermore, instead of a list/tuple/set/str, a callable can be provided which performs custom access checking, 

100 and directly is checked on True for access grant. 

101 """ 

102 access_config = locals() 

103 

104 def validate(*args, **kwargs): 

105 # evaluate access guard setting? 

106 user = current.user.get() 

107 

108 if trace := conf.debug.trace: 

109 logging.debug(f"@access {user=} {access_config=}") 

110 

111 if not user: 

112 if offer_login := access_config["offer_login"]: 

113 raise errors.Redirect(offer_login if isinstance(offer_login, str) else "/user/login") 

114 

115 raise errors.Unauthorized(access_config["message"]) if access_config["message"] else errors.Unauthorized() 

116 

117 ok = "root" in user["access"] 

118 

119 if not ok and access_config["access"]: 

120 for acc in access_config["access"]: 

121 if trace: 

122 logging.debug(f"@access checking {acc=}") 

123 

124 # Callable directly tests access 

125 if callable(acc): 

126 if acc(): 

127 ok = True 

128 break 

129 

130 continue 

131 

132 # Otherwise, check for access rights 

133 if isinstance(acc, str): 

134 acc = (acc,) 

135 

136 assert isinstance(acc, (tuple, list, set)) 

137 

138 if all(a in user["access"] for a in acc): 

139 ok = True 

140 break 

141 

142 if trace: 

143 logging.debug(f"@access {ok=}") 

144 

145 if not ok: 

146 raise errors.Forbidden(access_config["message"]) if access_config["message"] else errors.Forbidden() 

147 

148 def decorator(func): 

149 meth = Method.ensure(func) 

150 meth.guards.append(validate) 

151 

152 # extend additional access descr, must be a list to be JSON-serializable 

153 meth.additional_descr["access"] = [str(access) for access in access_config["access"]] 

154 

155 return meth 

156 

157 return decorator 

158 

159 

160def skey( 

161 func: t.Callable = None, 

162 *, 

163 allow_empty: bool | list[str] | tuple[str] | t.Callable = False, 

164 forward_payload: str | None = None, 

165 message: str = None, 

166 name: str = "skey", 

167 validate: t.Callable | None = None, 

168 **extra_kwargs: dict, 

169) -> Method: 

170 """ 

171 Decorator, which configures an exposed method for requiring a CSRF-security-key. 

172 The decorator enforces a raise of HTTP error 406 - Precondition failed in case the security-key is not provided 

173 or became invalid. 

174 

175 :param allow_empty: Allows to call the method without a security-key when no other parameters where provided. 

176 This can also be a tuple or list of keys which are being ignored, or a callable taking args and kwargs, and 

177 programmatically decide whether security-key is required or not. 

178 :param forward_payload: Forwards the extracted payload of the security-key to the method under the key specified 

179 here as a value in kwargs. 

180 :param message: Allows to specify a custom error message in case a HTTP 406 is raised. 

181 :param name: Defaults to "skey", but allows also for another name passed to the method. 

182 :param validate: Allows to specify a Callable used to further evaluate the payload of the security-key. 

183 Security-keys can be equipped with further data, see the securitykey-module for details. 

184 :param extra_kwargs: Any provided extra_kwargs are being passed to securitykey.validate as kwargs. 

185 """ 

186 skey_config = locals() 

187 

188 def validate(args, kwargs, varargs, varkwargs): 

189 # evaluate skey guard setting? 

190 if not current.request.get().skey_checked: # skey guardiance is only required once per request 

191 if conf.debug.trace: 

192 logging.debug(f"@skey {skey_config=}") 

193 

194 security_key = kwargs.pop(skey_config["name"], "") 

195 

196 # validation is necessary? 

197 if allow_empty := skey_config["allow_empty"]: 

198 # allow_empty can be callable, to detect programmatically 

199 if callable(allow_empty): 

200 required = not allow_empty(args, kwargs) 

201 # or allow_empty can be a sequence of allowed keys 

202 elif isinstance(allow_empty, (list, tuple)): 

203 required = any(k for k in kwargs.keys() if k not in allow_empty) 

204 # otherwise, varargs or varkwargs may not be empty. 

205 else: 

206 required = varargs or varkwargs or security_key 

207 if conf.debug.trace: 

208 logging.debug(f"@skey {required=} because either {varargs=} or {varkwargs=} or {security_key=}") 

209 else: 

210 required = True 

211 

212 if required: 

213 if conf.debug.trace: 

214 logging.debug(f"@skey wanted, validating {security_key!r}") 

215 

216 from viur.core import securitykey 

217 payload = securitykey.validate(security_key, **skey_config["extra_kwargs"]) 

218 current.request.get().skey_checked = True 

219 

220 if not payload or (skey_config["validate"] and not skey_config["validate"](payload)): 

221 raise errors.PreconditionFailed( 

222 skey_config["message"] or f"Missing or invalid parameter {skey_config['name']!r}" 

223 ) 

224 

225 if skey_config["forward_payload"]: 

226 kwargs |= {skey_config["forward_payload"]: payload} 

227 

228 def decorator(func): 

229 meth = Method.ensure(func) 

230 meth.skey = skey_config 

231 meth.guards.append(validate) 

232 

233 # extend additional access descr, must be a list to be JSON-serializable 

234 meth.additional_descr["skey"] = skey_config["name"] 

235 

236 return meth 

237 

238 if func is None: 238 ↛ 241line 238 didn't jump to line 241 because the condition on line 238 was always true

239 return decorator 

240 

241 return decorator(func) 

242 

243 

244def cors( 

245 allow_headers: t.Iterable[str] = (), 

246) -> t.Callable: 

247 """Add additional CORS setting for a decorated :meth:`exposed` method.""" 

248 

249 def decorator(func): 

250 meth = Method.ensure(func) 

251 meth.cors_allow_headers = allow_headers 

252 return meth 

253 

254 return decorator