Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/request.py: 6%

442 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1""" 

2 This module implements the WSGI (Web Server Gateway Interface) layer for ViUR. This is the main entry 

3 point for incomming http requests. The main class is the :class:BrowserHandler. Each request will get it's 

4 own instance of that class which then holds the reference to the request and response object. 

5 Additionally, this module defines the RequestValidator interface which provides a very early hook into the 

6 request processing (useful for global ratelimiting, DDoS prevention or access control). 

7""" 

8import datetime 

9import fnmatch 

10import json 

11import logging 

12import os 

13import re 

14import time 

15import traceback 

16import typing as t 

17import unicodedata 

18from abc import ABC, abstractmethod 

19from urllib import parse 

20from urllib.parse import quote, unquote, urljoin, urlparse 

21 

22import webob 

23 

24from viur.core import current, db, errors, session, utils 

25from viur.core.config import conf 

26from viur.core.logging import client as loggingClient, requestLogger, requestLoggingRessource 

27from viur.core.module import Method 

28from viur.core.securityheaders import extendCsp 

29from viur.core.tasks import _appengineServiceIPs 

30 

31TEMPLATE_STYLE_KEY = "style" 

32 

33 

34class RequestValidator(ABC): 

35 """ 

36 RequestValidators can be used to validate a request very early on. If the validate method returns a tuple, 

37 the request is aborted. Can be used to block requests from bots. 

38 

39 To register or remove a validator, access it in main.py through 

40 :attr: viur.core.request.Router.requestValidators 

41 """ 

42 # Internal name to trace which validator aborted the request 

43 name = "RequestValidator" 

44 

45 @staticmethod 

46 @abstractmethod 

47 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]: 

48 """ 

49 The function that checks the current request. If the request is valid, simply return None. 

50 If the request should be blocked, it must return a tuple of 

51 - The HTTP status code (as int) 

52 - The Description of that status code (eg "Forbidden") 

53 - The Response Body (can be a simple string or an HTML-Page) 

54 :param request: The Request instance to check 

55 :return: None on success, an Error-Tuple otherwise 

56 """ 

57 raise NotImplementedError() 

58 

59 

60class FetchMetaDataValidator(RequestValidator): 

61 """ 

62 This validator examines the headers "Sec-Fetch-Site", "sec-fetch-mode" and "sec-fetch-dest" as 

63 recommended by https://web.dev/fetch-metadata/ 

64 """ 

65 name = "FetchMetaDataValidator" 

66 

67 @staticmethod 

68 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]: 

69 """ 

70 This validator examines the headers "sec-fetch-site", 

71 "sec-fetch-mode" and "sec-fetch-dest" as recommended 

72 by https://web.dev/fetch-metadata/ 

73 """ 

74 headers = request.request.headers 

75 

76 match headers.get("sec-fetch-site"): 

77 case None | "same-origin" | "none": 

78 # A Request from our site, or browser didn't send "sec-fetch-site" 

79 return None 

80 case "same-site": 

81 # We are accepting a request with same-site only in local dev mode 

82 if conf.instance.is_dev_server: 

83 return None 

84 case _: 

85 # Incoming navigation GET request 

86 if ( 

87 not request.isPostRequest 

88 and headers.get("sec-fetch-mode") == "navigate" 

89 and headers.get('sec-fetch-dest') not in ("object", "embed") 

90 ): 

91 return None 

92 

93 return 403, "Forbidden", "Request rejected due to fetch metadata" 

94 

95 

96class Router: 

97 """ 

98 This class accepts the requests, collect its parameters and routes the request 

99 to its destination function. 

100 The basic control flow is 

101 - Setting up internal variables 

102 - Running the Request validators 

103 - Emitting the headers (especially the security related ones) 

104 - Run the TLS check (ensure it's a secure connection or check if the URL is whitelisted) 

105 - Load or initialize a new session 

106 - Set up i18n (choosing the language etc) 

107 - Run the request preprocessor (if any) 

108 - Normalize & sanity check the parameters 

109 - Resolve the exposed function and call it 

110 - Save the session / tear down the request 

111 - Return the response generated 

112 

113 

114 :warning: Don't instantiate! Don't subclass! DON'T TOUCH! ;) 

115 """ 

116 

117 # List of requestValidators used to preflight-check an request before it's being dispatched within ViUR 

118 requestValidators = [FetchMetaDataValidator] 

119 

120 def __init__(self, environ: dict): 

