Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / utils / __init__.py: 30%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 10:14 +0000

1import datetime 

2import logging 

3import typing as t 

4import urllib.parse 

5import warnings 

6from collections.abc import Iterable 

7 

8from viur.core import current, db 

9from viur.core.config import conf 

10from deprecated.sphinx import deprecated 

11from . import json, parse, string # noqa: used by external imports 

12 

13if t.TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true

14 from viur.core.skeleton import SkeletonInstance 

15 

16 

17def utcNow() -> datetime.datetime: 

18 """ 

19 Returns an actual timestamp with UTC timezone setting. 

20 """ 

21 return datetime.datetime.now(datetime.timezone.utc) 

22 

23 

24def seoUrlToEntry(module: str, 

25 entry: t.Optional["SkeletonInstance"] = None, 

26 skelType: t.Optional[str] = None, 

27 language: t.Optional[str] = None) -> str: 

28 """ 

29 Return the seo-url to a skeleton instance or the module. 

30 

31 :param module: The module name. 

32 :param entry: A skeleton instance or None, to get the path to the module. 

33 :param skelType: # FIXME: Not used 

34 :param language: For which language. 

35 If None, the language of the current request is used. 

36 :return: The path (with a leading /). 

37 """ 

38 from viur.core import conf 

39 pathComponents = [""] 

40 if language is None: 

41 language = current.language.get() 

42 if conf.i18n.language_method == "url": 

43 pathComponents.append(language) 

44 if module in conf.i18n.language_module_map and language in conf.i18n.language_module_map[module]: 

45 module = conf.i18n.language_module_map[module][language] 

46 pathComponents.append(module) 

47 if not entry: 

48 return "/".join(pathComponents) 

49 else: 

50 try: 

51 currentSeoKeys = entry["viurCurrentSeoKeys"] 

52 except: 

53 return "/".join(pathComponents) 

54 if language in (currentSeoKeys or {}): 

55 pathComponents.append(str(currentSeoKeys[language])) 

56 elif "key" in entry: 

57 key = entry["key"] 

58 if isinstance(key, str): 

59 try: 

60 key = db.Key.from_legacy_urlsafe(key) 

61 except: 

62 pass 

63 pathComponents.append(str(key.id_or_name) if isinstance(key, db.Key) else str(key)) 

64 elif "name" in dir(entry): 

65 pathComponents.append(str(entry.name)) 

66 return "/".join(pathComponents) 

67 

68 

69def seoUrlToFunction(module: str, function: str, render: t.Optional[str] = None) -> str: 

70 from viur.core import conf 

71 lang = current.language.get() 

72 if module in conf.i18n.language_module_map and lang in conf.i18n.language_module_map[module]: 

73 module = conf.i18n.language_module_map[module][lang] 

74 if conf.i18n.language_method == "url": 

75 pathComponents = ["", lang] 

76 else: 

77 pathComponents = [""] 

78 targetObject = conf.main_resolver 

79 if module in targetObject: 

80 pathComponents.append(module) 

81 targetObject = targetObject[module] 

82 if render and render in targetObject: 

83 pathComponents.append(render) 

84 targetObject = targetObject[render] 

85 if function in targetObject: 

86 func = targetObject[function] 

87 if func.seo_language_map and lang in func.seo_language_map: 

88 pathComponents.append(func.seo_language_map[lang]) 

89 else: 

90 pathComponents.append(function) 

91 return "/".join(pathComponents) 

92 

93 

94@deprecated(version="3.8.0", reason="Use 'db.normalize_key' instead") 

95def normalizeKey(key: t.Union[None, db.Key]) -> t.Union[None, db.Key]: 

96 """ 

97 Normalizes a datastore key (replacing _application with the current one) 

98 

99 :param key: Key to be normalized. 

100 

101 :return: Normalized key in string representation. 

102 """ 

103 db.normalize_key(key) 

104 

105 

106def get_base_url() -> str: 

107 """ 

108 Retrieve current request's base URL with protocol. 

109 The function enforces use of https-protocol on non-localhost hostnames. 

110 

111 :returns: Returns the hostname, including the currently used protocol, e.g: https://www.example.com 

112 :rtype: str 

113 """ 

114 base = urllib.parse.urlparse(current.request.get().request.url).netloc # retrieve URL of request 

115 

116 # Always enforce https! 

117 if any(base.startswith(i) for i in ("localhost", "127.0.0.1", "[::1]", "0.0.0.0")): 

118 base = f"http://{base}" 

119 else: 

120 base = f"https://{base}" 

121 

122 # Replace non-SSL-ready-"appspot.com"-URLs with their SSL-ready counterpart 

123 return base.replace(f".{conf.instance.project_id}.", f"-dot-{conf.instance.project_id}.") 

124 

125 

