Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/db/transport.py: 19%

131 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1from __future__ import annotations 

2 

3import logging 

4import time 

5import typing as t 

6 

7from deprecated.sphinx import deprecated 

8from google.cloud import datastore, exceptions 

9 

10from .overrides import entity_from_protobuf, key_from_protobuf 

11from .types import Entity, Key, QueryDefinition, SortOrder, current_db_access_log 

12from viur.core.config import conf 

13from viur.core.errors import HTTPException 

14 

15# patching our key and entity classes 

16datastore.helpers.key_from_protobuf = key_from_protobuf 

17datastore.helpers.entity_from_protobuf = entity_from_protobuf 

18 

19__client__ = datastore.Client() 

20 

21 

22def allocate_ids(kind_name: str, num_ids: int = 1, retry=None, timeout=None) -> list[Key]: 

23 if type(kind_name) is not str: 

24 raise TypeError("kind_name must be a string") 

25 return __client__.allocate_ids(Key(kind_name), num_ids, retry, timeout) 

26 

27 

28@deprecated(version="3.8.0", reason="Use 'db.allocate_ids' instead") 

29def AllocateIDs(kind_name): 

30 """ 

31 Allocates a new, free unique id for a given kind_name. 

32 """ 

33 if isinstance(kind_name, Key): # so ein Murks... 

34 kind_name = kind_name.kind 

35 

36 return allocate_ids(kind_name)[0] 

37 

38 

39def get(keys: t.Union[Key, t.List[Key]]) -> t.Union[t.List[Entity], Entity, None]: 

40 """ 

41 Retrieves an entity (or a list thereof) from datastore. 

42 If only a single key has been given we'll return the entity or none in case the key has not been found, 

43 otherwise a list of all entities that have been looked up (which may be empty) 

44 :param keys: A datastore key (or a list thereof) to lookup 

45 :return: The entity (or None if it has not been found), or a list of entities. 

46 """ 

47 _write_to_access_log(keys) 

48 

49 if isinstance(keys, (list, set, tuple)): 

50 res_list = list(__client__.get_multi(keys)) 

51 res_list.sort(key=lambda k: keys.index(k.key) if k else -1) 

52 return res_list 

53 

54 return __client__.get(keys) 

55 

56 

57@deprecated(version="3.8.0", reason="Use 'db.get' instead") 

58def Get(keys: t.Union[Key, t.List[Key]]) -> t.Union[t.List[Entity], Entity, None]: 

59 return get(keys) 

60 

61 

62def put(entities: t.Union[Entity, t.List[Entity]]): 

63 """ 

64 Save an entity in the Cloud Datastore. 

65 Also ensures that no string-key with a digit-only name can be used. 

66 :param entities: The entities to be saved to the datastore. 

67 """ 

68 _write_to_access_log(entities) 

69 if isinstance(entities, Entity): 

70 return __client__.put(entities) 

71 

72 return __client__.put_multi(entities=entities) 

73 

74 

75@deprecated(version="3.8.0", reason="Use 'db.put' instead") 

76def Put(entities: t.Union[Entity, t.List[Entity]]) -> t.Union[Entity, None]: 

77 return put(entities) 

78 

79 

80def delete(keys: t.Union[Entity, t.List[Entity], Key, t.List[Key]]): 

81 """ 

82 Deletes the entities with the given key(s) from the datastore. 

83 :param keys: A Key (or a t.List of Keys) to delete 

84 """ 

85 

86 _write_to_access_log(keys) 

87 if not isinstance(keys, (set, list, tuple)): 

88 return __client__.delete(keys) 

89 

90 return __client__.delete_multi(keys) 

91 

92 

93@deprecated(version="3.8.0", reason="Use 'db.delete' instead") 

94def Delete(keys: t.Union[Entity, t.List[Entity], Key, t.List[Key]]): 

95 return delete(keys) 

96 

97 

98def run_in_transaction(func: t.Callable, *args, **kwargs) -> t.Any: 

99 """ 

100 Runs the function given in :param:callee inside a transaction. 

101 Inside a transaction it's guaranteed that 

102 - either all or no changes are written to the datastore 

103 - no other transaction is currently reading/writing the entities accessed 

104 

105 See (transactions)[https://cloud.google.com/datastore/docs/concepts/cloud-datastore-transactions] for more 

106 information. 

107 

108 ..Warning: The datastore may produce unexpected results if an entity that have been written inside a transaction 

109 is read (or returned in a query) again. In this case you will the the *old* state of that entity. Keep that 

110 in mind if wrapping functions to run in a transaction that may have not been designed to handle this case. 

111 :param func: The function that will be run inside a transaction 

112 :param args: All args will be passed into the callee 

113 :param kwargs: All kwargs will be passed into the callee 

114 :return: Whatever the callee function returned 

115 :raises RuntimeError: If the maximum transaction retries exceeded 

116 """ 

