Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/key.py: 13%
90 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import copy
2import logging
3import typing as t
4from viur.core import db, i18n
5from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity
8class KeyBone(BaseBone):
9 """
10 The KeyBone is used for managing keys in the database. It provides various methods for validating,
11 converting, and storing key values, as well as querying the database.
12 Key management is crucial for maintaining relationships between entities in the database, and the
13 KeyBone class helps ensure that keys are handled correctly and efficiently throughout the system.
15 :param descr: The description of the KeyBone.
16 :param readOnly: Whether the KeyBone is read-only.
17 :param visible: Whether the KeyBone is visible.
18 :param allowed_kinds: The allowed entity kinds for the KeyBone.
19 :param check: Whether to check for entity existence.
20 """
21 type = "key"
23 def __init__(
24 self,
25 *,
26 descr: str = "Key",
27 readOnly: bool = True, # default is readonly
28 visible: bool = False, # default is invisible
29 allowed_kinds: t.Optional[t.Iterable[str]] = None, # None allows for any kind
30 check: bool = False, # check for entity existence
31 **kwargs
32 ):
33 super().__init__(descr=descr, readOnly=readOnly, visible=visible, defaultValue=None, **kwargs)
34 self.allowed_kinds = tuple(allowed_kinds) if allowed_kinds else None
35 self.check = check
37 def singleValueFromClient(self, value, skel=None, bone_name=None, client_data=None, parse_only: bool = False):
38 # check for correct key
39 if isinstance(value, str):
40 value = value.strip()
42 if self.allowed_kinds:
43 try:
44 key = db.key_helper(value, self.allowed_kinds[0], self.allowed_kinds[1:])
45 except ValueError as e:
46 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, e.args[0])]
47 else:
48 try:
49 key = db.normalize_key(value)
50 except Exception as exc:
51 logging.exception(f"Failed to normalize {value}: {exc}")
52 return self.getEmptyValue(), [
53 ReadFromClientError(
54 ReadFromClientErrorSeverity.Invalid,
55 i18n.translate("core.bones.error.invalidkey", "No valid database key could be parsed")
56 )
57 ]
59 if not parse_only:
60 # Check custom validity
61 if err := self.isInvalid(key):
62 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)]
64 if self.check:
65 if db.get(key) is None:
66 return self.getEmptyValue(), [
67 ReadFromClientError(
68 ReadFromClientErrorSeverity.Invalid,
69 i18n.translate("core.bones.error.keynotfound", "The provided database key does not exist")
70 )
71 ]
73 return key, None
75 def singleValueUnserialize(self, val):
76 if not val:
77 rval = None
78 elif isinstance(val, db.Key):
79 rval = db.normalize_key(val)
80 else:
81 rval, err = self.singleValueFromClient(val, parse_only=True)
82 if err:
83 raise ValueError(err[0].errorMessage)
85 return rval
87 def unserialize(self, skel: 'SkeletonInstance', name: str) -> bool:
88 if (
89 name == "key"
90 and isinstance(skel.dbEntity, db.Entity)
91 and skel.dbEntity.key
92 and not skel.dbEntity.key.is_partial
93 ):
94 skel.accessedValues[name] = skel.dbEntity.key
95 return True
96 return super().unserialize(skel, name)
98 def serialize(self, skel: 'SkeletonInstance', name: str, parentIndexed: bool) -> bool:
99 if name not in skel.accessedValues:
100 return False
101 if name == "key":
102 skel.dbEntity.key = skel.accessedValues["key"]
103 return True
105 return super().serialize(skel, name, parentIndexed=parentIndexed)
107 def buildDBFilter(
108 self,
109 name: str,
110 skel: 'viur.core.skeleton.SkeletonInstance',
111 dbFilter: db.Query,
112 rawFilter: dict,
113 prefix: t.Optional[str] = None
114 ) -> db.Query:
115 """
116 This method parses the search filter specified by the client in their request and converts
117 it into a format that can be understood by the datastore. It takes care of ignoring filters
118 that do not target this bone and safely handles malformed data in the raw filter.
120 :param name: The property name of this bone in the Skeleton (not the description).
121 :param skel: The :class:viur.core.skeleton.SkeletonInstance this bone is a part of.
122 :param dbFilter: The current :class:viur.core.db.Query instance the filters should be
123 applied to.
124 :param rawFilter: The dictionary of filters the client wants to have applied.
125 :param prefix: An optional string to prepend to the filter key. Defaults to None.
127 :return: The modified :class:viur.core.db.Query.
129 The method takes the following steps:
131 #. Decodes the provided key(s) from the raw filter.
132 #. If the filter contains a list of keys, it iterates through the list, creating a new
133 filter for each key and appending it to the list of queries.
134 #. If the filter contains a single key, it applies the filter directly to the query.
135 #. In case of any invalid key or other issues, it raises a RuntimeError.
136 """
138 def _decodeKey(key):
139 if isinstance(key, db.Key):
140 return key
141 else:
142 try:
143 return db.Key.from_legacy_urlsafe(key)
144 except Exception as e:
145 logging.exception(e)
146 logging.warning(f"Could not decode key {key}")
147 raise RuntimeError()
149 if name in rawFilter:
150 if isinstance(rawFilter[name], list):
151 if isinstance(dbFilter.queries, list):
152 raise ValueError("In-Filter already used!")
153 elif dbFilter.queries is None:
154 return dbFilter # Query is already unsatisfiable
155 oldFilter = dbFilter.queries
156 dbFilter.queries = []
157 for key in rawFilter[name]:
158 newFilter = copy.deepcopy(oldFilter)
159 try:
160 if name == "key":
161 newFilter.filters[f"{prefix or ''}{db.KEY_SPECIAL_PROPERTY} ="] = _decodeKey(key)
162 else:
163 newFilter.filters[f"{prefix or ''}{name} ="] = _decodeKey(key)
164 except: # Invalid key or something
165 raise RuntimeError()
166 dbFilter.queries.append(newFilter)
167 else:
168 try:
169 if name == "key":
170 dbFilter.filter(f"""{prefix or ""}{db.KEY_SPECIAL_PROPERTY} =""", _decodeKey(rawFilter[name]))
171 else:
172 dbFilter.filter(f"""{prefix or ""}{name} =""", _decodeKey(rawFilter[name]))
173 except: # Invalid key or something
174 raise RuntimeError()
175 return dbFilter
177 def _atomic_dump(self, value):
178 if not value:
179 return None
181 return str(value)