126def ensure_iterable( 

127 obj: t.Any, 

128 *, 

129 test: t.Optional[t.Callable[[t.Any], bool]] = None, 

130 allow_callable: bool = True, 

131) -> t.Iterable[t.Any]: 

132 """ 

133 Ensures an object to be iterable. 

134 

135 An additional test can be provided to check additionally. 

136 

137 If the object is not considered to be iterable, a tuple with the object is returned. 

138 """ 

139 if allow_callable and callable(obj): 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 obj = obj() 

141 

142 if not isinstance(obj, str) and isinstance(obj, Iterable): # uses collections.abc.Iterable 

143 if test is None or test(obj): 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true

144 return obj # return the obj, which is an iterable 

145 

146 return () # empty tuple 

147 

148 elif obj is None or (isinstance(obj, str) and not obj): 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 return () # empty tuple 

150 

151 return obj, # return a tuple with the obj 

152 

153 

154def build_content_disposition_header( 

155 filename: str, 

156 *, 

157 attachment: bool = False, 

158 inline: bool = False, 

159) -> str: 

160 """ 

161 Build a Content-Disposition header with UTF-8 support and ASCII fallback. 

162 

163 Generates a properly formatted `Content-Disposition` header value, including 

164 both a fallback ASCII filename and a UTF-8 encoded filename using RFC 5987. 

165 

166 Set either `attachment` or `inline` to control content disposition type. 

167 If both are False, the header will omit disposition type (not recommended). 

168 

169 Example: 

170 filename = "Änderung.pdf" ➜ 

171 'attachment; filename="Anderung.pdf"; filename*=UTF-8\'\'%C3%84nderung.pdf' 

172 

173 :param filename: The desired filename for the content. 

174 :param attachment: Whether to mark the content as an attachment. 

175 :param inline: Whether to mark the content as inline. 

176 :return: A `Content-Disposition` header string. 

177 """ 

178 if attachment and inline: 

179 raise ValueError("Only one of 'attachment' or 'inline' may be True.") 

180 

181 fallback = string.normalize_ascii(filename) 

182 quoted_utf8 = urllib.parse.quote_from_bytes(filename.encode("utf-8")) 

183 

184 content_disposition = "; ".join( 

185 item for item in ( 

186 "attachment" if attachment else None, 

187 "inline" if inline else None, 

188 f'filename="{fallback}"' if filename else None, 

189 f'filename*=UTF-8\'\'{quoted_utf8}' if filename else None, 

190 ) if item 

191 ) 

192 

193 return content_disposition 

194 

195 

196# DEPRECATED ATTRIBUTES HANDLING 

197__UTILS_CONF_REPLACEMENT = { 

198 "projectID": "viur.instance.project_id", 

199 "isLocalDevelopmentServer": "viur.instance.is_dev_server", 

200 "projectBasePath": "viur.instance.project_base_path", 

201 "coreBasePath": "viur.instance.core_base_path" 

202} 

203 

204__UTILS_NAME_REPLACEMENT = { 

205 "currentLanguage": ("current.language", current.language), 

206 "currentRequest": ("current.request", current.request), 

207 "currentRequestData": ("current.request_data", current.request_data), 

208 "currentSession": ("current.session", current.session), 

209 "downloadUrlFor": ("conf.main_app.file.create_download_url", lambda: conf.main_app.file.create_download_url), 

210 "escapeString": ("utils.string.escape", lambda: string.escape), 

211 "generateRandomString": ("utils.string.random", lambda: string.random), 

212 "getCurrentUser": ("current.user.get", lambda: current.user.get), 

213 "is_prefix": ("utils.string.is_prefix", lambda: string.is_prefix), 

214 "parse_bool": ("utils.parse.bool", lambda: parse.bool), 

215 "srcSetFor": ("conf.main_app.file.create_src_set", lambda: conf.main_app.file.create_src_set), 

216} 

217 

218 

219def __getattr__(attr): 

220 if replace := __UTILS_CONF_REPLACEMENT.get(attr): 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 msg = f"Use of `utils.{attr}` is deprecated; Use `conf.{replace}` instead!" 

222 warnings.warn(msg, DeprecationWarning, stacklevel=3) 

223 logging.warning(msg, stacklevel=3) 

224 return conf[replace] 

225 

226 if replace := __UTILS_NAME_REPLACEMENT.get(attr): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 msg = f"Use of `utils.{attr}` is deprecated; Use `{replace[0]}` instead!" 

228 warnings.warn(msg, DeprecationWarning, stacklevel=3) 

229 logging.warning(msg, stacklevel=3) 

230 res = replace[1] 

231 if isinstance(res, t.Callable): 

232 res = res() 

233 return res 

234 

235 return super(__import__(__name__).__class__).__getattribute__(attr)