Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/request.py: 6%

442 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-03 12:27 +0000

1""" 

2 This module implements the WSGI (Web Server Gateway Interface) layer for ViUR. This is the main entry 

3 point for incomming http requests. The main class is the :class:BrowserHandler. Each request will get it's 

4 own instance of that class which then holds the reference to the request and response object. 

5 Additionally, this module defines the RequestValidator interface which provides a very early hook into the 

6 request processing (useful for global ratelimiting, DDoS prevention or access control). 

7""" 

8import datetime 

9import fnmatch 

10import json 

11import logging 

12import os 

13import re 

14import time 

15import traceback 

16import typing as t 

17import unicodedata 

18from abc import ABC, abstractmethod 

19from urllib import parse 

20from urllib.parse import quote, unquote, urljoin, urlparse 

21 

22import webob 

23 

24from viur.core import current, db, errors, session, utils 

25from viur.core.config import conf 

26from viur.core.logging import client as loggingClient, requestLogger, requestLoggingRessource 

27from viur.core.module import Method 

28from viur.core.securityheaders import extendCsp 

29from viur.core.tasks import _appengineServiceIPs 

30 

31TEMPLATE_STYLE_KEY = "style" 

32 

33 

34class RequestValidator(ABC): 

35 """ 

36 RequestValidators can be used to validate a request very early on. If the validate method returns a tuple, 

37 the request is aborted. Can be used to block requests from bots. 

38 

39 To register or remove a validator, access it in main.py through 

40 :attr: viur.core.request.Router.requestValidators 

41 """ 

42 # Internal name to trace which validator aborted the request 

43 name = "RequestValidator" 

44 

45 @staticmethod 

46 @abstractmethod 

47 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]: 

48 """ 

49 The function that checks the current request. If the request is valid, simply return None. 

50 If the request should be blocked, it must return a tuple of 

51 - The HTTP status code (as int) 

52 - The Description of that status code (eg "Forbidden") 

53 - The Response Body (can be a simple string or an HTML-Page) 

54 :param request: The Request instance to check 

55 :return: None on success, an Error-Tuple otherwise 

56 """ 

57 raise NotImplementedError() 

58 

59 

60class FetchMetaDataValidator(RequestValidator): 

61 """ 

62 This validator examines the headers "Sec-Fetch-Site", "sec-fetch-mode" and "sec-fetch-dest" as 

63 recommended by https://web.dev/fetch-metadata/ 

64 """ 

65 name = "FetchMetaDataValidator" 

66 

67 @staticmethod 

68 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]: 

69 headers = request.request.headers 

70 if not headers.get("sec-fetch-site"): # These headers are not send by all browsers 

71 return None 

72 if headers.get('sec-fetch-site') in {"same-origin", "none"}: # A Request from our site 

73 return None 

74 if os.environ['GAE_ENV'] == "localdev" and headers.get('sec-fetch-site') == "same-site": 

75 # We are accepting a request with same-site only in local dev mode 

76 return None 

77 if headers.get('sec-fetch-mode') == 'navigate' and not request.isPostRequest \ 

78 and headers.get('sec-fetch-dest') not in {'object', 'embed'}: # Incoming navigation GET request 

79 return None 

80 return 403, "Forbidden", "Request rejected due to fetch metadata" 

81 

82 

83class Router: 

84 """ 

85 This class accepts the requests, collect its parameters and routes the request 

86 to its destination function. 

87 The basic control flow is 

88 - Setting up internal variables 

89 - Running the Request validators 

90 - Emitting the headers (especially the security related ones) 

91 - Run the TLS check (ensure it's a secure connection or check if the URL is whitelisted) 

92 - Load or initialize a new session 

93 - Set up i18n (choosing the language etc) 

94 - Run the request preprocessor (if any) 

95 - Normalize & sanity check the parameters 

96 - Resolve the exposed function and call it 

97 - Save the session / tear down the request 

98 - Return the response generated 

99 

100 

101 :warning: Don't instantiate! Don't subclass! DON'T TOUCH! ;) 

102 """ 

103 

104 # List of requestValidators used to preflight-check an request before it's being dispatched within ViUR 

105 requestValidators = [FetchMetaDataValidator] 

106 

107 def __init__(self, environ: dict): 

108 super().__init__() 

109 self.startTime = time.time() 

110 

111 self.request = webob.Request(environ) 

112 self.response = webob.Response() 