121 super().__init__() 

122 self.startTime = time.time() 

123 

124 self.request = webob.Request(environ) 

125 self.response = webob.Response() 

126 

127 self.maxLogLevel = logging.DEBUG 

128 self._traceID = \ 

129 self.request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or utils.string.random() 

130 self.is_deferred = False 

131 self.path = "" 

132 self.path_list = () 

133 

134 self.skey_checked = False # indicates whether @skey-decorator-check has already performed within a request 

135 self.internalRequest = False 

136 self.disableCache = False # Shall this request bypass the caches? 

137 self.pendingTasks = [] 

138 self.args = () 

139 self.kwargs = {} 

140 self.context = {} 

141 self.template_style: str | None = None 

142 self.cors_headers = () 

143 

144 # Check if it's a HTTP-Method we support 

145 self.method = self.request.method.lower() 

146 self.isPostRequest = self.method == "post" 

147 self.isSSLConnection = self.request.host_url.lower().startswith("https://") # We have an encrypted channel 

148 

149 db.current_db_access_log.set(set()) 

150 

151 # Set context variables 

152 current.language.set(conf.i18n.default_language) 

153 current.request.set(self) 

154 current.session.set(session.Session()) 

155 current.request_data.set({}) 

156 

157 # Process actual request 

158 self._process() 

159 

160 self._cors() 

161 

162 # Unset context variables 

163 current.language.set(None) 

164 current.request_data.set(None) 

165 current.session.set(None) 

166 current.request.set(None) 

167 current.user.set(None) 

168 

169 @property 

170 def isDevServer(self) -> bool: 

171 import warnings 

172 msg = "Use of `isDevServer` is deprecated; Use `conf.instance.is_dev_server` instead!" 

173 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

174 logging.warning(msg) 

175 return conf.instance.is_dev_server 

176 

177 def _select_language(self, path: str) -> str: 

178 """ 

179 Tries to select the best language for the current request. Depending on the value of 

180 conf.i18n.language_method, we'll either try to load it from the session, determine it by the domain 

181 or extract it from the URL. 

182 """ 

183 

184 def get_language_from_header() -> str | None: 

185 if not (accept_language := self.request.headers.get("accept-language")): 

186 return None 

187 languages = accept_language.split(",") 

188 locale_q_pairs = [] 

189 

190 for language in languages: 

191 if language.split(";")[0] == language: 

192 # no q => q = 1 

193 locale_q_pairs.append((language.strip(), "1")) 

194 else: 

195 try: 

196 locale = language.split(";")[0].strip() 

197 q = language.split(";")[1].split("=")[1] 

198 locale_q_pairs.append((locale, q)) 

199 except IndexError: 

200 continue # skip language 

201 locale_q_pairs.sort(key=lambda pair: pair[1], reverse=True) # sort by Quality values 

202 for locale_q_pair in locale_q_pairs: 

203 if "-" in locale_q_pair[0]: # Check for de-DE 

204 lang = locale_q_pair[0].split("-")[0] 

205 else: 

206 lang = locale_q_pair[0] 

207 if lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()): 

208 return lang 

209 if lang == "*": # fallback 

210 return conf.i18n.available_languages[0] 

211 return None 

212 

213 if not conf.i18n.available_languages: 

214 # This project doesn't use the multi-language feature, nothing to do here 

215 return path 

216 if conf.i18n.language_method == "session": 

217 current_session = current.session.get() 

218 lang = conf.i18n.default_language 

219 # We save the language in the session, if it exists, and try to load it from there 

220 if "lang" in current_session: 

221 current.language.set(current_session["lang"]) 

222 return path 

223 

224 if header_lang := get_language_from_header(): 

225 lang = header_lang 

226 current.language.set(lang) 

227 

228 elif header_lang := self.request.headers.get("X-Appengine-Country"): 

229 header_lang = str(header_lang).lower() 

230 if header_lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()): 

231 lang = header_lang 

232 

233 if current_session.loaded: 

234 current_session["lang"] = lang 

235 current.language.set(lang) 

236 

237 elif conf.i18n.language_method == "domain": 

238 host = self.request.host_url.lower() 

239 host = host[host.find("://") + 3:].strip(" /") # strip http(s):// 

240 if host.startswith("www."): 

241 host = host[4:] 

242 if lang := conf.i18n.domain_language_mapping.get(host): 

243 current.language.set(lang) 

