Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/uri.py: 73%

117 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1import fnmatch 

2import typing as t 

3from . import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

4from urllib.parse import urlparse, urlunparse 

5from collections.abc import Iterable 

6from collections import namedtuple 

7 

8PORT_MIN: t.Final[int] = 1 

9PORT_MAX: t.Final[int] = 2 ** 16 - 1 

10 

11 

12class UriBone(BaseBone): 

13 type = "uri" 

14 

15 def __init__( 

16 self, 

17 *, 

18 accepted_protocols: str | t.Iterable[str] | None = None, 

19 accepted_ports: int | str | t.Iterable[int] | t.Iterable[str] | None = None, 

20 clean_get_params: bool = False, 

21 domain_allowed_list: t.Iterable[str] | None = None, 

22 domain_disallowed_list: t.Iterable[str] | None = None, 

23 local_path_allowed: bool = False, 

24 **kwargs 

25 ): 

26 """ 

27 The UriBone is used for storing URI and URL. 

28 

29 :param accepted_protocols: The accepted protocols can be set to allow only the provide protocols. 

30 :param accepted_ports The accepted ports can be set to allow only the provide ports. 

31 .. code-block:: python 

32 # Example 

33 UriBone(accepted_ports=1) 

34 UriBone(accepted_ports="2") 

35 UriBone(accepted_ports="1-4") 

36 UriBone(accepted_ports=(1,"2","4-10")) 

37 :param clean_get_params: When set to True, the GET-parameter for the URL will be cleaned. 

38 :param domain_allowed_list: If set, only the URLs that are matched with an entry of this iterable 

39 will be accepted. 

40 :param domain_disallowed_list: If set, only the URLs that are not matched 

41 with an entry of this iterable will be accepted. 

42 :param local_path_allowed: If True, the URLs that are local paths will be prefixed with "/". 

43 """ 

44 super().__init__(**kwargs) 

45 if accepted_ports: 

46 self.accepted_ports = sorted(set(UriBone._build_accepted_ports(accepted_ports)), key=lambda rng: rng.start) 

47 

48 if range(PORT_MIN, PORT_MAX + 1) in self.accepted_ports: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 self.accepted_ports = None # all allowed 

50 else: 

51 self.accepted_ports = None 

52 

53 self.accepted_protocols = accepted_protocols 

54 if self.accepted_protocols: 

55 if not isinstance(self.accepted_protocols, Iterable) or isinstance(self.accepted_protocols, str): 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 self.accepted_protocols = set(self.accepted_protocols) 

57 if "*" in accepted_protocols: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true

58 self.accepted_protocols = None 

59 

60 if not isinstance(clean_get_params, bool): 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 raise ValueError("clean_get_params must be a boolean") 

62 

63 if not isinstance(domain_allowed_list, (list, tuple)) and domain_allowed_list is not None: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 raise ValueError("domain_allowed_list must be a list or a tuple or None") 

65 

66 if not isinstance(domain_disallowed_list, (list, tuple)) and domain_disallowed_list is not None: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 raise ValueError("domain_disallowed_list must be a list or a tuple or None") 

68 

69 if domain_allowed_list is not None: 

70 if any([not isinstance(domain, str) for domain in domain_allowed_list]): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 raise ValueError("domain_allowed_list must only contain strings") 

72 

73 if domain_disallowed_list is not None: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 if any([not isinstance(domain, str) for domain in domain_disallowed_list]): 

75 raise ValueError("domain_disallowed_list must only contain strings") 

76 

77 if domain_allowed_list and domain_disallowed_list: 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 raise ValueError("Only one of domain_allowed_list and domain_disallowed_list can be set") 

79 

80 if not isinstance(local_path_allowed, bool): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 raise ValueError("local_path_allowed must be a boolean") 

82 

83 self.clean_get_params = clean_get_params 

84 self.domain_allowed_list = domain_allowed_list 

85 self.domain_disallowed_list = domain_disallowed_list 

86 self.local_path_allowed = local_path_allowed 

87 

88 @classmethod 

89 def _build_accepted_ports(cls, accepted_ports: str | int | t.Iterable[str | int]) -> list[range]: 

90 if isinstance(accepted_ports, str): 

91 if accepted_ports == "*": 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 return [range(PORT_MIN, PORT_MAX + 1)] 

93 

94 elif "," in accepted_ports: # list of ranges, values 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true

95 return cls._build_accepted_ports([ 

96 value.strip() for value in accepted_ports.split(",") 

97 ]) 

98 

99 elif "-" in accepted_ports: # range of ports 