113 

114 self.maxLogLevel = logging.DEBUG 

115 self._traceID = \ 

116 self.request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or utils.string.random() 

117 self.is_deferred = False 

118 self.path = "" 

119 self.path_list = () 

120 

121 self.skey_checked = False # indicates whether @skey-decorator-check has already performed within a request 

122 self.internalRequest = False 

123 self.disableCache = False # Shall this request bypass the caches? 

124 self.pendingTasks = [] 

125 self.args = () 

126 self.kwargs = {} 

127 self.context = {} 

128 self.template_style: str | None = None 

129 self.cors_headers = () 

130 

131 # Check if it's a HTTP-Method we support 

132 self.method = self.request.method.lower() 

133 self.isPostRequest = self.method == "post" 

134 self.isSSLConnection = self.request.host_url.lower().startswith("https://") # We have an encrypted channel 

135 

136 db.currentDbAccessLog.set(set()) 

137 

138 # Set context variables 

139 current.language.set(conf.i18n.default_language) 

140 current.request.set(self) 

141 current.session.set(session.Session()) 

142 current.request_data.set({}) 

143 

144 # Process actual request 

145 self._process() 

146 

147 self._cors() 

148 

149 # Unset context variables 

150 current.language.set(None) 

151 current.request_data.set(None) 

152 current.session.set(None) 

153 current.request.set(None) 

154 current.user.set(None) 

155 

156 @property 

157 def isDevServer(self) -> bool: 

158 import warnings 

159 msg = "Use of `isDevServer` is deprecated; Use `conf.instance.is_dev_server` instead!" 

160 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

161 logging.warning(msg) 

162 return conf.instance.is_dev_server 

163 

164 def _select_language(self, path: str) -> str: 

165 """ 

166 Tries to select the best language for the current request. Depending on the value of 

167 conf.i18n.language_method, we'll either try to load it from the session, determine it by the domain 

168 or extract it from the URL. 

169 """ 

170 

171 def get_language_from_header() -> str | None: 

172 if not (accept_language := self.request.headers.get("accept-language")): 

173 return None 

174 languages = accept_language.split(",") 

175 locale_q_pairs = [] 

176 

177 for language in languages: 

178 if language.split(";")[0] == language: 

179 # no q => q = 1 

180 locale_q_pairs.append((language.strip(), "1")) 

181 else: 

182 try: 

183 locale = language.split(";")[0].strip() 

184 q = language.split(";")[1].split("=")[1] 

185 locale_q_pairs.append((locale, q)) 

186 except IndexError: 

187 continue # skip language 

188 locale_q_pairs.sort(key=lambda pair: pair[1], reverse=True) # sort by Quality values 

189 for locale_q_pair in locale_q_pairs: 

190 if "-" in locale_q_pair[0]: # Check for de-DE 

191 lang = locale_q_pair[0].split("-")[0] 

192 else: 

193 lang = locale_q_pair[0] 

194 if lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()): 

195 return lang 

196 if lang == "*": # fallback 

197 return conf.i18n.available_languages[0] 

198 return None 

199 

200 if not conf.i18n.available_languages: 

201 # This project doesn't use the multi-language feature, nothing to do here 

202 return path 

203 if conf.i18n.language_method == "session": 

204 current_session = current.session.get() 

205 lang = conf.i18n.default_language 

206 # We save the language in the session, if it exists, and try to load it from there 

207 if "lang" in current_session: 

208 current.language.set(current_session["lang"]) 

209 return path 

210 

211 if header_lang := get_language_from_header(): 

212 lang = header_lang 

213 current.language.set(lang) 

214 

215 elif header_lang := self.request.headers.get("X-Appengine-Country"): 

216 header_lang = str(header_lang).lower() 

217 if header_lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()): 

218 lang = header_lang 

219 

220 if current_session.loaded: 

221 current_session["lang"] = lang 

222 current.language.set(lang) 

223 

224 elif conf.i18n.language_method == "domain": 

225 host = self.request.host_url.lower() 

226 host = host[host.find("://") + 3:].strip(" /") # strip http(s):// 

227 if host.startswith("www."): 

228 host = host[4:] 

229 if lang := conf.i18n.domain_language_mapping.get(host): 

230 current.language.set(lang) 

231 # We have no language configured for this domain, try to read it from the HTTP Header 

232 elif lang := get_language_from_header(): 

233 current.language.set(lang) 

