Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/request.py: 6%

436 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1""" 

2 This module implements the WSGI (Web Server Gateway Interface) layer for ViUR. This is the main entry 

3 point for incomming http requests. The main class is the :class:BrowserHandler. Each request will get it's 

4 own instance of that class which then holds the reference to the request and response object. 

5 Additionally, this module defines the RequestValidator interface which provides a very early hook into the 

6 request processing (useful for global ratelimiting, DDoS prevention or access control). 

7""" 

8import datetime 

9import fnmatch 

10import json 

11import logging 

12import os 

13import re 

14import time 

15import traceback 

16import typing as t 

17from abc import ABC, abstractmethod 

18from urllib import parse 

19from urllib.parse import unquote, urljoin, urlparse 

20 

21import unicodedata 

22import webob 

23 

24from viur.core import current, db, errors, session, utils 

25from viur.core.config import conf 

26from viur.core.logging import client as loggingClient, requestLogger, requestLoggingRessource 

27from viur.core.module import Method 

28from viur.core.securityheaders import extendCsp 

29from viur.core.tasks import _appengineServiceIPs 

30 

31TEMPLATE_STYLE_KEY = "style" 

32 

33 

34class RequestValidator(ABC): 

35 """ 

36 RequestValidators can be used to validate a request very early on. If the validate method returns a tuple, 

37 the request is aborted. Can be used to block requests from bots. 

38 

39 To register or remove a validator, access it in main.py through 

40 :attr: viur.core.request.Router.requestValidators 

41 """ 

42 # Internal name to trace which validator aborted the request 

43 name = "RequestValidator" 

44 

45 @staticmethod 

46 @abstractmethod 

47 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]: 

48 """ 

49 The function that checks the current request. If the request is valid, simply return None. 

50 If the request should be blocked, it must return a tuple of 

51 - The HTTP status code (as int) 

52 - The Description of that status code (eg "Forbidden") 

53 - The Response Body (can be a simple string or an HTML-Page) 

54 :param request: The Request instance to check 

55 :return: None on success, an Error-Tuple otherwise 

56 """ 

57 raise NotImplementedError() 

58 

59 

60class FetchMetaDataValidator(RequestValidator): 

61 """ 

62 This validator examines the headers "Sec-Fetch-Site", "sec-fetch-mode" and "sec-fetch-dest" as 

63 recommended by https://web.dev/fetch-metadata/ 

64 """ 

65 name = "FetchMetaDataValidator" 

66 

67 @staticmethod 

68 def validate(request: 'BrowseHandler') -> t.Optional[tuple[int, str, str]]: 

69 headers = request.request.headers 

70 if not headers.get("sec-fetch-site"): # These headers are not send by all browsers 

71 return None 

72 if headers.get('sec-fetch-site') in {"same-origin", "none"}: # A Request from our site 

73 return None 

74 if os.environ['GAE_ENV'] == "localdev" and headers.get('sec-fetch-site') == "same-site": 

75 # We are accepting a request with same-site only in local dev mode 

76 return None 

77 if headers.get('sec-fetch-mode') == 'navigate' and not request.isPostRequest \ 

78 and headers.get('sec-fetch-dest') not in {'object', 'embed'}: # Incoming navigation GET request 

79 return None 

80 return 403, "Forbidden", "Request rejected due to fetch metadata" 

81 

82 

83class Router: 

84 """ 

85 This class accepts the requests, collect its parameters and routes the request 

86 to its destination function. 

87 The basic control flow is 

88 - Setting up internal variables 

89 - Running the Request validators 

90 - Emitting the headers (especially the security related ones) 

91 - Run the TLS check (ensure it's a secure connection or check if the URL is whitelisted) 

92 - Load or initialize a new session 

93 - Set up i18n (choosing the language etc) 

94 - Run the request preprocessor (if any) 

95 - Normalize & sanity check the parameters 

96 - Resolve the exposed function and call it 

97 - Save the session / tear down the request 

98 - Return the response generated 

99 

100 

101 :warning: Don't instantiate! Don't subclass! DON'T TOUCH! ;) 

102 """ 

103 

104 # List of requestValidators used to preflight-check an request before it's being dispatched within ViUR 

105 requestValidators = [FetchMetaDataValidator] 

106 

107 def __init__(self, environ: dict): 

108 super().__init__() 

109 self.startTime = time.time() 

110 