244 # We have no language configured for this domain, try to read it from the HTTP Header 

245 elif lang := get_language_from_header(): 

246 current.language.set(lang) 

247 

248 elif conf.i18n.language_method == "url": 

249 tmppath = urlparse(path).path 

250 tmppath = [unquote(x) for x in tmppath.lower().strip("/").split("/")] 

251 if ( 

252 len(tmppath) > 0 

253 and tmppath[0] in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()) 

254 ): 

255 current.language.set(tmppath[0]) 

256 return path[len(tmppath[0]) + 1:] # Return the path stripped by its language segment 

257 else: # This URL doesnt contain an language prefix, try to read it from session 

258 if header_lang := get_language_from_header(): 

259 current.language.set(header_lang) 

260 elif header_lang := self.request.headers.get("X-Appengine-Country"): 

261 lang = str(header_lang).lower() 

262 if lang in conf.i18n.available_languages or lang in conf.i18n.language_alias_map: 

263 current.language.set(lang) 

264 elif conf.i18n.language_method == "header": 

265 if lang := get_language_from_header(): 

266 current.language.set(lang) 

267 

268 return path 

269 

270 def _process(self): 

271 if self.method not in ("get", "post", "head", "options"): 

272 logging.error(f"{self.method=} not supported") 

273 return 

274 

275 if self.request.headers.get("X-AppEngine-TaskName", None) is not None: # Check if we run in the appengine 

276 if self.request.environ.get("HTTP_X_APPENGINE_USER_IP") in _appengineServiceIPs: 

277 self.is_deferred = True 

278 elif os.getenv("TASKS_EMULATOR") is not None: 

279 self.is_deferred = True 

280 

281 # Check if we should process or abort the request 

282 for validator, reqValidatorResult in [(x, x.validate(self)) for x in self.requestValidators]: 

283 if reqValidatorResult is not None: 

284 logging.warning(f"Request rejected by validator {validator.name}") 

285 statusCode, statusStr, statusDescr = reqValidatorResult 

286 self.response.status = f"{statusCode} {statusStr}" 

287 self.response.write(statusDescr) 

288 return 

289 

290 path = self.request.path 

291 

292 # Add CSP headers early (if any) 

293 if conf.security.content_security_policy and conf.security.content_security_policy["_headerCache"]: 

294 for k, v in conf.security.content_security_policy["_headerCache"].items(): 

295 self.response.headers[k] = v 

296 if self.isSSLConnection: # Check for HTST and PKP headers only if we have a secure channel. 

297 if conf.security.strict_transport_security: 

298 self.response.headers["Strict-Transport-Security"] = conf.security.strict_transport_security 

299 # Check for X-Security-Headers we shall emit 

300 if conf.security.x_content_type_options: 

301 self.response.headers["X-Content-Type-Options"] = "nosniff" 

302 if conf.security.x_xss_protection is not None: 

303 if conf.security.x_xss_protection: 

304 self.response.headers["X-XSS-Protection"] = "1; mode=block" 

305 elif conf.security.x_xss_protection is False: 

306 self.response.headers["X-XSS-Protection"] = "0" 

307 if conf.security.x_frame_options is not None and isinstance(conf.security.x_frame_options, tuple): 

308 mode, uri = conf.security.x_frame_options 

309 if mode in ["deny", "sameorigin"]: 

310 self.response.headers["X-Frame-Options"] = mode 

311 elif mode == "allow-from": 

312 self.response.headers["X-Frame-Options"] = f"allow-from {uri}" 

313 if conf.security.x_permitted_cross_domain_policies is not None: 

314 self.response.headers["X-Permitted-Cross-Domain-Policies"] = conf.security.x_permitted_cross_domain_policies 

315 if conf.security.referrer_policy: 

316 self.response.headers["Referrer-Policy"] = conf.security.referrer_policy 

317 if conf.security.permissions_policy.get("_headerCache"): 

318 self.response.headers["Permissions-Policy"] = conf.security.permissions_policy["_headerCache"] 

319 if conf.security.enable_coep: 

320 self.response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" 

321 if conf.security.enable_coop: 

322 self.response.headers["Cross-Origin-Opener-Policy"] = conf.security.enable_coop 

323 if conf.security.enable_corp: 

324 self.response.headers["Cross-Origin-Resource-Policy"] = conf.security.enable_corp 

325 

326 # Ensure that TLS is used if required 