234 

235 elif conf.i18n.language_method == "url": 

236 tmppath = urlparse(path).path 

237 tmppath = [unquote(x) for x in tmppath.lower().strip("/").split("/")] 

238 if ( 

239 len(tmppath) > 0 

240 and tmppath[0] in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()) 

241 ): 

242 current.language.set(tmppath[0]) 

243 return path[len(tmppath[0]) + 1:] # Return the path stripped by its language segment 

244 else: # This URL doesnt contain an language prefix, try to read it from session 

245 if header_lang := get_language_from_header(): 

246 current.language.set(header_lang) 

247 elif header_lang := self.request.headers.get("X-Appengine-Country"): 

248 lang = str(header_lang).lower() 

249 if lang in conf.i18n.available_languages or lang in conf.i18n.language_alias_map: 

250 current.language.set(lang) 

251 elif conf.i18n.language_method == "header": 

252 if lang := get_language_from_header(): 

253 current.language.set(lang) 

254 

255 return path 

256 

257 def _process(self): 

258 if self.method not in ("get", "post", "head", "options"): 

259 logging.error(f"{self.method=} not supported") 

260 return 

261 

262 if self.request.headers.get("X-AppEngine-TaskName", None) is not None: # Check if we run in the appengine 

263 if self.request.environ.get("HTTP_X_APPENGINE_USER_IP") in _appengineServiceIPs: 

264 self.is_deferred = True 

265 elif os.getenv("TASKS_EMULATOR") is not None: 

266 self.is_deferred = True 

267 

268 current.language.set(conf.i18n.default_language) 

269 # Check if we should process or abort the request 

270 for validator, reqValidatorResult in [(x, x.validate(self)) for x in self.requestValidators]: 

271 if reqValidatorResult is not None: 

272 logging.warning(f"Request rejected by validator {validator.name}") 

273 statusCode, statusStr, statusDescr = reqValidatorResult 

274 self.response.status = f"{statusCode} {statusStr}" 

275 self.response.write(statusDescr) 

276 return 

277 

278 path = self.request.path 

279 

280 # Add CSP headers early (if any) 

281 if conf.security.content_security_policy and conf.security.content_security_policy["_headerCache"]: 

282 for k, v in conf.security.content_security_policy["_headerCache"].items(): 

283 self.response.headers[k] = v 

284 if self.isSSLConnection: # Check for HTST and PKP headers only if we have a secure channel. 

285 if conf.security.strict_transport_security: 

286 self.response.headers["Strict-Transport-Security"] = conf.security.strict_transport_security 

287 # Check for X-Security-Headers we shall emit 

288 if conf.security.x_content_type_options: 

289 self.response.headers["X-Content-Type-Options"] = "nosniff" 

290 if conf.security.x_xss_protection is not None: 

291 if conf.security.x_xss_protection: 

292 self.response.headers["X-XSS-Protection"] = "1; mode=block" 

293 elif conf.security.x_xss_protection is False: 

294 self.response.headers["X-XSS-Protection"] = "0" 

295 if conf.security.x_frame_options is not None and isinstance(conf.security.x_frame_options, tuple): 

296 mode, uri = conf.security.x_frame_options 

297 if mode in ["deny", "sameorigin"]: 

298 self.response.headers["X-Frame-Options"] = mode 

299 elif mode == "allow-from": 

300 self.response.headers["X-Frame-Options"] = f"allow-from {uri}" 

301 if conf.security.x_permitted_cross_domain_policies is not None: 

302 self.response.headers["X-Permitted-Cross-Domain-Policies"] = conf.security.x_permitted_cross_domain_policies 

303 if conf.security.referrer_policy: 

304 self.response.headers["Referrer-Policy"] = conf.security.referrer_policy 

305 if conf.security.permissions_policy.get("_headerCache"): 

306 self.response.headers["Permissions-Policy"] = conf.security.permissions_policy["_headerCache"] 

307 if conf.security.enable_coep: 

308 self.response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" 

309 if conf.security.enable_coop: 

310 self.response.headers["Cross-Origin-Opener-Policy"] = conf.security.enable_coop 

311 if conf.security.enable_corp: 

312 self.response.headers["Cross-Origin-Resource-Policy"] = conf.security.enable_corp 

313 

314 # Ensure that TLS is used if required 

315 if conf.security.force_ssl and not self.isSSLConnection and not conf.instance.is_dev_server: 

