Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/request.py: 6%
436 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
1"""
2 This module implements the WSGI (Web Server Gateway Interface) layer for ViUR. This is the main entry
3 point for incomming http requests. The main class is the :class:BrowserHandler. Each request will get it's
4 own instance of that class which then holds the reference to the request and response object.
5 Additionally, this module defines the RequestValidator interface which provides a very early hook into the
6 request processing (useful for global ratelimiting, DDoS prevention or access control).
7"""
8import datetime
9import fnmatch
10import json
11import logging
12import os
13import re
14import time
15import traceback
16import typing as t
17from abc import ABC, abstractmethod
18from urllib import parse
19from urllib.parse import unquote, urljoin, urlparse
21import unicodedata
22import webob
24from viur.core import current, db, errors, session, utils
25from viur.core.config import conf
26from viur.core.logging import client as loggingClient, requestLogger, requestLoggingRessource
27from viur.core.module import Method
28from viur.core.securityheaders import extendCsp
29from viur.core.tasks import _appengineServiceIPs
31TEMPLATE_STYLE_KEY = "style"
34class RequestValidator(ABC):
35 """
36 RequestValidators can be used to validate a request very early on. If the validate method returns a tuple,
37 the request is aborted. Can be used to block requests from bots.
39 To register or remove a validator, access it in main.py through
40 :attr: viur.core.request.Router.requestValidators
41 """
42 # Internal name to trace which validator aborted the request
43 name = "RequestValidator"
45 @staticmethod
46 @abstractmethod
47 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
48 """
49 The function that checks the current request. If the request is valid, simply return None.
50 If the request should be blocked, it must return a tuple of
51 - The HTTP status code (as int)
52 - The Description of that status code (eg "Forbidden")
53 - The Response Body (can be a simple string or an HTML-Page)
54 :param request: The Request instance to check
55 :return: None on success, an Error-Tuple otherwise
56 """
57 raise NotImplementedError()
60class FetchMetaDataValidator(RequestValidator):
61 """
62 This validator examines the headers "Sec-Fetch-Site", "sec-fetch-mode" and "sec-fetch-dest" as
63 recommended by https://web.dev/fetch-metadata/
64 """
65 name = "FetchMetaDataValidator"
67 @staticmethod
68 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
69 headers = request.request.headers
70 if not headers.get("sec-fetch-site"): # These headers are not send by all browsers
71 return None
72 if headers.get('sec-fetch-site') in {"same-origin", "none"}: # A Request from our site
73 return None
74 if os.environ['GAE_ENV'] == "localdev" and headers.get('sec-fetch-site') == "same-site":
75 # We are accepting a request with same-site only in local dev mode
76 return None
77 if headers.get('sec-fetch-mode') == 'navigate' and not request.isPostRequest \
78 and headers.get('sec-fetch-dest') not in {'object', 'embed'}: # Incoming navigation GET request
79 return None
80 return 403, "Forbidden", "Request rejected due to fetch metadata"
83class Router:
84 """
85 This class accepts the requests, collect its parameters and routes the request
86 to its destination function.
87 The basic control flow is
88 - Setting up internal variables
89 - Running the Request validators
90 - Emitting the headers (especially the security related ones)
91 - Run the TLS check (ensure it's a secure connection or check if the URL is whitelisted)
92 - Load or initialize a new session
93 - Set up i18n (choosing the language etc)
94 - Run the request preprocessor (if any)
95 - Normalize & sanity check the parameters
96 - Resolve the exposed function and call it
97 - Save the session / tear down the request
98 - Return the response generated
101 :warning: Don't instantiate! Don't subclass! DON'T TOUCH! ;)
102 """
104 # List of requestValidators used to preflight-check an request before it's being dispatched within ViUR
105 requestValidators = [FetchMetaDataValidator]
107 def __init__(self, environ: dict):
108 super().__init__()
109 self.startTime = time.time()
111 self.request = webob.Request(environ)
112 self.response = webob.Response()
114 self.maxLogLevel = logging.DEBUG
115 self._traceID = \
116 self.request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or utils.string.random()
117 self.is_deferred = False
118 self.path = ""
119 self.path_list = ()
121 self.skey_checked = False # indicates whether @skey-decorator-check has already performed within a request
122 self.internalRequest = False
123 self.disableCache = False # Shall this request bypass the caches?
124 self.pendingTasks = []
125 self.args = ()
126 self.kwargs = {}
127 self.context = {}
128 self.template_style: str | None = None
129 self.cors_headers = ()
131 # Check if it's a HTTP-Method we support
132 self.method = self.request.method.lower()
133 self.isPostRequest = self.method == "post"
134 self.isSSLConnection = self.request.host_url.lower().startswith("https://") # We have an encrypted channel
136 db.currentDbAccessLog.set(set())
138 # Set context variables
139 current.language.set(conf.i18n.default_language)
140 current.request.set(self)
141 current.session.set(session.Session())
142 current.request_data.set({})
144 # Process actual request
145 self._process()
147 self._cors()
149 # Unset context variables
150 current.language.set(None)
151 current.request_data.set(None)
152 current.session.set(None)
153 current.request.set(None)
154 current.user.set(None)
156 @property
157 def isDevServer(self) -> bool:
158 import warnings
159 msg = "Use of `isDevServer` is deprecated; Use `conf.instance.is_dev_server` instead!"
160 warnings.warn(msg, DeprecationWarning, stacklevel=2)
161 logging.warning(msg)
162 return conf.instance.is_dev_server
164 def _select_language(self, path: str) -> str:
165 """
166 Tries to select the best language for the current request. Depending on the value of
167 conf.i18n.language_method, we'll either try to load it from the session, determine it by the domain
168 or extract it from the URL.
169 """
171 def get_language_from_header() -> str | None:
172 if not (accept_language := self.request.headers.get("accept-language")):
173 return None
174 languages = accept_language.split(",")
175 locale_q_pairs = []
177 for language in languages:
178 if language.split(";")[0] == language:
179 # no q => q = 1
180 locale_q_pairs.append((language.strip(), "1"))
181 else:
182 locale = language.split(";")[0].strip()
183 q = language.split(";")[1].split("=")[1]
184 locale_q_pairs.append((locale, q))
185 for locale_q_pair in locale_q_pairs:
186 if "-" in locale_q_pair[0]: # Check for de-DE
187 lang = locale_q_pair[0].split("-")[0]
188 else:
189 lang = locale_q_pair[0]
190 if lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
191 return lang
192 return None
194 if not conf.i18n.available_languages:
195 # This project doesn't use the multi-language feature, nothing to do here
196 return path
197 if conf.i18n.language_method == "session":
198 current_session = current.session.get()
199 lang = conf.i18n.default_language
200 # We save the language in the session, if it exists, and try to load it from there
201 if "lang" in current_session:
202 current.language.set(current_session["lang"])
203 return path
205 if header_lang := get_language_from_header():
206 lang = header_lang
207 current.language.set(lang)
209 elif header_lang := self.request.headers.get("X-Appengine-Country"):
210 header_lang = str(header_lang).lower()
211 if header_lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
212 lang = header_lang
214 if current_session.loaded:
215 current_session["lang"] = lang
216 current.language.set(lang)
218 elif conf.i18n.language_method == "domain":
219 host = self.request.host_url.lower()
220 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
221 if host.startswith("www."):
222 host = host[4:]
223 if lang := conf.i18n.domain_language_mapping.get(host):
224 current.language.set(lang)
225 # We have no language configured for this domain, try to read it from the HTTP Header
226 elif lang := get_language_from_header():
227 current.language.set(lang)
229 elif conf.i18n.language_method == "url":
230 tmppath = urlparse(path).path
231 tmppath = [unquote(x) for x in tmppath.lower().strip("/").split("/")]
232 if (
233 len(tmppath) > 0
234 and tmppath[0] in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys())
235 ):
236 current.language.set(tmppath[0])
237 return path[len(tmppath[0]) + 1:] # Return the path stripped by its language segment
238 else: # This URL doesnt contain an language prefix, try to read it from session
239 if header_lang := get_language_from_header():
240 current.language.set(header_lang)
241 elif header_lang := self.request.headers.get("X-Appengine-Country"):
242 lang = str(header_lang).lower()
243 if lang in conf.i18n.available_languages or lang in conf.i18n.language_alias_map:
244 current.language.set(lang)
245 elif conf.i18n.language_method == "header":
246 if lang := get_language_from_header():
247 current.language.set(lang)
249 return path
251 def _process(self):
252 if self.method not in ("get", "post", "head", "options"):
253 logging.error(f"{self.method=} not supported")
254 return
256 if self.request.headers.get("X-AppEngine-TaskName", None) is not None: # Check if we run in the appengine
257 if self.request.environ.get("HTTP_X_APPENGINE_USER_IP") in _appengineServiceIPs:
258 self.is_deferred = True
259 elif os.getenv("TASKS_EMULATOR") is not None:
260 self.is_deferred = True
262 current.language.set(conf.i18n.default_language)
263 # Check if we should process or abort the request
264 for validator, reqValidatorResult in [(x, x.validate(self)) for x in self.requestValidators]:
265 if reqValidatorResult is not None:
266 logging.warning(f"Request rejected by validator {validator.name}")
267 statusCode, statusStr, statusDescr = reqValidatorResult
268 self.response.status = f"{statusCode} {statusStr}"
269 self.response.write(statusDescr)
270 return
272 path = self.request.path
274 # Add CSP headers early (if any)
275 if conf.security.content_security_policy and conf.security.content_security_policy["_headerCache"]:
276 for k, v in conf.security.content_security_policy["_headerCache"].items():
277 self.response.headers[k] = v
278 if self.isSSLConnection: # Check for HTST and PKP headers only if we have a secure channel.
279 if conf.security.strict_transport_security:
280 self.response.headers["Strict-Transport-Security"] = conf.security.strict_transport_security
281 # Check for X-Security-Headers we shall emit
282 if conf.security.x_content_type_options:
283 self.response.headers["X-Content-Type-Options"] = "nosniff"
284 if conf.security.x_xss_protection is not None:
285 if conf.security.x_xss_protection:
286 self.response.headers["X-XSS-Protection"] = "1; mode=block"
287 elif conf.security.x_xss_protection is False:
288 self.response.headers["X-XSS-Protection"] = "0"
289 if conf.security.x_frame_options is not None and isinstance(conf.security.x_frame_options, tuple):
290 mode, uri = conf.security.x_frame_options
291 if mode in ["deny", "sameorigin"]:
292 self.response.headers["X-Frame-Options"] = mode
293 elif mode == "allow-from":
294 self.response.headers["X-Frame-Options"] = f"allow-from {uri}"
295 if conf.security.x_permitted_cross_domain_policies is not None:
296 self.response.headers["X-Permitted-Cross-Domain-Policies"] = conf.security.x_permitted_cross_domain_policies
297 if conf.security.referrer_policy:
298 self.response.headers["Referrer-Policy"] = conf.security.referrer_policy
299 if conf.security.permissions_policy.get("_headerCache"):
300 self.response.headers["Permissions-Policy"] = conf.security.permissions_policy["_headerCache"]
301 if conf.security.enable_coep:
302 self.response.headers["Cross-Origin-Embedder-Policy"] = "require-corp"
303 if conf.security.enable_coop:
304 self.response.headers["Cross-Origin-Opener-Policy"] = conf.security.enable_coop
305 if conf.security.enable_corp:
306 self.response.headers["Cross-Origin-Resource-Policy"] = conf.security.enable_corp
308 # Ensure that TLS is used if required
309 if conf.security.force_ssl and not self.isSSLConnection and not conf.instance.is_dev_server:
310 isWhitelisted = False
311 reqPath = self.request.path
312 for testUrl in conf.security.no_ssl_check_urls:
313 if testUrl.endswith("*"):
314 if reqPath.startswith(testUrl[:-1]):
315 isWhitelisted = True
316 break
317 else:
318 if testUrl == reqPath:
319 isWhitelisted = True
320 break
321 if not isWhitelisted: # Some URLs need to be whitelisted (as f.e. the Tasks-Queue doesn't call using https)
322 # Redirect the user to the startpage (using ssl this time)
323 host = self.request.host_url.lower()
324 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
325 self.response.status = "302 Found"
326 self.response.headers['Location'] = f"https://{host}/"
327 return
328 if path.startswith("/_ah/warmup"):
329 self.response.write("okay")
330 return
332 try:
333 current.session.get().load()
335 # Load current user into context variable if user module is there.
336 if user_mod := getattr(conf.main_app.vi, "user", None):
337 current.user.set(user_mod.getCurrentUser())
339 path = self._select_language(path)[1:]
341 # Check for closed system
342 if conf.security.closed_system and self.method != "options":
343 if not current.user.get():
344 if not any(fnmatch.fnmatch(path, pat) for pat in conf.security.closed_system_allowed_paths):
345 raise errors.Unauthorized()
347 if conf.request_preprocessor:
348 path = conf.request_preprocessor(path)
350 self._route(path)
352 except errors.Redirect as e:
353 if conf.debug.trace_exceptions:
354 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
355 raise
356 self.response.status = f"{e.status} {e.name}"
357 url = e.url
358 if url.startswith(('.', '/')):
359 url = str(urljoin(self.request.url, url))
360 self.response.headers['Location'] = url
362 except Exception as e:
363 if conf.debug.trace_exceptions:
364 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
365 raise
366 self.response.body = b""
367 if isinstance(e, errors.HTTPException):
368 logging.info(f"[{e.status}] {e.name}: {e.descr}", exc_info=conf.debug.trace)
369 self.response.status = f"{e.status} {e.name}"
370 # Set machine-readable x-viur-error response header in case there is an exception description.
371 if e.descr:
372 self.response.headers["x-viur-error"] = e.descr.replace("\n", "")
373 else:
374 self.response.status = 500
375 logging.error("ViUR has caught an unhandled exception!")
376 logging.exception(e)
378 res = None
379 if conf.error_handler:
380 try:
381 res = conf.error_handler(e)
382 except Exception as newE:
383 logging.error("viur.error_handler failed!")
384 logging.exception(newE)
385 res = None
386 if not res:
387 descr = "The server encountered an unexpected error and is unable to process your request."
389 if isinstance(e, errors.HTTPException):
390 error_info = {
391 "status": e.status,
392 "reason": e.name,
393 "title": str(translate(e.name)),
394 "descr": e.descr,
395 }
396 else:
397 error_info = {
398 "status": 500,
399 "reason": "Internal Server Error",
400 "title": str(translate("Internal Server Error")),
401 "descr": descr
402 }
404 if conf.instance.is_dev_server:
405 error_info["traceback"] = traceback.format_exc()
407 error_info["logo"] = conf.error_logo
409 if (len(self.path_list) > 0 and self.path_list[0] in ("vi", "json")) or \
410 current.request.get().response.headers["Content-Type"] == "application/json":
411 current.request.get().response.headers["Content-Type"] = "application/json"
412 res = json.dumps(error_info)
413 else: # We render the error in html
414 # Try to get the template from html/error/
415 if filename := conf.main_app.render.getTemplateFileName((f"{error_info['status']}", "error"),
416 raise_exception=False):
417 template = conf.main_app.render.getEnv().get_template(filename)
418 try:
419 uses_unsafe_inline = \
420 "unsafe-inline" in conf.security.content_security_policy["enforce"]["style-src"]
421 except (KeyError, TypeError): # Not set
422 uses_unsafe_inline = False
423 if uses_unsafe_inline:
424 logging.info("Using style-src:unsafe-inline, don't create a nonce")
425 nonce = None
426 else:
427 nonce = utils.string.random(16)
428 extendCsp({"style-src": [f"nonce-{nonce}"]})
429 res = template.render(error_info, nonce=nonce)
430 else:
431 res = (f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">'
432 f'<title>{error_info["status"]} - {error_info["reason"]}</title>'
433 f'</head><body><h1>{error_info["status"]} - {error_info["reason"]}</h1>')
435 self.response.write(res.encode("UTF-8"))
437 finally:
438 current.session.get().save()
439 if conf.instance.is_dev_server and conf.debug.dev_server_cloud_logging:
440 # Emit the outer log only on dev_appserver (we'll use the existing request log when live)
441 SEVERITY = "DEBUG"
442 if self.maxLogLevel >= 50:
443 SEVERITY = "CRITICAL"
444 elif self.maxLogLevel >= 40:
445 SEVERITY = "ERROR"
446 elif self.maxLogLevel >= 30:
447 SEVERITY = "WARNING"
448 elif self.maxLogLevel >= 20:
449 SEVERITY = "INFO"
451 TRACE = "projects/{}/traces/{}".format(loggingClient.project, self._traceID)
453 REQUEST = {
454 'requestMethod': self.request.method,
455 'requestUrl': self.request.url,
456 'status': self.response.status_code,
457 'userAgent': self.request.headers.get('USER-AGENT'),
458 'responseSize': self.response.content_length,
459 'latency': "%0.3fs" % (time.time() - self.startTime),
460 'remoteIp': self.request.environ.get("HTTP_X_APPENGINE_USER_IP")
461 }
462 requestLogger.log_text(
463 "",
464 client=loggingClient,
465 severity=SEVERITY,
466 http_request=REQUEST,
467 trace=TRACE,
468 resource=requestLoggingRessource,
469 operation={
470 "first": True,
471 "last": True,
472 "id": self._traceID
473 }
474 )
476 if conf.instance.is_dev_server:
477 self.is_deferred = True
479 while self.pendingTasks:
480 task = self.pendingTasks.pop()
481 logging.debug(f"Deferred task emulation, executing {task=}")
482 try:
483 task()
484 except Exception: # noqa
485 logging.exception(f"Deferred Task emulation {task} failed")
487 def _route(self, path: str) -> None:
488 """
489 Does the actual work of sanitizing the parameter, determine which exposed-function to call
490 (and with which parameters)
491 """
493 # Parse the URL
494 if path := parse.urlparse(path).path:
495 self.path = path
496 self.path_list = tuple(unicodedata.normalize("NFC", parse.unquote(part))
497 for part in path.strip("/").split("/"))
499 # Prevent Hash-collision attacks
500 if len(self.request.params) > conf.max_post_params_count:
501 raise errors.BadRequest(
502 f"Too many arguments supplied, exceeding maximum"
503 f" of {conf.max_post_params_count} allowed arguments per request"
504 )
506 param_filter = conf.param_filter_function
507 if param_filter and not callable(param_filter):
508 raise ValueError(f"""{param_filter=} is not callable""")
510 for key, value in self.request.params.items():
511 try:
512 key = unicodedata.normalize("NFC", key)
513 value = unicodedata.normalize("NFC", value)
514 except UnicodeError:
515 # We received invalid unicode data (usually happens when
516 # someone tries to exploit unicode normalisation bugs)
517 raise errors.BadRequest()
519 if param_filter and param_filter(key, value):
520 continue
522 if key == TEMPLATE_STYLE_KEY:
523 self.template_style = value
524 continue
526 if key in self.kwargs:
527 if isinstance(self.kwargs[key], list):
528 self.kwargs[key].append(value)
529 else: # Convert that key to a list
530 self.kwargs[key] = [self.kwargs[key], value]
531 else:
532 self.kwargs[key] = value
534 if "self" in self.kwargs or "return" in self.kwargs: # self or return is reserved for bound methods
535 raise errors.BadRequest()
537 caller = conf.main_resolver
538 idx = 0 # Count how may items from *args we'd have consumed (so the rest can go into *args of the called func
539 path_found = True
541 for part in self.path_list:
542 # TODO: Remove canAccess guards... solve differently.
543 if "canAccess" in caller and not caller["canAccess"]():
544 # We have a canAccess function guarding that object,
545 # and it returns False...
546 raise errors.Unauthorized()
548 idx += 1
550 if part not in caller:
551 part = "index"
553 if caller := caller.get(part):
554 if isinstance(caller, Method):
555 if part == "index":
556 idx -= 1
558 self.args = tuple(self.path_list[idx:])
559 break
561 elif part == "index":
562 path_found = False
563 break
565 else:
566 path_found = False
567 break
569 if not path_found:
570 raise errors.NotFound(
571 f"""The path {utils.string.escape("/".join(self.path_list[:idx]))} could not be found""")
573 if not isinstance(caller, Method):
574 # try to find "index" function
575 if (index := caller.get("index")) and isinstance(index, Method):
576 caller = index
577 else:
578 raise errors.MethodNotAllowed()
580 # Check for internal exposed
581 if caller.exposed is False and not self.internalRequest:
582 raise errors.NotFound()
584 # Fill the Allow header of the response with the allowed HTTP methods
585 if self.method == "options":
586 self.response.headers["Allow"] = ", ".join(sorted(caller.methods)).upper()
588 # Register caller specific CORS headers
589 self.cors_headers = [str(header).lower() for header in caller.cors_allow_headers or ()]
591 # Check for @force_ssl flag
592 if not self.internalRequest \
593 and caller.ssl \
594 and not self.request.host_url.lower().startswith("https://") \
595 and not conf.instance.is_dev_server:
596 raise errors.PreconditionFailed("You must use SSL to access this resource!")
598 # Check for @force_post flag
599 if not self.isPostRequest and caller.methods == ("POST",):
600 raise errors.MethodNotAllowed("You must use POST to access this resource!")
602 # Check if this request should bypass the caches
603 if self.request.headers.get("X-Viur-Disable-Cache"):
604 # No cache requested, check if the current user is allowed to do so
605 if (user := current.user.get()) and "root" in user["access"]:
606 logging.debug("Caching disabled by X-Viur-Disable-Cache header")
607 self.disableCache = True
609 # Destill context as self.context, if available
610 if context := {k: v for k, v in self.kwargs.items() if k.startswith("@")}:
611 # Remove context parameters from kwargs
612 kwargs = {k: v for k, v in self.kwargs.items() if k not in context}
613 # Remove leading "@" from context parameters
614 self.context |= {k[1:]: v for k, v in context.items() if len(k) > 1}
615 else:
616 kwargs = self.kwargs
618 if ((self.internalRequest and conf.debug.trace_internal_call_routing)
619 or conf.debug.trace_external_call_routing):
620 logging.debug(
621 f"Calling {caller._func!r} with args={self.args!r}, {kwargs=} within context={self.context!r}"
622 )
624 if self.method == "options":
625 # OPTIONS request doesn't have a body
626 del self.response.app_iter
627 del self.response.content_type
628 self.response.status = "204 No Content"
629 return
631 # Now call the routed method!
632 res = caller(*self.args, **kwargs)
634 if self.method == "options":
635 # OPTIONS request doesn't have a body
636 del self.response.app_iter
637 del self.response.content_type
638 self.response.status = "204 No Content"
639 return
641 if not isinstance(res, bytes): # Convert the result to bytes if it is not already!
642 res = str(res).encode("UTF-8")
643 self.response.write(res)
645 def _cors(self) -> None:
646 """
647 Set CORS headers to the HTTP response.
649 .. seealso::
651 Option :attr:`core.config.Security.cors_origins`, etc.
652 for cors settings.
654 https://fetch.spec.whatwg.org/#http-cors-protocol
656 https://enable-cors.org/server.html
658 https://www.html5rocks.com/static/images/cors_server_flowchart.png
659 """
661 def test_candidates(value: str, *candidates: str | re.Pattern) -> bool:
662 """Test if the value matches the pattern of any candidate"""
663 for candidate in candidates:
664 if isinstance(candidate, re.Pattern):
665 if candidate.match(value):
666 return True
667 elif isinstance(candidate, str):
668 if candidate.lower() == str(value).lower():
669 return True
670 else:
671 raise TypeError(
672 f"Invalid setting {candidate}. "
673 f"Expected a string or a compiled regex."
674 )
675 return False
677 origin = current.request.get().request.headers.get("Origin")
678 if not origin:
679 logging.debug(f"Origin header is not set")
680 return
682 # Origin is set --> It's a CORS request
683 logging.debug(f"Got CORS request from {origin=}")
685 any_origin_allowed = (
686 conf.security.cors_origins == "*"
687 or any(_origin == "*" for _origin in conf.security.cors_origins)
688 or any(_origin.pattern == r".*"
689 for _origin in conf.security.cors_origins
690 if isinstance(_origin, re.Pattern))
691 )
693 if any_origin_allowed and conf.security.cors_origins_use_wildcard:
694 if conf.security.cors_allow_credentials:
695 raise RuntimeError(
696 "Invalid CORS config: "
697 "If credentials mode is \"include\", then `Access-Control-Allow-Origin` cannot be `*`. "
698 "See https://fetch.spec.whatwg.org/#cors-protocol-and-credentials"
699 )
700 self.response.headers["Access-Control-Allow-Origin"] = "*"
702 elif test_candidates(origin, *conf.security.cors_origins):
703 self.response.headers["Access-Control-Allow-Origin"] = origin
705 else:
706 logging.warning(f"{origin=} not valid (must be one of {conf.security.cors_origins=})")
707 return
709 if conf.security.cors_allow_credentials:
710 self.response.headers["Access-Control-Allow-Credentials"] = "true"
712 if self.method == "options":
713 method = (self.request.headers.get("Access-Control-Request-Method") or "").lower()
715 if method in conf.security.cors_methods:
716 # It's a CORS-preflight request
717 # - MUST include Access-Control-Request-Method
718 # - CAN include Access-Control-Request-Headers
720 # The response can be cached
721 if conf.security.cors_max_age is not None:
722 assert isinstance(conf.security.cors_max_age, datetime.timedelta)
723 self.response.headers["Access-Control-Max-Age"] = \
724 str(int(conf.security.cors_max_age.total_seconds()))
726 # Allowed methods
727 self.response.headers["Access-Control-Allow-Methods"] = ", ".join(
728 sorted(conf.security.cors_methods)).upper()
730 # Allowed headers
731 request_headers = self.request.headers.get("Access-Control-Request-Headers")
732 request_headers = [h.strip().lower() for h in request_headers.split(",")]
733 if conf.security.cors_allow_headers == "*":
734 # Every header is allowed
735 allow_headers = request_headers[:]
736 else:
737 # There are generally headers allowed and/or from the caller
738 allow_headers = [
739 header
740 for header in request_headers
741 if test_candidates(
742 header,
743 *(self.cors_headers or ()), # caller specific
744 *(conf.security.cors_allow_headers or ()) # generally global
745 )
746 ]
747 if allow_headers:
748 self.response.headers["Access-Control-Allow-Headers"] = ", ".join(sorted(allow_headers))
750 else:
751 logging.warning(
752 f"Access-Control-Request-Method: {method} is NOT a valid method of {conf.security.cors_methods=}. "
753 f"Don't append CORS-preflight request headers"
754 )
756 def saveSession(self) -> None:
757 current.session.get().save()
760from .i18n import translate # noqa: E402