Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/uri.py: 73%
117 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
1import fnmatch
2import typing as t
3from . import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity
4from urllib.parse import urlparse, urlunparse
5from collections.abc import Iterable
6from collections import namedtuple
8PORT_MIN: t.Final[int] = 1
9PORT_MAX: t.Final[int] = 2 ** 16 - 1
12class UriBone(BaseBone):
13 type = "uri"
15 def __init__(
16 self,
17 *,
18 accepted_protocols: str | t.Iterable[str] | None = None,
19 accepted_ports: int | str | t.Iterable[int] | t.Iterable[str] | None = None,
20 clean_get_params: bool = False,
21 domain_allowed_list: t.Iterable[str] | None = None,
22 domain_disallowed_list: t.Iterable[str] | None = None,
23 local_path_allowed: bool = False,
24 **kwargs
25 ):
26 """
27 The UriBone is used for storing URI and URL.
29 :param accepted_protocols: The accepted protocols can be set to allow only the provide protocols.
30 :param accepted_ports The accepted ports can be set to allow only the provide ports.
31 .. code-block:: python
32 # Example
33 UriBone(accepted_ports=1)
34 UriBone(accepted_ports="2")
35 UriBone(accepted_ports="1-4")
36 UriBone(accepted_ports=(1,"2","4-10"))
37 :param clean_get_params: When set to True, the GET-parameter for the URL will be cleaned.
38 :param domain_allowed_list: If set, only the URLs that are matched with an entry of this iterable
39 will be accepted.
40 :param domain_disallowed_list: If set, only the URLs that are not matched
41 with an entry of this iterable will be accepted.
42 :param local_path_allowed: If True, the URLs that are local paths will be prefixed with "/".
43 """
44 super().__init__(**kwargs)
45 if accepted_ports:
46 self.accepted_ports = sorted(set(UriBone._build_accepted_ports(accepted_ports)), key=lambda rng: rng.start)
48 if range(PORT_MIN, PORT_MAX + 1) in self.accepted_ports: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 self.accepted_ports = None # all allowed
50 else:
51 self.accepted_ports = None
53 self.accepted_protocols = accepted_protocols
54 if self.accepted_protocols:
55 if not isinstance(self.accepted_protocols, Iterable) or isinstance(self.accepted_protocols, str): 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 self.accepted_protocols = set(self.accepted_protocols)
57 if "*" in accepted_protocols: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true
58 self.accepted_protocols = None
60 if not isinstance(clean_get_params, bool): 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 raise ValueError("clean_get_params must be a boolean")
63 if not isinstance(domain_allowed_list, (list, tuple)) and domain_allowed_list is not None: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 raise ValueError("domain_allowed_list must be a list or a tuple or None")
66 if not isinstance(domain_disallowed_list, (list, tuple)) and domain_disallowed_list is not None: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 raise ValueError("domain_disallowed_list must be a list or a tuple or None")
69 if domain_allowed_list is not None:
70 if any([not isinstance(domain, str) for domain in domain_allowed_list]): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 raise ValueError("domain_allowed_list must only contain strings")
73 if domain_disallowed_list is not None: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 if any([not isinstance(domain, str) for domain in domain_disallowed_list]):
75 raise ValueError("domain_disallowed_list must only contain strings")
77 if domain_allowed_list and domain_disallowed_list: 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 raise ValueError("Only one of domain_allowed_list and domain_disallowed_list can be set")
80 if not isinstance(local_path_allowed, bool): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 raise ValueError("local_path_allowed must be a boolean")
83 self.clean_get_params = clean_get_params
84 self.domain_allowed_list = domain_allowed_list
85 self.domain_disallowed_list = domain_disallowed_list
86 self.local_path_allowed = local_path_allowed
88 @classmethod
89 def _build_accepted_ports(cls, accepted_ports: str | int | t.Iterable[str | int]) -> list[range]:
90 if isinstance(accepted_ports, str):
91 if accepted_ports == "*": 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 return [range(PORT_MIN, PORT_MAX + 1)]
94 elif "," in accepted_ports: # list of ranges, values 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true
95 return cls._build_accepted_ports([
96 value.strip() for value in accepted_ports.split(",")
97 ])
99 elif "-" in accepted_ports: # range of ports
100 start, end = accepted_ports.split("-", 1)
101 start = int(start)
102 end = int(end)
103 if start > end: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 raise ValueError("Start value must be less than end value")
106 if start < PORT_MIN: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 raise ValueError("Start value must be greater than zero")
109 if end > PORT_MAX: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 raise ValueError(f"End value must be less or equal than {PORT_MAX}")
112 return [range(start, end + 1)]
114 else:
115 port = int(accepted_ports)
116 return [range(port, port + 1)]
118 elif isinstance(accepted_ports, int):
119 if accepted_ports < PORT_MIN: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 raise ValueError("Port value must be greater than zero")
122 if accepted_ports > PORT_MAX: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 raise ValueError(f"Port value must be less or equal than {PORT_MAX}")
125 return [range(accepted_ports, accepted_ports + 1)]
127 elif isinstance(accepted_ports, Iterable): 127 ↛ 133line 127 didn't jump to line 133 because the condition on line 127 was always true
128 accepted_ports_value = []
129 for accepted_port in accepted_ports:
130 accepted_ports_value.extend(UriBone._build_accepted_ports(accepted_port))
131 return accepted_ports_value
133 raise ValueError("accepted_ports must be a iterable or an integer or string")
135 def isInvalid(self, value) -> str | None:
136 try:
137 parsed_url = urlparse(value)
138 except ValueError:
139 return "Can't parse URL"
141 if not self.local_path_allowed and parsed_url.scheme == "":
142 return f"""No protocol specified"""
144 if self.accepted_ports:
145 if not any(parsed_url.port in rng for rng in self.accepted_ports):
146 return f""""{parsed_url.port}" not in the accepted ports."""
148 if self.accepted_protocols:
149 for protocol in self.accepted_protocols:
150 if fnmatch.fnmatch(parsed_url.scheme, protocol):
151 break
152 else:
153 return f""""{parsed_url.scheme}" not in the accepted protocols."""
155 if self.domain_allowed_list is not None:
156 if parsed_url.hostname: 156 ↛ 163line 156 didn't jump to line 163 because the condition on line 156 was always true
157 for domain in self.domain_allowed_list:
158 if fnmatch.fnmatch(parsed_url.hostname, domain) or domain in parsed_url.hostname:
159 break
160 else:
161 return f"""Provided URL is not in the domain allowed list."""
162 else:
163 return f"""Provided URL has no hostname specified."""
165 if self.domain_disallowed_list is not None: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true
166 if parsed_url.hostname:
167 for domain in self.domain_disallowed_list:
168 if fnmatch.fnmatch(parsed_url.hostname, domain) or domain in parsed_url.hostname:
169 return f"""Provided URL is in the domain disallowed list."""
171 else:
172 return f"""Provided URL has no hostname specified."""
174 def singleValueFromClient(self, value, skel, bone_name, client_data) -> tuple:
175 if err := self.isInvalid(value):
176 return value, [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)]
178 parsed_url = urlparse(value)
179 if self.local_path_allowed and parsed_url.scheme == "":
180 if value[0] not in "?#/":
181 value = f"/{value}"
182 parsed_url = urlparse(value)
184 if self.clean_get_params:
185 Components = namedtuple(
186 typename="Components",
187 field_names=["scheme", "netloc", "path", "url", "query", "fragment"]
188 )
190 value = urlunparse(
191 Components(
192 scheme=parsed_url.scheme,
193 netloc=parsed_url.netloc,
194 query=None, # Set the GET-params to None to clear it
195 path=parsed_url.path,
196 url=None,
197 fragment=parsed_url.fragment,
198 )
199 )
201 return value, None
203 def structure(self) -> dict:
204 return super().structure() | {
205 "accepted_protocols": list(self.accepted_protocols) if self.accepted_protocols else None,
206 "accepted_ports": [(rng.start, rng.stop) for rng in self.accepted_ports] if self.accepted_ports else None,
207 "clean_get_params": self.clean_get_params,
208 "domain_allowed_list": self.domain_allowed_list,
209 "domain_disallowed_list": self.domain_disallowed_list,
210 "local_path_allowed": self.local_path_allowed,
211 }