316 isWhitelisted = False 

317 reqPath = self.request.path 

318 for testUrl in conf.security.no_ssl_check_urls: 

319 if testUrl.endswith("*"): 

320 if reqPath.startswith(testUrl[:-1]): 

321 isWhitelisted = True 

322 break 

323 else: 

324 if testUrl == reqPath: 

325 isWhitelisted = True 

326 break 

327 if not isWhitelisted: # Some URLs need to be whitelisted (as f.e. the Tasks-Queue doesn't call using https) 

328 # Redirect the user to the startpage (using ssl this time) 

329 host = self.request.host_url.lower() 

330 host = host[host.find("://") + 3:].strip(" /") # strip http(s):// 

331 self.response.status = "302 Found" 

332 self.response.headers['Location'] = f"https://{host}/" 

333 return 

334 if path.startswith("/_ah/warmup"): 

335 self.response.write("okay") 

336 return 

337 

338 try: 

339 current.session.get().load() 

340 

341 # Load current user into context variable if user module is there. 

342 if user_mod := getattr(conf.main_app.vi, "user", None): 

343 current.user.set(user_mod.getCurrentUser()) 

344 

345 path = self._select_language(path)[1:] 

346 

347 # Check for closed system 

348 if conf.security.closed_system and self.method != "options": 

349 if not current.user.get(): 

350 if not any(fnmatch.fnmatch(path, pat) for pat in conf.security.closed_system_allowed_paths): 

351 raise errors.Unauthorized() 

352 

353 if conf.request_preprocessor: 

354 path = conf.request_preprocessor(path) 

355 

356 self._route(path) 

357 

358 except errors.Redirect as e: 

359 if conf.debug.trace_exceptions: 

360 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""") 

361 raise 

362 self.response.status = f"{e.status} {e.name}" 

363 url = e.url 

364 url = unquote(url) # decode first 

365 # safe = https://url.spec.whatwg.org/#url-path-segment-string 

366 url = quote(url, encoding="utf-8", safe="!$&'()*+,-./:;=?@_~") # re-encode all in utf-8 

367 if url.startswith(('.', '/')): 

368 url = str(urljoin(self.request.url, url)) 

369 self.response.headers['Location'] = url 

370 

371 except Exception as e: 

372 if conf.debug.trace_exceptions: 

373 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""") 

374 raise 

375 self.response.body = b"" 

376 if isinstance(e, errors.HTTPException): 

377 logging.info(f"[{e.status}] {e.name}: {e.descr}", exc_info=conf.debug.trace) 

378 self.response.status = f"{e.status} {e.name}" 

379 # Set machine-readable x-viur-error response header in case there is an exception description. 

380 if e.descr: 

381 self.response.headers["x-viur-error"] = e.descr.replace("\n", "") 

382 else: 

383 self.response.status = 500 

384 logging.error("ViUR has caught an unhandled exception!") 

385 logging.exception(e) 

386 

387 res = None 

388 if conf.error_handler: 

389 try: 

390 res = conf.error_handler(e) 

391 except Exception as newE: 

392 logging.error("viur.error_handler failed!") 

393 logging.exception(newE) 

394 res = None 

395 if not res: 

396 descr = "The server encountered an unexpected error and is unable to process your request." 

397 

398 if isinstance(e, errors.HTTPException): 

399 error_info = { 

400 "status": e.status, 

401 "reason": e.name, 

402 "title": str(translate(e.name)), 

403 "descr": e.descr, 

404 } 

405 else: 

406 error_info = { 

407 "status": 500, 

408 "reason": "Internal Server Error", 

409 "title": str(translate("Internal Server Error")), 

410 "descr": descr 

411 } 

412 

413 if conf.instance.is_dev_server: 

414 error_info["traceback"] = traceback.format_exc() 

415 

416 error_info["logo"] = conf.error_logo 

417 

418 if (len(self.path_list) > 0 and self.path_list[0] in ("vi", "json")) or \ 

419 current.request.get().response.headers["Content-Type"] == "application/json": 

420 current.request.get().response.headers["Content-Type"] = "application/json" 

421 res = json.dumps(error_info) 

422 else: # We render the error in html 

423 # Try to get the template from html/error/ 

424 if filename := conf.main_app.render.getTemplateFileName((f"{error_info['status']}", "error"), 

425 raise_exception=False): 

426 template = conf.main_app.render.getEnv().get_template(filename) 

