Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/db/utils.py: 17%

111 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import datetime 

2from deprecated.sphinx import deprecated 

3import typing as t 

4from .transport import get, put, run_in_transaction, __client__ 

5from .types import Entity, Key, current_db_access_log 

6from google.cloud.datastore.transaction import Transaction 

7from viur.core import current 

8 

9 

10def fix_unindexable_properties(entry: Entity) -> Entity: 

11 """ 

12 Recursively walk the given Entity and add all properties to the list of unindexed properties if they contain 

13 a string longer than 1500 bytes (which is maximum size of a string that can be indexed). The datastore would 

14 return an error otherwise. 

15 https://cloud.google.com/datastore/docs/concepts/limits?hl=en#limits 

16 :param entry: The entity to fix (inplace) 

17 :return: The fixed entity 

18 """ 

19 

20 def has_unindexable_property(prop): 

21 if isinstance(prop, dict): 

22 return any([has_unindexable_property(x) for x in prop.values()]) 

23 elif isinstance(prop, list): 

24 return any([has_unindexable_property(x) for x in prop]) 

25 elif isinstance(prop, (str, bytes)): 

26 return len(prop) >= 1500 

27 else: 

28 return False 

29 

30 unindexable_properties = set() 

31 for key, value in entry.items(): 

32 if not has_unindexable_property(value): 

33 continue 

34 if isinstance(value, dict): 

35 inner_entity = Entity() 

36 inner_entity.update(value) 

37 entry[key] = fix_unindexable_properties(inner_entity) 

38 if isinstance(value, Entity): 

39 inner_entity.key = value.key 

40 else: 

41 unindexable_properties.add(key) 

42 entry.exclude_from_indexes = unindexable_properties 

43 return entry 

44 

45 

46def normalize_key(key: t.Union[None, Key, str]) -> t.Union[None, Key]: 

47 """ 

48 Normalizes a datastore key (replacing _application with the current one) 

49 

50 :param key: Key to be normalized. 

51 :return: Normalized key in string representation. 

52 """ 

53 if key is None: 

54 return None 

55 if isinstance(key, str): 

56 key = Key.from_legacy_urlsafe(key) 

57 if key.parent: 

58 parent = normalize_key(key.parent) 

59 else: 

60 parent = None 

61 return Key(key.kind, key.id_or_name, parent=parent) 

62 

63 

64@deprecated(version="3.8.0", reason="Use 'db.normalize_key' instead") 

65def normalizeKey(key: t.Union[None, Key]) -> t.Union[None, Key]: 

66 return normalize_key(key) 

67 

68 

69def key_helper( 

70 in_key: t.Union[Key, str, int], 

71 target_kind: str, 

72 additional_allowed_kinds: t.Union[t.List[str], t.Tuple[str]] = (), 

73 adjust_kind: bool = False, 

74) -> Key: 

75 if isinstance(in_key, Key): 

76 if in_key.kind != target_kind and in_key.kind not in additional_allowed_kinds: 

77 if not adjust_kind: 

78 raise ValueError( 

79 f"Kind mismatch: {in_key.kind!r} != {target_kind!r} (or in {additional_allowed_kinds!r})") 

80 in_key = Key(target_kind, in_key.id_or_name, parent=in_key.parent) 

81 return in_key 

82 elif isinstance(in_key, str): 

83 # Try to parse key from str 

84 try: 

85 decoded_key = normalize_key(in_key) 

86 except Exception: 

87 decoded_key = None 

88 

89 # If it did decode, recall keyHelper with Key object 

90 if decoded_key: 

91 return key_helper( 

92 decoded_key, 

93 target_kind=target_kind, 

94 additional_allowed_kinds=additional_allowed_kinds, 

95 adjust_kind=adjust_kind 

96 ) 

97 

98 # otherwise, construct key from str or int 

99 if in_key.isdigit(): 

100 in_key = int(in_key) 

101 

102 return Key(target_kind, in_key) 

103 elif isinstance(in_key, int): 

104 return Key(target_kind, in_key) 

105 

106 raise NotImplementedError(f"Unsupported key type {type(in_key)}") 

107 

108 

109def keyHelper( 

110 inKey: t.Union[Key, str, int], 

111 targetKind: str, 

112 additionalAllowedKinds: t.Union[t.List[str], t.Tuple[str]] = (), 

113 adjust_kind: bool = False, 

114) -> Key: 

115 return key_helper( 

116 in_key=inKey, 

117 target_kind=targetKind, 

118 additional_allowed_kinds=additionalAllowedKinds, 

119 adjust_kind=adjust_kind 

120 ) 

