Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / db / cache.py: 14%
100 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 21:16 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 21:16 +0000
1import datetime
2import logging
3import sys
4import typing as t
6from viur.core.config import conf
7from .types import Entity, Key
9MEMCACHE_MAX_BATCH_SIZE = 30
10MEMCACHE_NAMESPACE = "viur-datastore"
11MEMCACHE_TIMEOUT: int | datetime.timedelta = datetime.timedelta(days=1)
12MEMCACHE_MAX_SIZE: t.Final[int] = 1_000_000
13TESTBED = None
14"""
16 This Module controls the Interaction with the Memcache from Google
17 To activate the cache copy this code in your main.py
18 .. code-block:: python
19 # Example
20 from viur.core import conf
21 from google.appengine.api.memcache import Client
22 conf.db.memcache_client = Client()
23"""
25__all__ = [
26 "MEMCACHE_MAX_BATCH_SIZE",
27 "MEMCACHE_NAMESPACE",
28 "MEMCACHE_TIMEOUT",
29 "MEMCACHE_MAX_SIZE",
30 "get",
31 "put",
32 "delete",
33 "flush",
34]
37def get(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> t.Union[Entity, list[Entity], None]:
38 """
39 Reads data form the memcache.
40 :param keys: Unique identifier(s) for one or more entry(s).
41 :param namespace: Optional namespace to use.
42 :return: The entity (or None if it has not been found), or a list of entities.
43 """
44 if not check_for_memcache():
45 return None
47 namespace = namespace or MEMCACHE_NAMESPACE
48 single_request = not isinstance(keys, (list, tuple, set))
49 if single_request:
50 keys = [keys]
51 keys = [str(key) for key in keys] # Enforce that all keys are strings
52 cached_data = {}
53 result = []
54 try:
55 while keys:
56 cached_data |= conf.db.memcache_client.get_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace)
57 keys = keys[MEMCACHE_MAX_BATCH_SIZE:]
58 except Exception as e:
59 logging.error(f"""Failed to get keys form the memcache with {e=}""")
60 for key, value in cached_data.items():
61 entity = Entity(Key(key))
62 entity |= value
63 result.append(entity)
64 if single_request:
65 return result[0] if result else None
66 return result if result else None
69def put(
70 data: t.Union[Entity, t.Dict[Key, Entity], t.Iterable[Entity]],
71 namespace: t.Optional[str] = None,
72 timeout: t.Optional[t.Union[int, datetime.timedelta]] = None
73) -> bool:
74 """
75 Writes Data to the memcache.
76 :param data: Data to write
77 :param namespace: Optional namespace to use.
78 :param timeout: Optional timeout in seconds or a timedelta object.
79 :return: A boolean indicating success.
80 """
81 if not check_for_memcache():
82 return False
84 namespace = namespace or MEMCACHE_NAMESPACE
85 timeout = timeout or MEMCACHE_TIMEOUT
86 if isinstance(timeout, datetime.timedelta):
87 timeout = timeout.total_seconds()
89 if isinstance(data, (list, tuple, set)):
90 data = {item.key: item for item in data}
91 elif isinstance(data, Entity):
92 data = {data.key: data}
93 elif not isinstance(data, dict):
94 raise TypeError(f"Invalid type {type(data)}. Expected a db.Entity, list or dict.")
95 # Add only values to cache <= MEMMAX_SIZE (1.000.000)
96 data = {str(key): value for key, value in data.items() if get_size(value) <= MEMCACHE_MAX_SIZE}
98 keys = list(data.keys())
99 try:
100 while keys:
101 data_batch = {key: data[key] for key in keys[:MEMCACHE_MAX_BATCH_SIZE]}
102 conf.db.memcache_client.set_multi(data_batch, namespace=namespace, time=timeout)
103 keys = keys[MEMCACHE_MAX_BATCH_SIZE:]
104 return True
105 except Exception as e:
106 logging.error(f"""Failed to put data to the memcache with {e=}""")
107 return False
110def delete(keys: t.Union[Key, list[Key]], namespace: t.Optional[str] = None) -> None:
111 """
112 Deletes an Entry form memcache.
113 :param keys: Unique identifier(s) for one or more entry(s).
114 :param namespace: Optional namespace to use.
115 """
116 if not check_for_memcache():
117 return None
119 namespace = namespace or MEMCACHE_NAMESPACE
120 if not isinstance(keys, list):
121 keys = [keys]
122 keys = [str(key) for key in keys] # Enforce that all keys are strings
123 try:
124 while keys:
125 conf.db.memcache_client.delete_multi(keys[:MEMCACHE_MAX_BATCH_SIZE], namespace=namespace)
126 keys = keys[MEMCACHE_MAX_BATCH_SIZE:]
127 except Exception as e:
128 logging.error(f"""Failed to delete keys form the memcache with {e=}""")
131def flush() -> bool:
132 """
133 Deletes everything in memcache.
134 :return: A boolean indicating success.
135 """
136 if not check_for_memcache():
137 return False
138 try:
139 conf.db.memcache_client.flush_all()
140 except Exception as e:
141 logging.error(f"""Failed to flush the memcache with {e=}""")
142 return False
143 return True
146def get_size(obj: t.Any) -> int:
147 """
148 Utility function that counts the size of an object in bytes.
149 """
150 if isinstance(obj, dict):
151 return sum(get_size([k, v]) for k, v in obj.items())
152 elif isinstance(obj, list):
153 return sum(get_size(x) for x in obj)
155 return sys.getsizeof(obj)
158def check_for_memcache() -> bool:
159 if conf.db.memcache_client is None:
160 logging.warning(f"""conf.db.memcache_client is 'None'. It can not be used.""")
161 return False
162 init_testbed()
163 return True
166def init_testbed() -> None:
167 global TESTBED
168 if TESTBED is None and conf.instance.is_dev_server and conf.db.memcache_client:
169 from google.appengine.ext.testbed import Testbed
170 TESTBED = Testbed()
171 TESTBED.activate()
172 TESTBED.init_memcache_stub()