Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/db/transport.py: 19%
131 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1from __future__ import annotations
3import logging
4import time
5import typing as t
7from deprecated.sphinx import deprecated
8from google.cloud import datastore, exceptions
10from .overrides import entity_from_protobuf, key_from_protobuf
11from .types import Entity, Key, QueryDefinition, SortOrder, current_db_access_log
12from viur.core.config import conf
13from viur.core.errors import HTTPException
15# patching our key and entity classes
16datastore.helpers.key_from_protobuf = key_from_protobuf
17datastore.helpers.entity_from_protobuf = entity_from_protobuf
19__client__ = datastore.Client()
22def allocate_ids(kind_name: str, num_ids: int = 1, retry=None, timeout=None) -> list[Key]:
23 if type(kind_name) is not str:
24 raise TypeError("kind_name must be a string")
25 return __client__.allocate_ids(Key(kind_name), num_ids, retry, timeout)
28@deprecated(version="3.8.0", reason="Use 'db.allocate_ids' instead")
29def AllocateIDs(kind_name):
30 """
31 Allocates a new, free unique id for a given kind_name.
32 """
33 if isinstance(kind_name, Key): # so ein Murks...
34 kind_name = kind_name.kind
36 return allocate_ids(kind_name)[0]
39def get(keys: t.Union[Key, t.List[Key]]) -> t.Union[t.List[Entity], Entity, None]:
40 """
41 Retrieves an entity (or a list thereof) from datastore.
42 If only a single key has been given we'll return the entity or none in case the key has not been found,
43 otherwise a list of all entities that have been looked up (which may be empty)
44 :param keys: A datastore key (or a list thereof) to lookup
45 :return: The entity (or None if it has not been found), or a list of entities.
46 """
47 _write_to_access_log(keys)
49 if isinstance(keys, (list, set, tuple)):
50 res_list = list(__client__.get_multi(keys))
51 res_list.sort(key=lambda k: keys.index(k.key) if k else -1)
52 return res_list
54 return __client__.get(keys)
57@deprecated(version="3.8.0", reason="Use 'db.get' instead")
58def Get(keys: t.Union[Key, t.List[Key]]) -> t.Union[t.List[Entity], Entity, None]:
59 return get(keys)
62def put(entities: t.Union[Entity, t.List[Entity]]):
63 """
64 Save an entity in the Cloud Datastore.
65 Also ensures that no string-key with a digit-only name can be used.
66 :param entities: The entities to be saved to the datastore.
67 """
68 _write_to_access_log(entities)
69 if isinstance(entities, Entity):
70 return __client__.put(entities)
72 return __client__.put_multi(entities=entities)
75@deprecated(version="3.8.0", reason="Use 'db.put' instead")
76def Put(entities: t.Union[Entity, t.List[Entity]]) -> t.Union[Entity, None]:
77 return put(entities)
80def delete(keys: t.Union[Entity, t.List[Entity], Key, t.List[Key]]):
81 """
82 Deletes the entities with the given key(s) from the datastore.
83 :param keys: A Key (or a t.List of Keys) to delete
84 """
86 _write_to_access_log(keys)
87 if not isinstance(keys, (set, list, tuple)):
88 return __client__.delete(keys)
90 return __client__.delete_multi(keys)
93@deprecated(version="3.8.0", reason="Use 'db.delete' instead")
94def Delete(keys: t.Union[Entity, t.List[Entity], Key, t.List[Key]]):
95 return delete(keys)
98def run_in_transaction(func: t.Callable, *args, **kwargs) -> t.Any:
99 """
100 Runs the function given in :param:callee inside a transaction.
101 Inside a transaction it's guaranteed that
102 - either all or no changes are written to the datastore
103 - no other transaction is currently reading/writing the entities accessed
105 See (transactions)[https://cloud.google.com/datastore/docs/concepts/cloud-datastore-transactions] for more
106 information.
108 ..Warning: The datastore may produce unexpected results if an entity that have been written inside a transaction
109 is read (or returned in a query) again. In this case you will the the *old* state of that entity. Keep that
110 in mind if wrapping functions to run in a transaction that may have not been designed to handle this case.
111 :param func: The function that will be run inside a transaction
112 :param args: All args will be passed into the callee
113 :param kwargs: All kwargs will be passed into the callee
114 :return: Whatever the callee function returned
115 :raises RuntimeError: If the maximum transaction retries exceeded
116 """
117 if __client__.current_transaction:
118 res = func(*args, **kwargs)
119 else:
120 for i in range(3):
121 try:
122 with __client__.transaction():
123 res = func(*args, **kwargs)
124 break
126 except exceptions.Conflict:
127 logging.error(f"Transaction failed with a conflict, trying again in {2 ** i} seconds")
128 time.sleep(2 ** i)
129 continue
131 else:
132 raise RuntimeError("Maximum transaction retries exceeded")
134 return res
137@deprecated(version="3.8.0", reason="Use 'db.run_in_transaction' instead")
138def RunInTransaction(callee: t.Callable, *args, **kwargs) -> t.Any:
139 return run_in_transaction(callee, *args, **kwargs)
142def count(kind: str = None, up_to=2 ** 31 - 1, queryDefinition: QueryDefinition = None) -> int:
143 if not kind:
144 kind = queryDefinition.kind
146 query = __client__.query(kind=kind)
147 if queryDefinition and queryDefinition.filters:
148 for k, v in queryDefinition.filters.items():
149 key, op = k.split(" ")
150 if not isinstance(v, list): # multi equal filters
151 v = [v]
152 for val in v:
153 f = datastore.query.PropertyFilter(key, op, val)
154 query.add_filter(filter=f)
156 aggregation_query = __client__.aggregation_query(query)
158 result = aggregation_query.count(alias="total").fetch(limit=up_to)
159 return list(result)[0][0].value
162@deprecated(version="3.8.0", reason="Use 'db.count' instead")
163def Count(kind: str = None, up_to=2 ** 31 - 1, queryDefinition: QueryDefinition = None) -> int:
164 return count(kind, up_to, queryDefinition)
167def run_single_filter(query: QueryDefinition, limit: int) -> t.List[Entity]:
168 """
169 Internal helper function that runs a single query definition on the datastore and returns a list of
170 entities found.
171 :param query: The querydefinition (filters, orders, distinct etc.) to run against the datastore
172 :param limit: How many results should at most be returned
173 :return: The first *limit* entities that matches this query
174 """
176 qry = __client__.query(kind=query.kind)
177 startCursor = None
178 endCursor = None
179 hasInvertedOrderings = None
181 if query:
182 if query.filters:
183 for k, v in query.filters.items():
184 key, op = k.split(" ")
185 if not isinstance(v, list): # multi equal filters
186 v = [v]
187 for val in v:
188 f = datastore.query.PropertyFilter(key, op, val)
189 qry.add_filter(filter=f)
191 if query.orders:
192 hasInvertedOrderings = any(
193 [
194 x[1] in [SortOrder.InvertedAscending, SortOrder.InvertedDescending]
195 for x in query.orders
196 ]
197 )
198 qry.order = [
199 x[0] if x[1] in [SortOrder.Ascending, SortOrder.InvertedDescending] else f"-{x[0]}"
200 for x in query.orders
201 ]
203 if query.distinct:
204 qry.distinct_on = query.distinct
206 startCursor = query.startCursor
207 endCursor = query.endCursor
209 qryRes = qry.fetch(limit=limit, start_cursor=startCursor, end_cursor=endCursor)
210 res = list(qryRes)
212 query.currentCursor = qryRes.next_page_token
213 if hasInvertedOrderings:
214 res.reverse()
215 return res
218@deprecated(version="3.8.0", reason="Use 'run_single_filter' instead")
219def runSingleFilter(query: QueryDefinition, limit: int) -> t.List[Entity]:
220 run_single_filter(query, limit)
223# helper function for access log
224def _write_to_access_log(data: t.Union[Key, list[Key], Entity, list[Entity]]) -> None:
225 if not conf.db.create_access_log:
226 return
227 access_log = current_db_access_log.get()
228 if not isinstance(access_log, set):
229 return # access log not exist
230 if not data:
231 return
232 if isinstance(data, Entity):
233 access_log.add(data.key)
234 elif isinstance(data, Key):
235 access_log.add(data)
236 else:
237 for entry in data:
238 if isinstance(entry, Entity):
239 access_log.add(entry.key)
240 elif isinstance(entry, Key):
241 access_log.add(entry)
244__all__ = [allocate_ids, delete, get, put, run_in_transaction, count]