Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / db / utils.py: 17%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 14:41 +0000

1import datetime 

2import sys 

3import typing as t 

4 

5from deprecated.sphinx import deprecated 

6from google.cloud.datastore.transaction import Transaction 

7 

8from viur.core import current 

9from .transport import __client__, get, put, run_in_transaction 

10from .types import Entity, Key, current_db_access_log 

11 

12 

13def fix_unindexable_properties(entry: Entity, *, keep_exclusions: bool = True) -> Entity: 

14 """ 

15 Recursively walk the given Entity and add all properties to the list of unindexed properties if they contain 

16 a string longer than 1500 bytes (which is maximum size of a string that can be indexed). The datastore would 

17 return an error otherwise. 

18 https://cloud.google.com/datastore/docs/concepts/limits?hl=en#limits 

19 

20 :param entry: The entity to fix (inplace) 

21 :param keep_exclusions: If true, keep the properties already included in ``exclude_from_indexes``. 

22 Otherwise, ignore them and exclude only non-indexable properties. 

23 :return: The fixed entity 

24 """ 

25 

26 def has_unindexable_property(prop): 

27 if isinstance(prop, dict): 

28 return any(has_unindexable_property(x) for x in prop.values()) 

29 elif isinstance(prop, list): 

30 return any(has_unindexable_property(x) for x in prop) 

31 elif isinstance(prop, (str, bytes)): 

32 return sys.getsizeof(prop) >= 1500 

33 else: 

34 return False 

35 

36 unindexable_properties = set() 

37 for key, value in entry.items(): 

38 if not has_unindexable_property(value): 

39 continue 

40 if isinstance(value, dict): 

41 inner_entity = Entity() 

42 inner_entity.update(value) 

43 entry[key] = fix_unindexable_properties(inner_entity) 

44 if isinstance(value, Entity): 

45 inner_entity.key = value.key 

46 else: 

47 unindexable_properties.add(key) 

48 if keep_exclusions: 

49 entry.exclude_from_indexes.update(unindexable_properties) # type:ignore 

50 else: 

51 entry.exclude_from_indexes = unindexable_properties 

52 return entry 

53 

54 

55def normalize_key(key: t.Union[None, Key, str]) -> t.Union[None, Key]: 

56 """ 

57 Normalizes a datastore key (replacing _application with the current one) 

58 

59 :param key: Key to be normalized. 

60 :return: Normalized key in string representation. 

61 """ 

62 if key is None: 

63 return None 

64 if isinstance(key, str): 

65 key = Key.from_legacy_urlsafe(key) 

66 if key.parent: 

67 parent = normalize_key(key.parent) 

68 else: 

69 parent = None 

70 return Key(key.kind, key.id_or_name, parent=parent) 

71 

72 

73@deprecated(version="3.8.0", reason="Use 'db.normalize_key' instead") 

74def normalizeKey(key: t.Union[None, Key]) -> t.Union[None, Key]: 

75 return normalize_key(key) 

76 

77 

78def key_helper( 

79 in_key: t.Union[Key, str, int], 

80 target_kind: str, 

81 additional_allowed_kinds: t.Union[t.List[str], t.Tuple[str]] = (), 

82 adjust_kind: bool = False, 

83) -> Key: 

84 if isinstance(in_key, Key): 

85 if in_key.kind != target_kind and in_key.kind not in additional_allowed_kinds: 

86 if not adjust_kind: 

87 raise ValueError( 

88 f"Kind mismatch: {in_key.kind!r} != {target_kind!r} (or in {additional_allowed_kinds!r})") 

89 in_key = Key(target_kind, in_key.id_or_name, parent=in_key.parent) 

90 return in_key 

91 elif isinstance(in_key, str): 

92 # Try to parse key from str 

93 try: 

94 decoded_key = normalize_key(in_key) 

95 except Exception: 

96 decoded_key = None 

97 

98 # If it did decode, recall keyHelper with Key object 

99 if decoded_key: 

100 return key_helper( 

101 decoded_key, 

102 target_kind=target_kind, 

103 additional_allowed_kinds=additional_allowed_kinds, 

104 adjust_kind=adjust_kind 

105 ) 

106 

107 # otherwise, construct key from str or int 

108 if in_key.isdigit(): 

109 in_key = int(in_key) 

110 

111 return Key(target_kind, in_key) 

112 elif isinstance(in_key, int): 

113 return Key(target_kind, in_key) 

114 

115 raise NotImplementedError(f"Unsupported key type {type(in_key)}") 

116 

117 

118def keyHelper( 

119 inKey: t.Union[Key, str, int], 

120 targetKind: str, 

121 additionalAllowedKinds: t.Union[t.List[str], t.Tuple[str]] = (), 

122 adjust_kind: bool = False, 

123) -> Key: 

