Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/db/cache.py: 15%

100 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import datetime 

2 

3from google.appengine.ext.testbed import Testbed 

4 

5import logging 

6import sys 

7import typing as t 

8 

9from viur.core.config import conf 

10from .types import Entity, Key 

11 

12MEMCACHE_MAX_BATCH_SIZE = 30 

13MEMCACHE_NAMESPACE = "viur-datastore" 

14MEMCACHE_TIMEOUT: int | datetime.timedelta = datetime.timedelta(days=1) 

15MEMCACHE_MAX_SIZE: t.Final[int] = 1_000_000 

16TESTBED = None 

17""" 

18 

19 This Module controls the Interaction with the Memcache from Google 

20 To activate the cache copy this code in your main.py 

21 .. code-block:: python 

22 # Example 

23 from viur.core import conf 

24 from google.appengine.api.memcache import Client 

25 conf.db.memcache_client = Client() 

26""" 

27 

28__all__ = [ 

29 "MEMCACHE_MAX_BATCH_SIZE", 

30 "MEMCACHE_NAMESPACE", 

31 "MEMCACHE_TIMEOUT", 

32 "MEMCACHE_MAX_SIZE", 

33 "get", 

34 "put", 

35 "delete", 

36 "flush", 

37] 

38 

39 

40def get(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> t.Union[Entity, list[Entity], None]: 

41 """ 

42 Reads data form the memcache. 

43 :param keys: Unique identifier(s) for one or more entry(s). 

44 :param namespace: Optional namespace to use. 

45 :return: The entity (or None if it has not been found), or a list of entities. 

46 """ 

47 if not check_for_memcache(): 

48 return None 

49 

50 namespace = namespace or MEMCACHE_NAMESPACE 

51 single_request = not isinstance(keys, (list, tuple, set)) 

52 if single_request: 

53 keys = [keys] 

54 keys = [str(key) for key in keys] # Enforce that all keys are strings 

55 cached_data = {} 

56 result = [] 

57 try: 

58 while keys: 

59 cached_data |= conf.db.memcache_client.get_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace) 

60 keys = keys[MEMCACHE_MAX_BATCH_SIZE:] 

61 except Exception as e: 

62 logging.error(f"""Failed to get keys form the memcache with {e=}""") 

63 for key, value in cached_data.items(): 

64 entity = Entity(Key(key)) 

65 entity |= value 

66 result.append(entity) 

67 if single_request: 

68 return result[0] if result else None 

69 return result if result else None 

70 

71 

72def put( 

73 data: t.Union[Entity, t.Dict[Key, Entity], t.Iterable[Entity]], 

74 namespace: t.Optional[str] = None, 

75 timeout: t.Optional[t.Union[int, datetime.timedelta]] = None 

76) -> bool: 

77 """ 

78 Writes Data to the memcache. 

79 :param data: Data to write 

80 :param namespace: Optional namespace to use. 

81 :param timeout: Optional timeout in seconds or a timedelta object. 

82 :return: A boolean indicating success. 

83 """ 

84 if not check_for_memcache(): 

85 return False 

86 

87 namespace = namespace or MEMCACHE_NAMESPACE 

88 timeout = timeout or MEMCACHE_TIMEOUT 

89 if isinstance(timeout, datetime.timedelta): 

90 timeout = timeout.total_seconds() 

91 

92 if isinstance(data, (list, tuple, set)): 

93 data = {item.key: item for item in data} 

94 elif isinstance(data, Entity): 

95 data = {data.key: data} 

96 elif not isinstance(data, dict): 

97 raise TypeError(f"Invalid type {type(data)}. Expected a db.Entity, list or dict.") 

98 # Add only values to cache <= MEMMAX_SIZE (1.000.000) 

99 data = {str(key): value for key, value in data.items() if get_size(value) <= MEMCACHE_MAX_SIZE} 

100 

101 keys = list(data.keys()) 

102 try: 

103 while keys: 

104 data_batch = {key: data[key] for key in keys[:MEMCACHE_MAX_BATCH_SIZE]} 

105 conf.db.memcache_client.set_multi(data_batch, namespace=namespace, time=timeout) 

106 keys = keys[MEMCACHE_MAX_BATCH_SIZE:] 

107 return True 

108 except Exception as e: 

109 logging.error(f"""Failed to put data to the memcache with {e=}""") 

110 return False 

111 

112 

113def delete(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> None: 

114 """ 

115 Deletes an Entry form memcache. 

116 :param keys: Unique identifier(s) for one or more entry(s). 

117 :param namespace: Optional namespace to use. 

118 """ 

119 if not check_for_memcache(): 

120 return None 

121 

122 namespace = namespace or MEMCACHE_NAMESPACE 

123 if not isinstance(keys, list): 

124 keys = [keys] 

125 keys = [str(key) for key in keys] # Enforce that all keys are strings 

126 try: 

127 while keys: 

128 conf.db.memcache_client.delete_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace) 

129 keys = keys[MEMCACHE_MAX_BATCH_SIZE:] 

130 except Exception as e: 

131 logging.error(f"""Failed to delete keys form the memcache with {e=}""") 

132 

133 

134def flush() -> bool: 

135 """ 

136 Deletes everything in memcache. 

137 :return: A boolean indicating success. 

138 """ 

139 if not check_for_memcache(): 

140 return False 

141 try: 

142 conf.db.memcache_client.flush_all() 

143 except Exception as e: 

144 logging.error(f"""Failed to flush the memcache with {e=}""") 

145 return False 

146 return True 

147 

148 

149def get_size(obj: t.Any) -> int: 

150 """ 

151 Utility function that counts the size of an object in bytes. 

152 """ 

153 if isinstance(obj, dict): 

154 return sum(get_size([k, v]) for k, v in obj.items()) 

155 elif isinstance(obj, list): 

156 return sum(get_size(x) for x in obj) 

157 

158 return sys.getsizeof(obj) 

159 

160 

161def check_for_memcache() -> bool: 

162 if conf.db.memcache_client is None: 

163 logging.warning(f"""conf.db.memcache_client is 'None'. It can not be used.""") 

164 return False 

165 init_testbed() 

166 return True 

167 

168 

169def init_testbed() -> None: 

170 global TESTBED 

171 if TESTBED is None and conf.instance.is_dev_server and conf.db.memcache_client: 

172 TESTBED = Testbed() 

173 TESTBED.activate() 

174 TESTBED.init_memcache_stub()