427 try: 

428 uses_unsafe_inline = \ 

429 "unsafe-inline" in conf.security.content_security_policy["enforce"]["style-src"] 

430 except (KeyError, TypeError): # Not set 

431 uses_unsafe_inline = False 

432 if uses_unsafe_inline: 

433 logging.info("Using style-src:unsafe-inline, don't create a nonce") 

434 nonce = None 

435 else: 

436 nonce = utils.string.random(16) 

437 extendCsp({"style-src": [f"nonce-{nonce}"]}) 

438 res = template.render(error_info, nonce=nonce) 

439 else: 

440 res = (f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">' 

441 f'<title>{error_info["status"]} - {error_info["reason"]}</title>' 

442 f'</head><body><h1>{error_info["status"]} - {error_info["reason"]}</h1>') 

443 

444 self.response.write(res.encode("UTF-8")) 

445 

446 finally: 

447 current.session.get().save() 

448 if conf.instance.is_dev_server and conf.debug.dev_server_cloud_logging: 

449 # Emit the outer log only on dev_appserver (we'll use the existing request log when live) 

450 SEVERITY = "DEBUG" 

451 if self.maxLogLevel >= 50: 

452 SEVERITY = "CRITICAL" 

453 elif self.maxLogLevel >= 40: 

454 SEVERITY = "ERROR" 

455 elif self.maxLogLevel >= 30: 

456 SEVERITY = "WARNING" 

457 elif self.maxLogLevel >= 20: 

458 SEVERITY = "INFO" 

459 

460 TRACE = "projects/{}/traces/{}".format(loggingClient.project, self._traceID) 

461 

462 REQUEST = { 

463 'requestMethod': self.request.method, 

464 'requestUrl': self.request.url, 

465 'status': self.response.status_code, 

466 'userAgent': self.request.headers.get('USER-AGENT'), 

467 'responseSize': self.response.content_length, 

468 'latency': "%0.3fs" % (time.time() - self.startTime), 

469 'remoteIp': self.request.environ.get("HTTP_X_APPENGINE_USER_IP") 

470 } 

471 requestLogger.log_text( 

472 "", 

473 client=loggingClient, 

474 severity=SEVERITY, 

475 http_request=REQUEST, 

476 trace=TRACE, 

477 resource=requestLoggingRessource, 

478 operation={ 

479 "first": True, 

480 "last": True, 

481 "id": self._traceID 

482 } 

483 ) 

484 

485 if conf.instance.is_dev_server: 

486 self.is_deferred = True 

487 

488 while self.pendingTasks: 

489 task = self.pendingTasks.pop() 

490 logging.debug(f"Deferred task emulation, executing {task=}") 

491 try: 

492 task() 

493 except Exception: # noqa 

494 logging.exception(f"Deferred Task emulation {task} failed") 

495 

496 def _route(self, path: str) -> None: 

497 """ 

498 Does the actual work of sanitizing the parameter, determine which exposed-function to call 

499 (and with which parameters) 

500 """ 

501 

502 # Parse the URL 

503 if path := parse.urlparse(path).path: 

504 self.path = path 

505 self.path_list = tuple(unicodedata.normalize("NFC", parse.unquote(part)) 

506 for part in path.strip("/").split("/")) 

507 

508 # Prevent Hash-collision attacks 

509 if len(self.request.params) > conf.max_post_params_count: 

510 raise errors.BadRequest( 

511 f"Too many arguments supplied, exceeding maximum" 

512 f" of {conf.max_post_params_count} allowed arguments per request" 

513 ) 

514 

515 param_filter = conf.param_filter_function 

516 if param_filter and not callable(param_filter): 

517 raise ValueError(f"""{param_filter=} is not callable""") 

518 

519 for key, value in self.request.params.items(): 

520 try: 

521 key = unicodedata.normalize("NFC", key) 

522 value = unicodedata.normalize("NFC", value) 

523 except UnicodeError: 

524 # We received invalid unicode data (usually happens when 

525 # someone tries to exploit unicode normalisation bugs) 

526 raise errors.BadRequest() 

527 

528 if param_filter and param_filter(key, value): 

529 continue 

530 

531 if key == TEMPLATE_STYLE_KEY: 

532 self.template_style = value 

533 continue 

534 

535 if key in self.kwargs: 

536 if isinstance(self.kwargs[key], list): 