111 self.request = webob.Request(environ) 

112 self.response = webob.Response() 

113 

114 self.maxLogLevel = logging.DEBUG 

115 self._traceID = \ 

116 self.request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or utils.string.random() 

117 self.is_deferred = False 

118 self.path = "" 

119 self.path_list = () 

120 

121 self.skey_checked = False # indicates whether @skey-decorator-check has already performed within a request 

122 self.internalRequest = False 

123 self.disableCache = False # Shall this request bypass the caches? 

124 self.pendingTasks = [] 

125 self.args = () 

126 self.kwargs = {} 

127 self.context = {} 

128 self.template_style: str | None = None 

129 self.cors_headers = () 

130 

131 # Check if it's a HTTP-Method we support 

132 self.method = self.request.method.lower() 

133 self.isPostRequest = self.method == "post" 

134 self.isSSLConnection = self.request.host_url.lower().startswith("https://") # We have an encrypted channel 

135 

136 db.currentDbAccessLog.set(set()) 

137 

138 # Set context variables 

139 current.language.set(conf.i18n.default_language) 

140 current.request.set(self) 

141 current.session.set(session.Session()) 

142 current.request_data.set({}) 

143 

144 # Process actual request 

145 self._process() 

146 

147 self._cors() 

148 

149 # Unset context variables 

150 current.language.set(None) 

151 current.request_data.set(None) 

152 current.session.set(None) 

153 current.request.set(None) 

154 current.user.set(None) 

155 

156 @property 

157 def isDevServer(self) -> bool: 

158 import warnings 

159 msg = "Use of `isDevServer` is deprecated; Use `conf.instance.is_dev_server` instead!" 

160 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

161 logging.warning(msg) 

162 return conf.instance.is_dev_server 

163 

164 def _select_language(self, path: str) -> str: 

165 """ 

166 Tries to select the best language for the current request. Depending on the value of 

167 conf.i18n.language_method, we'll either try to load it from the session, determine it by the domain 

168 or extract it from the URL. 

169 """ 

170 

171 def get_language_from_header() -> str | None: 

172 if not (accept_language := self.request.headers.get("accept-language")): 

173 return None 

174 languages = accept_language.split(",") 

175 locale_q_pairs = [] 

176 

177 for language in languages: 

178 if language.split(";")[0] == language: 

179 # no q => q = 1 

180 locale_q_pairs.append((language.strip(), "1")) 

181 else: 

182 locale = language.split(";")[0].strip() 

183 q = language.split(";")[1].split("=")[1] 

184 locale_q_pairs.append((locale, q)) 

185 for locale_q_pair in locale_q_pairs: 

186 if "-" in locale_q_pair[0]: # Check for de-DE 

187 lang = locale_q_pair[0].split("-")[0] 

188 else: 

189 lang = locale_q_pair[0] 

190 if lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()): 

191 return lang 

192 return None 

193 

194 if not conf.i18n.available_languages: 

195 # This project doesn't use the multi-language feature, nothing to do here 

196 return path 

197 if conf.i18n.language_method == "session": 

198 current_session = current.session.get() 

199 lang = conf.i18n.default_language 

200 # We save the language in the session, if it exists, and try to load it from there 

201 if "lang" in current_session: 

202 current.language.set(current_session["lang"]) 

203 return path 

204 

205 if header_lang := get_language_from_header(): 

206 lang = header_lang 

207 current.language.set(lang) 

208 

209 elif header_lang := self.request.headers.get("X-Appengine-Country"): 

210 header_lang = str(header_lang).lower() 

211 if header_lang in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()): 

212 lang = header_lang 

213 

214 if current_session.loaded: 

215 current_session["lang"] = lang 

216 current.language.set(lang) 

217 

218 elif conf.i18n.language_method == "domain": 

219 host = self.request.host_url.lower() 

220 host = host[host.find("://") + 3:].strip(" /") # strip http(s):// 

221 if host.startswith("www."): 

222 host = host[4:] 

223 if lang := conf.i18n.domain_language_mapping.get(host): 

224 current.language.set(lang) 

225 # We have no language configured for this domain, try to read it from the HTTP Header 

226 elif lang := get_language_from_header(): 

227 current.language.set(lang) 

228 

229 elif conf.i18n.language_method == "url": 

230 tmppath = urlparse(path).path 