327 if conf.security.force_ssl and not self.isSSLConnection and not conf.instance.is_dev_server: 

328 isWhitelisted = False 

329 reqPath = self.request.path 

330 for testUrl in conf.security.no_ssl_check_urls: 

331 if testUrl.endswith("*"): 

332 if reqPath.startswith(testUrl[:-1]): 

333 isWhitelisted = True 

334 break 

335 else: 

336 if testUrl == reqPath: 

337 isWhitelisted = True 

338 break 

339 if not isWhitelisted: # Some URLs need to be whitelisted (as f.e. the Tasks-Queue doesn't call using https) 

340 # Redirect the user to the startpage (using ssl this time) 

341 host = self.request.host_url.lower() 

342 host = host[host.find("://") + 3:].strip(" /") # strip http(s):// 

343 self.response.status = "302 Found" 

344 self.response.headers['Location'] = f"https://{host}/" 

345 return 

346 if path.startswith("/_ah/warmup"): 

347 self.response.write("okay") 

348 return 

349 

350 try: 

351 current.session.get().load() 

352 

353 # Load current user into context variable if user module is there. 

354 if user_mod := getattr(conf.main_app.vi, "user", None): 

355 current.user.set(user_mod.getCurrentUser()) 

356 

357 path = self._select_language(path)[1:] 

358 

359 # Check for closed system 

360 if conf.security.closed_system and self.method != "options": 

361 if not current.user.get(): 

362 if not any(fnmatch.fnmatch(path, pat) for pat in conf.security.closed_system_allowed_paths): 

363 raise errors.Unauthorized() 

364 

365 if conf.request_preprocessor: 

366 path = conf.request_preprocessor(path) 

367 

368 self._route(path) 

369 

370 except errors.Redirect as e: 

371 if conf.debug.trace_exceptions: 

372 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""") 

373 raise 

374 self.response.status = f"{e.status} {e.name}" 

375 url = e.url 

376 url = unquote(url) # decode first 

377 # safe = https://url.spec.whatwg.org/#url-path-segment-string 

378 url = quote(url, encoding="utf-8", safe="!$&'()*+,-./:;=?@_~#") # re-encode all in utf-8 

379 if url.startswith(('.', '/')): 

380 url = str(urljoin(self.request.url, url)) 

381 self.response.headers['Location'] = url 

382 

383 except Exception as e: 

384 if conf.debug.trace_exceptions: 

385 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""") 

386 raise 

387 self.response.body = b"" 

388 if isinstance(e, errors.HTTPException): 

389 logging.info(f"[{e.status}] {e.name}: {e.descr}", exc_info=conf.debug.trace) 

390 self.response.status = f"{e.status} {e.name}" 

391 # Set machine-readable x-viur-error response header in case there is an exception description. 

392 if e.descr: 

393 self.response.headers["x-viur-error"] = e.descr.replace("\n", "") 

394 else: 

395 self.response.status = 500 

396 logging.error("ViUR has caught an unhandled exception!") 

397 logging.exception(e) 

398 

399 res = None 

400 if conf.error_handler: 

401 try: 

402 res = conf.error_handler(e) 

403 except Exception as newE: 

404 logging.error("viur.error_handler failed!") 

405 logging.exception(newE) 

406 res = None 

407 if not res: 

408 descr = "The server encountered an unexpected error and is unable to process your request." 

409 

410 if isinstance(e, errors.HTTPException): 

411 error_info = { 

412 "status": e.status, 

413 "reason": e.name, 

414 "title": str(translate(e.name)), 

415 "descr": e.descr, 

416 } 

417 else: 

418 error_info = { 

419 "status": 500, 

420 "reason": "Internal Server Error", 

421 "title": str(translate("Internal Server Error")), 

422 "descr": descr 

423 } 

424 

425 if conf.instance.is_dev_server: 

426 error_info["traceback"] = traceback.format_exc() 

427 

428 error_info["logo"] = conf.error_logo 

429 

430 if (len(self.path_list) > 0 and self.path_list[0] in ("vi", "json")) or \ 

431 current.request.get().response.headers["Content-Type"] == "application/json": 

432 current.request.get().response.headers["Content-Type"] = "application/json" 

433 res = json.dumps(error_info) 

434 else: # We render the error in html 

435 # Try to get the template from html/error/ 

436 if filename := conf.main_app.render.getTemplateFileName((f"{error_info['status']}", "error"), 

437 raise_exception=False): 