121 

122 

123def is_in_transaction() -> bool: 

124 return __client__.current_transaction is not None 

125 

126 

127@deprecated(version="3.8.0", reason="Use 'db.utils.is_in_transaction' instead") 

128def IsInTransaction() -> bool: 

129 return is_in_transaction() 

130 

131 

132def get_or_insert(key: Key, **kwargs) -> Entity: 

133 """ 

134 Either creates a new entity with the given key, or returns the existing one. 

135 

136 Its guaranteed that there is no race-condition here; it will never overwrite a 

137 previously created entity. Extra keyword arguments passed to this function will be 

138 used to populate the entity if it has to be created; otherwise they are ignored. 

139 

140 :param key: The key which will be fetched or created. 

141 :returns: Returns the fetched or newly created Entity. 

142 """ 

143 

144 def txn(key, kwargs): 

145 obj = get(key) 

146 if not obj: 

147 obj = Entity(key) 

148 for k, v in kwargs.items(): 

149 obj[k] = v 

150 put(obj) 

151 return obj 

152 

153 if is_in_transaction(): 

154 return txn(key, kwargs) 

155 return run_in_transaction(txn, key, kwargs) 

156 

157 

158@deprecated(version="3.8.0", reason="Use 'db.get_or_insert' instead") 

159def GetOrInsert(key: Key, **kwargs: t.Any) -> Entity: 

160 return get_or_insert(key, **kwargs) 

161 

162 

163@deprecated(version="3.8.0", reason="Use 'str(key)' instead") 

164def encodeKey(key: Key) -> str: 

165 """ 

166 Return the given key encoded as string (mimicking the old str() behaviour of keys) 

167 """ 

168 return str(key) 

169 

170 

171def acquire_transaction_success_marker() -> str: 

172 """ 

173 Generates a token that will be written to the datastore (under "viur-transactionmarker") if the transaction 

174 completes successfully. Currently only used by deferredTasks to check if the task should actually execute 

175 or if the transaction it was created in failed. 

176 :return: Name of the entry in viur-transactionmarker 

177 """ 

178 txn: Transaction | None = __client__.current_transaction 

179 assert txn, "acquire_transaction_success_marker cannot be called outside an transaction" 

180 marker = str(txn.id) 

181 request_data = current.request_data.get() 

182 if not request_data.get("__viur-transactionmarker__"): 

183 db_obj = Entity(Key("viur-transactionmarker", marker)) 

184 db_obj["creationdate"] = datetime.datetime.now(datetime.timezone.utc) 

185 put(db_obj) 

186 request_data["__viur-transactionmarker__"] = True 

187 return marker 

188 

189 

190def start_data_access_log() -> t.Set[t.Union[Key, str]]: 

191 """ 

192 Clears our internal access log (which keeps track of which entries have been accessed in the current 

193 request). The old set of accessed entries is returned so that it can be restored with 

194 :func:`server.db.popAccessData` in case of nested caching. You must call popAccessData afterwards, otherwise 

195 we'll continue to log all entries accessed in subsequent request on the same thread! 

196 :return: t.Set of old accessed entries 

197 """ 

198 old = current_db_access_log.get(set()) 

199 current_db_access_log.set(set()) 

200 return old 

201 

202 

203def startDataAccessLog() -> t.Set[t.Union[Key, str]]: 

204 return start_data_access_log() 

205 

206 

207def end_data_access_log( 

208 outer_access_log: t.Optional[t.Set[t.Union[Key, str]]] = None, 

209) -> t.Optional[t.Set[t.Union[Key, str]]]: 

210 """ 

211 Retrieves the set of entries accessed so far. 

212 

213 To clean up and restart the log, call :func:`viur.datastore.startAccessDataLog`. 

214 

215 If you called :func:`server.db.startAccessDataLog` before, you can re-apply the old log using 

216 the outerAccessLog param. Otherwise, it will disable the access log. 

217 

218 :param outerAccessLog: State of your log returned by :func:`server.db.startAccessDataLog` 

219 :return: t.Set of entries accessed 

220 """ 

221 res = current_db_access_log.get() 

222 if isinstance(outer_access_log, set): 

223 current_db_access_log.set((outer_access_log or set()).union(res)) 

224 else: 

225 current_db_access_log.set(None) 

226 return res 

227 

228 

229def endDataAccessLog( 

230 outerAccessLog: t.Optional[t.Set[t.Union[Key, str]]] = None, 

231) -> t.Optional[t.Set[t.Union[Key, str]]]: 

232 return end_data_access_log(outer_access_log=outerAccessLog)