Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / bones / key.py: 13%
90 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 20:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 20:18 +0000
1import copy
2import logging
3import typing as t
4from viur.core import db, i18n
5from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity
8class KeyBone(BaseBone):
9 """
10 The KeyBone is used for managing keys in the database. It provides various methods for validating,
11 converting, and storing key values, as well as querying the database.
12 Key management is crucial for maintaining relationships between entities in the database, and the
13 KeyBone class helps ensure that keys are handled correctly and efficiently throughout the system.
15 :param descr: The description of the KeyBone.
16 :param readOnly: Whether the KeyBone is read-only.
17 :param visible: Whether the KeyBone is visible.
18 :param allowed_kinds: The allowed entity kinds for the KeyBone.
19 :param check: Whether to check for entity existence.
20 """
21 type = "key"
23 def __init__(
24 self,
25 *,
26 descr: str = "Key",
27 readOnly: bool = True, # default is readonly
28 visible: bool = False, # default is invisible
29 allowed_kinds: t.Optional[t.Iterable[str]] = None, # None allows for any kind
30 check: bool = False, # check for entity existence
31 **kwargs
32 ):
33 super().__init__(descr=descr, readOnly=readOnly, visible=visible, defaultValue=None, **kwargs)
34 self.allowed_kinds = tuple(allowed_kinds) if allowed_kinds else None
35 self.check = check
37 def singleValueFromClient(self, value, skel=None, bone_name=None, client_data=None, parse_only: bool = False):
38 # check for correct key
39 if isinstance(value, str):
40 value = value.strip()
42 if self.allowed_kinds:
43 try:
44 key = db.key_helper(value, self.allowed_kinds[0], self.allowed_kinds[1:])
45 except ValueError as e:
46 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, e.args[0])]
47 else:
48 try:
49 key = db.normalize_key(value)
50 except Exception as exc:
51 logging.exception(f"Failed to normalize {value}: {exc}")
52 return self.getEmptyValue(), [
53 ReadFromClientError(
54 ReadFromClientErrorSeverity.Invalid,
55 i18n.translate("core.bones.error.invalidkey", "No valid database key could be parsed")
56 )
57 ]
59 if not parse_only:
60 # Check custom validity
61 if err := self.isInvalid(key):
62 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)]
64 if self.check:
65 if db.get(key) is None:
66 return self.getEmptyValue(), [
67 ReadFromClientError(
68 ReadFromClientErrorSeverity.Invalid,
69 i18n.translate("core.bones.error.keynotfound", "The provided database key does not exist")
70 )
71 ]
73 return key, None
75 def singleValueUnserialize(self, val):
76 if not val:
77 rval = None
78 elif isinstance(val, db.Key):
79 rval = db.normalize_key(val)
80 else:
81 rval, err = self.singleValueFromClient(val, parse_only=True)
82 if err:
83 raise ValueError(err[0].errorMessage)
85 return rval
87 def unserialize(self, skel: 'SkeletonInstance', name: str) -> bool:
88 if (
89 name == "key"
90 and isinstance(skel.dbEntity, db.Entity)
91 and skel.dbEntity.key
92 and not skel.dbEntity.key.is_partial
93 ):
94 skel.accessedValues[name] = skel.dbEntity.key
95 return True
96 return super().unserialize(skel, name)
98 def serialize(self, skel: 'SkeletonInstance', name: str, parentIndexed: bool) -> bool:
99 if name == "key":
100 if name not in skel.accessedValues:
101 return False
103 skel.dbEntity.key = skel.accessedValues[name]
104 return True
106 return super().serialize(skel, name, parentIndexed=parentIndexed)
108 def buildDBFilter(
109 self,
110 name: str,
111 skel: 'viur.core.skeleton.SkeletonInstance',
112 dbFilter: db.Query,
113 rawFilter: dict,
114 prefix: t.Optional[str] = None
115 ) -> db.Query:
116 """
117 This method parses the search filter specified by the client in their request and converts
118 it into a format that can be understood by the datastore. It takes care of ignoring filters
119 that do not target this bone and safely handles malformed data in the raw filter.
121 :param name: The property name of this bone in the Skeleton (not the description).
122 :param skel: The :class:viur.core.skeleton.SkeletonInstance this bone is a part of.
123 :param dbFilter: The current :class:viur.core.db.Query instance the filters should be
124 applied to.
125 :param rawFilter: The dictionary of filters the client wants to have applied.
126 :param prefix: An optional string to prepend to the filter key. Defaults to None.
128 :return: The modified :class:viur.core.db.Query.
130 The method takes the following steps:
132 #. Decodes the provided key(s) from the raw filter.
133 #. If the filter contains a list of keys, it iterates through the list, creating a new
134 filter for each key and appending it to the list of queries.
135 #. If the filter contains a single key, it applies the filter directly to the query.
136 #. In case of any invalid key or other issues, it raises a RuntimeError.
137 """
139 def _decodeKey(key):
140 if isinstance(key, db.Key):
141 return key
142 else:
143 try:
144 return db.Key.from_legacy_urlsafe(key)
145 except Exception as e:
146 logging.exception(e)
147 logging.warning(f"Could not decode key {key}")
148 raise RuntimeError()
150 if name in rawFilter:
151 if isinstance(rawFilter[name], list):
152 if isinstance(dbFilter.queries, list):
153 raise ValueError("In-Filter already used!")
154 elif dbFilter.queries is None:
155 return dbFilter # Query is already unsatisfiable
156 oldFilter = dbFilter.queries
157 dbFilter.queries = []
158 for key in rawFilter[name]:
159 newFilter = copy.deepcopy(oldFilter)
160 try:
161 if name == "key":
162 newFilter.filters[f"{prefix or ''}{db.KEY_SPECIAL_PROPERTY} ="] = _decodeKey(key)
163 else:
164 newFilter.filters[f"{prefix or ''}{name} ="] = _decodeKey(key)
165 except: # Invalid key or something
166 raise RuntimeError()
167 dbFilter.queries.append(newFilter)
168 else:
169 try:
170 if name == "key":
171 dbFilter.filter(f"""{prefix or ""}{db.KEY_SPECIAL_PROPERTY} =""", _decodeKey(rawFilter[name]))
172 else:
173 dbFilter.filter(f"""{prefix or ""}{name} =""", _decodeKey(rawFilter[name]))
174 except: # Invalid key or something
175 raise RuntimeError()
176 return dbFilter
178 def _atomic_dump(self, value):
179 if not value:
180 return None
182 return str(value)