Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/db/cache.py: 15%
100 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import datetime
3from google.appengine.ext.testbed import Testbed
5import logging
6import sys
7import typing as t
9from viur.core.config import conf
10from .types import Entity, Key
12MEMCACHE_MAX_BATCH_SIZE = 30
13MEMCACHE_NAMESPACE = "viur-datastore"
14MEMCACHE_TIMEOUT: int | datetime.timedelta = datetime.timedelta(days=1)
15MEMCACHE_MAX_SIZE: t.Final[int] = 1_000_000
16TESTBED = None
17"""
19 This Module controls the Interaction with the Memcache from Google
20 To activate the cache copy this code in your main.py
21 .. code-block:: python
22 # Example
23 from viur.core import conf
24 from google.appengine.api.memcache import Client
25 conf.db.memcache_client = Client()
26"""
28__all__ = [
29 "MEMCACHE_MAX_BATCH_SIZE",
30 "MEMCACHE_NAMESPACE",
31 "MEMCACHE_TIMEOUT",
32 "MEMCACHE_MAX_SIZE",
33 "get",
34 "put",
35 "delete",
36 "flush",
37]
40def get(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> t.Union[Entity, list[Entity], None]:
41 """
42 Reads data form the memcache.
43 :param keys: Unique identifier(s) for one or more entry(s).
44 :param namespace: Optional namespace to use.
45 :return: The entity (or None if it has not been found), or a list of entities.
46 """
47 if not check_for_memcache():
48 return None
50 namespace = namespace or MEMCACHE_NAMESPACE
51 single_request = not isinstance(keys, (list, tuple, set))
52 if single_request:
53 keys = [keys]
54 keys = [str(key) for key in keys] # Enforce that all keys are strings
55 cached_data = {}
56 result = []
57 try:
58 while keys:
59 cached_data |= conf.db.memcache_client.get_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace)
60 keys = keys[MEMCACHE_MAX_BATCH_SIZE:]
61 except Exception as e:
62 logging.error(f"""Failed to get keys form the memcache with {e=}""")
63 for key, value in cached_data.items():
64 entity = Entity(Key(key))
65 entity |= value
66 result.append(entity)
67 if single_request:
68 return result[0] if result else None
69 return result if result else None
72def put(
73 data: t.Union[Entity, t.Dict[Key, Entity], t.Iterable[Entity]],
74 namespace: t.Optional[str] = None,
75 timeout: t.Optional[t.Union[int, datetime.timedelta]] = None
76) -> bool:
77 """
78 Writes Data to the memcache.
79 :param data: Data to write
80 :param namespace: Optional namespace to use.
81 :param timeout: Optional timeout in seconds or a timedelta object.
82 :return: A boolean indicating success.
83 """
84 if not check_for_memcache():
85 return False
87 namespace = namespace or MEMCACHE_NAMESPACE
88 timeout = timeout or MEMCACHE_TIMEOUT
89 if isinstance(timeout, datetime.timedelta):
90 timeout = timeout.total_seconds()
92 if isinstance(data, (list, tuple, set)):
93 data = {item.key: item for item in data}
94 elif isinstance(data, Entity):
95 data = {data.key: data}
96 elif not isinstance(data, dict):
97 raise TypeError(f"Invalid type {type(data)}. Expected a db.Entity, list or dict.")
98 # Add only values to cache <= MEMMAX_SIZE (1.000.000)
99 data = {str(key): value for key, value in data.items() if get_size(value) <= MEMCACHE_MAX_SIZE}
101 keys = list(data.keys())
102 try:
103 while keys:
104 data_batch = {key: data[key] for key in keys[:MEMCACHE_MAX_BATCH_SIZE]}
105 conf.db.memcache_client.set_multi(data_batch, namespace=namespace, time=timeout)
106 keys = keys[MEMCACHE_MAX_BATCH_SIZE:]
107 return True
108 except Exception as e:
109 logging.error(f"""Failed to put data to the memcache with {e=}""")
110 return False
113def delete(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> None:
114 """
115 Deletes an Entry form memcache.
116 :param keys: Unique identifier(s) for one or more entry(s).
117 :param namespace: Optional namespace to use.
118 """
119 if not check_for_memcache():
120 return None
122 namespace = namespace or MEMCACHE_NAMESPACE
123 if not isinstance(keys, list):
124 keys = [keys]
125 keys = [str(key) for key in keys] # Enforce that all keys are strings
126 try:
127 while keys:
128 conf.db.memcache_client.delete_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace)
129 keys = keys[MEMCACHE_MAX_BATCH_SIZE:]
130 except Exception as e:
131 logging.error(f"""Failed to delete keys form the memcache with {e=}""")
134def flush() -> bool:
135 """
136 Deletes everything in memcache.
137 :return: A boolean indicating success.
138 """
139 if not check_for_memcache():
140 return False
141 try:
142 conf.db.memcache_client.flush_all()
143 except Exception as e:
144 logging.error(f"""Failed to flush the memcache with {e=}""")
145 return False
146 return True
149def get_size(obj: t.Any) -> int:
150 """
151 Utility function that counts the size of an object in bytes.
152 """
153 if isinstance(obj, dict):
154 return sum(get_size([k, v]) for k, v in obj.items())
155 elif isinstance(obj, list):
156 return sum(get_size(x) for x in obj)
158 return sys.getsizeof(obj)
161def check_for_memcache() -> bool:
162 if conf.db.memcache_client is None:
163 logging.warning(f"""conf.db.memcache_client is 'None'. It can not be used.""")
164 return False
165 init_testbed()
166 return True
169def init_testbed() -> None:
170 global TESTBED
171 if TESTBED is None and conf.instance.is_dev_server and conf.db.memcache_client:
172 TESTBED = Testbed()
173 TESTBED.activate()
174 TESTBED.init_memcache_stub()