Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / db / utils.py: 17%
114 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 14:41 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 14:41 +0000
1import datetime
2import sys
3import typing as t
5from deprecated.sphinx import deprecated
6from google.cloud.datastore.transaction import Transaction
8from viur.core import current
9from .transport import __client__, get, put, run_in_transaction
10from .types import Entity, Key, current_db_access_log
13def fix_unindexable_properties(entry: Entity, *, keep_exclusions: bool = True) -> Entity:
14 """
15 Recursively walk the given Entity and add all properties to the list of unindexed properties if they contain
16 a string longer than 1500 bytes (which is maximum size of a string that can be indexed). The datastore would
17 return an error otherwise.
18 https://cloud.google.com/datastore/docs/concepts/limits?hl=en#limits
20 :param entry: The entity to fix (inplace)
21 :param keep_exclusions: If true, keep the properties already included in ``exclude_from_indexes``.
22 Otherwise, ignore them and exclude only non-indexable properties.
23 :return: The fixed entity
24 """
26 def has_unindexable_property(prop):
27 if isinstance(prop, dict):
28 return any(has_unindexable_property(x) for x in prop.values())
29 elif isinstance(prop, list):
30 return any(has_unindexable_property(x) for x in prop)
31 elif isinstance(prop, (str, bytes)):
32 return sys.getsizeof(prop) >= 1500
33 else:
34 return False
36 unindexable_properties = set()
37 for key, value in entry.items():
38 if not has_unindexable_property(value):
39 continue
40 if isinstance(value, dict):
41 inner_entity = Entity()
42 inner_entity.update(value)
43 entry[key] = fix_unindexable_properties(inner_entity)
44 if isinstance(value, Entity):
45 inner_entity.key = value.key
46 else:
47 unindexable_properties.add(key)
48 if keep_exclusions:
49 entry.exclude_from_indexes.update(unindexable_properties) # type:ignore
50 else:
51 entry.exclude_from_indexes = unindexable_properties
52 return entry
55def normalize_key(key: t.Union[None, Key, str]) -> t.Union[None, Key]:
56 """
57 Normalizes a datastore key (replacing _application with the current one)
59 :param key: Key to be normalized.
60 :return: Normalized key in string representation.
61 """
62 if key is None:
63 return None
64 if isinstance(key, str):
65 key = Key.from_legacy_urlsafe(key)
66 if key.parent:
67 parent = normalize_key(key.parent)
68 else:
69 parent = None
70 return Key(key.kind, key.id_or_name, parent=parent)
73@deprecated(version="3.8.0", reason="Use 'db.normalize_key' instead")
74def normalizeKey(key: t.Union[None, Key]) -> t.Union[None, Key]:
75 return normalize_key(key)
78def key_helper(
79 in_key: t.Union[Key, str, int],
80 target_kind: str,
81 additional_allowed_kinds: t.Union[t.List[str], t.Tuple[str]] = (),
82 adjust_kind: bool = False,
83) -> Key:
84 if isinstance(in_key, Key):
85 if in_key.kind != target_kind and in_key.kind not in additional_allowed_kinds:
86 if not adjust_kind:
87 raise ValueError(
88 f"Kind mismatch: {in_key.kind!r} != {target_kind!r} (or in {additional_allowed_kinds!r})")
89 in_key = Key(target_kind, in_key.id_or_name, parent=in_key.parent)
90 return in_key
91 elif isinstance(in_key, str):
92 # Try to parse key from str
93 try:
94 decoded_key = normalize_key(in_key)
95 except Exception:
96 decoded_key = None
98 # If it did decode, recall keyHelper with Key object
99 if decoded_key:
100 return key_helper(
101 decoded_key,
102 target_kind=target_kind,
103 additional_allowed_kinds=additional_allowed_kinds,
104 adjust_kind=adjust_kind
105 )
107 # otherwise, construct key from str or int
108 if in_key.isdigit():
109 in_key = int(in_key)
111 return Key(target_kind, in_key)
112 elif isinstance(in_key, int):
113 return Key(target_kind, in_key)
115 raise NotImplementedError(f"Unsupported key type {type(in_key)}")
118def keyHelper(
119 inKey: t.Union[Key, str, int],
120 targetKind: str,
121 additionalAllowedKinds: t.Union[t.List[str], t.Tuple[str]] = (),
122 adjust_kind: bool = False,
123) -> Key:
124 return key_helper(
125 in_key=inKey,
126 target_kind=targetKind,
127 additional_allowed_kinds=additionalAllowedKinds,
128 adjust_kind=adjust_kind
129 )
132def is_in_transaction() -> bool:
133 return __client__.current_transaction is not None
136@deprecated(version="3.8.0", reason="Use 'db.utils.is_in_transaction' instead")
137def IsInTransaction() -> bool:
138 return is_in_transaction()
141def get_or_insert(key: Key, **kwargs) -> Entity:
142 """
143 Either creates a new entity with the given key, or returns the existing one.
145 Its guaranteed that there is no race-condition here; it will never overwrite a
146 previously created entity. Extra keyword arguments passed to this function will be
147 used to populate the entity if it has to be created; otherwise they are ignored.
149 :param key: The key which will be fetched or created.
150 :returns: Returns the fetched or newly created Entity.
151 """
153 def txn(key, kwargs):
154 obj = get(key)
155 if not obj:
156 obj = Entity(key)
157 for k, v in kwargs.items():
158 obj[k] = v
159 put(obj)
160 return obj
162 if is_in_transaction():
163 return txn(key, kwargs)
164 return run_in_transaction(txn, key, kwargs)
167@deprecated(version="3.8.0", reason="Use 'db.get_or_insert' instead")
168def GetOrInsert(key: Key, **kwargs: t.Any) -> Entity:
169 return get_or_insert(key, **kwargs)
172@deprecated(version="3.8.0", reason="Use 'str(key)' instead")
173def encodeKey(key: Key) -> str:
174 """
175 Return the given key encoded as string (mimicking the old str() behaviour of keys)
176 """
177 return str(key)
180def acquire_transaction_success_marker() -> str:
181 """
182 Generates a token that will be written to the datastore (under "viur-transactionmarker") if the transaction
183 completes successfully. Currently only used by deferredTasks to check if the task should actually execute
184 or if the transaction it was created in failed.
185 :return: Name of the entry in viur-transactionmarker
186 """
187 txn: Transaction | None = __client__.current_transaction
188 assert txn, "acquire_transaction_success_marker cannot be called outside an transaction"
189 marker = str(txn.id)
190 request_data = current.request_data.get()
191 if not request_data.get("__viur-transactionmarker__"):
192 db_obj = Entity(Key("viur-transactionmarker", marker))
193 db_obj["creationdate"] = datetime.datetime.now(datetime.timezone.utc)
194 put(db_obj)
195 request_data["__viur-transactionmarker__"] = True
196 return marker
199def start_data_access_log() -> t.Set[t.Union[Key, str]]:
200 """
201 Clears our internal access log (which keeps track of which entries have been accessed in the current
202 request). The old set of accessed entries is returned so that it can be restored with
203 :func:`server.db.popAccessData` in case of nested caching. You must call popAccessData afterwards, otherwise
204 we'll continue to log all entries accessed in subsequent request on the same thread!
205 :return: t.Set of old accessed entries
206 """
207 old = current_db_access_log.get(set())
208 current_db_access_log.set(set())
209 return old
212def startDataAccessLog() -> t.Set[t.Union[Key, str]]:
213 return start_data_access_log()
216def end_data_access_log(
217 outer_access_log: t.Optional[t.Set[t.Union[Key, str]]] = None,
218) -> t.Optional[t.Set[t.Union[Key, str]]]:
219 """
220 Retrieves the set of entries accessed so far.
222 To clean up and restart the log, call :func:`viur.datastore.startAccessDataLog`.
224 If you called :func:`server.db.startAccessDataLog` before, you can re-apply the old log using
225 the outerAccessLog param. Otherwise, it will disable the access log.
227 :param outerAccessLog: State of your log returned by :func:`server.db.startAccessDataLog`
228 :return: t.Set of entries accessed
229 """
230 res = current_db_access_log.get()
231 if isinstance(outer_access_log, set):
232 current_db_access_log.set((outer_access_log or set()).union(res))
233 else:
234 current_db_access_log.set(None)
235 return res
238def endDataAccessLog(
239 outerAccessLog: t.Optional[t.Set[t.Union[Key, str]]] = None,
240) -> t.Optional[t.Set[t.Union[Key, str]]]:
241 return end_data_access_log(outer_access_log=outerAccessLog)