Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/key.py: 10%
88 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
1import copy
2import logging
3import typing as t
4from viur.core import db
5from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity
8class KeyBone(BaseBone):
9 """
10 The KeyBone is used for managing keys in the database. It provides various methods for validating,
11 converting, and storing key values, as well as querying the database.
12 Key management is crucial for maintaining relationships between entities in the database, and the
13 KeyBone class helps ensure that keys are handled correctly and efficiently throughout the system.
15 :param descr: The description of the KeyBone.
16 :param readOnly: Whether the KeyBone is read-only.
17 :param visible: Whether the KeyBone is visible.
18 :param allowed_kinds: The allowed entity kinds for the KeyBone.
19 :param check: Whether to check for entity existence.
20 """
21 type = "key"
23 def __init__(
24 self,
25 *,
26 descr: str = "Key",
27 readOnly: bool = True, # default is readonly
28 visible: bool = False, # default is invisible
29 allowed_kinds: t.Optional[t.Iterable[str]] = None, # None allows for any kind
30 check: bool = False, # check for entity existence
31 **kwargs
32 ):
33 super().__init__(descr=descr, readOnly=readOnly, visible=visible, defaultValue=None, **kwargs)
34 self.allowed_kinds = tuple(allowed_kinds) if allowed_kinds else None
35 self.check = check
37 def singleValueFromClient(self, value, skel=None, bone_name=None, client_data=None, parse_only: bool = False):
38 # check for correct key
39 if isinstance(value, str):
40 value = value.strip()
42 if self.allowed_kinds:
43 try:
44 key = db.keyHelper(value, self.allowed_kinds[0], self.allowed_kinds[1:])
45 except ValueError as e:
46 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, e.args[0])]
47 else:
48 try:
49 if isinstance(value, db.Key):
50 key = db.normalizeKey(value)
51 else:
52 key = db.normalizeKey(db.Key.from_legacy_urlsafe(value))
53 except Exception as exc:
54 logging.exception(f"Failed to normalize {value}: {exc}")
55 return self.getEmptyValue(), [
56 ReadFromClientError(
57 ReadFromClientErrorSeverity.Invalid,
58 "The provided key is not a valid database key"
59 )
60 ]
62 if not parse_only:
63 # Check custom validity
64 if err := self.isInvalid(key):
65 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)]
67 if self.check:
68 if db.Get(key) is None:
69 return self.getEmptyValue(), [
70 ReadFromClientError(
71 ReadFromClientErrorSeverity.Invalid,
72 "The provided key does not exist"
73 )
74 ]
76 return key, None
78 def singleValueUnserialize(self, val):
79 if not val:
80 rval = None
81 elif isinstance(val, db.Key):
82 rval = db.normalizeKey(val)
83 else:
84 rval, err = self.singleValueFromClient(val, parse_only=True)
85 if err:
86 raise ValueError(err[0].errorMessage)
88 return rval
90 def unserialize(self, skel: 'SkeletonInstance', name: str) -> bool:
91 if (
92 name == "key"
93 and isinstance(skel.dbEntity, db.Entity)
94 and skel.dbEntity.key
95 and not skel.dbEntity.key.is_partial
96 ):
97 skel.accessedValues[name] = skel.dbEntity.key
98 return True
99 return super().unserialize(skel, name)
101 def serialize(self, skel: 'SkeletonInstance', name: str, parentIndexed: bool) -> bool:
102 if name not in skel.accessedValues:
103 return False
104 if name == "key":
105 skel.dbEntity.key = skel.accessedValues["key"]
106 return True
108 return super().serialize(skel, name, parentIndexed=parentIndexed)
110 def buildDBFilter(
111 self,
112 name: str,
113 skel: 'viur.core.skeleton.SkeletonInstance',
114 dbFilter: db.Query,
115 rawFilter: dict,
116 prefix: t.Optional[str] = None
117 ) -> db.Query:
118 """
119 This method parses the search filter specified by the client in their request and converts
120 it into a format that can be understood by the datastore. It takes care of ignoring filters
121 that do not target this bone and safely handles malformed data in the raw filter.
123 :param name: The property name of this bone in the Skeleton (not the description).
124 :param skel: The :class:viur.core.skeleton.SkeletonInstance this bone is a part of.
125 :param dbFilter: The current :class:viur.core.db.Query instance the filters should be
126 applied to.
127 :param rawFilter: The dictionary of filters the client wants to have applied.
128 :param prefix: An optional string to prepend to the filter key. Defaults to None.
130 :return: The modified :class:viur.core.db.Query.
132 The method takes the following steps:
134 #. Decodes the provided key(s) from the raw filter.
135 #. If the filter contains a list of keys, it iterates through the list, creating a new
136 filter for each key and appending it to the list of queries.
137 #. If the filter contains a single key, it applies the filter directly to the query.
138 #. In case of any invalid key or other issues, it raises a RuntimeError.
139 """
141 def _decodeKey(key):
142 if isinstance(key, db.Key):
143 return key
144 else:
145 try:
146 return db.Key.from_legacy_urlsafe(key)
147 except Exception as e:
148 logging.exception(e)
149 logging.warning(f"Could not decode key {key}")
150 raise RuntimeError()
152 if name in rawFilter:
153 if isinstance(rawFilter[name], list):
154 if isinstance(dbFilter.queries, list):
155 raise ValueError("In-Filter already used!")
156 elif dbFilter.queries is None:
157 return dbFilter # Query is already unsatisfiable
158 oldFilter = dbFilter.queries
159 dbFilter.queries = []
160 for key in rawFilter[name]:
161 newFilter = copy.deepcopy(oldFilter)
162 try:
163 if name == "key":
164 newFilter.filters[f"{prefix or ''}{db.KEY_SPECIAL_PROPERTY} ="] = _decodeKey(key)
165 else:
166 newFilter.filters[f"{prefix or ''}{name} ="] = _decodeKey(key)
167 except: # Invalid key or something
168 raise RuntimeError()
169 dbFilter.queries.append(newFilter)
170 else:
171 try:
172 if name == "key":
173 dbFilter.filter(f"""{prefix or ""}{db.KEY_SPECIAL_PROPERTY} =""", _decodeKey(rawFilter[name]))
174 else:
175 dbFilter.filter(f"""{prefix or ""}{name} =""", _decodeKey(rawFilter[name]))
176 except: # Invalid key or something
177 raise RuntimeError()
178 return dbFilter