Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/string.py: 39%
163 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import datetime
2import functools
3import logging
4import string
5import typing as t
6import warnings
7from numbers import Number
9from viur.core import current, db, utils
10from .base import ReadFromClientError, ReadFromClientErrorSeverity
11from .raw import RawBone
13if t.TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true
14 from ..skeleton import SkeletonInstance
16DB_TYPE_INDEXED: t.TypeAlias = dict[t.Literal["val", "idx", "sort_idx"], str]
19class StringBone(RawBone):
20 """
21 The "StringBone" represents a data field that contains text values.
22 """
23 type = "str"
25 def __init__(
26 self,
27 *,
28 caseSensitive: bool = True,
29 max_length: int | None = 254,
30 min_length: int | None = None,
31 natural_sorting: bool | t.Callable = False,
32 escape_html: bool = True,
33 **kwargs
34 ):
35 """
36 Initializes a new StringBone.
38 :param caseSensitive: When filtering for values in this bone, should it be case-sensitive?
39 :param max_length: The maximum length allowed for values of this bone. Set to None for no limitation.
40 :param min_length: The minimum length allowed for values of this bone. Set to None for no limitation.
41 :param natural_sorting: Allows a more natural sorting
42 than the default sorting on the plain values.
43 This uses the .sort_idx property.
44 `True` enables sorting according to DIN 5007 Variant 2.
45 With passing a `callable`, a custom transformer method can be set
46 that creates the value for the index property.
47 :param escape_html: Replace some characters in the string with HTML-safe sequences with
48 using :meth:`utils.string.escape` for safe use in HTML.
49 :param kwargs: Inherited arguments from the BaseBone.
50 """
51 # fixme: Remove in viur-core >= 4
52 if "maxLength" in kwargs: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 warnings.warn("maxLength parameter is deprecated, please use max_length",
54 DeprecationWarning, stacklevel=2)
55 max_length = kwargs.pop("maxLength")
56 super().__init__(**kwargs)
57 if max_length is not None and max_length <= 0: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true
58 raise ValueError("max_length must be a positive integer or None")
59 if min_length is not None and min_length <= 0: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 raise ValueError("min_length must be a positive integer or None")
61 if min_length is not None and max_length is not None: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 if min_length > max_length:
63 raise ValueError("min_length can't be greater than max_length")
64 self.caseSensitive = caseSensitive
65 self.max_length = max_length
66 self.min_length = min_length
67 if callable(natural_sorting): 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 self.natural_sorting = natural_sorting
69 elif not isinstance(natural_sorting, bool): 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 raise TypeError("natural_sorting must be a callable or boolean!")
71 elif not natural_sorting: 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was always true
72 self.natural_sorting = None
73 # else: keep self.natural_sorting as is
74 self.escape_html = escape_html
76 def type_coerce_single_value(self, value: t.Any) -> str:
77 """Convert a value to a string (if not already)
79 Converts a value that is not a string into a string
80 if a meaningful conversion is possible (simple data types only).
81 """
82 if isinstance(value, str):
83 return value
84 elif isinstance(value, Number): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 return str(value)
86 elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 return value.isoformat()
88 elif isinstance(value, db.Key): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return value.to_legacy_urlsafe().decode("ASCII")
90 elif not value: # None or any other falsy value 90 ↛ 93line 90 didn't jump to line 93 because the condition on line 90 was always true
91 return self.getEmptyValue()
92 else:
93 raise ValueError(
94 f"Value {value} of type {type(value)} cannot be coerced for {type(self).__name__} {self.name}"
95 )
97 def singleValueSerialize(
98 self,
99 value: t.Any,
100 skel: "SkeletonInstance",
101 name: str,
102 parentIndexed: bool,
103 ) -> str | DB_TYPE_INDEXED:
104 """
105 Serializes a single value of this data field for storage in the database.
107 :param value: The value to serialize.
108 It should be a str value, if not it is forced with :meth:`type_coerce_single_value`.
109 :param skel: The skeleton instance that this data field belongs to.
110 :param name: The name of this data field.
111 :param parentIndexed: A boolean value indicating whether the parent object has an index on
112 this data field or not.
113 :return: The serialized value.
114 """
115 value = self.type_coerce_single_value(value)
116 if (not self.caseSensitive or self.natural_sorting) and parentIndexed:
117 serialized: DB_TYPE_INDEXED = {"val": value}
118 if not self.caseSensitive: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true
119 serialized["idx"] = value.lower()
120 if self.natural_sorting: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 serialized["sort_idx"] = self.natural_sorting(value)
122 return serialized
123 return value
125 def singleValueUnserialize(self, value: str | DB_TYPE_INDEXED) -> str:
126 """
127 Unserializes a single value of this data field from the database.
129 :param value: The serialized value to unserialize.
130 :return: The unserialized value.
131 """
132 if isinstance(value, dict) and "val" in value:
133 value = value["val"] # Process with the raw value
134 if value:
135 return str(value)
136 else:
137 return self.getEmptyValue()
139 def getEmptyValue(self) -> str:
140 """
141 Returns the empty value for this data field.
143 :return: An empty string.
144 """
145 return ""
147 def isEmpty(self, value):
148 """
149 Determines whether a value for this data field is empty or not.
151 :param value: The value to check for emptiness.
152 :return: A boolean value indicating whether the value is empty or not.
153 """
154 if not value:
155 return True
157 return not bool(str(value).strip())
159 def isInvalid(self, value: t.Any) -> str | None:
160 """
161 Returns None if the value would be valid for
162 this bone, an error-message otherwise.
163 """
164 if self.max_length is not None and len(value) > self.max_length: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 return "Maximum length exceeded"
166 if self.min_length is not None and len(value) < self.min_length: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 return "Minimum length not reached"
168 return None
170 def singleValueFromClient(self, value, skel, bone_name, client_data):
171 """
172 Returns None and the escaped value if the value would be valid for
173 this bone, otherwise the empty value and an error-message.
174 """
175 if not (err := self.isInvalid(str(value))):
176 if self.escape_html: 176 ↛ 178line 176 didn't jump to line 178 because the condition on line 176 was always true
177 return utils.string.escape(value, self.max_length), None
178 elif self.max_length:
179 return value[:self.max_length], None
180 return value, None
182 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)]
184 def buildDBFilter(
185 self,
186 name: str,
187 skel: "SkeletonInstance",
188 dbFilter: db.Query,
189 rawFilter: dict,
190 prefix: t.Optional[str] = None
191 ) -> db.Query:
192 """
193 Builds and returns a database filter for this data field based on the provided raw filter data.
195 :param name: The name of this data field.
196 :param skel: The skeleton instance that this data field belongs to.
197 :param dbFilter: The database filter to add query clauses to.
198 :param rawFilter: A dictionary containing the raw filter data for this data field.
199 :param prefix: An optional prefix to add to the query clause.
200 :return: The database filter with the added query clauses.
201 """
202 if name not in rawFilter and not any(
203 [(x.startswith(name + "$") or x.startswith(name + ".")) for x in rawFilter.keys()]
204 ):
205 return super().buildDBFilter(name, skel, dbFilter, rawFilter, prefix)
207 if not self.languages:
208 namefilter = name
209 else:
210 lang = None
211 for key in rawFilter.keys():
212 if key.startswith(f"{name}."):
213 langStr = key.replace(f"{name}.", "")
214 if langStr in self.languages:
215 lang = langStr
216 break
217 if not lang:
218 lang = current.language.get() # currentSession.getLanguage()
219 if not lang or not lang in self.languages:
220 lang = self.languages[0]
221 namefilter = f"{name}.{lang}"
223 if name + "$lk" in rawFilter: # Do a prefix-match
224 if not self.caseSensitive:
225 dbFilter.filter((prefix or "") + namefilter + ".idx >=", str(rawFilter[name + "$lk"]).lower())
226 dbFilter.filter((prefix or "") + namefilter + ".idx <",
227 str(rawFilter[name + "$lk"] + u"\ufffd").lower())
228 else:
229 dbFilter.filter((prefix or "") + namefilter + " >=", str(rawFilter[name + "$lk"]))
230 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lk"] + u"\ufffd"))
232 if name + "$gt" in rawFilter: # All entries after
233 if not self.caseSensitive:
234 dbFilter.filter((prefix or "") + namefilter + ".idx >", str(rawFilter[name + "$gt"]).lower())
235 else:
236 dbFilter.filter((prefix or "") + namefilter + " >", str(rawFilter[name + "$gt"]))
238 if name + "$lt" in rawFilter: # All entries before
239 if not self.caseSensitive:
240 dbFilter.filter((prefix or "") + namefilter + ".idx <", str(rawFilter[name + "$lt"]).lower())
241 else:
242 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lt"]))
244 if name in rawFilter: # Normal, strict match
245 if not self.caseSensitive:
246 dbFilter.filter((prefix or "") + namefilter + ".idx", str(rawFilter[name]).lower())
247 else:
248 dbFilter.filter((prefix or "") + namefilter, str(rawFilter[name]))
250 return dbFilter
252 def buildDBSort(
253 self,
254 name: str,
255 skel: 'SkeletonInstance',
256 query: db.Query,
257 params: dict,
258 postfix: str = "",
259 ) -> t.Optional[db.Query]:
260 return super().buildDBSort(
261 name, skel, query, params,
262 postfix=".sort_idx" if self.natural_sorting else ".idx" if not self.caseSensitive else postfix
263 )
265 def natural_sorting(self, value: str | None) -> str | None:
266 """Implements a default natural sorting transformer.
268 The sorting is according to DIN 5007 Variant 2
269 and sets ö and oe, etc. equal.
270 """
271 if value is None:
272 return None
273 assert isinstance(value, str)
274 if not self.caseSensitive:
275 value = value.lower()
277 # DIN 5007 Variant 2
278 return value.translate(str.maketrans({
279 "ö": "oe",
280 "Ö": "Oe",
281 "ü": "ue",
282 "Ü": "Ue",
283 "ä": "ae",
284 "Ä": "Ae",
285 "ẞ": "SS",
286 }))
288 def getUniquePropertyIndexValues(self, skel: "SkeletonInstance", name: str) -> list[str]:
289 """
290 Returns a list of unique index values for a given property name.
292 :param skel: The skeleton instance.
293 :param name: The name of the property.
294 :return: A list of unique index values for the property.
295 :raises NotImplementedError: If the StringBone has languages and the implementation
296 for this case is not yet defined.
297 """
298 if self.languages:
299 # Not yet implemented as it's unclear if we should keep each language distinct or not
300 raise NotImplementedError()
302 if not self.caseSensitive and (value := skel[name]) is not None:
303 if self.multiple:
304 value = [v.lower() for v in value]
305 else:
306 value = value.lower()
307 return self._hashValueForUniquePropertyIndex(value)
309 return super().getUniquePropertyIndexValues(skel, name)
311 def refresh(self, skel: "SkeletonInstance", bone_name: str) -> None:
312 super().refresh(skel, bone_name)
314 # TODO: duplicate code, this is the same iteration logic as in NumericBone
315 new_value = {}
316 for _, lang, value in self.iter_bone_value(skel, bone_name):
317 value = self.type_coerce_single_value(value)
318 if self.escape_html:
319 value = utils.string.escape(value)
320 else:
321 value = utils.string.unescape(value)
322 new_value.setdefault(lang, []).append(value)
324 if not self.multiple:
325 # take the first one
326 new_value = {lang: values[0] for lang, values in new_value.items() if values}
328 if self.languages:
329 skel[bone_name] = new_value
330 elif not self.languages:
331 # just the value(s) with None language
332 skel[bone_name] = new_value.get(None, [] if self.multiple else self.getEmptyValue())
334 def structure(self) -> dict:
335 ret = super().structure() | {
336 "maxlength": self.max_length,
337 "minlength": self.min_length
338 }
339 return ret
341 @classmethod
342 def v_func_valid_chars(cls, valid_chars: t.Iterable = string.printable) -> t.Callable:
343 """
344 Returns a function that takes a string and checks whether it contains valid characters.
345 If all characters of the string are valid, it returns None, and succeeds.
346 If invalid characters are present, it returns an appropriate error message.
348 :param valid_chars: An iterable of valid characters.
349 :return: A function that takes a string and check whether it contains valid characters.
351 Example for digits only:
352 .. code-block:: python
353 str_bone = StringBone(vfunc=StringBone.v_func_valid_chars(string.digits))
354 """
356 def v_func(valid_chars_intern, value):
357 if any(char not in valid_chars_intern for char in value):
358 return "Not all letters are available in the charset"
360 return functools.partial(v_func, valid_chars)