Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/string.py: 39%

163 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import datetime 

2import functools 

3import logging 

4import string 

5import typing as t 

6import warnings 

7from numbers import Number 

8 

9from viur.core import current, db, utils 

10from .base import ReadFromClientError, ReadFromClientErrorSeverity 

11from .raw import RawBone 

12 

13if t.TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true

14 from ..skeleton import SkeletonInstance 

15 

16DB_TYPE_INDEXED: t.TypeAlias = dict[t.Literal["val", "idx", "sort_idx"], str] 

17 

18 

19class StringBone(RawBone): 

20 """ 

21 The "StringBone" represents a data field that contains text values. 

22 """ 

23 type = "str" 

24 

25 def __init__( 

26 self, 

27 *, 

28 caseSensitive: bool = True, 

29 max_length: int | None = 254, 

30 min_length: int | None = None, 

31 natural_sorting: bool | t.Callable = False, 

32 escape_html: bool = True, 

33 **kwargs 

34 ): 

35 """ 

36 Initializes a new StringBone. 

37 

38 :param caseSensitive: When filtering for values in this bone, should it be case-sensitive? 

39 :param max_length: The maximum length allowed for values of this bone. Set to None for no limitation. 

40 :param min_length: The minimum length allowed for values of this bone. Set to None for no limitation. 

41 :param natural_sorting: Allows a more natural sorting 

42 than the default sorting on the plain values. 

43 This uses the .sort_idx property. 

44 `True` enables sorting according to DIN 5007 Variant 2. 

45 With passing a `callable`, a custom transformer method can be set 

46 that creates the value for the index property. 

47 :param escape_html: Replace some characters in the string with HTML-safe sequences with 

48 using :meth:`utils.string.escape` for safe use in HTML. 

49 :param kwargs: Inherited arguments from the BaseBone. 

50 """ 

51 # fixme: Remove in viur-core >= 4 

52 if "maxLength" in kwargs: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 warnings.warn("maxLength parameter is deprecated, please use max_length", 

54 DeprecationWarning, stacklevel=2) 

55 max_length = kwargs.pop("maxLength") 

56 super().__init__(**kwargs) 

57 if max_length is not None and max_length <= 0: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true

58 raise ValueError("max_length must be a positive integer or None") 

59 if min_length is not None and min_length <= 0: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 raise ValueError("min_length must be a positive integer or None") 

61 if min_length is not None and max_length is not None: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 if min_length > max_length: 

63 raise ValueError("min_length can't be greater than max_length") 

64 self.caseSensitive = caseSensitive 

65 self.max_length = max_length 

66 self.min_length = min_length 

67 if callable(natural_sorting): 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 self.natural_sorting = natural_sorting 

69 elif not isinstance(natural_sorting, bool): 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 raise TypeError("natural_sorting must be a callable or boolean!") 

71 elif not natural_sorting: 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was always true

72 self.natural_sorting = None 

73 # else: keep self.natural_sorting as is 

74 self.escape_html = escape_html 

75 

76 def type_coerce_single_value(self, value: t.Any) -> str: 

77 """Convert a value to a string (if not already) 

78 

79 Converts a value that is not a string into a string 

80 if a meaningful conversion is possible (simple data types only). 

81 """ 

82 if isinstance(value, str): 

83 return value 

84 elif isinstance(value, Number): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 return str(value) 

86 elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 return value.isoformat() 

88 elif isinstance(value, db.Key): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return value.to_legacy_urlsafe().decode("ASCII") 

90 elif not value: # None or any other falsy value 90 ↛ 93line 90 didn't jump to line 93 because the condition on line 90 was always true

91 return self.getEmptyValue() 

92 else: 

93 raise ValueError( 

94 f"Value {value} of type {type(value)} cannot be coerced for {type(self).__name__} {self.name}" 

95 ) 

96 

97 def singleValueSerialize( 

98 self, 

99 value: t.Any, 

100 skel: "SkeletonInstance", 

101 name: str, 

102 parentIndexed: bool, 

103 ) -> str | DB_TYPE_INDEXED: 

104 """ 

105 Serializes a single value of this data field for storage in the database. 

106 

107 :param value: The value to serialize. 

108 It should be a str value, if not it is forced with :meth:`type_coerce_single_value`. 

109 :param skel: The skeleton instance that this data field belongs to. 

110 :param name: The name of this data field. 

111 :param parentIndexed: A boolean value indicating whether the parent object has an index on 

112 this data field or not. 

113 :return: The serialized value. 

114 """ 

115 value = self.type_coerce_single_value(value) 

116 if (not self.caseSensitive or self.natural_sorting) and parentIndexed: 

117 serialized: DB_TYPE_INDEXED = {"val": value} 

