Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/request.py: 6%
442 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-03 12:27 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-03 12:27 +0000
1"""
2 This module implements the WSGI (Web Server Gateway Interface) layer for ViUR. This is the main entry
3 point for incomming http requests. The main class is the :class:BrowserHandler. Each request will get it's
4 own instance of that class which then holds the reference to the request and response object.
5 Additionally, this module defines the RequestValidator interface which provides a very early hook into the
6 request processing (useful for global ratelimiting, DDoS prevention or access control).
7"""
8import datetime
9import fnmatch
10import json
11import logging
12import os
13import re
14import time
15import traceback
16import typing as t
17import unicodedata
18from abc import ABC, abstractmethod
19from urllib import parse
20from urllib.parse import quote, unquote, urljoin, urlparse
22import webob
24from viur.core import current, db, errors, session, utils
25from viur.core.config import conf
26from viur.core.logging import client as loggingClient, requestLogger, requestLoggingRessource
27from viur.core.module import Method
28from viur.core.securityheaders import extendCsp
29from viur.core.tasks import _appengineServiceIPs
31TEMPLATE_STYLE_KEY = "style"
34class RequestValidator(ABC):
35 """
36 RequestValidators can be used to validate a request very early on. If the validate method returns a tuple,
37 the request is aborted. Can be used to block requests from bots.
39 To register or remove a validator, access it in main.py through
40 :attr: viur.core.request.Router.requestValidators
41 """
42 # Internal name to trace which validator aborted the request
43 name = "RequestValidator"
45 @staticmethod
46 @abstractmethod
47 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
48 """
49 The function that checks the current request. If the request is valid, simply return None.
50 If the request should be blocked, it must return a tuple of
51 - The HTTP status code (as int)
52 - The Description of that status code (eg "Forbidden")
53 - The Response Body (can be a simple string or an HTML-Page)
54 :param request: The Request instance to check
55 :return: None on success, an Error-Tuple otherwise
56 """
57 raise NotImplementedError()
60class FetchMetaDataValidator(RequestValidator):
61 """
62 This validator examines the headers "Sec-Fetch-Site", "sec-fetch-mode" and "sec-fetch-dest" as
63 recommended by https://web.dev/fetch-metadata/
64 """
65 name = "FetchMetaDataValidator"
67 @staticmethod
68 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]:
69 headers = request.request.headers
70 if not headers.get("sec-fetch-site"): # These headers are not send by all browsers
71 return None
72 if headers.get('sec-fetch-site') in {"same-origin", "none"}: # A Request from our site
73 return None
74 if os.environ['GAE_ENV'] == "localdev" and headers.get('sec-fetch-site') == "same-site":
75 # We are accepting a request with same-site only in local dev mode
76 return None
77 if headers.get('sec-fetch-mode') == 'navigate' and not request.isPostRequest \
78 and headers.get('sec-fetch-dest') not in {'object', 'embed'}: # Incoming navigation GET request
79 return None
80 return 403, "Forbidden", "Request rejected due to fetch metadata"
83class Router:
84 """
85 This class accepts the requests, collect its parameters and routes the request
86 to its destination function.
87 The basic control flow is
88 - Setting up internal variables
89 - Running the Request validators
90 - Emitting the headers (especially the security related ones)
91 - Run the TLS check (ensure it's a secure connection or check if the URL is whitelisted)
92 - Load or initialize a new session
93 - Set up i18n (choosing the language etc)
94 - Run the request preprocessor (if any)
95 - Normalize & sanity check the parameters
96 - Resolve the exposed function and call it
97 - Save the session / tear down the request
98 - Return the response generated
101 :warning: Don't instantiate! Don't subclass! DON'T TOUCH! ;)
102 """
104 # List of requestValidators used to preflight-check an request before it's being dispatched within ViUR
105 requestValidators = [FetchMetaDataValidator]
107 def __init__(self, environ: dict):
108 super().__init__()
109 self.startTime = time.time()
111 self.request = webob.Request(environ)
112 self.response = webob.Response()
114 self.maxLogLevel = logging.DEBUG
115 self._traceID = \
116 self.request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or utils.string.random()
117 self.is_deferred = False
118 self.path = ""
119 self.path_list = ()
121 self.skey_checked = False # indicates whether @skey-decorator-check has already performed within a request
122 self.internalRequest = False
123 self.disableCache = False # Shall this request bypass the caches?
124 self.pendingTasks = []
125 self.args = ()
126 self.kwargs = {}
127 self.context = {}
128 self.template_style: str | None = None
129 self.cors_headers = ()
131 # Check if it's a HTTP-Method we support
132 self.method = self.request.method.lower()
133 self.isPostRequest = self.method == "post"
134 self.isSSLConnection = self.request.host_url.lower().startswith("https://") # We have an encrypted channel
136 db.currentDbAccessLog.set(set())
138 # Set context variables
139 current.language.set(conf.i18n.default_language)
140 current.request.set(self)
141 current.session.set(session.Session())
142 current.request_data.set({})
144 # Process actual request
145 self._process()
147 self._cors()
149 # Unset context variables
150 current.language.set(None)
151 current.request_data.set(None)
152 current.session.set(None)
153 current.request.set(None)
154 current.user.set(None)
156 @property
157 def isDevServer(self) -> bool:
158 import warnings
159 msg = "Use of `isDevServer` is deprecated; Use `conf.instance.is_dev_server` instead!"
160 warnings.warn(msg, DeprecationWarning, stacklevel=2)
161 logging.warning(msg)
162 return conf.instance.is_dev_server
164 def _select_language(self, path: str) -> str:
165 """
166 Tries to select the best language for the current request. Depending on the value of
167 conf.i18n.language_method, we'll either try to load it from the session, determine it by the domain
168 or extract it from the URL.
169 """
171 def get_language_from_header() -> str | None:
172 if not (accept_language := self.request.headers.get("accept-language")):
173 return None
174 languages = accept_language.split(",")
175 locale_q_pairs = []
177 for language in languages:
178 if language.split(";")[0] == language:
179 # no q => q = 1
180 locale_q_pairs.append((language.strip(), "1"))
181 else:
182 try:
183 locale = language.split(";")[0].strip()
184 q = language.split(";")[1].split("=")[1]
185 locale_q_pairs.append((locale, q))
186 except IndexError:
187 continue # skip language
188 locale_q_pairs.sort(key=lambda pair: pair[1], reverse=True) # sort by Quality values
189 for locale_q_pair in locale_q_pairs:
190 if "-" in locale_q_pair[0]: # Check for de-DE
191 lang = locale_q_pair[0].split("-")[0]
192 else:
193 lang = locale_q_pair[0]
194 if lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
195 return lang
196 if lang == "*": # fallback
197 return conf.i18n.available_languages[0]
198 return None
200 if not conf.i18n.available_languages:
201 # This project doesn't use the multi-language feature, nothing to do here
202 return path
203 if conf.i18n.language_method == "session":
204 current_session = current.session.get()
205 lang = conf.i18n.default_language
206 # We save the language in the session, if it exists, and try to load it from there
207 if "lang" in current_session:
208 current.language.set(current_session["lang"])
209 return path
211 if header_lang := get_language_from_header():
212 lang = header_lang
213 current.language.set(lang)
215 elif header_lang := self.request.headers.get("X-Appengine-Country"):
216 header_lang = str(header_lang).lower()
217 if header_lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()):
218 lang = header_lang
220 if current_session.loaded:
221 current_session["lang"] = lang
222 current.language.set(lang)
224 elif conf.i18n.language_method == "domain":
225 host = self.request.host_url.lower()
226 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
227 if host.startswith("www."):
228 host = host[4:]
229 if lang := conf.i18n.domain_language_mapping.get(host):
230 current.language.set(lang)
231 # We have no language configured for this domain, try to read it from the HTTP Header
232 elif lang := get_language_from_header():
233 current.language.set(lang)
235 elif conf.i18n.language_method == "url":
236 tmppath = urlparse(path).path
237 tmppath = [unquote(x) for x in tmppath.lower().strip("/").split("/")]
238 if (
239 len(tmppath) > 0
240 and tmppath[0] in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys())
241 ):
242 current.language.set(tmppath[0])
243 return path[len(tmppath[0]) + 1:] # Return the path stripped by its language segment
244 else: # This URL doesnt contain an language prefix, try to read it from session
245 if header_lang := get_language_from_header():
246 current.language.set(header_lang)
247 elif header_lang := self.request.headers.get("X-Appengine-Country"):
248 lang = str(header_lang).lower()
249 if lang in conf.i18n.available_languages or lang in conf.i18n.language_alias_map:
250 current.language.set(lang)
251 elif conf.i18n.language_method == "header":
252 if lang := get_language_from_header():
253 current.language.set(lang)
255 return path
257 def _process(self):
258 if self.method not in ("get", "post", "head", "options"):
259 logging.error(f"{self.method=} not supported")
260 return
262 if self.request.headers.get("X-AppEngine-TaskName", None) is not None: # Check if we run in the appengine
263 if self.request.environ.get("HTTP_X_APPENGINE_USER_IP") in _appengineServiceIPs:
264 self.is_deferred = True
265 elif os.getenv("TASKS_EMULATOR") is not None:
266 self.is_deferred = True
268 current.language.set(conf.i18n.default_language)
269 # Check if we should process or abort the request
270 for validator, reqValidatorResult in [(x, x.validate(self)) for x in self.requestValidators]:
271 if reqValidatorResult is not None:
272 logging.warning(f"Request rejected by validator {validator.name}")
273 statusCode, statusStr, statusDescr = reqValidatorResult
274 self.response.status = f"{statusCode} {statusStr}"
275 self.response.write(statusDescr)
276 return
278 path = self.request.path
280 # Add CSP headers early (if any)
281 if conf.security.content_security_policy and conf.security.content_security_policy["_headerCache"]:
282 for k, v in conf.security.content_security_policy["_headerCache"].items():
283 self.response.headers[k] = v
284 if self.isSSLConnection: # Check for HTST and PKP headers only if we have a secure channel.
285 if conf.security.strict_transport_security:
286 self.response.headers["Strict-Transport-Security"] = conf.security.strict_transport_security
287 # Check for X-Security-Headers we shall emit
288 if conf.security.x_content_type_options:
289 self.response.headers["X-Content-Type-Options"] = "nosniff"
290 if conf.security.x_xss_protection is not None:
291 if conf.security.x_xss_protection:
292 self.response.headers["X-XSS-Protection"] = "1; mode=block"
293 elif conf.security.x_xss_protection is False:
294 self.response.headers["X-XSS-Protection"] = "0"
295 if conf.security.x_frame_options is not None and isinstance(conf.security.x_frame_options, tuple):
296 mode, uri = conf.security.x_frame_options
297 if mode in ["deny", "sameorigin"]:
298 self.response.headers["X-Frame-Options"] = mode
299 elif mode == "allow-from":
300 self.response.headers["X-Frame-Options"] = f"allow-from {uri}"
301 if conf.security.x_permitted_cross_domain_policies is not None:
302 self.response.headers["X-Permitted-Cross-Domain-Policies"] = conf.security.x_permitted_cross_domain_policies
303 if conf.security.referrer_policy:
304 self.response.headers["Referrer-Policy"] = conf.security.referrer_policy
305 if conf.security.permissions_policy.get("_headerCache"):
306 self.response.headers["Permissions-Policy"] = conf.security.permissions_policy["_headerCache"]
307 if conf.security.enable_coep:
308 self.response.headers["Cross-Origin-Embedder-Policy"] = "require-corp"
309 if conf.security.enable_coop:
310 self.response.headers["Cross-Origin-Opener-Policy"] = conf.security.enable_coop
311 if conf.security.enable_corp:
312 self.response.headers["Cross-Origin-Resource-Policy"] = conf.security.enable_corp
314 # Ensure that TLS is used if required
315 if conf.security.force_ssl and not self.isSSLConnection and not conf.instance.is_dev_server:
316 isWhitelisted = False
317 reqPath = self.request.path
318 for testUrl in conf.security.no_ssl_check_urls:
319 if testUrl.endswith("*"):
320 if reqPath.startswith(testUrl[:-1]):
321 isWhitelisted = True
322 break
323 else:
324 if testUrl == reqPath:
325 isWhitelisted = True
326 break
327 if not isWhitelisted: # Some URLs need to be whitelisted (as f.e. the Tasks-Queue doesn't call using https)
328 # Redirect the user to the startpage (using ssl this time)
329 host = self.request.host_url.lower()
330 host = host[host.find("://") + 3:].strip(" /") # strip http(s)://
331 self.response.status = "302 Found"
332 self.response.headers['Location'] = f"https://{host}/"
333 return
334 if path.startswith("/_ah/warmup"):
335 self.response.write("okay")
336 return
338 try:
339 current.session.get().load()
341 # Load current user into context variable if user module is there.
342 if user_mod := getattr(conf.main_app.vi, "user", None):
343 current.user.set(user_mod.getCurrentUser())
345 path = self._select_language(path)[1:]
347 # Check for closed system
348 if conf.security.closed_system and self.method != "options":
349 if not current.user.get():
350 if not any(fnmatch.fnmatch(path, pat) for pat in conf.security.closed_system_allowed_paths):
351 raise errors.Unauthorized()
353 if conf.request_preprocessor:
354 path = conf.request_preprocessor(path)
356 self._route(path)
358 except errors.Redirect as e:
359 if conf.debug.trace_exceptions:
360 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
361 raise
362 self.response.status = f"{e.status} {e.name}"
363 url = e.url
364 url = unquote(url) # decode first
365 # safe = https://url.spec.whatwg.org/#url-path-segment-string
366 url = quote(url, encoding="utf-8", safe="!$&'()*+,-./:;=?@_~") # re-encode all in utf-8
367 if url.startswith(('.', '/')):
368 url = str(urljoin(self.request.url, url))
369 self.response.headers['Location'] = url
371 except Exception as e:
372 if conf.debug.trace_exceptions:
373 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""")
374 raise
375 self.response.body = b""
376 if isinstance(e, errors.HTTPException):
377 logging.info(f"[{e.status}] {e.name}: {e.descr}", exc_info=conf.debug.trace)
378 self.response.status = f"{e.status} {e.name}"
379 # Set machine-readable x-viur-error response header in case there is an exception description.
380 if e.descr:
381 self.response.headers["x-viur-error"] = e.descr.replace("\n", "")
382 else:
383 self.response.status = 500
384 logging.error("ViUR has caught an unhandled exception!")
385 logging.exception(e)
387 res = None
388 if conf.error_handler:
389 try:
390 res = conf.error_handler(e)
391 except Exception as newE:
392 logging.error("viur.error_handler failed!")
393 logging.exception(newE)
394 res = None
395 if not res:
396 descr = "The server encountered an unexpected error and is unable to process your request."
398 if isinstance(e, errors.HTTPException):
399 error_info = {
400 "status": e.status,
401 "reason": e.name,
402 "title": str(translate(e.name)),
403 "descr": e.descr,
404 }
405 else:
406 error_info = {
407 "status": 500,
408 "reason": "Internal Server Error",
409 "title": str(translate("Internal Server Error")),
410 "descr": descr
411 }
413 if conf.instance.is_dev_server:
414 error_info["traceback"] = traceback.format_exc()
416 error_info["logo"] = conf.error_logo
418 if (len(self.path_list) > 0 and self.path_list[0] in ("vi", "json")) or \
419 current.request.get().response.headers["Content-Type"] == "application/json":
420 current.request.get().response.headers["Content-Type"] = "application/json"
421 res = json.dumps(error_info)
422 else: # We render the error in html
423 # Try to get the template from html/error/
424 if filename := conf.main_app.render.getTemplateFileName((f"{error_info['status']}", "error"),
425 raise_exception=False):
426 template = conf.main_app.render.getEnv().get_template(filename)
427 try:
428 uses_unsafe_inline = \
429 "unsafe-inline" in conf.security.content_security_policy["enforce"]["style-src"]
430 except (KeyError, TypeError): # Not set
431 uses_unsafe_inline = False
432 if uses_unsafe_inline:
433 logging.info("Using style-src:unsafe-inline, don't create a nonce")
434 nonce = None
435 else:
436 nonce = utils.string.random(16)
437 extendCsp({"style-src": [f"nonce-{nonce}"]})
438 res = template.render(error_info, nonce=nonce)
439 else:
440 res = (f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">'
441 f'<title>{error_info["status"]} - {error_info["reason"]}</title>'
442 f'</head><body><h1>{error_info["status"]} - {error_info["reason"]}</h1>')
444 self.response.write(res.encode("UTF-8"))
446 finally:
447 current.session.get().save()
448 if conf.instance.is_dev_server and conf.debug.dev_server_cloud_logging:
449 # Emit the outer log only on dev_appserver (we'll use the existing request log when live)
450 SEVERITY = "DEBUG"
451 if self.maxLogLevel >= 50:
452 SEVERITY = "CRITICAL"
453 elif self.maxLogLevel >= 40:
454 SEVERITY = "ERROR"
455 elif self.maxLogLevel >= 30:
456 SEVERITY = "WARNING"
457 elif self.maxLogLevel >= 20:
458 SEVERITY = "INFO"
460 TRACE = "projects/{}/traces/{}".format(loggingClient.project, self._traceID)
462 REQUEST = {
463 'requestMethod': self.request.method,
464 'requestUrl': self.request.url,
465 'status': self.response.status_code,
466 'userAgent': self.request.headers.get('USER-AGENT'),
467 'responseSize': self.response.content_length,
468 'latency': "%0.3fs" % (time.time() - self.startTime),
469 'remoteIp': self.request.environ.get("HTTP_X_APPENGINE_USER_IP")
470 }
471 requestLogger.log_text(
472 "",
473 client=loggingClient,
474 severity=SEVERITY,
475 http_request=REQUEST,
476 trace=TRACE,
477 resource=requestLoggingRessource,
478 operation={
479 "first": True,
480 "last": True,
481 "id": self._traceID
482 }
483 )
485 if conf.instance.is_dev_server:
486 self.is_deferred = True
488 while self.pendingTasks:
489 task = self.pendingTasks.pop()
490 logging.debug(f"Deferred task emulation, executing {task=}")
491 try:
492 task()
493 except Exception: # noqa
494 logging.exception(f"Deferred Task emulation {task} failed")
496 def _route(self, path: str) -> None:
497 """
498 Does the actual work of sanitizing the parameter, determine which exposed-function to call
499 (and with which parameters)
500 """
502 # Parse the URL
503 if path := parse.urlparse(path).path:
504 self.path = path
505 self.path_list = tuple(unicodedata.normalize("NFC", parse.unquote(part))
506 for part in path.strip("/").split("/"))
508 # Prevent Hash-collision attacks
509 if len(self.request.params) > conf.max_post_params_count:
510 raise errors.BadRequest(
511 f"Too many arguments supplied, exceeding maximum"
512 f" of {conf.max_post_params_count} allowed arguments per request"
513 )
515 param_filter = conf.param_filter_function
516 if param_filter and not callable(param_filter):
517 raise ValueError(f"""{param_filter=} is not callable""")
519 for key, value in self.request.params.items():
520 try:
521 key = unicodedata.normalize("NFC", key)
522 value = unicodedata.normalize("NFC", value)
523 except UnicodeError:
524 # We received invalid unicode data (usually happens when
525 # someone tries to exploit unicode normalisation bugs)
526 raise errors.BadRequest()
528 if param_filter and param_filter(key, value):
529 continue
531 if key == TEMPLATE_STYLE_KEY:
532 self.template_style = value
533 continue
535 if key in self.kwargs:
536 if isinstance(self.kwargs[key], list):
537 self.kwargs[key].append(value)
538 else: # Convert that key to a list
539 self.kwargs[key] = [self.kwargs[key], value]
540 else:
541 self.kwargs[key] = value
543 if "self" in self.kwargs or "return" in self.kwargs: # self or return is reserved for bound methods
544 raise errors.BadRequest()
546 caller = conf.main_resolver
547 idx = 0 # Count how may items from *args we'd have consumed (so the rest can go into *args of the called func
548 path_found = True
550 for part in self.path_list:
551 # TODO: Remove canAccess guards... solve differently.
552 if "canAccess" in caller and not caller["canAccess"]():
553 # We have a canAccess function guarding that object,
554 # and it returns False...
555 raise errors.Unauthorized()
557 idx += 1
559 if part not in caller:
560 part = "index"
562 if caller := caller.get(part):
563 if isinstance(caller, Method):
564 if part == "index":
565 idx -= 1
567 self.args = tuple(self.path_list[idx:])
568 break
570 elif part == "index":
571 path_found = False
572 break
574 else:
575 path_found = False
576 break
578 if not path_found:
579 raise errors.NotFound(
580 f"""The path {utils.string.escape("/".join(self.path_list[:idx]))} could not be found""")
582 if not isinstance(caller, Method):
583 # try to find "index" function
584 if (index := caller.get("index")) and isinstance(index, Method):
585 caller = index
586 else:
587 raise errors.MethodNotAllowed()
589 # Check for internal exposed
590 if caller.exposed is False and not self.internalRequest:
591 raise errors.NotFound()
593 # Fill the Allow header of the response with the allowed HTTP methods
594 if self.method == "options":
595 self.response.headers["Allow"] = ", ".join(sorted(caller.methods)).upper()
597 # Register caller specific CORS headers
598 self.cors_headers = [str(header).lower() for header in caller.cors_allow_headers or ()]
600 # Check for @force_ssl flag
601 if not self.internalRequest \
602 and caller.ssl \
603 and not self.request.host_url.lower().startswith("https://") \
604 and not conf.instance.is_dev_server:
605 raise errors.PreconditionFailed("You must use SSL to access this resource!")
607 # Check for @force_post flag
608 if not self.isPostRequest and caller.methods == ("POST",):
609 raise errors.MethodNotAllowed("You must use POST to access this resource!")
611 # Check if this request should bypass the caches
612 if self.request.headers.get("X-Viur-Disable-Cache"):
613 # No cache requested, check if the current user is allowed to do so
614 if (user := current.user.get()) and "root" in user["access"]:
615 logging.debug("Caching disabled by X-Viur-Disable-Cache header")
616 self.disableCache = True
618 # Destill context as self.context, if available
619 if context := {k: v for k, v in self.kwargs.items() if k.startswith("@")}:
620 # Remove context parameters from kwargs
621 kwargs = {k: v for k, v in self.kwargs.items() if k not in context}
622 # Remove leading "@" from context parameters
623 self.context |= {k[1:]: v for k, v in context.items() if len(k) > 1}
624 else:
625 kwargs = self.kwargs
627 if ((self.internalRequest and conf.debug.trace_internal_call_routing)
628 or conf.debug.trace_external_call_routing):
629 logging.debug(
630 f"Calling {caller._func!r} with args={self.args!r}, {kwargs=} within context={self.context!r}"
631 )
633 if self.method == "options":
634 # OPTIONS request doesn't have a body
635 del self.response.app_iter
636 del self.response.content_type
637 self.response.status = "204 No Content"
638 return
640 # Now call the routed method!
641 res = caller(*self.args, **kwargs)
643 if self.method == "options":
644 # OPTIONS request doesn't have a body
645 del self.response.app_iter
646 del self.response.content_type
647 self.response.status = "204 No Content"
648 return
650 if not isinstance(res, bytes): # Convert the result to bytes if it is not already!
651 res = str(res).encode("UTF-8")
652 self.response.write(res)
654 def _cors(self) -> None:
655 """
656 Set CORS headers to the HTTP response.
658 .. seealso::
660 Option :attr:`core.config.Security.cors_origins`, etc.
661 for cors settings.
663 https://fetch.spec.whatwg.org/#http-cors-protocol
665 https://enable-cors.org/server.html
667 https://www.html5rocks.com/static/images/cors_server_flowchart.png
668 """
670 def test_candidates(value: str, *candidates: str | re.Pattern) -> bool:
671 """Test if the value matches the pattern of any candidate"""
672 for candidate in candidates:
673 if isinstance(candidate, re.Pattern):
674 if candidate.match(value):
675 return True
676 elif isinstance(candidate, str):
677 if candidate.lower() == str(value).lower():
678 return True
679 else:
680 raise TypeError(
681 f"Invalid setting {candidate}. "
682 f"Expected a string or a compiled regex."
683 )
684 return False
686 origin = current.request.get().request.headers.get("Origin")
687 if not origin:
688 return
690 # Origin is set --> It's a CORS request
692 any_origin_allowed = (
693 conf.security.cors_origins == "*"
694 or any(_origin == "*" for _origin in conf.security.cors_origins)
695 or any(_origin.pattern == r".*"
696 for _origin in conf.security.cors_origins
697 if isinstance(_origin, re.Pattern))
698 )
700 if any_origin_allowed and conf.security.cors_origins_use_wildcard:
701 if conf.security.cors_allow_credentials:
702 raise RuntimeError(
703 "Invalid CORS config: "
704 "If credentials mode is \"include\", then `Access-Control-Allow-Origin` cannot be `*`. "
705 "See https://fetch.spec.whatwg.org/#cors-protocol-and-credentials"
706 )
707 self.response.headers["Access-Control-Allow-Origin"] = "*"
709 elif test_candidates(origin, *conf.security.cors_origins):
710 self.response.headers["Access-Control-Allow-Origin"] = origin
712 else:
713 logging.warning(f"{origin=} not valid (must be one of {conf.security.cors_origins=})")
714 return
716 if conf.security.cors_allow_credentials:
717 self.response.headers["Access-Control-Allow-Credentials"] = "true"
719 if self.method == "options":
720 method = (self.request.headers.get("Access-Control-Request-Method") or "").lower()
722 if method in conf.security.cors_methods:
723 # It's a CORS-preflight request
724 # - MUST include Access-Control-Request-Method
725 # - CAN include Access-Control-Request-Headers
727 # The response can be cached
728 if conf.security.cors_max_age is not None:
729 assert isinstance(conf.security.cors_max_age, datetime.timedelta)
730 self.response.headers["Access-Control-Max-Age"] = \
731 str(int(conf.security.cors_max_age.total_seconds()))
733 # Allowed methods
734 self.response.headers["Access-Control-Allow-Methods"] = ", ".join(
735 sorted(conf.security.cors_methods)).upper()
737 # Allowed headers
738 request_headers = self.request.headers.get("Access-Control-Request-Headers")
739 request_headers = [h.strip().lower() for h in request_headers.split(",")]
740 if conf.security.cors_allow_headers == "*":
741 # Every header is allowed
742 allow_headers = request_headers[:]
743 else:
744 # There are generally headers allowed and/or from the caller
745 allow_headers = [
746 header
747 for header in request_headers
748 if test_candidates(
749 header,
750 *(self.cors_headers or ()), # caller specific
751 *(conf.security.cors_allow_headers or ()) # generally global
752 )
753 ]
754 if allow_headers:
755 self.response.headers["Access-Control-Allow-Headers"] = ", ".join(sorted(allow_headers))
757 else:
758 logging.warning(
759 f"Access-Control-Request-Method: {method} is NOT a valid method of {conf.security.cors_methods=}. "
760 f"Don't append CORS-preflight request headers"
761 )
763 def saveSession(self) -> None:
764 current.session.get().save()
767from .i18n import translate # noqa: E402