438 template = conf.main_app.render.getEnv().get_template(filename) 

439 try: 

440 uses_unsafe_inline = \ 

441 "unsafe-inline" in conf.security.content_security_policy["enforce"]["style-src"] 

442 except (KeyError, TypeError): # Not set 

443 uses_unsafe_inline = False 

444 if uses_unsafe_inline: 

445 logging.info("Using style-src:unsafe-inline, don't create a nonce") 

446 nonce = None 

447 else: 

448 nonce = utils.string.random(16) 

449 extendCsp({"style-src": [f"nonce-{nonce}"]}) 

450 res = template.render(error_info, nonce=nonce) 

451 else: 

452 res = (f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">' 

453 f'<title>{error_info["status"]} - {error_info["reason"]}</title>' 

454 f'</head><body><h1>{error_info["status"]} - {error_info["reason"]}</h1>') 

455 

456 self.response.write(res.encode("UTF-8")) 

457 

458 finally: 

459 current.session.get().save() 

460 if conf.instance.is_dev_server and conf.debug.dev_server_cloud_logging: 

461 # Emit the outer log only on dev_appserver (we'll use the existing request log when live) 

462 SEVERITY = "DEBUG" 

463 if self.maxLogLevel >= 50: 

464 SEVERITY = "CRITICAL" 

465 elif self.maxLogLevel >= 40: 

466 SEVERITY = "ERROR" 

467 elif self.maxLogLevel >= 30: 

468 SEVERITY = "WARNING" 

469 elif self.maxLogLevel >= 20: 

470 SEVERITY = "INFO" 

471 

472 TRACE = "projects/{}/traces/{}".format(loggingClient.project, self._traceID) 

473 

474 REQUEST = { 

475 'requestMethod': self.request.method, 

476 'requestUrl': self.request.url, 

477 'status': self.response.status_code, 

478 'userAgent': self.request.headers.get('USER-AGENT'), 

479 'responseSize': self.response.content_length, 

480 'latency': "%0.3fs" % (time.time() - self.startTime), 

481 'remoteIp': self.request.environ.get("HTTP_X_APPENGINE_USER_IP") 

482 } 

483 requestLogger.log_text( 

484 "", 

485 client=loggingClient, 

486 severity=SEVERITY, 

487 http_request=REQUEST, 

488 trace=TRACE, 

489 resource=requestLoggingRessource, 

490 operation={ 

491 "first": True, 

492 "last": True, 

493 "id": self._traceID 

494 } 

495 ) 

496 

497 if conf.instance.is_dev_server: 

498 self.is_deferred = True 

499 

500 while self.pendingTasks: 

501 task = self.pendingTasks.pop() 

502 logging.debug(f"Deferred task emulation, executing {task=}") 

503 try: 

504 task() 

505 except Exception: # noqa 

506 logging.exception(f"Deferred Task emulation {task} failed") 

507 

508 def _route(self, path: str) -> None: 

509 """ 

510 Does the actual work of sanitizing the parameter, determine which exposed-function to call 

511 (and with which parameters) 

512 """ 

513 

514 # Parse the URL 

515 if path := parse.urlparse(path).path: 

516 self.path = path 

517 self.path_list = tuple(unicodedata.normalize("NFC", parse.unquote(part)) 

518 for part in path.strip("/").split("/")) 

519 

520 # Prevent Hash-collision attacks 

521 if len(self.request.params) > conf.max_post_params_count: 

522 raise errors.BadRequest( 

523 f"Too many arguments supplied, exceeding maximum" 

524 f" of {conf.max_post_params_count} allowed arguments per request" 

525 ) 

526 

527 param_filter = conf.param_filter_function 

528 if param_filter and not callable(param_filter): 

529 raise ValueError(f"""{param_filter=} is not callable""") 

530 

531 for key, value in self.request.params.items(): 

532 try: 

533 key = unicodedata.normalize("NFC", key) 

534 value = unicodedata.normalize("NFC", value) 

535 except UnicodeError: 

536 # We received invalid unicode data (usually happens when 

537 # someone tries to exploit unicode normalisation bugs) 

538 raise errors.BadRequest() 

539 

540 if param_filter and param_filter(key, value): 

541 continue 

542 

543 if key == TEMPLATE_STYLE_KEY: 

544 self.template_style = value 

545 continue 

546 

547 if key in self.kwargs: 

548 if isinstance(self.kwargs[key], list): 