118 if not self.caseSensitive: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true

119 serialized["idx"] = value.lower() 

120 if self.natural_sorting: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 serialized["sort_idx"] = self.natural_sorting(value) 

122 return serialized 

123 return value 

124 

125 def singleValueUnserialize(self, value: str | DB_TYPE_INDEXED) -> str: 

126 """ 

127 Unserializes a single value of this data field from the database. 

128 

129 :param value: The serialized value to unserialize. 

130 :return: The unserialized value. 

131 """ 

132 if isinstance(value, dict) and "val" in value: 

133 value = value["val"] # Process with the raw value 

134 if value: 

135 return str(value) 

136 else: 

137 return self.getEmptyValue() 

138 

139 def getEmptyValue(self) -> str: 

140 """ 

141 Returns the empty value for this data field. 

142 

143 :return: An empty string. 

144 """ 

145 return "" 

146 

147 def isEmpty(self, value): 

148 """ 

149 Determines whether a value for this data field is empty or not. 

150 

151 :param value: The value to check for emptiness. 

152 :return: A boolean value indicating whether the value is empty or not. 

153 """ 

154 if not value: 

155 return True 

156 

157 return not bool(str(value).strip()) 

158 

159 def isInvalid(self, value: t.Any) -> str | None: 

160 """ 

161 Returns None if the value would be valid for 

162 this bone, an error-message otherwise. 

163 """ 

164 if self.max_length is not None and len(value) > self.max_length: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 return "Maximum length exceeded" 

166 if self.min_length is not None and len(value) < self.min_length: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 return "Minimum length not reached" 

168 return None 

169 

170 def singleValueFromClient(self, value, skel, bone_name, client_data): 

171 """ 

172 Returns None and the escaped value if the value would be valid for 

173 this bone, otherwise the empty value and an error-message. 

174 """ 

175 if not (err := self.isInvalid(str(value))): 

176 if self.escape_html: 176 ↛ 178line 176 didn't jump to line 178 because the condition on line 176 was always true

177 return utils.string.escape(value, self.max_length), None 

178 elif self.max_length: 

179 return value[:self.max_length], None 

180 return value, None 

181 

182 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

183 

184 def buildDBFilter( 

185 self, 

186 name: str, 

187 skel: "SkeletonInstance", 

188 dbFilter: db.Query, 

189 rawFilter: dict, 

190 prefix: t.Optional[str] = None 

191 ) -> db.Query: 

192 """ 

193 Builds and returns a database filter for this data field based on the provided raw filter data. 

194 

195 :param name: The name of this data field. 

196 :param skel: The skeleton instance that this data field belongs to. 

197 :param dbFilter: The database filter to add query clauses to. 

198 :param rawFilter: A dictionary containing the raw filter data for this data field. 

199 :param prefix: An optional prefix to add to the query clause. 

200 :return: The database filter with the added query clauses. 

201 """ 

202 if name not in rawFilter and not any( 

203 [(x.startswith(name + "$") or x.startswith(name + ".")) for x in rawFilter.keys()] 

204 ): 

205 return super().buildDBFilter(name, skel, dbFilter, rawFilter, prefix) 

206 

207 if not self.languages: 

208 namefilter = name 

209 else: 

210 lang = None 

211 for key in rawFilter.keys(): 

212 if key.startswith(f"{name}."): 

213 langStr = key.replace(f"{name}.", "") 

214 if langStr in self.languages: 

215 lang = langStr 

216 break 

217 if not lang: 

218 lang = current.language.get() # currentSession.getLanguage() 

219 if not lang or not lang in self.languages: 

220 lang = self.languages[0] 

221 namefilter = f"{name}.{lang}" 

222 

223 if name + "$lk" in rawFilter: # Do a prefix-match 

224 if not self.caseSensitive: 

225 dbFilter.filter((prefix or "") + namefilter + ".idx >=", str(rawFilter[name + "$lk"]).lower()) 

226 dbFilter.filter((prefix or "") + namefilter + ".idx <", 

227 str(rawFilter[name + "$lk"] + u"\ufffd").lower()) 

228 else: 

229 dbFilter.filter((prefix or "") + namefilter + " >=", str(rawFilter[name + "$lk"])) 

230 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lk"] + u"\ufffd")) 

231 

232 if name + "$gt" in rawFilter: # All entries after 

233 if not self.caseSensitive: 

234 dbFilter.filter((prefix or "") + namefilter + ".idx >", str(rawFilter[name + "$gt"]).lower()) 

235 else: 

236 dbFilter.filter((prefix or "") + namefilter + " >", str(rawFilter[name + "$gt"])) 

237 

