Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / db / cache.py: 14%

100 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 21:16 +0000

1import datetime 

2import logging 

3import sys 

4import typing as t 

5 

6from viur.core.config import conf 

7from .types import Entity, Key 

8 

9MEMCACHE_MAX_BATCH_SIZE = 30 

10MEMCACHE_NAMESPACE = "viur-datastore" 

11MEMCACHE_TIMEOUT: int | datetime.timedelta = datetime.timedelta(days=1) 

12MEMCACHE_MAX_SIZE: t.Final[int] = 1_000_000 

13TESTBED = None 

14""" 

15 

16 This Module controls the Interaction with the Memcache from Google 

17 To activate the cache copy this code in your main.py 

18 .. code-block:: python 

19 # Example 

20 from viur.core import conf 

21 from google.appengine.api.memcache import Client 

22 conf.db.memcache_client = Client() 

23""" 

24 

25__all__ = [ 

26 "MEMCACHE_MAX_BATCH_SIZE", 

27 "MEMCACHE_NAMESPACE", 

28 "MEMCACHE_TIMEOUT", 

29 "MEMCACHE_MAX_SIZE", 

30 "get", 

31 "put", 

32 "delete", 

33 "flush", 

34] 

35 

36 

37def get(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> t.Union[Entity, list[Entity], None]: 

38 """ 

39 Reads data form the memcache. 

40 :param keys: Unique identifier(s) for one or more entry(s). 

41 :param namespace: Optional namespace to use. 

42 :return: The entity (or None if it has not been found), or a list of entities. 

43 """ 

44 if not check_for_memcache(): 

45 return None 

46 

47 namespace = namespace or MEMCACHE_NAMESPACE 

48 single_request = not isinstance(keys, (list, tuple, set)) 

49 if single_request: 

50 keys = [keys] 

51 keys = [str(key) for key in keys] # Enforce that all keys are strings 

52 cached_data = {} 

53 result = [] 

54 try: 

55 while keys: 

56 cached_data |= conf.db.memcache_client.get_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace) 

57 keys = keys[MEMCACHE_MAX_BATCH_SIZE:] 

58 except Exception as e: 

59 logging.error(f"""Failed to get keys form the memcache with {e=}""") 

60 for key, value in cached_data.items(): 

61 entity = Entity(Key(key)) 

62 entity |= value 

63 result.append(entity) 

64 if single_request: 

65 return result[0] if result else None 

66 return result if result else None 

67 

68 

69def put( 

70 data: t.Union[Entity, t.Dict[Key, Entity], t.Iterable[Entity]], 

71 namespace: t.Optional[str] = None, 

72 timeout: t.Optional[t.Union[int, datetime.timedelta]] = None 

73) -> bool: 

74 """ 

75 Writes Data to the memcache. 

76 :param data: Data to write 

77 :param namespace: Optional namespace to use. 

78 :param timeout: Optional timeout in seconds or a timedelta object. 

79 :return: A boolean indicating success. 

80 """ 

81 if not check_for_memcache(): 

82 return False 

83 

84 namespace = namespace or MEMCACHE_NAMESPACE 

85 timeout = timeout or MEMCACHE_TIMEOUT 

86 if isinstance(timeout, datetime.timedelta): 

87 timeout = timeout.total_seconds() 

88 

89 if isinstance(data, (list, tuple, set)): 

90 data = {item.key: item for item in data} 

91 elif isinstance(data, Entity): 

92 data = {data.key: data} 

93 elif not isinstance(data, dict): 

94 raise TypeError(f"Invalid type {type(data)}. Expected a db.Entity, list or dict.") 

95 # Add only values to cache <= MEMMAX_SIZE (1.000.000) 

96 data = {str(key): value for key, value in data.items() if get_size(value) <= MEMCACHE_MAX_SIZE} 

97 

98 keys = list(data.keys()) 

99 try: 

100 while keys: 

101 data_batch = {key: data[key] for key in keys[:MEMCACHE_MAX_BATCH_SIZE]} 

102 conf.db.memcache_client.set_multi(data_batch, namespace=namespace, time=timeout) 

103 keys = keys[MEMCACHE_MAX_BATCH_SIZE:] 

104 return True 

105 except Exception as e: 

106 logging.error(f"""Failed to put data to the memcache with {e=}""") 

107 return False 

108 

109 

110def delete(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> None: 

111 """ 

112 Deletes an Entry form memcache. 

113 :param keys: Unique identifier(s) for one or more entry(s). 

114 :param namespace: Optional namespace to use. 

115 """ 

116 if not check_for_memcache(): 

117 return None 

118 

119 namespace = namespace or MEMCACHE_NAMESPACE 

120 if not isinstance(keys, list): 

121 keys = [keys] 

122 keys = [str(key) for key in keys] # Enforce that all keys are strings 

123 try: 

124 while keys: 

125 conf.db.memcache_client.delete_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace) 

126 keys = keys[MEMCACHE_MAX_BATCH_SIZE:] 

127 except Exception as e: 

128 logging.error(f"""Failed to delete keys form the memcache with {e=}""") 

129 

130 

131def flush() -> bool: 

132 """ 

133 Deletes everything in memcache. 

134 :return: A boolean indicating success. 

135 """ 

136 if not check_for_memcache(): 

137 return False 

138 try: 

139 conf.db.memcache_client.flush_all() 

140 except Exception as e: 

141 logging.error(f"""Failed to flush the memcache with {e=}""") 

142 return False 

143 return True 

144 

145 

146def get_size(obj: t.Any) -> int: 

147 """ 

148 Utility function that counts the size of an object in bytes. 

149 """ 

150 if isinstance(obj, dict): 

151 return sum(get_size([k, v]) for k, v in obj.items()) 

152 elif isinstance(obj, list): 

153 return sum(get_size(x) for x in obj) 

154 

155 return sys.getsizeof(obj) 

156 

157 

158def check_for_memcache() -> bool: 

159 if conf.db.memcache_client is None: 

160 logging.warning(f"""conf.db.memcache_client is 'None'. It can not be used.""") 

161 return False 

162 init_testbed() 

163 return True 

164 

165 

166def init_testbed() -> None: 

167 global TESTBED 

168 if TESTBED is None and conf.instance.is_dev_server and conf.db.memcache_client: 

169 from google.appengine.ext.testbed import Testbed 

170 TESTBED = Testbed() 

171 TESTBED.activate() 

172 TESTBED.init_memcache_stub()