549 self.kwargs[key].append(value) 

550 else: # Convert that key to a list 

551 self.kwargs[key] = [self.kwargs[key], value] 

552 else: 

553 self.kwargs[key] = value 

554 

555 if "self" in self.kwargs or "return" in self.kwargs: # self or return is reserved for bound methods 

556 raise errors.BadRequest() 

557 

558 caller = conf.main_resolver 

559 idx = 0 # Count how may items from *args we'd have consumed (so the rest can go into *args of the called func 

560 path_found = True 

561 

562 for part in self.path_list: 

563 # TODO: Remove canAccess guards... solve differently. 

564 if "canAccess" in caller and not caller["canAccess"](): 

565 # We have a canAccess function guarding that object, 

566 # and it returns False... 

567 raise errors.Unauthorized() 

568 

569 idx += 1 

570 

571 if part not in caller: 

572 part = "index" 

573 

574 if caller := caller.get(part): 

575 if isinstance(caller, Method): 

576 if part == "index": 

577 idx -= 1 

578 

579 self.args = tuple(self.path_list[idx:]) 

580 break 

581 

582 elif part == "index": 

583 path_found = False 

584 break 

585 

586 else: 

587 path_found = False 

588 break 

589 

590 if not path_found: 

591 raise errors.NotFound( 

592 f"""The path {utils.string.escape("/".join(self.path_list[:idx]))} could not be found""") 

593 

594 if not isinstance(caller, Method): 

595 # try to find "index" function 

596 if (index := caller.get("index")) and isinstance(index, Method): 

597 caller = index 

598 else: 

599 raise errors.MethodNotAllowed() 

600 

601 # Check for internal exposed 

602 if caller.exposed is False and not self.internalRequest: 

603 raise errors.NotFound() 

604 

605 # Fill the Allow header of the response with the allowed HTTP methods 

606 if self.method == "options": 

607 self.response.headers["Allow"] = ", ".join(sorted(caller.methods)).upper() 

608 

609 # Register caller specific CORS headers 

610 self.cors_headers = [str(header).lower() for header in caller.cors_allow_headers or ()] 

611 

612 # Check for @force_ssl flag 

613 if not self.internalRequest \ 

614 and caller.ssl \ 

615 and not self.request.host_url.lower().startswith("https://") \ 

616 and not conf.instance.is_dev_server: 

617 raise errors.PreconditionFailed("You must use SSL to access this resource!") 

618 

619 # Check for @force_post flag 

620 if not self.isPostRequest and caller.methods == ("POST",): 

621 raise errors.MethodNotAllowed("You must use POST to access this resource!") 

622 

623 # Check if this request should bypass the caches 

624 if self.request.headers.get("X-Viur-Disable-Cache"): 

625 # No cache requested, check if the current user is allowed to do so 

626 if (user := current.user.get()) and "root" in user["access"]: 

627 logging.debug("Caching disabled by X-Viur-Disable-Cache header") 

628 self.disableCache = True 

629 

630 # Destill context as self.context, if available 

631 if context := {k: v for k, v in self.kwargs.items() if k.startswith("@")}: 

632 # Remove context parameters from kwargs 

633 kwargs = {k: v for k, v in self.kwargs.items() if k not in context} 

634 # Remove leading "@" from context parameters 

635 self.context |= {k[1:]: v for k, v in context.items() if len(k) > 1} 

636 else: 

637 kwargs = self.kwargs 

638 

639 if ((self.internalRequest and conf.debug.trace_internal_call_routing) 

640 or conf.debug.trace_external_call_routing): 

641 logging.debug( 

642 f"Calling {caller._func!r} with args={self.args!r}, {kwargs=} within context={self.context!r}" 

643 ) 

644 

645 if self.method == "options": 

646 # OPTIONS request doesn't have a body 

647 del self.response.app_iter 

648 del self.response.content_type 

649 self.response.status = "204 No Content" 

650 return 

651 

652 # Now call the routed method! 

653 res = caller(*self.args, **kwargs) 

654 

655 if self.method == "options": 

656 # OPTIONS request doesn't have a body 

657 del self.response.app_iter 

658 del self.response.content_type 

659 self.response.status = "204 No Content" 

660 return 

661 

662 if not isinstance(res, bytes): # Convert the result to bytes if it is not already! 

663 res = str(res).encode("UTF-8") 

664 self.response.write(res) 

665 