231 tmppath = [unquote(x) for x in tmppath.lower().strip("/").split("/")] 

232 if ( 

233 len(tmppath) > 0 

234 and tmppath[0] in conf.i18n.available_languages + list(conf.i18n.language_alias_map.keys()) 

235 ): 

236 current.language.set(tmppath[0]) 

237 return path[len(tmppath[0]) + 1:] # Return the path stripped by its language segment 

238 else: # This URL doesnt contain an language prefix, try to read it from session 

239 if header_lang := get_language_from_header(): 

240 current.language.set(header_lang) 

241 elif header_lang := self.request.headers.get("X-Appengine-Country"): 

242 lang = str(header_lang).lower() 

243 if lang in conf.i18n.available_languages or lang in conf.i18n.language_alias_map: 

244 current.language.set(lang) 

245 elif conf.i18n.language_method == "header": 

246 if lang := get_language_from_header(): 

247 current.language.set(lang) 

248 

249 return path 

250 

251 def _process(self): 

252 if self.method not in ("get", "post", "head", "options"): 

253 logging.error(f"{self.method=} not supported") 

254 return 

255 

256 if self.request.headers.get("X-AppEngine-TaskName", None) is not None: # Check if we run in the appengine 

257 if self.request.environ.get("HTTP_X_APPENGINE_USER_IP") in _appengineServiceIPs: 

258 self.is_deferred = True 

259 elif os.getenv("TASKS_EMULATOR") is not None: 

260 self.is_deferred = True 

261 

262 current.language.set(conf.i18n.default_language) 

263 # Check if we should process or abort the request 

264 for validator, reqValidatorResult in [(x, x.validate(self)) for x in self.requestValidators]: 

265 if reqValidatorResult is not None: 

266 logging.warning(f"Request rejected by validator {validator.name}") 

267 statusCode, statusStr, statusDescr = reqValidatorResult 

268 self.response.status = f"{statusCode} {statusStr}" 

269 self.response.write(statusDescr) 

270 return 

271 

272 path = self.request.path 

273 

274 # Add CSP headers early (if any) 

275 if conf.security.content_security_policy and conf.security.content_security_policy["_headerCache"]: 

276 for k, v in conf.security.content_security_policy["_headerCache"].items(): 

277 self.response.headers[k] = v 

278 if self.isSSLConnection: # Check for HTST and PKP headers only if we have a secure channel. 

279 if conf.security.strict_transport_security: 

280 self.response.headers["Strict-Transport-Security"] = conf.security.strict_transport_security 

281 # Check for X-Security-Headers we shall emit 

282 if conf.security.x_content_type_options: 

283 self.response.headers["X-Content-Type-Options"] = "nosniff" 

284 if conf.security.x_xss_protection is not None: 

285 if conf.security.x_xss_protection: 

286 self.response.headers["X-XSS-Protection"] = "1; mode=block" 

287 elif conf.security.x_xss_protection is False: 

288 self.response.headers["X-XSS-Protection"] = "0" 

289 if conf.security.x_frame_options is not None and isinstance(conf.security.x_frame_options, tuple): 

290 mode, uri = conf.security.x_frame_options 

291 if mode in ["deny", "sameorigin"]: 

292 self.response.headers["X-Frame-Options"] = mode 

293 elif mode == "allow-from": 

294 self.response.headers["X-Frame-Options"] = f"allow-from {uri}" 

295 if conf.security.x_permitted_cross_domain_policies is not None: 

296 self.response.headers["X-Permitted-Cross-Domain-Policies"] = conf.security.x_permitted_cross_domain_policies 

297 if conf.security.referrer_policy: 

298 self.response.headers["Referrer-Policy"] = conf.security.referrer_policy 

299 if conf.security.permissions_policy.get("_headerCache"): 

300 self.response.headers["Permissions-Policy"] = conf.security.permissions_policy["_headerCache"] 

301 if conf.security.enable_coep: 

302 self.response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" 

303 if conf.security.enable_coop: 

304 self.response.headers["Cross-Origin-Opener-Policy"] = conf.security.enable_coop 

305 if conf.security.enable_corp: 

306 self.response.headers["Cross-Origin-Resource-Policy"] = conf.security.enable_corp 

307 

308 # Ensure that TLS is used if required 

309 if conf.security.force_ssl and not self.isSSLConnection and not conf.instance.is_dev_server: 

310 isWhitelisted = False 

