Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/string.py: 37%

167 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1import datetime 

2import functools 

3import logging 

4import string 

5import typing as t 

6import warnings 

7from numbers import Number 

8 

9from viur.core import current, db, utils 

10from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

11 

12if t.TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true

13 from ..skeleton import SkeletonInstance 

14 

15DB_TYPE_INDEXED: t.TypeAlias = dict[t.Literal["val", "idx", "sort_idx"], str] 

16 

17 

18class StringBone(BaseBone): 

19 """ 

20 The "StringBone" represents a data field that contains text values. 

21 """ 

22 type = "str" 

23 

24 def __init__( 

25 self, 

26 *, 

27 caseSensitive: bool = True, 

28 max_length: int | None = 254, 

29 min_length: int | None = None, 

30 natural_sorting: bool | t.Callable = False, 

31 escape_html: bool = True, 

32 **kwargs 

33 ): 

34 """ 

35 Initializes a new StringBone. 

36 

37 :param caseSensitive: When filtering for values in this bone, should it be case-sensitive? 

38 :param max_length: The maximum length allowed for values of this bone. Set to None for no limitation. 

39 :param min_length: The minimum length allowed for values of this bone. Set to None for no limitation. 

40 :param natural_sorting: Allows a more natural sorting 

41 than the default sorting on the plain values. 

42 This uses the .sort_idx property. 

43 `True` enables sorting according to DIN 5007 Variant 2. 

44 With passing a `callable`, a custom transformer method can be set 

45 that creates the value for the index property. 

46 :param escape_html: Replace some characters in the string with HTML-safe sequences with 

47 using :meth:`utils.string.escape` for safe use in HTML. 

48 :param kwargs: Inherited arguments from the BaseBone. 

49 """ 

50 # fixme: Remove in viur-core >= 4 

51 if "maxLength" in kwargs: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 warnings.warn("maxLength parameter is deprecated, please use max_length", 

53 DeprecationWarning, stacklevel=2) 

54 max_length = kwargs.pop("maxLength") 

55 super().__init__(**kwargs) 

56 if max_length is not None and max_length <= 0: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 raise ValueError("max_length must be a positive integer or None") 

58 if min_length is not None and min_length <= 0: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 raise ValueError("min_length must be a positive integer or None") 

60 if min_length is not None and max_length is not None: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 if min_length > max_length: 

62 raise ValueError("min_length can't be greater than max_length") 

63 self.caseSensitive = caseSensitive 

64 self.max_length = max_length 

65 self.min_length = min_length 

66 if callable(natural_sorting): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 self.natural_sorting = natural_sorting 

68 elif not isinstance(natural_sorting, bool): 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 raise TypeError("natural_sorting must be a callable or boolean!") 

70 elif not natural_sorting: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true

71 self.natural_sorting = None 

72 # else: keep self.natural_sorting as is 

73 self.escape_html = escape_html 

74 

75 def type_coerce_single_value(self, value: t.Any) -> str: 

76 """Convert a value to a string (if not already) 

77 

78 Converts a value that is not a string into a string 

79 if a meaningful conversion is possible (simple data types only). 

80 """ 

81 if isinstance(value, str): 

82 return value 

83 elif isinstance(value, Number): 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 return str(value) 

85 elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 return value.isoformat() 

87 elif isinstance(value, db.Key): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 return value.to_legacy_urlsafe().decode("ASCII") 

89 elif not value: # None or any other falsy value 89 ↛ 92line 89 didn't jump to line 92 because the condition on line 89 was always true

90 return self.getEmptyValue() 

91 else: 

92 raise ValueError( 

93 f"Value {value} of type {type(value)} cannot be coerced for {type(self).__name__} {self.name}" 

94 ) 

95 

96 def singleValueSerialize( 

97 self, 

98 value: t.Any, 

99 skel: "SkeletonInstance", 

100 name: str, 

101 parentIndexed: bool, 

102 ) -> str | DB_TYPE_INDEXED: 

103 """ 

104 Serializes a single value of this data field for storage in the database. 

105 

106 :param value: The value to serialize. 

107 It should be a str value, if not it is forced with :meth:`type_coerce_single_value`. 

108 :param skel: The skeleton instance that this data field belongs to. 

109 :param name: The name of this data field. 

110 :param parentIndexed: A boolean value indicating whether the parent object has an index on 

111 this data field or not. 

112 :return: The serialized value. 

113 """ 

114 value = self.type_coerce_single_value(value) 

115 if (not self.caseSensitive or self.natural_sorting) and parentIndexed: 

116 serialized: DB_TYPE_INDEXED = {"val": value} 

117 if not self.caseSensitive: 117 ↛ 119line 117 didn't jump to line 119 because the condition on line 117 was always true

118 serialized["idx"] = value.lower() 

119 if self.natural_sorting: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 serialized["sort_idx"] = self.natural_sorting(value) 

