Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/request.py: 6%
436 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 11:31 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 11:31 +0000
1"""
2 This module implements the WSGI (Web Server Gateway Interface) layer for ViUR. This is the main entry
3 point for incomming http requests. The main class is the :class:BrowserHandler. Each request will get it's
4 own instance of that class which then holds the reference to the request and response object.
5 Additionally, this module defines the RequestValidator interface which provides a very early hook into the
6 request processing (useful for global ratelimiting, DDoS prevention or access control).
7"""
8import datetime
9import fnmatch
10import json
11import logging
12import os
13import re
14import time
15import traceback
16import typing as t
17import unicodedata
18from abc import ABC, abstractmethod
19from urllib import parse
20from urllib.parse import quote, unquote, urljoin, urlparse
22import webob
24from viur.core import current, db, errors, session, utils
25from viur.core.config import conf
26from viur.core.logging import client as loggingClient, requestLogger, requestLoggingRessource
27from viur.core.module import Method
28from viur.core.securityheaders import extendCsp
29from viur.core.tasks import _appengineServiceIPs
31TEMPLATE_STYLE_KEY = "style"
34class RequestValidator(ABC):
35 """
36 RequestValidators can be used to validate a request very early on. If the validate method returns a tuple,
37 the request is aborted. Can be used to block requests from bots.
39 To register or remove a validator, access it in main.py through
40 :attr: viur.core.request.Router.requestValidators
41 """
42 # Internal name to trace which validator aborted the request
43 name = "RequestValidator"
45 @staticmethod
46 @abstractmethod
47 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
48 """
49 The function that checks the current request. If the request is valid, simply return None.
50 If the request should be blocked, it must return a tuple of
51 - The HTTP status code (as int)
52 - The Description of that status code (eg "Forbidden")
53 - The Response Body (can be a simple string or an HTML-Page)
54 :param request: The Request instance to check
55 :return: None on success, an Error-Tuple otherwise
56 """
57 raise NotImplementedError()
60class FetchMetaDataValidator(RequestValidator):
61 """
62 This validator examines the headers "Sec-Fetch-Site", "sec-fetch-mode" and "sec-fetch-dest" as
63 recommended by https://web.dev/fetch-metadata/
64 """
65 name = "FetchMetaDataValidator"
67 @staticmethod
68 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
69 headers = request.request.headers
70 if not headers.get("sec-fetch-site"): # These headers are not send by all browsers
71 return None
72 if headers.get('sec-fetch-site') in {"same-origin", "none"}: # A Request from our site
73 return None
74 if os.environ['GAE_ENV'] == "localdev" and headers.get('sec-fetch-site') == "same-site":
75 # We are accepting a request with same-site only in local dev mode
76 return None
77 if headers.get('sec-fetch-mode') == 'navigate' and not request.isPostRequest \
78 and headers.get('sec-fetch-dest') not in {'object', 'embed'}: # Incoming navigation GET request
79 return None
80 return 403, "Forbidden", "Request rejected due to fetch metadata"
83class Router:
84 """
85 This class accepts the requests, collect its parameters and routes the request
86 to its destination function.
87 The basic control flow is
88 - Setting up internal variables
89 - Running the Request validators
90 - Emitting the headers (especially the security related ones)
91 - Run the TLS check (ensure it's a secure connection or check if the URL is whitelisted)
92 - Load or initialize a new session
93 - Set up i18n (choosing the language etc)
94 - Run the request preprocessor (if any)
95 - Normalize & sanity check the parameters
96 - Resolve the exposed function and call it
97 - Save the session / tear down the request
98 - Return the response generated
101 :warning: Don't instantiate! Don't subclass! DON'T TOUCH! ;)
102 """
104 # List of requestValidators used to preflight-check an request before it's being dispatched within ViUR
105 requestValidators = [FetchMetaDataValidator]
107 def __init__(self, environ: dict):
108 super().__init__()
109 self.startTime = time.time()
111 self.request = webob.Request(environ)
112 self.response = webob.Response()
114 self.maxLogLevel = logging.DEBUG
115 self._traceID = \
116 self.request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or utils.string.random()
117 self.is_deferred = False
118 self.path = ""
119 self.path_list = ()
121 self.skey_checked = False # indicates whether @skey-decorator-check has already performed within a request
122 self.internalRequest = False
123 self.disableCache = False # Shall this request bypass the caches?
124 self.pendingTasks = []
125 self.args = ()
126 self.kwargs = {}
127 self.context = {}
128 self.template_style: str | None = None
129 self.cors_headers = ()
131 # Check if it's a HTTP-Method we support
132 self.method = self.request.method.lower()
133 self.isPostRequest = self.method == "post"
134 self.isSSLConnection = self.request.host_url.lower().startswith("https://") # We have an encrypted channel
136 db.currentDbAccessLog.set(set())
138 # Set context variables
139 current.language.set(conf.i18n.default_language)
140 current.request.set(self)
141 current.session.set(session.Session())
142 current.request_data.set({})
144 # Process actual request
145 self._process()
147 self._cors()
149 # Unset context variables
150 current.language.set(None)
151 current.request_data.set(None)
152 current.session.set(None)
153 current.request.set(None)
154 current.user.set(None)
156 @property
157 def isDevServer(self) -> bool:
158 import warnings
159 msg = "Use of `isDevServer` is deprecated; Use `conf.instance.is_dev_server` instead!"
160 warnings.warn(msg, DeprecationWarning, stacklevel=2)
161 logging.warning(msg)
162 return conf.instance.is_dev_server
164 def _select_language(self, path: str) -> str:
165 """
166 Tries to select the best language for the current request. Depending on the value of
167 conf.i18n.language_method, we'll either try to load it from the session, determine it by the domain
168 or extract it from the URL.
169 """
171 def get_language_from_header() -> str | None:
172 if not (accept_language := self.request.headers.get("accept-language")):
173 return None
174 languages = accept_language.split(",")
175 locale_q_pairs = []
177 for language in languages:
178 if language.split(";")[0] == language:
179 # no q => q = 1
180 locale_q_pairs.append((language.strip(), "1"))
181 else:
182 locale = language.split(";")[0].strip()
183 q = language.split(";")[1].split("=")[1]
184 locale_q_pairs.append((locale, q))
185 for locale_q_pair in locale_q_pairs:
186 if "-" in locale_q_pair[0]: # Check for de-DE
187 lang = locale_q_pair[0].split("-")[0]
188 else:
189 lang = locale_q_pair[0]
190 if lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
191 return lang
192 return None
194 if not conf.i18n.available_languages:
195 # This project doesn't use the multi-language feature, nothing to do here
196 return path
197 if conf.i18n.language_method == "session":
198 current_session = current.session.get()
199 lang = conf.i18n.default_language
200 # We save the language in the session, if it exists, and try to load it from there
201 if "lang" in current_session:
202 current.language.set(current_session["lang"])
203 return path
205 if header_lang := get_language_from_header():
206 lang = header_lang
207 current.language.set(lang)
209 elif header_lang := self.request.headers.get("X-Appengine-Country"):
210 header_lang = str(header_lang).lower()
211 if header_lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
212 lang = header_lang
214 if current_session.loaded:
215 current_session["lang"] = lang
216 current.language.set(lang)
218 elif conf.i18n.language_method == "domain":
219 host = self.request.host_url.lower()
220 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
221 if host.startswith("www."):
222 host = host[4:]
223 if lang := conf.i18n.domain_language_mapping.get(host):
224 current.language.set(lang)
225 # We have no language configured for this domain, try to read it from the HTTP Header
226 elif lang := get_language_from_header():
227 current.language.set(lang)
229 elif conf.i18n.language_method == "url":
230 tmppath = urlparse(path).path
231 tmppath = [unquote(x) for x in tmppath.lower().strip("/").split("/")]
232 if (
233 len(tmppath) > 0
234 and tmppath[0] in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys())
235 ):
236 current.language.set(tmppath[0])
237 return path[len(tmppath[0]) + 1:] # Return the path stripped by its language segment
238 else: # This URL doesnt contain an language prefix, try to read it from session
239 if header_lang := get_language_from_header():
240 current.language.set(header_lang)
241 elif header_lang := self.request.headers.get("X-Appengine-Country"):
242 lang = str(header_lang).lower()
243 if lang in conf.i18n.available_languages or lang in conf.i18n.language_alias_map:
244 current.language.set(lang)
245 elif conf.i18n.language_method == "header":
246 if lang := get_language_from_header():
247 current.language.set(lang)
249 return path
251 def _process(self):
252 if self.method not in ("get", "post", "head", "options"):
253 logging.error(f"{self.method=} not supported")
254 return
256 if self.request.headers.get("X-AppEngine-TaskName", None) is not None: # Check if we run in the appengine
257 if self.request.environ.get("HTTP_X_APPENGINE_USER_IP") in _appengineServiceIPs:
258 self.is_deferred = True
259 elif os.getenv("TASKS_EMULATOR") is not None:
260 self.is_deferred = True
262 current.language.set(conf.i18n.default_language)
263 # Check if we should process or abort the request
264 for validator, reqValidatorResult in [(x, x.validate(self)) for x in self.requestValidators]:
265 if reqValidatorResult is not None:
266 logging.warning(f"Request rejected by validator {validator.name}")
267 statusCode, statusStr, statusDescr = reqValidatorResult
268 self.response.status = f"{statusCode} {statusStr}"
269 self.response.write(statusDescr)
270 return
272 path = self.request.path
274 # Add CSP headers early (if any)
275 if conf.security.content_security_policy and conf.security.content_security_policy["_headerCache"]:
276 for k, v in conf.security.content_security_policy["_headerCache"].items():
277 self.response.headers[k] = v
278 if self.isSSLConnection: # Check for HTST and PKP headers only if we have a secure channel.
279 if conf.security.strict_transport_security:
280 self.response.headers["Strict-Transport-Security"] = conf.security.strict_transport_security
281 # Check for X-Security-Headers we shall emit
282 if conf.security.x_content_type_options:
283 self.response.headers["X-Content-Type-Options"] = "nosniff"
284 if conf.security.x_xss_protection is not None:
285 if conf.security.x_xss_protection:
286 self.response.headers["X-XSS-Protection"] = "1; mode=block"
287 elif conf.security.x_xss_protection is False:
288 self.response.headers["X-XSS-Protection"] = "0"
289 if conf.security.x_frame_options is not None and isinstance(conf.security.x_frame_options, tuple):
290 mode, uri = conf.security.x_frame_options
291 if mode in ["deny", "sameorigin"]:
292 self.response.headers["X-Frame-Options"] = mode
293 elif mode == "allow-from":
294 self.response.headers["X-Frame-Options"] = f"allow-from {uri}"
295 if conf.security.x_permitted_cross_domain_policies is not None:
296 self.response.headers["X-Permitted-Cross-Domain-Policies"] = conf.security.x_permitted_cross_domain_policies
297 if conf.security.referrer_policy:
298 self.response.headers["Referrer-Policy"] = conf.security.referrer_policy
299 if conf.security.permissions_policy.get("_headerCache"):
300 self.response.headers["Permissions-Policy"] = conf.security.permissions_policy["_headerCache"]
301 if conf.security.enable_coep:
302 self.response.headers["Cross-Origin-Embedder-Policy"] = "require-corp"
303 if conf.security.enable_coop:
304 self.response.headers["Cross-Origin-Opener-Policy"] = conf.security.enable_coop
305 if conf.security.enable_corp:
306 self.response.headers["Cross-Origin-Resource-Policy"] = conf.security.enable_corp
308 # Ensure that TLS is used if required
309 if conf.security.force_ssl and not self.isSSLConnection and not conf.instance.is_dev_server:
310 isWhitelisted = False
311 reqPath = self.request.path
312 for testUrl in conf.security.no_ssl_check_urls:
313 if testUrl.endswith("*"):
314 if reqPath.startswith(testUrl[:-1]):
315 isWhitelisted = True
316 break
317 else:
318 if testUrl == reqPath:
319 isWhitelisted = True
320 break
321 if not isWhitelisted: # Some URLs need to be whitelisted (as f.e. the Tasks-Queue doesn't call using https)
322 # Redirect the user to the startpage (using ssl this time)
323 host = self.request.host_url.lower()
324 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
325 self.response.status = "302 Found"
326 self.response.headers['Location'] = f"https://{host}/"
327 return
328 if path.startswith("/_ah/warmup"):
329 self.response.write("okay")
330 return
332 try:
333 current.session.get().load()
335 # Load current user into context variable if user module is there.
336 if user_mod := getattr(conf.main_app.vi, "user", None):
337 current.user.set(user_mod.getCurrentUser())
339 path = self._select_language(path)[1:]
341 # Check for closed system
342 if conf.security.closed_system and self.method != "options":
343 if not current.user.get():
344 if not any(fnmatch.fnmatch(path, pat) for pat in conf.security.closed_system_allowed_paths):
345 raise errors.Unauthorized()
347 if conf.request_preprocessor:
348 path = conf.request_preprocessor(path)
350 self._route(path)
352 except errors.Redirect as e:
353 if conf.debug.trace_exceptions:
354 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
355 raise
356 self.response.status = f"{e.status} {e.name}"
357 url = e.url
358 url = unquote(url) # decode first
359 # safe = https://url.spec.whatwg.org/#url-path-segment-string
360 url = quote(url, encoding="utf-8", safe="!$&'()*+,-./:;=?@_~") # re-encode all in utf-8
361 if url.startswith(('.', '/')):
362 url = str(urljoin(self.request.url, url))
363 self.response.headers['Location'] = url
365 except Exception as e:
366 if conf.debug.trace_exceptions:
367 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
368 raise
369 self.response.body = b""
370 if isinstance(e, errors.HTTPException):
371 logging.info(f"[{e.status}] {e.name}: {e.descr}", exc_info=conf.debug.trace)
372 self.response.status = f"{e.status} {e.name}"
373 # Set machine-readable x-viur-error response header in case there is an exception description.
374 if e.descr:
375 self.response.headers["x-viur-error"] = e.descr.replace("\n", "")
376 else:
377 self.response.status = 500
378 logging.error("ViUR has caught an unhandled exception!")
379 logging.exception(e)
381 res = None
382 if conf.error_handler:
383 try:
384 res = conf.error_handler(e)
385 except Exception as newE:
386 logging.error("viur.error_handler failed!")
387 logging.exception(newE)
388 res = None
389 if not res:
390 descr = "The server encountered an unexpected error and is unable to process your request."
392 if isinstance(e, errors.HTTPException):
393 error_info = {
394 "status": e.status,
395 "reason": e.name,
396 "title": str(translate(e.name)),
397 "descr": e.descr,
398 }
399 else:
400 error_info = {
401 "status": 500,
402 "reason": "Internal Server Error",
403 "title": str(translate("Internal Server Error")),
404 "descr": descr
405 }
407 if conf.instance.is_dev_server:
408 error_info["traceback"] = traceback.format_exc()
410 error_info["logo"] = conf.error_logo
412 if (len(self.path_list) > 0 and self.path_list[0] in ("vi", "json")) or \
413 current.request.get().response.headers["Content-Type"] == "application/json":
414 current.request.get().response.headers["Content-Type"] = "application/json"
415 res = json.dumps(error_info)
416 else: # We render the error in html
417 # Try to get the template from html/error/
418 if filename := conf.main_app.render.getTemplateFileName((f"{error_info['status']}", "error"),
419 raise_exception=False):
420 template = conf.main_app.render.getEnv().get_template(filename)
421 try:
422 uses_unsafe_inline = \
423 "unsafe-inline" in conf.security.content_security_policy["enforce"]["style-src"]
424 except (KeyError, TypeError): # Not set
425 uses_unsafe_inline = False
426 if uses_unsafe_inline:
427 logging.info("Using style-src:unsafe-inline, don't create a nonce")
428 nonce = None
429 else:
430 nonce = utils.string.random(16)
431 extendCsp({"style-src": [f"nonce-{nonce}"]})
432 res = template.render(error_info, nonce=nonce)
433 else:
434 res = (f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">'
435 f'<title>{error_info["status"]} - {error_info["reason"]}</title>'
436 f'</head><body><h1>{error_info["status"]} - {error_info["reason"]}</h1>')
438 self.response.write(res.encode("UTF-8"))
440 finally:
441 current.session.get().save()
442 if conf.instance.is_dev_server and conf.debug.dev_server_cloud_logging:
443 # Emit the outer log only on dev_appserver (we'll use the existing request log when live)
444 SEVERITY = "DEBUG"
445 if self.maxLogLevel >= 50:
446 SEVERITY = "CRITICAL"
447 elif self.maxLogLevel >= 40:
448 SEVERITY = "ERROR"
449 elif self.maxLogLevel >= 30:
450 SEVERITY = "WARNING"
451 elif self.maxLogLevel >= 20:
452 SEVERITY = "INFO"
454 TRACE = "projects/{}/traces/{}".format(loggingClient.project, self._traceID)
456 REQUEST = {
457 'requestMethod': self.request.method,
458 'requestUrl': self.request.url,
459 'status': self.response.status_code,
460 'userAgent': self.request.headers.get('USER-AGENT'),
461 'responseSize': self.response.content_length,
462 'latency': "%0.3fs" % (time.time() - self.startTime),
463 'remoteIp': self.request.environ.get("HTTP_X_APPENGINE_USER_IP")
464 }
465 requestLogger.log_text(
466 "",
467 client=loggingClient,
468 severity=SEVERITY,
469 http_request=REQUEST,
470 trace=TRACE,
471 resource=requestLoggingRessource,
472 operation={
473 "first": True,
474 "last": True,
475 "id": self._traceID
476 }
477 )
479 if conf.instance.is_dev_server:
480 self.is_deferred = True
482 while self.pendingTasks:
483 task = self.pendingTasks.pop()
484 logging.debug(f"Deferred task emulation, executing {task=}")
485 try:
486 task()
487 except Exception: # noqa
488 logging.exception(f"Deferred Task emulation {task} failed")
490 def _route(self, path: str) -> None:
491 """
492 Does the actual work of sanitizing the parameter, determine which exposed-function to call
493 (and with which parameters)
494 """
496 # Parse the URL
497 if path := parse.urlparse(path).path:
498 self.path = path
499 self.path_list = tuple(unicodedata.normalize("NFC", parse.unquote(part))
500 for part in path.strip("/").split("/"))
502 # Prevent Hash-collision attacks
503 if len(self.request.params) > conf.max_post_params_count:
504 raise errors.BadRequest(
505 f"Too many arguments supplied, exceeding maximum"
506 f" of {conf.max_post_params_count} allowed arguments per request"
507 )
509 param_filter = conf.param_filter_function
510 if param_filter and not callable(param_filter):
511 raise ValueError(f"""{param_filter=} is not callable""")
513 for key, value in self.request.params.items():
514 try:
515 key = unicodedata.normalize("NFC", key)
516 value = unicodedata.normalize("NFC", value)
517 except UnicodeError:
518 # We received invalid unicode data (usually happens when
519 # someone tries to exploit unicode normalisation bugs)
520 raise errors.BadRequest()
522 if param_filter and param_filter(key, value):
523 continue
525 if key == TEMPLATE_STYLE_KEY:
526 self.template_style = value
527 continue
529 if key in self.kwargs:
530 if isinstance(self.kwargs[key], list):
531 self.kwargs[key].append(value)
532 else: # Convert that key to a list
533 self.kwargs[key] = [self.kwargs[key], value]
534 else:
535 self.kwargs[key] = value
537 if "self" in self.kwargs or "return" in self.kwargs: # self or return is reserved for bound methods
538 raise errors.BadRequest()
540 caller = conf.main_resolver
541 idx = 0 # Count how may items from *args we'd have consumed (so the rest can go into *args of the called func
542 path_found = True
544 for part in self.path_list:
545 # TODO: Remove canAccess guards... solve differently.
546 if "canAccess" in caller and not caller["canAccess"]():
547 # We have a canAccess function guarding that object,
548 # and it returns False...
549 raise errors.Unauthorized()
551 idx += 1
553 if part not in caller:
554 part = "index"
556 if caller := caller.get(part):
557 if isinstance(caller, Method):
558 if part == "index":
559 idx -= 1
561 self.args = tuple(self.path_list[idx:])
562 break
564 elif part == "index":
565 path_found = False
566 break
568 else:
569 path_found = False
570 break
572 if not path_found:
573 raise errors.NotFound(
574 f"""The path {utils.string.escape("/".join(self.path_list[:idx]))} could not be found""")
576 if not isinstance(caller, Method):
577 # try to find "index" function
578 if (index := caller.get("index")) and isinstance(index, Method):
579 caller = index
580 else:
581 raise errors.MethodNotAllowed()
583 # Check for internal exposed
584 if caller.exposed is False and not self.internalRequest:
585 raise errors.NotFound()
587 # Fill the Allow header of the response with the allowed HTTP methods
588 if self.method == "options":
589 self.response.headers["Allow"] = ", ".join(sorted(caller.methods)).upper()
591 # Register caller specific CORS headers
592 self.cors_headers = [str(header).lower() for header in caller.cors_allow_headers or ()]
594 # Check for @force_ssl flag
595 if not self.internalRequest \
596 and caller.ssl \
597 and not self.request.host_url.lower().startswith("https://") \
598 and not conf.instance.is_dev_server:
599 raise errors.PreconditionFailed("You must use SSL to access this resource!")
601 # Check for @force_post flag
602 if not self.isPostRequest and caller.methods == ("POST",):
603 raise errors.MethodNotAllowed("You must use POST to access this resource!")
605 # Check if this request should bypass the caches
606 if self.request.headers.get("X-Viur-Disable-Cache"):
607 # No cache requested, check if the current user is allowed to do so
608 if (user := current.user.get()) and "root" in user["access"]:
609 logging.debug("Caching disabled by X-Viur-Disable-Cache header")
610 self.disableCache = True
612 # Destill context as self.context, if available
613 if context := {k: v for k, v in self.kwargs.items() if k.startswith("@")}:
614 # Remove context parameters from kwargs
615 kwargs = {k: v for k, v in self.kwargs.items() if k not in context}
616 # Remove leading "@" from context parameters
617 self.context |= {k[1:]: v for k, v in context.items() if len(k) > 1}
618 else:
619 kwargs = self.kwargs
621 if ((self.internalRequest and conf.debug.trace_internal_call_routing)
622 or conf.debug.trace_external_call_routing):
623 logging.debug(
624 f"Calling {caller._func!r} with args={self.args!r}, {kwargs=} within context={self.context!r}"
625 )
627 if self.method == "options":
628 # OPTIONS request doesn't have a body
629 del self.response.app_iter
630 del self.response.content_type
631 self.response.status = "204 No Content"
632 return
634 # Now call the routed method!
635 res = caller(*self.args, **kwargs)
637 if self.method == "options":
638 # OPTIONS request doesn't have a body
639 del self.response.app_iter
640 del self.response.content_type
641 self.response.status = "204 No Content"
642 return
644 if not isinstance(res, bytes): # Convert the result to bytes if it is not already!
645 res = str(res).encode("UTF-8")
646 self.response.write(res)
648 def _cors(self) -> None:
649 """
650 Set CORS headers to the HTTP response.
652 .. seealso::
654 Option :attr:`core.config.Security.cors_origins`, etc.
655 for cors settings.
657 https://fetch.spec.whatwg.org/#http-cors-protocol
659 https://enable-cors.org/server.html
661 https://www.html5rocks.com/static/images/cors_server_flowchart.png
662 """
664 def test_candidates(value: str, *candidates: str | re.Pattern) -> bool:
665 """Test if the value matches the pattern of any candidate"""
666 for candidate in candidates:
667 if isinstance(candidate, re.Pattern):
668 if candidate.match(value):
669 return True
670 elif isinstance(candidate, str):
671 if candidate.lower() == str(value).lower():
672 return True
673 else:
674 raise TypeError(
675 f"Invalid setting {candidate}. "
676 f"Expected a string or a compiled regex."
677 )
678 return False
680 origin = current.request.get().request.headers.get("Origin")
681 if not origin:
682 return
684 # Origin is set --> It's a CORS request
686 any_origin_allowed = (
687 conf.security.cors_origins == "*"
688 or any(_origin == "*" for _origin in conf.security.cors_origins)
689 or any(_origin.pattern == r".*"
690 for _origin in conf.security.cors_origins
691 if isinstance(_origin, re.Pattern))
692 )
694 if any_origin_allowed and conf.security.cors_origins_use_wildcard:
695 if conf.security.cors_allow_credentials:
696 raise RuntimeError(
697 "Invalid CORS config: "
698 "If credentials mode is \"include\", then `Access-Control-Allow-Origin` cannot be `*`. "
699 "See https://fetch.spec.whatwg.org/#cors-protocol-and-credentials"
700 )
701 self.response.headers["Access-Control-Allow-Origin"] = "*"
703 elif test_candidates(origin, *conf.security.cors_origins):
704 self.response.headers["Access-Control-Allow-Origin"] = origin
706 else:
707 logging.warning(f"{origin=} not valid (must be one of {conf.security.cors_origins=})")
708 return
710 if conf.security.cors_allow_credentials:
711 self.response.headers["Access-Control-Allow-Credentials"] = "true"
713 if self.method == "options":
714 method = (self.request.headers.get("Access-Control-Request-Method") or "").lower()
716 if method in conf.security.cors_methods:
717 # It's a CORS-preflight request
718 # - MUST include Access-Control-Request-Method
719 # - CAN include Access-Control-Request-Headers
721 # The response can be cached
722 if conf.security.cors_max_age is not None:
723 assert isinstance(conf.security.cors_max_age, datetime.timedelta)
724 self.response.headers["Access-Control-Max-Age"] = \
725 str(int(conf.security.cors_max_age.total_seconds()))
727 # Allowed methods
728 self.response.headers["Access-Control-Allow-Methods"] = ", ".join(
729 sorted(conf.security.cors_methods)).upper()
731 # Allowed headers
732 request_headers = self.request.headers.get("Access-Control-Request-Headers")
733 request_headers = [h.strip().lower() for h in request_headers.split(",")]
734 if conf.security.cors_allow_headers == "*":
735 # Every header is allowed
736 allow_headers = request_headers[:]
737 else:
738 # There are generally headers allowed and/or from the caller
739 allow_headers = [
740 header
741 for header in request_headers
742 if test_candidates(
743 header,
744 *(self.cors_headers or ()), # caller specific
745 *(conf.security.cors_allow_headers or ()) # generally global
746 )
747 ]
748 if allow_headers:
749 self.response.headers["Access-Control-Allow-Headers"] = ", ".join(sorted(allow_headers))
751 else:
752 logging.warning(
753 f"Access-Control-Request-Method: {method} is NOT a valid method of {conf.security.cors_methods=}. "
754 f"Don't append CORS-preflight request headers"
755 )
757 def saveSession(self) -> None:
758 current.session.get().save()
761from .i18n import translate # noqa: E402