311 reqPath = self.request.path 

312 for testUrl in conf.security.no_ssl_check_urls: 

313 if testUrl.endswith("*"): 

314 if reqPath.startswith(testUrl[:-1]): 

315 isWhitelisted = True 

316 break 

317 else: 

318 if testUrl == reqPath: 

319 isWhitelisted = True 

320 break 

321 if not isWhitelisted: # Some URLs need to be whitelisted (as f.e. the Tasks-Queue doesn't call using https) 

322 # Redirect the user to the startpage (using ssl this time) 

323 host = self.request.host_url.lower() 

324 host = host[host.find("://") + 3:].strip(" /") # strip http(s):// 

325 self.response.status = "302 Found" 

326 self.response.headers['Location'] = f"https://{host}/" 

327 return 

328 if path.startswith("/_ah/warmup"): 

329 self.response.write("okay") 

330 return 

331 

332 try: 

333 current.session.get().load() 

334 

335 # Load current user into context variable if user module is there. 

336 if user_mod := getattr(conf.main_app.vi, "user", None): 

337 current.user.set(user_mod.getCurrentUser()) 

338 

339 path = self._select_language(path)[1:] 

340 

341 # Check for closed system 

342 if conf.security.closed_system and self.method != "options": 

343 if not current.user.get(): 

344 if not any(fnmatch.fnmatch(path, pat) for pat in conf.security.closed_system_allowed_paths): 

345 raise errors.Unauthorized() 

346 

347 if conf.request_preprocessor: 

348 path = conf.request_preprocessor(path) 

349 

350 self._route(path) 

351 

352 except errors.Redirect as e: 

353 if conf.debug.trace_exceptions: 

354 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""") 

355 raise 

356 self.response.status = f"{e.status} {e.name}" 

357 url = e.url 

358 if url.startswith(('.', '/')): 

359 url = str(urljoin(self.request.url, url)) 

360 self.response.headers['Location'] = url 

361 

362 except Exception as e: 

363 if conf.debug.trace_exceptions: 

364 logging.warning("""conf.debug.trace_exceptions is set, won't handle this exception""") 

365 raise 

366 self.response.body = b"" 

367 if isinstance(e, errors.HTTPException): 

368 logging.info(f"[{e.status}] {e.name}: {e.descr}", exc_info=conf.debug.trace) 

369 self.response.status = f"{e.status} {e.name}" 

370 # Set machine-readable x-viur-error response header in case there is an exception description. 

371 if e.descr: 

372 self.response.headers["x-viur-error"] = e.descr.replace("\n", "") 

373 else: 

374 self.response.status = 500 

375 logging.error("ViUR has caught an unhandled exception!") 

376 logging.exception(e) 

377 

378 res = None 

379 if conf.error_handler: 

380 try: 

381 res = conf.error_handler(e) 

382 except Exception as newE: 

383 logging.error("viur.error_handler failed!") 

384 logging.exception(newE) 

385 res = None 

386 if not res: 

387 descr = "The server encountered an unexpected error and is unable to process your request." 

388 

389 if isinstance(e, errors.HTTPException): 

390 error_info = { 

391 "status": e.status, 

392 "reason": e.name, 

393 "title": str(translate(e.name)), 

394 "descr": e.descr, 

395 } 

396 else: 

397 error_info = { 

398 "status": 500, 

399 "reason": "Internal Server Error", 

400 "title": str(translate("Internal Server Error")), 

401 "descr": descr 

402 } 

403 

404 if conf.instance.is_dev_server: 

405 error_info["traceback"] = traceback.format_exc() 

406 

407 error_info["logo"] = conf.error_logo 

408 

409 if (len(self.path_list) > 0 and self.path_list[0] in ("vi", "json")) or \ 

410 current.request.get().response.headers["Content-Type"] == "application/json": 

411 current.request.get().response.headers["Content-Type"] = "application/json" 

412 res = json.dumps(error_info) 

413 else: # We render the error in html 

414 # Try to get the template from html/error/ 

415 if filename := conf.main_app.render.getTemplateFileName((f"{error_info['status']}", "error"), 

416 raise_exception=False): 

417 template = conf.main_app.render.getEnv().get_template(filename) 

418 try: 

419 uses_unsafe_inline = \ 

420 "unsafe-inline" in conf.security.content_security_policy["enforce"]["style-src"] 