537 self.kwargs[key].append(value) 

538 else: # Convert that key to a list 

539 self.kwargs[key] = [self.kwargs[key], value] 

540 else: 

541 self.kwargs[key] = value 

542 

543 if "self" in self.kwargs or "return" in self.kwargs: # self or return is reserved for bound methods 

544 raise errors.BadRequest() 

545 

546 caller = conf.main_resolver 

547 idx = 0 # Count how may items from *args we'd have consumed (so the rest can go into *args of the called func 

548 path_found = True 

549 

550 for part in self.path_list: 

551 # TODO: Remove canAccess guards... solve differently. 

552 if "canAccess" in caller and not caller["canAccess"](): 

553 # We have a canAccess function guarding that object, 

554 # and it returns False... 

555 raise errors.Unauthorized() 

556 

557 idx += 1 

558 

559 if part not in caller: 

560 part = "index" 

561 

562 if caller := caller.get(part): 

563 if isinstance(caller, Method): 

564 if part == "index": 

565 idx -= 1 

566 

567 self.args = tuple(self.path_list[idx:]) 

568 break 

569 

570 elif part == "index": 

571 path_found = False 

572 break 

573 

574 else: 

575 path_found = False 

576 break 

577 

578 if not path_found: 

579 raise errors.NotFound( 

580 f"""The path {utils.string.escape("/".join(self.path_list[:idx]))} could not be found""") 

581 

582 if not isinstance(caller, Method): 

583 # try to find "index" function 

584 if (index := caller.get("index")) and isinstance(index, Method): 

585 caller = index 

586 else: 

587 raise errors.MethodNotAllowed() 

588 

589 # Check for internal exposed 

590 if caller.exposed is False and not self.internalRequest: 

591 raise errors.NotFound() 

592 

593 # Fill the Allow header of the response with the allowed HTTP methods 

594 if self.method == "options": 

595 self.response.headers["Allow"] = ", ".join(sorted(caller.methods)).upper() 

596 

597 # Register caller specific CORS headers 

598 self.cors_headers = [str(header).lower() for header in caller.cors_allow_headers or ()] 

599 

600 # Check for @force_ssl flag 

601 if not self.internalRequest \ 

602 and caller.ssl \ 

603 and not self.request.host_url.lower().startswith("https://") \ 

604 and not conf.instance.is_dev_server: 

605 raise errors.PreconditionFailed("You must use SSL to access this resource!") 

606 

607 # Check for @force_post flag 

608 if not self.isPostRequest and caller.methods == ("POST",): 

609 raise errors.MethodNotAllowed("You must use POST to access this resource!") 

610 

611 # Check if this request should bypass the caches 

612 if self.request.headers.get("X-Viur-Disable-Cache"): 

613 # No cache requested, check if the current user is allowed to do so 

614 if (user := current.user.get()) and "root" in user["access"]: 

615 logging.debug("Caching disabled by X-Viur-Disable-Cache header") 

616 self.disableCache = True 

617 

618 # Destill context as self.context, if available 

619 if context := {k: v for k, v in self.kwargs.items() if k.startswith("@")}: 

620 # Remove context parameters from kwargs 

621 kwargs = {k: v for k, v in self.kwargs.items() if k not in context} 

622 # Remove leading "@" from context parameters 

623 self.context |= {k[1:]: v for k, v in context.items() if len(k) > 1} 

624 else: 

625 kwargs = self.kwargs 

626 

627 if ((self.internalRequest and conf.debug.trace_internal_call_routing) 

628 or conf.debug.trace_external_call_routing): 

629 logging.debug( 

630 f"Calling {caller._func!r} with args={self.args!r}, {kwargs=} within context={self.context!r}" 

631 ) 

632 

633 if self.method == "options": 

634 # OPTIONS request doesn't have a body 

635 del self.response.app_iter 

636 del self.response.content_type 

637 self.response.status = "204 No Content" 

638 return 

639 

640 # Now call the routed method! 

641 res = caller(*self.args, **kwargs) 

642 

643 if self.method == "options": 

644 # OPTIONS request doesn't have a body 

645 del self.response.app_iter 

646 del self.response.content_type 

647 self.response.status = "204 No Content" 

648 return 

649 

650 if not isinstance(res, bytes): # Convert the result to bytes if it is not already! 

651 res = str(res).encode("UTF-8") 

652 self.response.write(res) 

653 