117 if __client__.current_transaction: 

118 res = func(*args, **kwargs) 

119 else: 

120 for i in range(3): 

121 try: 

122 with __client__.transaction(): 

123 res = func(*args, **kwargs) 

124 break 

125 

126 except exceptions.Conflict: 

127 logging.error(f"Transaction failed with a conflict, trying again in {2 ** i} seconds") 

128 time.sleep(2 ** i) 

129 continue 

130 

131 else: 

132 raise RuntimeError("Maximum transaction retries exceeded") 

133 

134 return res 

135 

136 

137@deprecated(version="3.8.0", reason="Use 'db.run_in_transaction' instead") 

138def RunInTransaction(callee: t.Callable, *args, **kwargs) -> t.Any: 

139 return run_in_transaction(callee, *args, **kwargs) 

140 

141 

142def count(kind: str = None, up_to=2 ** 31 - 1, queryDefinition: QueryDefinition = None) -> int: 

143 if not kind: 

144 kind = queryDefinition.kind 

145 

146 query = __client__.query(kind=kind) 

147 if queryDefinition and queryDefinition.filters: 

148 for k, v in queryDefinition.filters.items(): 

149 key, op = k.split(" ") 

150 if not isinstance(v, list): # multi equal filters 

151 v = [v] 

152 for val in v: 

153 f = datastore.query.PropertyFilter(key, op, val) 

154 query.add_filter(filter=f) 

155 

156 aggregation_query = __client__.aggregation_query(query) 

157 

158 result = aggregation_query.count(alias="total").fetch(limit=up_to) 

159 return list(result)[0][0].value 

160 

161 

162@deprecated(version="3.8.0", reason="Use 'db.count' instead") 

163def Count(kind: str = None, up_to=2 ** 31 - 1, queryDefinition: QueryDefinition = None) -> int: 

164 return count(kind, up_to, queryDefinition) 

165 

166 

167def run_single_filter(query: QueryDefinition, limit: int) -> t.List[Entity]: 

168 """ 

169 Internal helper function that runs a single query definition on the datastore and returns a list of 

170 entities found. 

171 :param query: The querydefinition (filters, orders, distinct etc.) to run against the datastore 

172 :param limit: How many results should at most be returned 

173 :return: The first *limit* entities that matches this query 

174 """ 

175 

176 qry = __client__.query(kind=query.kind) 

177 startCursor = None 

178 endCursor = None 

179 hasInvertedOrderings = None 

180 

181 if query: 

182 if query.filters: 

183 for k, v in query.filters.items(): 

184 key, op = k.split(" ") 

185 if not isinstance(v, list): # multi equal filters 

186 v = [v] 

187 for val in v: 

188 f = datastore.query.PropertyFilter(key, op, val) 

189 qry.add_filter(filter=f) 

190 

191 if query.orders: 

192 hasInvertedOrderings = any( 

193 [ 

194 x[1] in [SortOrder.InvertedAscending, SortOrder.InvertedDescending] 

195 for x in query.orders 

196 ] 

197 ) 

198 qry.order = [ 

199 x[0] if x[1] in [SortOrder.Ascending, SortOrder.InvertedDescending] else f"-{x[0]}" 

200 for x in query.orders 

201 ] 

202 

203 if query.distinct: 

204 qry.distinct_on = query.distinct 

205 

206 startCursor = query.startCursor 

207 endCursor = query.endCursor 

208 

209 qryRes = qry.fetch(limit=limit, start_cursor=startCursor, end_cursor=endCursor) 

210 res = list(qryRes) 

211 

212 query.currentCursor = qryRes.next_page_token 

213 if hasInvertedOrderings: 

214 res.reverse() 

215 return res 

216 

217 

218@deprecated(version="3.8.0", reason="Use 'run_single_filter' instead") 

219def runSingleFilter(query: QueryDefinition, limit: int) -> t.List[Entity]: 

220 run_single_filter(query, limit) 

221 

222 

223# helper function for access log 

224def _write_to_access_log(data: t.Union[Key, list[Key], Entity, list[Entity]]) -> None: 

225 if not conf.db.create_access_log: 

226 return 

227 access_log = current_db_access_log.get() 

228 if not isinstance(access_log, set): 

229 return # access log not exist 

230 if not data: 

231 return 

232 if isinstance(data, Entity): 

233 access_log.add(data.key) 

234 elif isinstance(data, Key): 

235 access_log.add(data) 

236 else: 

237 for entry in data: 

238 if isinstance(entry, Entity): 

239 access_log.add(entry.key) 

240 elif isinstance(entry, Key): 

241 access_log.add(entry) 

242 

243 

244__all__ = [allocate_ids, delete, get, put, run_in_transaction, count]