124 return key_helper( 

125 in_key=inKey, 

126 target_kind=targetKind, 

127 additional_allowed_kinds=additionalAllowedKinds, 

128 adjust_kind=adjust_kind 

129 ) 

130 

131 

132def is_in_transaction() -> bool: 

133 return __client__.current_transaction is not None 

134 

135 

136@deprecated(version="3.8.0", reason="Use 'db.utils.is_in_transaction' instead") 

137def IsInTransaction() -> bool: 

138 return is_in_transaction() 

139 

140 

141def get_or_insert(key: Key, **kwargs) -> Entity: 

142 """ 

143 Either creates a new entity with the given key, or returns the existing one. 

144 

145 Its guaranteed that there is no race-condition here; it will never overwrite a 

146 previously created entity. Extra keyword arguments passed to this function will be 

147 used to populate the entity if it has to be created; otherwise they are ignored. 

148 

149 :param key: The key which will be fetched or created. 

150 :returns: Returns the fetched or newly created Entity. 

151 """ 

152 

153 def txn(key, kwargs): 

154 obj = get(key) 

155 if not obj: 

156 obj = Entity(key) 

157 for k, v in kwargs.items(): 

158 obj[k] = v 

159 put(obj) 

160 return obj 

161 

162 if is_in_transaction(): 

163 return txn(key, kwargs) 

164 return run_in_transaction(txn, key, kwargs) 

165 

166 

167@deprecated(version="3.8.0", reason="Use 'db.get_or_insert' instead") 

168def GetOrInsert(key: Key, **kwargs: t.Any) -> Entity: 

169 return get_or_insert(key, **kwargs) 

170 

171 

172@deprecated(version="3.8.0", reason="Use 'str(key)' instead") 

173def encodeKey(key: Key) -> str: 

174 """ 

175 Return the given key encoded as string (mimicking the old str() behaviour of keys) 

176 """ 

177 return str(key) 

178 

179 

180def acquire_transaction_success_marker() -> str: 

181 """ 

182 Generates a token that will be written to the datastore (under "viur-transactionmarker") if the transaction 

183 completes successfully. Currently only used by deferredTasks to check if the task should actually execute 

184 or if the transaction it was created in failed. 

185 :return: Name of the entry in viur-transactionmarker 

186 """ 

187 txn: Transaction | None = __client__.current_transaction 

188 assert txn, "acquire_transaction_success_marker cannot be called outside an transaction" 

189 marker = str(txn.id) 

190 request_data = current.request_data.get() 

191 if not request_data.get("__viur-transactionmarker__"): 

192 db_obj = Entity(Key("viur-transactionmarker", marker)) 

193 db_obj["creationdate"] = datetime.datetime.now(datetime.timezone.utc) 

194 put(db_obj) 

195 request_data["__viur-transactionmarker__"] = True 

196 return marker 

197 

198 

199def start_data_access_log() -> t.Set[t.Union[Key, str]]: 

200 """ 

201 Clears our internal access log (which keeps track of which entries have been accessed in the current 

202 request). The old set of accessed entries is returned so that it can be restored with 

203 :func:`server.db.popAccessData` in case of nested caching. You must call popAccessData afterwards, otherwise 

204 we'll continue to log all entries accessed in subsequent request on the same thread! 

205 :return: t.Set of old accessed entries 

206 """ 

207 old = current_db_access_log.get(set()) 

208 current_db_access_log.set(set()) 

209 return old 

210 

211 

212def startDataAccessLog() -> t.Set[t.Union[Key, str]]: 

213 return start_data_access_log() 

214 

215 

216def end_data_access_log( 

217 outer_access_log: t.Optional[t.Set[t.Union[Key, str]]] = None, 

218) -> t.Optional[t.Set[t.Union[Key, str]]]: 

219 """ 

220 Retrieves the set of entries accessed so far. 

221 

222 To clean up and restart the log, call :func:`viur.datastore.startAccessDataLog`. 

223 

224 If you called :func:`server.db.startAccessDataLog` before, you can re-apply the old log using 

225 the outerAccessLog param. Otherwise, it will disable the access log. 

226 

227 :param outerAccessLog: State of your log returned by :func:`server.db.startAccessDataLog` 

228 :return: t.Set of entries accessed 

229 """ 

230 res = current_db_access_log.get() 

231 if isinstance(outer_access_log, set): 

232 current_db_access_log.set((outer_access_log or set()).union(res)) 

233 else: 

234 current_db_access_log.set(None) 

235 return res 

236 

237 

238def endDataAccessLog( 

239 outerAccessLog: t.Optional[t.Set[t.Union[Key, str]]] = None, 

240) -> t.Optional[t.Set[t.Union[Key, str]]]: 

241 return end_data_access_log(outer_access_log=outerAccessLog)