Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/request.py: 6%
442 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1"""
2 This module implements the WSGI (Web Server Gateway Interface) layer for ViUR. This is the main entry
3 point for incomming http requests. The main class is the :class:BrowserHandler. Each request will get it's
4 own instance of that class which then holds the reference to the request and response object.
5 Additionally, this module defines the RequestValidator interface which provides a very early hook into the
6 request processing (useful for global ratelimiting, DDoS prevention or access control).
7"""
8import datetime
9import fnmatch
10import json
11import logging
12import os
13import re
14import time
15import traceback
16import typing as t
17import unicodedata
18from abc import ABC, abstractmethod
19from urllib import parse
20from urllib.parse import quote, unquote, urljoin, urlparse
22import webob
24from viur.core import current, db, errors, session, utils
25from viur.core.config import conf
26from viur.core.logging import client as loggingClient, requestLogger, requestLoggingRessource
27from viur.core.module import Method
28from viur.core.securityheaders import extendCsp
29from viur.core.tasks import _appengineServiceIPs
31TEMPLATE_STYLE_KEY = "style"
34class RequestValidator(ABC):
35 """
36 RequestValidators can be used to validate a request very early on. If the validate method returns a tuple,
37 the request is aborted. Can be used to block requests from bots.
39 To register or remove a validator, access it in main.py through
40 :attr: viur.core.request.Router.requestValidators
41 """
42 # Internal name to trace which validator aborted the request
43 name = "RequestValidator"
45 @staticmethod
46 @abstractmethod
47 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
48 """
49 The function that checks the current request. If the request is valid, simply return None.
50 If the request should be blocked, it must return a tuple of
51 - The HTTP status code (as int)
52 - The Description of that status code (eg "Forbidden")
53 - The Response Body (can be a simple string or an HTML-Page)
54 :param request: The Request instance to check
55 :return: None on success, an Error-Tuple otherwise
56 """
57 raise NotImplementedError()
60class FetchMetaDataValidator(RequestValidator):
61 """
62 This validator examines the headers "Sec-Fetch-Site", "sec-fetch-mode" and "sec-fetch-dest" as
63 recommended by https://web.dev/fetch-metadata/
64 """
65 name = "FetchMetaDataValidator"
67 @staticmethod
68 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
69 """
70 This validator examines the headers "sec-fetch-site",
71 "sec-fetch-mode" and "sec-fetch-dest" as recommended
72 by https://web.dev/fetch-metadata/
73 """
74 headers = request.request.headers
76 match headers.get("sec-fetch-site"):
77 case None | "same-origin" | "none":
78 # A Request from our site, or browser didn't send "sec-fetch-site"
79 return None
80 case "same-site":
81 # We are accepting a request with same-site only in local dev mode
82 if conf.instance.is_dev_server:
83 return None
84 case _:
85 # Incoming navigation GET request
86 if (
87 not request.isPostRequest
88 and headers.get("sec-fetch-mode") == "navigate"
89 and headers.get('sec-fetch-dest') not in ("object", "embed")
90 ):
91 return None
93 return 403, "Forbidden", "Request rejected due to fetch metadata"
96class Router:
97 """
98 This class accepts the requests, collect its parameters and routes the request
99 to its destination function.
100 The basic control flow is
101 - Setting up internal variables
102 - Running the Request validators
103 - Emitting the headers (especially the security related ones)
104 - Run the TLS check (ensure it's a secure connection or check if the URL is whitelisted)
105 - Load or initialize a new session
106 - Set up i18n (choosing the language etc)
107 - Run the request preprocessor (if any)
108 - Normalize & sanity check the parameters
109 - Resolve the exposed function and call it
110 - Save the session / tear down the request
111 - Return the response generated
114 :warning: Don't instantiate! Don't subclass! DON'T TOUCH! ;)
115 """
117 # List of requestValidators used to preflight-check an request before it's being dispatched within ViUR
118 requestValidators = [FetchMetaDataValidator]
120 def __init__(self, environ: dict):
121 super().__init__()
122 self.startTime = time.time()
124 self.request = webob.Request(environ)
125 self.response = webob.Response()
127 self.maxLogLevel = logging.DEBUG
128 self._traceID = \
129 self.request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or utils.string.random()
130 self.is_deferred = False
131 self.path = ""
132 self.path_list = ()
134 self.skey_checked = False # indicates whether @skey-decorator-check has already performed within a request
135 self.internalRequest = False
136 self.disableCache = False # Shall this request bypass the caches?
137 self.pendingTasks = []
138 self.args = ()
139 self.kwargs = {}
140 self.context = {}
141 self.template_style: str | None = None
142 self.cors_headers = ()
144 # Check if it's a HTTP-Method we support
145 self.method = self.request.method.lower()
146 self.isPostRequest = self.method == "post"
147 self.isSSLConnection = self.request.host_url.lower().startswith("https://") # We have an encrypted channel
149 db.current_db_access_log.set(set())
151 # Set context variables
152 current.language.set(conf.i18n.default_language)
153 current.request.set(self)
154 current.session.set(session.Session())
155 current.request_data.set({})
157 # Process actual request
158 self._process()
160 self._cors()
162 # Unset context variables
163 current.language.set(None)
164 current.request_data.set(None)
165 current.session.set(None)
166 current.request.set(None)
167 current.user.set(None)
169 @property
170 def isDevServer(self) -> bool:
171 import warnings
172 msg = "Use of `isDevServer` is deprecated; Use `conf.instance.is_dev_server` instead!"
173 warnings.warn(msg, DeprecationWarning, stacklevel=2)
174 logging.warning(msg)
175 return conf.instance.is_dev_server
177 def _select_language(self, path: str) -> str:
178 """
179 Tries to select the best language for the current request. Depending on the value of
180 conf.i18n.language_method, we'll either try to load it from the session, determine it by the domain
181 or extract it from the URL.
182 """
184 def get_language_from_header() -> str | None:
185 if not (accept_language := self.request.headers.get("accept-language")):
186 return None
187 languages = accept_language.split(",")
188 locale_q_pairs = []
190 for language in languages:
191 if language.split(";")[0] == language:
192 # no q => q = 1
193 locale_q_pairs.append((language.strip(), "1"))
194 else:
195 try:
196 locale = language.split(";")[0].strip()
197 q = language.split(";")[1].split("=")[1]
198 locale_q_pairs.append((locale, q))
199 except IndexError:
200 continue # skip language
201 locale_q_pairs.sort(key=lambda pair: pair[1], reverse=True) # sort by Quality values
202 for locale_q_pair in locale_q_pairs:
203 if "-" in locale_q_pair[0]: # Check for de-DE
204 lang = locale_q_pair[0].split("-")[0]
205 else:
206 lang = locale_q_pair[0]
207 if lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
208 return lang
209 if lang == "*": # fallback
210 return conf.i18n.available_languages[0]
211 return None
213 if not conf.i18n.available_languages:
214 # This project doesn't use the multi-language feature, nothing to do here
215 return path
216 if conf.i18n.language_method == "session":
217 current_session = current.session.get()
218 lang = conf.i18n.default_language
219 # We save the language in the session, if it exists, and try to load it from there
220 if "lang" in current_session:
221 current.language.set(current_session["lang"])
222 return path
224 if header_lang := get_language_from_header():
225 lang = header_lang
226 current.language.set(lang)
228 elif header_lang := self.request.headers.get("X-Appengine-Country"):
229 header_lang = str(header_lang).lower()
230 if header_lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
231 lang = header_lang
233 if current_session.loaded:
234 current_session["lang"] = lang
235 current.language.set(lang)
237 elif conf.i18n.language_method == "domain":
238 host = self.request.host_url.lower()
239 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
240 if host.startswith("www."):
241 host = host[4:]
242 if lang := conf.i18n.domain_language_mapping.get(host):
243 current.language.set(lang)
244 # We have no language configured for this domain, try to read it from the HTTP Header
245 elif lang := get_language_from_header():
246 current.language.set(lang)
248 elif conf.i18n.language_method == "url":
249 tmppath = urlparse(path).path
250 tmppath = [unquote(x) for x in tmppath.lower().strip("/").split("/")]
251 if (
252 len(tmppath) > 0
253 and tmppath[0] in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys())
254 ):
255 current.language.set(tmppath[0])
256 return path[len(tmppath[0]) + 1:] # Return the path stripped by its language segment
257 else: # This URL doesnt contain an language prefix, try to read it from session
258 if header_lang := get_language_from_header():
259 current.language.set(header_lang)
260 elif header_lang := self.request.headers.get("X-Appengine-Country"):
261 lang = str(header_lang).lower()
262 if lang in conf.i18n.available_languages or lang in conf.i18n.language_alias_map:
263 current.language.set(lang)
264 elif conf.i18n.language_method == "header":
265 if lang := get_language_from_header():
266 current.language.set(lang)
268 return path
270 def _process(self):
271 if self.method not in ("get", "post", "head", "options"):
272 logging.error(f"{self.method=} not supported")
273 return
275 if self.request.headers.get("X-AppEngine-TaskName", None) is not None: # Check if we run in the appengine
276 if self.request.environ.get("HTTP_X_APPENGINE_USER_IP") in _appengineServiceIPs:
277 self.is_deferred = True
278 elif os.getenv("TASKS_EMULATOR") is not None:
279 self.is_deferred = True
281 # Check if we should process or abort the request
282 for validator, reqValidatorResult in [(x, x.validate(self)) for x in self.requestValidators]:
283 if reqValidatorResult is not None:
284 logging.warning(f"Request rejected by validator {validator.name}")
285 statusCode, statusStr, statusDescr = reqValidatorResult
286 self.response.status = f"{statusCode} {statusStr}"
287 self.response.write(statusDescr)
288 return
290 path = self.request.path
292 # Add CSP headers early (if any)
293 if conf.security.content_security_policy and conf.security.content_security_policy["_headerCache"]:
294 for k, v in conf.security.content_security_policy["_headerCache"].items():
295 self.response.headers[k] = v
296 if self.isSSLConnection: # Check for HTST and PKP headers only if we have a secure channel.
297 if conf.security.strict_transport_security:
298 self.response.headers["Strict-Transport-Security"] = conf.security.strict_transport_security
299 # Check for X-Security-Headers we shall emit
300 if conf.security.x_content_type_options:
301 self.response.headers["X-Content-Type-Options"] = "nosniff"
302 if conf.security.x_xss_protection is not None:
303 if conf.security.x_xss_protection:
304 self.response.headers["X-XSS-Protection"] = "1; mode=block"
305 elif conf.security.x_xss_protection is False:
306 self.response.headers["X-XSS-Protection"] = "0"
307 if conf.security.x_frame_options is not None and isinstance(conf.security.x_frame_options, tuple):
308 mode, uri = conf.security.x_frame_options
309 if mode in ["deny", "sameorigin"]:
310 self.response.headers["X-Frame-Options"] = mode
311 elif mode == "allow-from":
312 self.response.headers["X-Frame-Options"] = f"allow-from {uri}"
313 if conf.security.x_permitted_cross_domain_policies is not None:
314 self.response.headers["X-Permitted-Cross-Domain-Policies"] = conf.security.x_permitted_cross_domain_policies
315 if conf.security.referrer_policy:
316 self.response.headers["Referrer-Policy"] = conf.security.referrer_policy
317 if conf.security.permissions_policy.get("_headerCache"):
318 self.response.headers["Permissions-Policy"] = conf.security.permissions_policy["_headerCache"]
319 if conf.security.enable_coep:
320 self.response.headers["Cross-Origin-Embedder-Policy"] = "require-corp"
321 if conf.security.enable_coop:
322 self.response.headers["Cross-Origin-Opener-Policy"] = conf.security.enable_coop
323 if conf.security.enable_corp:
324 self.response.headers["Cross-Origin-Resource-Policy"] = conf.security.enable_corp
326 # Ensure that TLS is used if required
327 if conf.security.force_ssl and not self.isSSLConnection and not conf.instance.is_dev_server:
328 isWhitelisted = False
329 reqPath = self.request.path
330 for testUrl in conf.security.no_ssl_check_urls:
331 if testUrl.endswith("*"):
332 if reqPath.startswith(testUrl[:-1]):
333 isWhitelisted = True
334 break
335 else:
336 if testUrl == reqPath:
337 isWhitelisted = True
338 break
339 if not isWhitelisted: # Some URLs need to be whitelisted (as f.e. the Tasks-Queue doesn't call using https)
340 # Redirect the user to the startpage (using ssl this time)
341 host = self.request.host_url.lower()
342 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
343 self.response.status = "302 Found"
344 self.response.headers['Location'] = f"https://{host}/"
345 return
346 if path.startswith("/_ah/warmup"):
347 self.response.write("okay")
348 return
350 try:
351 current.session.get().load()
353 # Load current user into context variable if user module is there.
354 if user_mod := getattr(conf.main_app.vi, "user", None):
355 current.user.set(user_mod.getCurrentUser())
357 path = self._select_language(path)[1:]
359 # Check for closed system
360 if conf.security.closed_system and self.method != "options":
361 if not current.user.get():
362 if not any(fnmatch.fnmatch(path, pat) for pat in conf.security.closed_system_allowed_paths):
363 raise errors.Unauthorized()
365 if conf.request_preprocessor:
366 path = conf.request_preprocessor(path)
368 self._route(path)
370 except errors.Redirect as e:
371 if conf.debug.trace_exceptions:
372 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
373 raise
374 self.response.status = f"{e.status} {e.name}"
375 url = e.url
376 url = unquote(url) # decode first
377 # safe = https://url.spec.whatwg.org/#url-path-segment-string
378 url = quote(url, encoding="utf-8", safe="!$&'()*+,-./:;=?@_~#") # re-encode all in utf-8
379 if url.startswith(('.', '/')):
380 url = str(urljoin(self.request.url, url))
381 self.response.headers['Location'] = url
383 except Exception as e:
384 if conf.debug.trace_exceptions:
385 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
386 raise
387 self.response.body = b""
388 if isinstance(e, errors.HTTPException):
389 logging.info(f"[{e.status}] {e.name}: {e.descr}", exc_info=conf.debug.trace)
390 self.response.status = f"{e.status} {e.name}"
391 # Set machine-readable x-viur-error response header in case there is an exception description.
392 if e.descr:
393 self.response.headers["x-viur-error"] = e.descr.replace("\n", "")
394 else:
395 self.response.status = 500
396 logging.error("ViUR has caught an unhandled exception!")
397 logging.exception(e)
399 res = None
400 if conf.error_handler:
401 try:
402 res = conf.error_handler(e)
403 except Exception as newE:
404 logging.error("viur.error_handler failed!")
405 logging.exception(newE)
406 res = None
407 if not res:
408 descr = "The server encountered an unexpected error and is unable to process your request."
410 if isinstance(e, errors.HTTPException):
411 error_info = {
412 "status": e.status,
413 "reason": e.name,
414 "title": str(translate(e.name)),
415 "descr": e.descr,
416 }
417 else:
418 error_info = {
419 "status": 500,
420 "reason": "Internal Server Error",
421 "title": str(translate("Internal Server Error")),
422 "descr": descr
423 }
425 if conf.instance.is_dev_server:
426 error_info["traceback"] = traceback.format_exc()
428 error_info["logo"] = conf.error_logo
430 if (len(self.path_list) > 0 and self.path_list[0] in ("vi", "json")) or \
431 current.request.get().response.headers["Content-Type"] == "application/json":
432 current.request.get().response.headers["Content-Type"] = "application/json"
433 res = json.dumps(error_info)
434 else: # We render the error in html
435 # Try to get the template from html/error/
436 if filename := conf.main_app.render.getTemplateFileName((f"{error_info['status']}", "error"),
437 raise_exception=False):
438 template = conf.main_app.render.getEnv().get_template(filename)
439 try:
440 uses_unsafe_inline = \
441 "unsafe-inline" in conf.security.content_security_policy["enforce"]["style-src"]
442 except (KeyError, TypeError): # Not set
443 uses_unsafe_inline = False
444 if uses_unsafe_inline:
445 logging.info("Using style-src:unsafe-inline, don't create a nonce")
446 nonce = None
447 else:
448 nonce = utils.string.random(16)
449 extendCsp({"style-src": [f"nonce-{nonce}"]})
450 res = template.render(error_info, nonce=nonce)
451 else:
452 res = (f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">'
453 f'<title>{error_info["status"]} - {error_info["reason"]}</title>'
454 f'</head><body><h1>{error_info["status"]} - {error_info["reason"]}</h1>')
456 self.response.write(res.encode("UTF-8"))
458 finally:
459 current.session.get().save()
460 if conf.instance.is_dev_server and conf.debug.dev_server_cloud_logging:
461 # Emit the outer log only on dev_appserver (we'll use the existing request log when live)
462 SEVERITY = "DEBUG"
463 if self.maxLogLevel >= 50:
464 SEVERITY = "CRITICAL"
465 elif self.maxLogLevel >= 40:
466 SEVERITY = "ERROR"
467 elif self.maxLogLevel >= 30:
468 SEVERITY = "WARNING"
469 elif self.maxLogLevel >= 20:
470 SEVERITY = "INFO"
472 TRACE = "projects/{}/traces/{}".format(loggingClient.project, self._traceID)
474 REQUEST = {
475 'requestMethod': self.request.method,
476 'requestUrl': self.request.url,
477 'status': self.response.status_code,
478 'userAgent': self.request.headers.get('USER-AGENT'),
479 'responseSize': self.response.content_length,
480 'latency': "%0.3fs" % (time.time() - self.startTime),
481 'remoteIp': self.request.environ.get("HTTP_X_APPENGINE_USER_IP")
482 }
483 requestLogger.log_text(
484 "",
485 client=loggingClient,
486 severity=SEVERITY,
487 http_request=REQUEST,
488 trace=TRACE,
489 resource=requestLoggingRessource,
490 operation={
491 "first": True,
492 "last": True,
493 "id": self._traceID
494 }
495 )
497 if conf.instance.is_dev_server:
498 self.is_deferred = True
500 while self.pendingTasks:
501 task = self.pendingTasks.pop()
502 logging.debug(f"Deferred task emulation, executing {task=}")
503 try:
504 task()
505 except Exception: # noqa
506 logging.exception(f"Deferred Task emulation {task} failed")
508 def _route(self, path: str) -> None:
509 """
510 Does the actual work of sanitizing the parameter, determine which exposed-function to call
511 (and with which parameters)
512 """
514 # Parse the URL
515 if path := parse.urlparse(path).path:
516 self.path = path
517 self.path_list = tuple(unicodedata.normalize("NFC", parse.unquote(part))
518 for part in path.strip("/").split("/"))
520 # Prevent Hash-collision attacks
521 if len(self.request.params) > conf.max_post_params_count:
522 raise errors.BadRequest(
523 f"Too many arguments supplied, exceeding maximum"
524 f" of {conf.max_post_params_count} allowed arguments per request"
525 )
527 param_filter = conf.param_filter_function
528 if param_filter and not callable(param_filter):
529 raise ValueError(f"""{param_filter=} is not callable""")
531 for key, value in self.request.params.items():
532 try:
533 key = unicodedata.normalize("NFC", key)
534 value = unicodedata.normalize("NFC", value)
535 except UnicodeError:
536 # We received invalid unicode data (usually happens when
537 # someone tries to exploit unicode normalisation bugs)
538 raise errors.BadRequest()
540 if param_filter and param_filter(key, value):
541 continue
543 if key == TEMPLATE_STYLE_KEY:
544 self.template_style = value
545 continue
547 if key in self.kwargs:
548 if isinstance(self.kwargs[key], list):
549 self.kwargs[key].append(value)
550 else: # Convert that key to a list
551 self.kwargs[key] = [self.kwargs[key], value]
552 else:
553 self.kwargs[key] = value
555 if "self" in self.kwargs or "return" in self.kwargs: # self or return is reserved for bound methods
556 raise errors.BadRequest()
558 caller = conf.main_resolver
559 idx = 0 # Count how may items from *args we'd have consumed (so the rest can go into *args of the called func
560 path_found = True
562 for part in self.path_list:
563 # TODO: Remove canAccess guards... solve differently.
564 if "canAccess" in caller and not caller["canAccess"]():
565 # We have a canAccess function guarding that object,
566 # and it returns False...
567 raise errors.Unauthorized()
569 idx += 1
571 if part not in caller:
572 part = "index"
574 if caller := caller.get(part):
575 if isinstance(caller, Method):
576 if part == "index":
577 idx -= 1
579 self.args = tuple(self.path_list[idx:])
580 break
582 elif part == "index":
583 path_found = False
584 break
586 else:
587 path_found = False
588 break
590 if not path_found:
591 raise errors.NotFound(
592 f"""The path {utils.string.escape("/".join(self.path_list[:idx]))} could not be found""")
594 if not isinstance(caller, Method):
595 # try to find "index" function
596 if (index := caller.get("index")) and isinstance(index, Method):
597 caller = index
598 else:
599 raise errors.MethodNotAllowed()
601 # Check for internal exposed
602 if caller.exposed is False and not self.internalRequest:
603 raise errors.NotFound()
605 # Fill the Allow header of the response with the allowed HTTP methods
606 if self.method == "options":
607 self.response.headers["Allow"] = ", ".join(sorted(caller.methods)).upper()
609 # Register caller specific CORS headers
610 self.cors_headers = [str(header).lower() for header in caller.cors_allow_headers or ()]
612 # Check for @force_ssl flag
613 if not self.internalRequest \
614 and caller.ssl \
615 and not self.request.host_url.lower().startswith("https://") \
616 and not conf.instance.is_dev_server:
617 raise errors.PreconditionFailed("You must use SSL to access this resource!")
619 # Check for @force_post flag
620 if not self.isPostRequest and caller.methods == ("POST",):
621 raise errors.MethodNotAllowed("You must use POST to access this resource!")
623 # Check if this request should bypass the caches
624 if self.request.headers.get("X-Viur-Disable-Cache"):
625 # No cache requested, check if the current user is allowed to do so
626 if (user := current.user.get()) and "root" in user["access"]:
627 logging.debug("Caching disabled by X-Viur-Disable-Cache header")
628 self.disableCache = True
630 # Destill context as self.context, if available
631 if context := {k: v for k, v in self.kwargs.items() if k.startswith("@")}:
632 # Remove context parameters from kwargs
633 kwargs = {k: v for k, v in self.kwargs.items() if k not in context}
634 # Remove leading "@" from context parameters
635 self.context |= {k[1:]: v for k, v in context.items() if len(k) > 1}
636 else:
637 kwargs = self.kwargs
639 if ((self.internalRequest and conf.debug.trace_internal_call_routing)
640 or conf.debug.trace_external_call_routing):
641 logging.debug(
642 f"Calling {caller._func!r} with args={self.args!r}, {kwargs=} within context={self.context!r}"
643 )
645 if self.method == "options":
646 # OPTIONS request doesn't have a body
647 del self.response.app_iter
648 del self.response.content_type
649 self.response.status = "204 No Content"
650 return
652 # Now call the routed method!
653 res = caller(*self.args, **kwargs)
655 if self.method == "options":
656 # OPTIONS request doesn't have a body
657 del self.response.app_iter
658 del self.response.content_type
659 self.response.status = "204 No Content"
660 return
662 if not isinstance(res, bytes): # Convert the result to bytes if it is not already!
663 res = str(res).encode("UTF-8")
664 self.response.write(res)
666 def _cors(self) -> None:
667 """
668 Set CORS headers to the HTTP response.
670 .. seealso::
672 Option :attr:`core.config.Security.cors_origins`, etc.
673 for cors settings.
675 https://fetch.spec.whatwg.org/#http-cors-protocol
677 https://enable-cors.org/server.html
679 https://www.html5rocks.com/static/images/cors_server_flowchart.png
680 """
682 def test_candidates(value: str, *candidates: str | re.Pattern) -> bool:
683 """Test if the value matches the pattern of any candidate"""
684 for candidate in candidates:
685 if isinstance(candidate, re.Pattern):
686 if candidate.match(value):
687 return True
688 elif isinstance(candidate, str):
689 if candidate.lower() == str(value).lower():
690 return True
691 else:
692 raise TypeError(
693 f"Invalid setting {candidate}. "
694 f"Expected a string or a compiled regex."
695 )
696 return False
698 origin = current.request.get().request.headers.get("Origin")
699 if not origin:
700 return
702 # Origin is set --> It's a CORS request
704 any_origin_allowed = (
705 conf.security.cors_origins == "*"
706 or any(_origin == "*" for _origin in conf.security.cors_origins)
707 or any(_origin.pattern == r".*"
708 for _origin in conf.security.cors_origins
709 if isinstance(_origin, re.Pattern))
710 )
712 if any_origin_allowed and conf.security.cors_origins_use_wildcard:
713 if conf.security.cors_allow_credentials:
714 raise RuntimeError(
715 "Invalid CORS config: "
716 "If credentials mode is \"include\", then `Access-Control-Allow-Origin` cannot be `*`. "
717 "See https://fetch.spec.whatwg.org/#cors-protocol-and-credentials"
718 )
719 self.response.headers["Access-Control-Allow-Origin"] = "*"
721 elif test_candidates(origin, *conf.security.cors_origins):
722 self.response.headers["Access-Control-Allow-Origin"] = origin
724 else:
725 logging.warning(f"{origin=} not valid (must be one of {conf.security.cors_origins=})")
726 return
728 if conf.security.cors_allow_credentials:
729 self.response.headers["Access-Control-Allow-Credentials"] = "true"
731 if self.method == "options":
732 method = (self.request.headers.get("Access-Control-Request-Method") or "").lower()
734 if method in conf.security.cors_methods:
735 # It's a CORS-preflight request
736 # - MUST include Access-Control-Request-Method
737 # - CAN include Access-Control-Request-Headers
739 # The response can be cached
740 if conf.security.cors_max_age is not None:
741 assert isinstance(conf.security.cors_max_age, datetime.timedelta)
742 self.response.headers["Access-Control-Max-Age"] = \
743 str(int(conf.security.cors_max_age.total_seconds()))
745 # Allowed methods
746 self.response.headers["Access-Control-Allow-Methods"] = ", ".join(
747 sorted(conf.security.cors_methods)).upper()
749 # Allowed headers
750 request_headers = self.request.headers.get("Access-Control-Request-Headers")
751 request_headers = [h.strip().lower() for h in request_headers.split(",")]
752 if conf.security.cors_allow_headers == "*":
753 # Every header is allowed
754 allow_headers = request_headers[:]
755 else:
756 # There are generally headers allowed and/or from the caller
757 allow_headers = [
758 header
759 for header in request_headers
760 if test_candidates(
761 header,
762 *(self.cors_headers or ()), # caller specific
763 *(conf.security.cors_allow_headers or ()) # generally global
764 )
765 ]
766 if allow_headers:
767 self.response.headers["Access-Control-Allow-Headers"] = ", ".join(sorted(allow_headers))
769 else:
770 logging.warning(
771 f"Access-Control-Request-Method: {method} is NOT a valid method of {conf.security.cors_methods=}. "
772 f"Don't append CORS-preflight request headers"
773 )
775 def saveSession(self) -> None:
776 current.session.get().save()
779from .i18n import translate # noqa: E402