666 def _cors(self) -> None: 

667 """ 

668 Set CORS headers to the HTTP response. 

669 

670 .. seealso:: 

671 

672 Option :attr:`core.config.Security.cors_origins`, etc. 

673 for cors settings. 

674 

675 https://fetch.spec.whatwg.org/#http-cors-protocol 

676 

677 https://enable-cors.org/server.html 

678 

679 https://www.html5rocks.com/static/images/cors_server_flowchart.png 

680 """ 

681 

682 def test_candidates(value: str, *candidates: str | re.Pattern) -> bool: 

683 """Test if the value matches the pattern of any candidate""" 

684 for candidate in candidates: 

685 if isinstance(candidate, re.Pattern): 

686 if candidate.match(value): 

687 return True 

688 elif isinstance(candidate, str): 

689 if candidate.lower() == str(value).lower(): 

690 return True 

691 else: 

692 raise TypeError( 

693 f"Invalid setting {candidate}. " 

694 f"Expected a string or a compiled regex." 

695 ) 

696 return False 

697 

698 origin = current.request.get().request.headers.get("Origin") 

699 if not origin: 

700 return 

701 

702 # Origin is set --> It's a CORS request 

703 

704 any_origin_allowed = ( 

705 conf.security.cors_origins == "*" 

706 or any(_origin == "*" for _origin in conf.security.cors_origins) 

707 or any(_origin.pattern == r".*" 

708 for _origin in conf.security.cors_origins 

709 if isinstance(_origin, re.Pattern)) 

710 ) 

711 

712 if any_origin_allowed and conf.security.cors_origins_use_wildcard: 

713 if conf.security.cors_allow_credentials: 

714 raise RuntimeError( 

715 "Invalid CORS config: " 

716 "If credentials mode is \"include\", then `Access-Control-Allow-Origin` cannot be `*`. " 

717 "See https://fetch.spec.whatwg.org/#cors-protocol-and-credentials" 

718 ) 

719 self.response.headers["Access-Control-Allow-Origin"] = "*" 

720 

721 elif test_candidates(origin, *conf.security.cors_origins): 

722 self.response.headers["Access-Control-Allow-Origin"] = origin 

723 

724 else: 

725 logging.warning(f"{origin=} not valid (must be one of {conf.security.cors_origins=})") 

726 return 

727 

728 if conf.security.cors_allow_credentials: 

729 self.response.headers["Access-Control-Allow-Credentials"] = "true" 

730 

731 if self.method == "options": 

732 method = (self.request.headers.get("Access-Control-Request-Method") or "").lower() 

733 

734 if method in conf.security.cors_methods: 

735 # It's a CORS-preflight request 

736 # - MUST include Access-Control-Request-Method 

737 # - CAN include Access-Control-Request-Headers 

738 

739 # The response can be cached 

740 if conf.security.cors_max_age is not None: 

741 assert isinstance(conf.security.cors_max_age, datetime.timedelta) 

742 self.response.headers["Access-Control-Max-Age"] = \ 

743 str(int(conf.security.cors_max_age.total_seconds())) 

744 

745 # Allowed methods 

746 self.response.headers["Access-Control-Allow-Methods"] = ", ".join( 

747 sorted(conf.security.cors_methods)).upper() 

748 

749 # Allowed headers 

750 request_headers = self.request.headers.get("Access-Control-Request-Headers") 

751 request_headers = [h.strip().lower() for h in request_headers.split(",")] 

752 if conf.security.cors_allow_headers == "*": 

753 # Every header is allowed 

754 allow_headers = request_headers[:] 

755 else: 

756 # There are generally headers allowed and/or from the caller 

757 allow_headers = [ 

758 header 

759 for header in request_headers 

760 if test_candidates( 

761 header, 

762 *(self.cors_headers or ()), # caller specific 

763 *(conf.security.cors_allow_headers or ()) # generally global 

764 ) 

765 ] 

766 if allow_headers: 

767 self.response.headers["Access-Control-Allow-Headers"] = ", ".join(sorted(allow_headers)) 

768 

769 else: 

770 logging.warning( 

771 f"Access-Control-Request-Method: {method} is NOT a valid method of {conf.security.cors_methods=}. " 

772 f"Don't append CORS-preflight request headers" 

773 ) 

774 

775 def saveSession(self) -> None: 

776 current.session.get().save() 

777 

778 

779from .i18n import translate # noqa: E402