100 start, end = accepted_ports.split("-", 1) 

101 start = int(start) 

102 end = int(end) 

103 if start > end: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 raise ValueError("Start value must be less than end value") 

105 

106 if start < PORT_MIN: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 raise ValueError("Start value must be greater than zero") 

108 

109 if end > PORT_MAX: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 raise ValueError(f"End value must be less or equal than {PORT_MAX}") 

111 

112 return [range(start, end + 1)] 

113 

114 else: 

115 port = int(accepted_ports) 

116 return [range(port, port + 1)] 

117 

118 elif isinstance(accepted_ports, int): 

119 if accepted_ports < PORT_MIN: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 raise ValueError("Port value must be greater than zero") 

121 

122 if accepted_ports > PORT_MAX: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 raise ValueError(f"Port value must be less or equal than {PORT_MAX}") 

124 

125 return [range(accepted_ports, accepted_ports + 1)] 

126 

127 elif isinstance(accepted_ports, Iterable): 127 ↛ 133line 127 didn't jump to line 133 because the condition on line 127 was always true

128 accepted_ports_value = [] 

129 for accepted_port in accepted_ports: 

130 accepted_ports_value.extend(UriBone._build_accepted_ports(accepted_port)) 

131 return accepted_ports_value 

132 

133 raise ValueError("accepted_ports must be a iterable or an integer or string") 

134 

135 def isInvalid(self, value) -> str | None: 

136 try: 

137 parsed_url = urlparse(value) 

138 except ValueError: 

139 return "Can't parse URL" 

140 

141 if not self.local_path_allowed and parsed_url.scheme == "": 

142 return f"""No protocol specified""" 

143 

144 if self.accepted_ports: 

145 if not any(parsed_url.port in rng for rng in self.accepted_ports): 

146 return f""""{parsed_url.port}" not in the accepted ports.""" 

147 

148 if self.accepted_protocols: 

149 for protocol in self.accepted_protocols: 

150 if fnmatch.fnmatch(parsed_url.scheme, protocol): 

151 break 

152 else: 

153 return f""""{parsed_url.scheme}" not in the accepted protocols.""" 

154 

155 if self.domain_allowed_list is not None: 

156 if parsed_url.hostname: 156 ↛ 163line 156 didn't jump to line 163 because the condition on line 156 was always true

157 for domain in self.domain_allowed_list: 

158 if fnmatch.fnmatch(parsed_url.hostname, domain) or domain in parsed_url.hostname: 

159 break 

160 else: 

161 return f"""Provided URL is not in the domain allowed list.""" 

162 else: 

163 return f"""Provided URL has no hostname specified.""" 

164 

165 if self.domain_disallowed_list is not None: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true

166 if parsed_url.hostname: 

167 for domain in self.domain_disallowed_list: 

168 if fnmatch.fnmatch(parsed_url.hostname, domain) or domain in parsed_url.hostname: 

169 return f"""Provided URL is in the domain disallowed list.""" 

170 

171 else: 

172 return f"""Provided URL has no hostname specified.""" 

173 

174 def singleValueFromClient(self, value, skel, bone_name, client_data) -> tuple: 

175 if err := self.isInvalid(value): 

176 return value, [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

177 

178 parsed_url = urlparse(value) 

179 if self.local_path_allowed and parsed_url.scheme == "": 

180 if value[0] not in "?#/": 

181 value = f"/{value}" 

182 parsed_url = urlparse(value) 

183 

184 if self.clean_get_params: 

185 Components = namedtuple( 

186 typename="Components", 

187 field_names=["scheme", "netloc", "path", "url", "query", "fragment"] 

188 ) 

189 

190 value = urlunparse( 

191 Components( 

192 scheme=parsed_url.scheme, 

193 netloc=parsed_url.netloc, 

194 query=None, # Set the GET-params to None to clear it 

195 path=parsed_url.path, 

196 url=None, 

197 fragment=parsed_url.fragment, 

198 ) 

199 ) 

200 

201 return value, None 

202 

203 def structure(self) -> dict: 

204 return super().structure() | { 

205 "accepted_protocols": list(self.accepted_protocols) if self.accepted_protocols else None, 

206 "accepted_ports": [(rng.start, rng.stop) for rng in self.accepted_ports] if self.accepted_ports else None, 

207 "clean_get_params": self.clean_get_params, 

208 "domain_allowed_list": self.domain_allowed_list, 

209 "domain_disallowed_list": self.domain_disallowed_list, 

210 "local_path_allowed": self.local_path_allowed, 

211 }