654 def _cors(self) -> None: 

655 """ 

656 Set CORS headers to the HTTP response. 

657 

658 .. seealso:: 

659 

660 Option :attr:`core.config.Security.cors_origins`, etc. 

661 for cors settings. 

662 

663 https://fetch.spec.whatwg.org/#http-cors-protocol 

664 

665 https://enable-cors.org/server.html 

666 

667 https://www.html5rocks.com/static/images/cors_server_flowchart.png 

668 """ 

669 

670 def test_candidates(value: str, *candidates: str | re.Pattern) -> bool: 

671 """Test if the value matches the pattern of any candidate""" 

672 for candidate in candidates: 

673 if isinstance(candidate, re.Pattern): 

674 if candidate.match(value): 

675 return True 

676 elif isinstance(candidate, str): 

677 if candidate.lower() == str(value).lower(): 

678 return True 

679 else: 

680 raise TypeError( 

681 f"Invalid setting {candidate}. " 

682 f"Expected a string or a compiled regex." 

683 ) 

684 return False 

685 

686 origin = current.request.get().request.headers.get("Origin") 

687 if not origin: 

688 return 

689 

690 # Origin is set --> It's a CORS request 

691 

692 any_origin_allowed = ( 

693 conf.security.cors_origins == "*" 

694 or any(_origin == "*" for _origin in conf.security.cors_origins) 

695 or any(_origin.pattern == r".*" 

696 for _origin in conf.security.cors_origins 

697 if isinstance(_origin, re.Pattern)) 

698 ) 

699 

700 if any_origin_allowed and conf.security.cors_origins_use_wildcard: 

701 if conf.security.cors_allow_credentials: 

702 raise RuntimeError( 

703 "Invalid CORS config: " 

704 "If credentials mode is \"include\", then `Access-Control-Allow-Origin` cannot be `*`. " 

705 "See https://fetch.spec.whatwg.org/#cors-protocol-and-credentials" 

706 ) 

707 self.response.headers["Access-Control-Allow-Origin"] = "*" 

708 

709 elif test_candidates(origin, *conf.security.cors_origins): 

710 self.response.headers["Access-Control-Allow-Origin"] = origin 

711 

712 else: 

713 logging.warning(f"{origin=} not valid (must be one of {conf.security.cors_origins=})") 

714 return 

715 

716 if conf.security.cors_allow_credentials: 

717 self.response.headers["Access-Control-Allow-Credentials"] = "true" 

718 

719 if self.method == "options": 

720 method = (self.request.headers.get("Access-Control-Request-Method") or "").lower() 

721 

722 if method in conf.security.cors_methods: 

723 # It's a CORS-preflight request 

724 # - MUST include Access-Control-Request-Method 

725 # - CAN include Access-Control-Request-Headers 

726 

727 # The response can be cached 

728 if conf.security.cors_max_age is not None: 

729 assert isinstance(conf.security.cors_max_age, datetime.timedelta) 

730 self.response.headers["Access-Control-Max-Age"] = \ 

731 str(int(conf.security.cors_max_age.total_seconds())) 

732 

733 # Allowed methods 

734 self.response.headers["Access-Control-Allow-Methods"] = ", ".join( 

735 sorted(conf.security.cors_methods)).upper() 

736 

737 # Allowed headers 

738 request_headers = self.request.headers.get("Access-Control-Request-Headers") 

739 request_headers = [h.strip().lower() for h in request_headers.split(",")] 

740 if conf.security.cors_allow_headers == "*": 

741 # Every header is allowed 

742 allow_headers = request_headers[:] 

743 else: 

744 # There are generally headers allowed and/or from the caller 

745 allow_headers = [ 

746 header 

747 for header in request_headers 

748 if test_candidates( 

749 header, 

750 *(self.cors_headers or ()), # caller specific 

751 *(conf.security.cors_allow_headers or ()) # generally global 

752 ) 

753 ] 

754 if allow_headers: 

755 self.response.headers["Access-Control-Allow-Headers"] = ", ".join(sorted(allow_headers)) 

756 

757 else: 

758 logging.warning( 

759 f"Access-Control-Request-Method: {method} is NOT a valid method of {conf.security.cors_methods=}. " 

760 f"Don't append CORS-preflight request headers" 

761 ) 

762 

763 def saveSession(self) -> None: 

764 current.session.get().save() 

765 

766 

767from .i18n import translate # noqa: E402