Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/db/utils.py: 17%
111 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import datetime
2from deprecated.sphinx import deprecated
3import typing as t
4from .transport import get, put, run_in_transaction, __client__
5from .types import Entity, Key, current_db_access_log
6from google.cloud.datastore.transaction import Transaction
7from viur.core import current
10def fix_unindexable_properties(entry: Entity) -> Entity:
11 """
12 Recursively walk the given Entity and add all properties to the list of unindexed properties if they contain
13 a string longer than 1500 bytes (which is maximum size of a string that can be indexed). The datastore would
14 return an error otherwise.
15 https://cloud.google.com/datastore/docs/concepts/limits?hl=en#limits
16 :param entry: The entity to fix (inplace)
17 :return: The fixed entity
18 """
20 def has_unindexable_property(prop):
21 if isinstance(prop, dict):
22 return any([has_unindexable_property(x) for x in prop.values()])
23 elif isinstance(prop, list):
24 return any([has_unindexable_property(x) for x in prop])
25 elif isinstance(prop, (str, bytes)):
26 return len(prop) >= 1500
27 else:
28 return False
30 unindexable_properties = set()
31 for key, value in entry.items():
32 if not has_unindexable_property(value):
33 continue
34 if isinstance(value, dict):
35 inner_entity = Entity()
36 inner_entity.update(value)
37 entry[key] = fix_unindexable_properties(inner_entity)
38 if isinstance(value, Entity):
39 inner_entity.key = value.key
40 else:
41 unindexable_properties.add(key)
42 entry.exclude_from_indexes = unindexable_properties
43 return entry
46def normalize_key(key: t.Union[None, Key, str]) -> t.Union[None, Key]:
47 """
48 Normalizes a datastore key (replacing _application with the current one)
50 :param key: Key to be normalized.
51 :return: Normalized key in string representation.
52 """
53 if key is None:
54 return None
55 if isinstance(key, str):
56 key = Key.from_legacy_urlsafe(key)
57 if key.parent:
58 parent = normalize_key(key.parent)
59 else:
60 parent = None
61 return Key(key.kind, key.id_or_name, parent=parent)
64@deprecated(version="3.8.0", reason="Use 'db.normalize_key' instead")
65def normalizeKey(key: t.Union[None, Key]) -> t.Union[None, Key]:
66 return normalize_key(key)
69def key_helper(
70 in_key: t.Union[Key, str, int],
71 target_kind: str,
72 additional_allowed_kinds: t.Union[t.List[str], t.Tuple[str]] = (),
73 adjust_kind: bool = False,
74) -> Key:
75 if isinstance(in_key, Key):
76 if in_key.kind != target_kind and in_key.kind not in additional_allowed_kinds:
77 if not adjust_kind:
78 raise ValueError(
79 f"Kind mismatch: {in_key.kind!r} != {target_kind!r} (or in {additional_allowed_kinds!r})")
80 in_key = Key(target_kind, in_key.id_or_name, parent=in_key.parent)
81 return in_key
82 elif isinstance(in_key, str):
83 # Try to parse key from str
84 try:
85 decoded_key = normalize_key(in_key)
86 except Exception:
87 decoded_key = None
89 # If it did decode, recall keyHelper with Key object
90 if decoded_key:
91 return key_helper(
92 decoded_key,
93 target_kind=target_kind,
94 additional_allowed_kinds=additional_allowed_kinds,
95 adjust_kind=adjust_kind
96 )
98 # otherwise, construct key from str or int
99 if in_key.isdigit():
100 in_key = int(in_key)
102 return Key(target_kind, in_key)
103 elif isinstance(in_key, int):
104 return Key(target_kind, in_key)
106 raise NotImplementedError(f"Unsupported key type {type(in_key)}")
109def keyHelper(
110 inKey: t.Union[Key, str, int],
111 targetKind: str,
112 additionalAllowedKinds: t.Union[t.List[str], t.Tuple[str]] = (),
113 adjust_kind: bool = False,
114) -> Key:
115 return key_helper(
116 in_key=inKey,
117 target_kind=targetKind,
118 additional_allowed_kinds=additionalAllowedKinds,
119 adjust_kind=adjust_kind
120 )
123def is_in_transaction() -> bool:
124 return __client__.current_transaction is not None
127@deprecated(version="3.8.0", reason="Use 'db.utils.is_in_transaction' instead")
128def IsInTransaction() -> bool:
129 return is_in_transaction()
132def get_or_insert(key: Key, **kwargs) -> Entity:
133 """
134 Either creates a new entity with the given key, or returns the existing one.
136 Its guaranteed that there is no race-condition here; it will never overwrite a
137 previously created entity. Extra keyword arguments passed to this function will be
138 used to populate the entity if it has to be created; otherwise they are ignored.
140 :param key: The key which will be fetched or created.
141 :returns: Returns the fetched or newly created Entity.
142 """
144 def txn(key, kwargs):
145 obj = get(key)
146 if not obj:
147 obj = Entity(key)
148 for k, v in kwargs.items():
149 obj[k] = v
150 put(obj)
151 return obj
153 if is_in_transaction():
154 return txn(key, kwargs)
155 return run_in_transaction(txn, key, kwargs)
158@deprecated(version="3.8.0", reason="Use 'db.get_or_insert' instead")
159def GetOrInsert(key: Key, **kwargs: t.Any) -> Entity:
160 return get_or_insert(key, **kwargs)
163@deprecated(version="3.8.0", reason="Use 'str(key)' instead")
164def encodeKey(key: Key) -> str:
165 """
166 Return the given key encoded as string (mimicking the old str() behaviour of keys)
167 """
168 return str(key)
171def acquire_transaction_success_marker() -> str:
172 """
173 Generates a token that will be written to the datastore (under "viur-transactionmarker") if the transaction
174 completes successfully. Currently only used by deferredTasks to check if the task should actually execute
175 or if the transaction it was created in failed.
176 :return: Name of the entry in viur-transactionmarker
177 """
178 txn: Transaction | None = __client__.current_transaction
179 assert txn, "acquire_transaction_success_marker cannot be called outside an transaction"
180 marker = str(txn.id)
181 request_data = current.request_data.get()
182 if not request_data.get("__viur-transactionmarker__"):
183 db_obj = Entity(Key("viur-transactionmarker", marker))
184 db_obj["creationdate"] = datetime.datetime.now(datetime.timezone.utc)
185 put(db_obj)
186 request_data["__viur-transactionmarker__"] = True
187 return marker
190def start_data_access_log() -> t.Set[t.Union[Key, str]]:
191 """
192 Clears our internal access log (which keeps track of which entries have been accessed in the current
193 request). The old set of accessed entries is returned so that it can be restored with
194 :func:`server.db.popAccessData` in case of nested caching. You must call popAccessData afterwards, otherwise
195 we'll continue to log all entries accessed in subsequent request on the same thread!
196 :return: t.Set of old accessed entries
197 """
198 old = current_db_access_log.get(set())
199 current_db_access_log.set(set())
200 return old
203def startDataAccessLog() -> t.Set[t.Union[Key, str]]:
204 return start_data_access_log()
207def end_data_access_log(
208 outer_access_log: t.Optional[t.Set[t.Union[Key, str]]] = None,
209) -> t.Optional[t.Set[t.Union[Key, str]]]:
210 """
211 Retrieves the set of entries accessed so far.
213 To clean up and restart the log, call :func:`viur.datastore.startAccessDataLog`.
215 If you called :func:`server.db.startAccessDataLog` before, you can re-apply the old log using
216 the outerAccessLog param. Otherwise, it will disable the access log.
218 :param outerAccessLog: State of your log returned by :func:`server.db.startAccessDataLog`
219 :return: t.Set of entries accessed
220 """
221 res = current_db_access_log.get()
222 if isinstance(outer_access_log, set):
223 current_db_access_log.set((outer_access_log or set()).union(res))
224 else:
225 current_db_access_log.set(None)
226 return res
229def endDataAccessLog(
230 outerAccessLog: t.Optional[t.Set[t.Union[Key, str]]] = None,
231) -> t.Optional[t.Set[t.Union[Key, str]]]:
232 return end_data_access_log(outer_access_log=outerAccessLog)