421 except (KeyError, TypeError): # Not set 

422 uses_unsafe_inline = False 

423 if uses_unsafe_inline: 

424 logging.info("Using style-src:unsafe-inline, don't create a nonce") 

425 nonce = None 

426 else: 

427 nonce = utils.string.random(16) 

428 extendCsp({"style-src": [f"nonce-{nonce}"]}) 

429 res = template.render(error_info, nonce=nonce) 

430 else: 

431 res = (f'<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">' 

432 f'<title>{error_info["status"]} - {error_info["reason"]}</title>' 

433 f'</head><body><h1>{error_info["status"]} - {error_info["reason"]}</h1>') 

434 

435 self.response.write(res.encode("UTF-8")) 

436 

437 finally: 

438 current.session.get().save() 

439 if conf.instance.is_dev_server and conf.debug.dev_server_cloud_logging: 

440 # Emit the outer log only on dev_appserver (we'll use the existing request log when live) 

441 SEVERITY = "DEBUG" 

442 if self.maxLogLevel >= 50: 

443 SEVERITY = "CRITICAL" 

444 elif self.maxLogLevel >= 40: 

445 SEVERITY = "ERROR" 

446 elif self.maxLogLevel >= 30: 

447 SEVERITY = "WARNING" 

448 elif self.maxLogLevel >= 20: 

449 SEVERITY = "INFO" 

450 

451 TRACE = "projects/{}/traces/{}".format(loggingClient.project, self._traceID) 

452 

453 REQUEST = { 

454 'requestMethod': self.request.method, 

455 'requestUrl': self.request.url, 

456 'status': self.response.status_code, 

457 'userAgent': self.request.headers.get('USER-AGENT'), 

458 'responseSize': self.response.content_length, 

459 'latency': "%0.3fs" % (time.time() - self.startTime), 

460 'remoteIp': self.request.environ.get("HTTP_X_APPENGINE_USER_IP") 

461 } 

462 requestLogger.log_text( 

463 "", 

464 client=loggingClient, 

465 severity=SEVERITY, 

466 http_request=REQUEST, 

467 trace=TRACE, 

468 resource=requestLoggingRessource, 

469 operation={ 

470 "first": True, 

471 "last": True, 

472 "id": self._traceID 

473 } 

474 ) 

475 

476 if conf.instance.is_dev_server: 

477 self.is_deferred = True 

478 

479 while self.pendingTasks: 

480 task = self.pendingTasks.pop() 

481 logging.debug(f"Deferred task emulation, executing {task=}") 

482 try: 

483 task() 

484 except Exception: # noqa 

485 logging.exception(f"Deferred Task emulation {task} failed") 

486 

487 def _route(self, path: str) -> None: 

488 """ 

489 Does the actual work of sanitizing the parameter, determine which exposed-function to call 

490 (and with which parameters) 

491 """ 

492 

493 # Parse the URL 

494 if path := parse.urlparse(path).path: 

495 self.path = path 

496 self.path_list = tuple(unicodedata.normalize("NFC", parse.unquote(part)) 

497 for part in path.strip("/").split("/")) 

498 

499 # Prevent Hash-collision attacks 

500 if len(self.request.params) > conf.max_post_params_count: 

501 raise errors.BadRequest( 

502 f"Too many arguments supplied, exceeding maximum" 

503 f" of {conf.max_post_params_count} allowed arguments per request" 

504 ) 

505 

506 param_filter = conf.param_filter_function 

507 if param_filter and not callable(param_filter): 

508 raise ValueError(f"""{param_filter=} is not callable""") 

509 

510 for key, value in self.request.params.items(): 

511 try: 

512 key = unicodedata.normalize("NFC", key) 

513 value = unicodedata.normalize("NFC", value) 

514 except UnicodeError: 

515 # We received invalid unicode data (usually happens when 

516 # someone tries to exploit unicode normalisation bugs) 

517 raise errors.BadRequest() 

518 

519 if param_filter and param_filter(key, value): 

520 continue 

521 

522 if key == TEMPLATE_STYLE_KEY: 

523 self.template_style = value 

524 continue 

525 

526 if key in self.kwargs: 

527 if isinstance(self.kwargs[key], list): 

528 self.kwargs[key].append(value) 

529 else: # Convert that key to a list 

530 self.kwargs[key] = [self.kwargs[key], value] 

531 else: 