238 if name + "$lt" in rawFilter: # All entries before 

239 if not self.caseSensitive: 

240 dbFilter.filter((prefix or "") + namefilter + ".idx <", str(rawFilter[name + "$lt"]).lower()) 

241 else: 

242 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lt"])) 

243 

244 if name in rawFilter: # Normal, strict match 

245 if not self.caseSensitive: 

246 dbFilter.filter((prefix or "") + namefilter + ".idx", str(rawFilter[name]).lower()) 

247 else: 

248 dbFilter.filter((prefix or "") + namefilter, str(rawFilter[name])) 

249 

250 return dbFilter 

251 

252 def buildDBSort( 

253 self, 

254 name: str, 

255 skel: 'SkeletonInstance', 

256 query: db.Query, 

257 params: dict, 

258 postfix: str = "", 

259 ) -> t.Optional[db.Query]: 

260 return super().buildDBSort( 

261 name, skel, query, params, 

262 postfix=".sort_idx" if self.natural_sorting else ".idx" if not self.caseSensitive else postfix 

263 ) 

264 

265 def natural_sorting(self, value: str | None) -> str | None: 

266 """Implements a default natural sorting transformer. 

267 

268 The sorting is according to DIN 5007 Variant 2 

269 and sets ö and oe, etc. equal. 

270 """ 

271 if value is None: 

272 return None 

273 assert isinstance(value, str) 

274 if not self.caseSensitive: 

275 value = value.lower() 

276 

277 # DIN 5007 Variant 2 

278 return value.translate(str.maketrans({ 

279 "ö": "oe", 

280 "Ö": "Oe", 

281 "ü": "ue", 

282 "Ü": "Ue", 

283 "ä": "ae", 

284 "Ä": "Ae", 

285 "ẞ": "SS", 

286 })) 

287 

288 def getUniquePropertyIndexValues(self, skel: "SkeletonInstance", name: str) -> list[str]: 

289 """ 

290 Returns a list of unique index values for a given property name. 

291 

292 :param skel: The skeleton instance. 

293 :param name: The name of the property. 

294 :return: A list of unique index values for the property. 

295 :raises NotImplementedError: If the StringBone has languages and the implementation 

296 for this case is not yet defined. 

297 """ 

298 if self.languages: 

299 # Not yet implemented as it's unclear if we should keep each language distinct or not 

300 raise NotImplementedError() 

301 

302 if not self.caseSensitive and (value := skel[name]) is not None: 

303 if self.multiple: 

304 value = [v.lower() for v in value] 

305 else: 

306 value = value.lower() 

307 return self._hashValueForUniquePropertyIndex(value) 

308 

309 return super().getUniquePropertyIndexValues(skel, name) 

310 

311 def refresh(self, skel: "SkeletonInstance", bone_name: str) -> None: 

312 super().refresh(skel, bone_name) 

313 

314 # TODO: duplicate code, this is the same iteration logic as in NumericBone 

315 new_value = {} 

316 for _, lang, value in self.iter_bone_value(skel, bone_name): 

317 value = self.type_coerce_single_value(value) 

318 if self.escape_html: 

319 value = utils.string.escape(value) 

320 else: 

321 value = utils.string.unescape(value) 

322 new_value.setdefault(lang, []).append(value) 

323 

324 if not self.multiple: 

325 # take the first one 

326 new_value = {lang: values[0] for lang, values in new_value.items() if values} 

327 

328 if self.languages: 

329 skel[bone_name] = new_value 

330 elif not self.languages: 

331 # just the value(s) with None language 

332 skel[bone_name] = new_value.get(None, [] if self.multiple else self.getEmptyValue()) 

333 

334 def structure(self) -> dict: 

335 ret = super().structure() | { 

336 "maxlength": self.max_length, 

337 "minlength": self.min_length 

338 } 

339 return ret 

340 

341 @classmethod 

342 def v_func_valid_chars(cls, valid_chars: t.Iterable = string.printable) -> t.Callable: 

343 """ 

344 Returns a function that takes a string and checks whether it contains valid characters. 

345 If all characters of the string are valid, it returns None, and succeeds. 

346 If invalid characters are present, it returns an appropriate error message. 

347 

348 :param valid_chars: An iterable of valid characters. 

349 :return: A function that takes a string and check whether it contains valid characters. 

350 

351 Example for digits only: 

352 .. code-block:: python 

353 str_bone = StringBone(vfunc=StringBone.v_func_valid_chars(string.digits)) 

354 """ 

355 

356 def v_func(valid_chars_intern, value): 

357 if any(char not in valid_chars_intern for char in value): 

358 return "Not all letters are available in the charset" 

359 

360 return functools.partial(v_func, valid_chars)