121 return serialized 

122 return value 

123 

124 def singleValueUnserialize(self, value: str | DB_TYPE_INDEXED) -> str: 

125 """ 

126 Unserializes a single value of this data field from the database. 

127 

128 :param value: The serialized value to unserialize. 

129 :return: The unserialized value. 

130 """ 

131 if isinstance(value, dict) and "val" in value: 

132 value = value["val"] # Process with the raw value 

133 if value: 

134 return str(value) 

135 else: 

136 return self.getEmptyValue() 

137 

138 def getEmptyValue(self) -> str: 

139 """ 

140 Returns the empty value for this data field. 

141 

142 :return: An empty string. 

143 """ 

144 return "" 

145 

146 def isEmpty(self, value): 

147 """ 

148 Determines whether a value for this data field is empty or not. 

149 

150 :param value: The value to check for emptiness. 

151 :return: A boolean value indicating whether the value is empty or not. 

152 """ 

153 if not value: 

154 return True 

155 

156 return not bool(str(value).strip()) 

157 

158 def isInvalid(self, value: t.Any) -> str | None: 

159 """ 

160 Returns None if the value would be valid for 

161 this bone, an error-message otherwise. 

162 """ 

163 if self.max_length is not None and len(value) > self.max_length: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 return "Maximum length exceeded" 

165 if self.min_length is not None and len(value) < self.min_length: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true

166 return "Minimum length not reached" 

167 return None 

168 

169 def singleValueFromClient(self, value, skel, bone_name, client_data): 

170 """ 

171 Returns None and the escaped value if the value would be valid for 

172 this bone, otherwise the empty value and an error-message. 

173 """ 

174 if not (err := self.isInvalid(str(value))): 

175 if self.escape_html: 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was always true

176 return utils.string.escape(value, self.max_length), None 

177 elif self.max_length: 

178 return value[:self.max_length], None 

179 return value, None 

180 

181 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

182 

183 def buildDBFilter( 

184 self, 

185 name: str, 

186 skel: "SkeletonInstance", 

187 dbFilter: db.Query, 

188 rawFilter: dict, 

189 prefix: t.Optional[str] = None 

190 ) -> db.Query: 

191 """ 

192 Builds and returns a database filter for this data field based on the provided raw filter data. 

193 

194 :param name: The name of this data field. 

195 :param skel: The skeleton instance that this data field belongs to. 

196 :param dbFilter: The database filter to add query clauses to. 

197 :param rawFilter: A dictionary containing the raw filter data for this data field. 

198 :param prefix: An optional prefix to add to the query clause. 

199 :return: The database filter with the added query clauses. 

200 """ 

201 if name not in rawFilter and not any( 

202 [(x.startswith(name + "$") or x.startswith(name + ".")) for x in rawFilter.keys()] 

203 ): 

204 return super().buildDBFilter(name, skel, dbFilter, rawFilter, prefix) 

205 

206 if not self.languages: 

207 namefilter = name 

208 else: 

209 lang = None 

210 for key in rawFilter.keys(): 

211 if key.startswith(f"{name}."): 

212 langStr = key.replace(f"{name}.", "") 

213 if langStr in self.languages: 

214 lang = langStr 

215 break 

216 if not lang: 

217 lang = current.language.get() # currentSession.getLanguage() 

218 if not lang or not lang in self.languages: 

219 lang = self.languages[0] 

220 namefilter = f"{name}.{lang}" 

221 

222 if name + "$lk" in rawFilter: # Do a prefix-match 

223 if not self.caseSensitive: 

224 dbFilter.filter((prefix or "") + namefilter + ".idx >=", str(rawFilter[name + "$lk"]).lower()) 

225 dbFilter.filter((prefix or "") + namefilter + ".idx <", 

226 str(rawFilter[name + "$lk"] + u"\ufffd").lower()) 

227 else: 

228 dbFilter.filter((prefix or "") + namefilter + " >=", str(rawFilter[name + "$lk"])) 

229 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lk"] + u"\ufffd")) 

230 

231 if name + "$gt" in rawFilter: # All entries after 

232 if not self.caseSensitive: 

233 dbFilter.filter((prefix or "") + namefilter + ".idx >", str(rawFilter[name + "$gt"]).lower()) 

234 else: 

235 dbFilter.filter((prefix or "") + namefilter + " >", str(rawFilter[name + "$gt"])) 

236 

237 if name + "$lt" in rawFilter: # All entries before 

238 if not self.caseSensitive: 

239 dbFilter.filter((prefix or "") + namefilter + ".idx <", str(rawFilter[name + "$lt"]).lower()) 

240 else: 

241 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lt"])) 

242 

243 if name in rawFilter: # Normal, strict match 

244 if not self.caseSensitive: 