532 self.kwargs[key] = value 

533 

534 if "self" in self.kwargs or "return" in self.kwargs: # self or return is reserved for bound methods 

535 raise errors.BadRequest() 

536 

537 caller = conf.main_resolver 

538 idx = 0 # Count how may items from *args we'd have consumed (so the rest can go into *args of the called func 

539 path_found = True 

540 

541 for part in self.path_list: 

542 # TODO: Remove canAccess guards... solve differently. 

543 if "canAccess" in caller and not caller["canAccess"](): 

544 # We have a canAccess function guarding that object, 

545 # and it returns False... 

546 raise errors.Unauthorized() 

547 

548 idx += 1 

549 

550 if part not in caller: 

551 part = "index" 

552 

553 if caller := caller.get(part): 

554 if isinstance(caller, Method): 

555 if part == "index": 

556 idx -= 1 

557 

558 self.args = tuple(self.path_list[idx:]) 

559 break 

560 

561 elif part == "index": 

562 path_found = False 

563 break 

564 

565 else: 

566 path_found = False 

567 break 

568 

569 if not path_found: 

570 raise errors.NotFound( 

571 f"""The path {utils.string.escape("/".join(self.path_list[:idx]))} could not be found""") 

572 

573 if not isinstance(caller, Method): 

574 # try to find "index" function 

575 if (index := caller.get("index")) and isinstance(index, Method): 

576 caller = index 

577 else: 

578 raise errors.MethodNotAllowed() 

579 

580 # Check for internal exposed 

581 if caller.exposed is False and not self.internalRequest: 

582 raise errors.NotFound() 

583 

584 # Fill the Allow header of the response with the allowed HTTP methods 

585 if self.method == "options": 

586 self.response.headers["Allow"] = ", ".join(sorted(caller.methods)).upper() 

587 

588 # Register caller specific CORS headers 

589 self.cors_headers = [str(header).lower() for header in caller.cors_allow_headers or ()] 

590 

591 # Check for @force_ssl flag 

592 if not self.internalRequest \ 

593 and caller.ssl \ 

594 and not self.request.host_url.lower().startswith("https://") \ 

595 and not conf.instance.is_dev_server: 

596 raise errors.PreconditionFailed("You must use SSL to access this resource!") 

597 

598 # Check for @force_post flag 

599 if not self.isPostRequest and caller.methods == ("POST",): 

600 raise errors.MethodNotAllowed("You must use POST to access this resource!") 

601 

602 # Check if this request should bypass the caches 

603 if self.request.headers.get("X-Viur-Disable-Cache"): 

604 # No cache requested, check if the current user is allowed to do so 

605 if (user := current.user.get()) and "root" in user["access"]: 

606 logging.debug("Caching disabled by X-Viur-Disable-Cache header") 

607 self.disableCache = True 

608 

609 # Destill context as self.context, if available 

610 if context := {k: v for k, v in self.kwargs.items() if k.startswith("@")}: 

611 # Remove context parameters from kwargs 

612 kwargs = {k: v for k, v in self.kwargs.items() if k not in context} 

613 # Remove leading "@" from context parameters 

614 self.context |= {k[1:]: v for k, v in context.items() if len(k) > 1} 

615 else: 

616 kwargs = self.kwargs 

617 

618 if ((self.internalRequest and conf.debug.trace_internal_call_routing) 

619 or conf.debug.trace_external_call_routing): 

620 logging.debug( 

621 f"Calling {caller._func!r} with args={self.args!r}, {kwargs=} within context={self.context!r}" 

622 ) 

623 

624 if self.method == "options": 

625 # OPTIONS request doesn't have a body 

626 del self.response.app_iter 

627 del self.response.content_type 

628 self.response.status = "204 No Content" 

629 return 

630 

631 # Now call the routed method! 

632 res = caller(*self.args, **kwargs) 

633 

634 if self.method == "options": 

635 # OPTIONS request doesn't have a body 

636 del self.response.app_iter 

637 del self.response.content_type 

638 self.response.status = "204 No Content" 

639 return 

640 

641 if not isinstance(res, bytes): # Convert the result to bytes if it is not already! 

642 res = str(res).encode("UTF-8") 

643 self.response.write(res) 

644 

645 def _cors(self) -> None: 

646 """ 

647 Set CORS headers to the HTTP response. 

648 

649 .. seealso:: 

650 

651 Option :attr:`core.config.Security.cors_origins`, etc. 

652 for cors settings. 

653 

654 https://fetch.spec.whatwg.org/#http-cors-protocol 

655 

656 https://enable-cors.org/server.html 

657 

658 https://www.html5rocks.com/static/images/cors_server_flowchart.png 

659 """ 

660 

661 def test_candidates(value: str, *candidates: str | re.Pattern) -> bool: 

662 """Test if the value matches the pattern of any candidate""" 

663 for candidate in candidates: 

664 if isinstance(candidate, re.Pattern): 

665 if candidate.match(value): 

666 return True 

667 elif isinstance(candidate, str): 

668 if candidate.lower() == str(value).lower(): 

669 return True 

670 else: 

671 raise TypeError( 

672 f"Invalid setting {candidate}. " 

673 f"Expected a string or a compiled regex." 

674 ) 

675 return False 

676 

677 origin = current.request.get().request.headers.get("Origin") 

678 if not origin: 

679 logging.debug(f"Origin header is not set") 

680 return 

681 

682 # Origin is set --> It's a CORS request 

683 logging.debug(f"Got CORS request from {origin=}") 

684 

685 any_origin_allowed = ( 

686 conf.security.cors_origins == "*" 

687 or any(_origin == "*" for _origin in conf.security.cors_origins) 

688 or any(_origin.pattern == r".*" 

689 for _origin in conf.security.cors_origins 

690 if isinstance(_origin, re.Pattern)) 

691 ) 

692 

693 if any_origin_allowed and conf.security.cors_origins_use_wildcard: 

694 if conf.security.cors_allow_credentials: 

695 raise RuntimeError( 

696 "Invalid CORS config: " 

697 "If credentials mode is \"include\", then `Access-Control-Allow-Origin` cannot be `*`. " 

698 "See https://fetch.spec.whatwg.org/#cors-protocol-and-credentials" 

699 ) 

700 self.response.headers["Access-Control-Allow-Origin"] = "*" 

701 

702 elif test_candidates(origin, *conf.security.cors_origins): 

703 self.response.headers["Access-Control-Allow-Origin"] = origin 

704 

705 else: 

706 logging.warning(f"{origin=} not valid (must be one of {conf.security.cors_origins=})") 

707 return 

708 

709 if conf.security.cors_allow_credentials: 

710 self.response.headers["Access-Control-Allow-Credentials"] = "true" 

711 

712 if self.method == "options": 

713 method = (self.request.headers.get("Access-Control-Request-Method") or "").lower() 

714 

715 if method in conf.security.cors_methods: 

716 # It's a CORS-preflight request 

717 # - MUST include Access-Control-Request-Method 

718 # - CAN include Access-Control-Request-Headers 

719 

720 # The response can be cached 

721 if conf.security.cors_max_age is not None: 

722 assert isinstance(conf.security.cors_max_age, datetime.timedelta) 

723 self.response.headers["Access-Control-Max-Age"] = \ 

724 str(int(conf.security.cors_max_age.total_seconds())) 

725 

726 # Allowed methods 

727 self.response.headers["Access-Control-Allow-Methods"] = ", ".join( 

728 sorted(conf.security.cors_methods)).upper() 

729 

730 # Allowed headers 

731 request_headers = self.request.headers.get("Access-Control-Request-Headers") 

732 request_headers = [h.strip().lower() for h in request_headers.split(",")] 

733 if conf.security.cors_allow_headers == "*": 

734 # Every header is allowed 

735 allow_headers = request_headers[:] 

736 else: 

737 # There are generally headers allowed and/or from the caller 

738 allow_headers = [ 

739 header 

740 for header in request_headers 

741 if test_candidates( 

742 header, 

743 *(self.cors_headers or ()), # caller specific 

744 *(conf.security.cors_allow_headers or ()) # generally global 

745 ) 

746 ] 

747 if allow_headers: 

748 self.response.headers["Access-Control-Allow-Headers"] = ", ".join(sorted(allow_headers)) 

749 

750 else: 

751 logging.warning( 

752 f"Access-Control-Request-Method: {method} is NOT a valid method of {conf.security.cors_methods=}. " 

753 f"Don't append CORS-preflight request headers" 

754 ) 

755 

756 def saveSession(self) -> None: 

757 current.session.get().save() 

758 

759 

760from .i18n import translate # noqa: E402