245 dbFilter.filter((prefix or "") + namefilter + ".idx", str(rawFilter[name]).lower()) 

246 else: 

247 dbFilter.filter((prefix or "") + namefilter, str(rawFilter[name])) 

248 

249 return dbFilter 

250 

251 def buildDBSort( 

252 self, 

253 name: str, 

254 skel: 'SkeletonInstance', 

255 query: db.Query, 

256 params: dict, 

257 postfix: str = "", 

258 ) -> t.Optional[db.Query]: 

259 return super().buildDBSort( 

260 name, skel, query, params, 

261 postfix=".sort_idx" if self.natural_sorting else ".idx" if not self.caseSensitive else postfix 

262 ) 

263 

264 def natural_sorting(self, value: str | None) -> str | None: 

265 """Implements a default natural sorting transformer. 

266 

267 The sorting is according to DIN 5007 Variant 2 

268 and sets ö and oe, etc. equal. 

269 """ 

270 if value is None: 

271 return None 

272 assert isinstance(value, str) 

273 if not self.caseSensitive: 

274 value = value.lower() 

275 

276 # DIN 5007 Variant 2 

277 return value.translate(str.maketrans({ 

278 "ö": "oe", 

279 "Ö": "Oe", 

280 "ü": "ue", 

281 "Ü": "Ue", 

282 "ä": "ae", 

283 "Ä": "Ae", 

284 "ẞ": "SS", 

285 })) 

286 

287 def getSearchTags(self, skel: "SkeletonInstance", name: str) -> set[str]: 

288 """ 

289 Returns a set of lowercased words that represent searchable tags for the given bone. 

290 

291 :param skel: The skeleton instance being searched. 

292 :param name: The name of the bone to generate tags for. 

293 

294 :return: A set of lowercased words representing searchable tags. 

295 """ 

296 result = set() 

297 for idx, lang, value in self.iter_bone_value(skel, name): 

298 if value is None: 

299 continue 

300 for line in str(value).splitlines(): # TODO: Can a StringBone be multiline? 

301 for word in line.split(" "): 

302 result.add(word.lower()) 

303 return result 

304 

305 def getUniquePropertyIndexValues(self, skel: "SkeletonInstance", name: str) -> list[str]: 

306 """ 

307 Returns a list of unique index values for a given property name. 

308 

309 :param skel: The skeleton instance. 

310 :param name: The name of the property. 

311 :return: A list of unique index values for the property. 

312 :raises NotImplementedError: If the StringBone has languages and the implementation 

313 for this case is not yet defined. 

314 """ 

315 if self.languages: 

316 # Not yet implemented as it's unclear if we should keep each language distinct or not 

317 raise NotImplementedError() 

318 

319 if not self.caseSensitive and (value := skel[name]) is not None: 

320 if self.multiple: 

321 value = [v.lower() for v in value] 

322 else: 

323 value = value.lower() 

324 return self._hashValueForUniquePropertyIndex(value) 

325 

326 return super().getUniquePropertyIndexValues(skel, name) 

327 

328 def refresh(self, skel: "SkeletonInstance", bone_name: str) -> None: 

329 super().refresh(skel, bone_name) 

330 

331 # TODO: duplicate code, this is the same iteration logic as in NumericBone 

332 new_value = {} 

333 for _, lang, value in self.iter_bone_value(skel, bone_name): 

334 new_value.setdefault(lang, []).append(self.type_coerce_single_value(value)) 

335 

336 if not self.multiple: 

337 # take the first one 

338 new_value = {lang: values[0] for lang, values in new_value.items() if values} 

339 

340 if self.languages: 

341 skel[bone_name] = new_value 

342 elif not self.languages: 

343 # just the value(s) with None language 

344 skel[bone_name] = new_value.get(None, [] if self.multiple else self.getEmptyValue()) 

345 

346 def structure(self) -> dict: 

347 ret = super().structure() | { 

348 "maxlength": self.max_length, 

349 "minlength": self.min_length 

350 } 

351 return ret 

352 

353 @classmethod 

354 def v_func_valid_chars(cls, valid_chars: t.Iterable = string.printable) -> t.Callable: 

355 """ 

356 Returns a function that takes a string and checks whether it contains valid characters. 

357 If all characters of the string are valid, it returns None, and succeeds. 

358 If invalid characters are present, it returns an appropriate error message. 

359 

360 :param valid_chars: An iterable of valid characters. 

361 :return: A function that takes a string and check whether it contains valid characters. 

362 

363 Example for digits only: 

364 .. code-block:: python 

365 str_bone = StringBone(vfunc=StringBone.v_func_valid_chars(string.digits)) 

366 """ 

367 

368 def v_func(valid_chars_intern, value): 

369 if any(char not in valid_chars_intern for char in value): 

370 return "Not all letters are available in the charset" 

371 

372 return functools.partial(v_func, valid_chars)