Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/session.py: 29%
125 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 11:31 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 11:31 +0000
1import datetime
2import logging
3import time
4import typing as t
6from viur.core import current, db, tasks, utils
7from viur.core.config import conf # this import has to stay alone due partial import
8from viur.core.tasks import DeleteEntitiesIter
10"""
11 Provides the session implementation for the Google AppEngine™ based on the datastore.
12 To access the current session, and call current.session.get()
14 Example:
16 .. code-block:: python
18 from viur.core import current
19 sessionData = current.session.get()
20 sessionData["your_key"] = "your_data"
21 data = sessionData["your_key"]
23 A get-method is provided for convenience.
24 It returns None instead of raising an Exception if the key is not found.
25"""
27_SENTINEL: t.Final[object] = object()
29TObserver = t.TypeVar("TObserver", bound=t.Callable[[db.Entity], None])
30"""Type of the observer for :meth:`Session.on_delete`"""
33class Session(db.Entity):
34 """
35 Store Sessions inside the datastore.
36 The behaviour of this module can be customized in the following ways:
38 - :prop:same_site can be set to None, "none", "lax" or "strict" to influence the same-site tag on the cookies
39 we set
40 - :prop:use_session_cookie is set to True by default, causing the cookie to be treated as a session cookie
41 (it will be deleted on browser close). If set to False, it will be emitted with the life-time in
42 conf.user.session_life_time.
43 - The config variable conf.user.session_life_time: Determines, how long (in seconds) a session is valid.
44 Even if :prop:use_session_cookie is set to True, the session is voided server-side after no request has been
45 made within the configured lifetime.
46 - The config variables conf.user.session_persistent_fields_on_login and
47 conf.user.session_persistent_fields_on_logout lists fields, that may survive a login/logout action.
48 For security reasons, we completely destroy a session on login/logout (it will be deleted, a new empty
49 database object will be created and a new cookie with a different key is sent to the browser). This causes
50 all data currently stored to be lost. Only keys listed in these variables will be copied into the new
51 session.
52 """
53 kindName = "viur-session"
54 same_site = "lax" # Either None (don't issue same_site header), "none", "lax" or "strict"
55 use_session_cookie = True # If True, issue the cookie without a lifeTime (will disappear on browser close)
56 cookie_name = f"""viur_cookie_{conf.instance.project_id}"""
57 GUEST_USER = "__guest__"
59 _ON_DELETE_OBSERVER = []
61 def __init__(self):
62 super().__init__()
63 self.changed = False
64 self.cookie_key = None
65 self.static_security_key = None
66 self.loaded = False
68 def load(self):
69 """
70 Initializes the Session.
72 If the client supplied a valid Cookie, the session is read from the datastore, otherwise a new,
73 empty session will be initialized.
74 """
76 if cookie_key := current.request.get().request.cookies.get(self.cookie_name):
77 cookie_key = str(cookie_key)
78 if data := db.Get(db.Key(self.kindName, cookie_key)): # Loaded successfully
79 if data["lastseen"] < time.time() - conf.user.session_life_time:
80 # This session is too old
81 self.reset()
82 return False
84 self.loaded = True
85 self.cookie_key = cookie_key
87 super().clear()
88 super().update(data["data"])
90 self.static_security_key = data.get("static_security_key") or data.get("staticSecurityKey")
91 if data["lastseen"] < time.time() - 5 * 60: # Refresh every 5 Minutes
92 self.changed = True
94 else:
95 self.reset()
97 def save(self):
98 """
99 Writes the session into the database.
101 Does nothing, in case the session hasn't been changed in the current request.
102 """
104 if not self.changed:
105 return
106 current_request = current.request.get()
107 # We will not issue sessions over http anymore
108 if not (current_request.isSSLConnection or conf.instance.is_dev_server):
109 return
111 # Get the current user's key
112 try:
113 # Check for our custom user-api
114 user_key = conf.main_app.vi.user.getCurrentUser()["key"]
115 except Exception:
116 user_key = Session.GUEST_USER # this is a guest
118 if not self.loaded:
119 self.cookie_key = utils.string.random(42)
120 self.static_security_key = utils.string.random(13)
122 dbSession = db.Entity(db.Key(self.kindName, self.cookie_key))
124 dbSession["data"] = db.fixUnindexableProperties(self)
125 dbSession["static_security_key"] = self.static_security_key
126 dbSession["lastseen"] = time.time()
127 dbSession["user"] = str(user_key) # allow filtering for users
128 dbSession.exclude_from_indexes = {"data"}
130 db.Put(dbSession)
132 # Provide Set-Cookie header entry with configured properties
133 flags = (
134 "Path=/",
135 "HttpOnly",
136 f"SameSite={self.same_site}" if self.same_site and not conf.instance.is_dev_server else None,
137 "Secure" if not conf.instance.is_dev_server else None,
138 f"Max-Age={conf.user.session_life_time}" if not self.use_session_cookie else None,
139 )
141 current_request.response.headerlist.append(
142 ("Set-Cookie", f"{self.cookie_name}={self.cookie_key};{';'.join([f for f in flags if f])}")
143 )
145 def __setitem__(self, key: str, item: t.Any):
146 """
147 Stores a new value under the given key.
149 If that key exists before, its value is
150 overwritten.
151 """
152 super().__setitem__(key, item)
153 self.changed = True
155 def markChanged(self) -> None:
156 """
157 Explicitly mark the current session as changed.
158 This will force save() to write into the datastore,
159 even if it believes that this session hasn't changed.
160 """
161 self.changed = True
163 def reset(self) -> None:
164 """
165 Invalidates the current session and starts a new one.
167 This function is especially useful at login, where
168 we might need to create an SSL-capable session.
170 :warning: Everything is flushed.
171 """
173 self.clear()
174 self.cookie_key = utils.string.random(42)
175 self.static_security_key = utils.string.random(13)
176 self.loaded = True
177 self.changed = True
179 def __delitem__(self, key: str) -> None:
180 """
181 Removes a *key* from the session.
182 This key must exist.
183 """
184 super().__delitem__(key)
185 self.changed = True
187 def __ior__(self, other: dict) -> t.Self:
188 """
189 Merges the contents of a dict into the session.
190 """
191 super().__ior__(other)
192 self.changed = True
193 return self
195 def update(self, other: dict) -> None:
196 """
197 Merges the contents of a dict into the session.
198 """
199 self |= other
201 def pop(self, key: str, default=_SENTINEL) -> t.Any:
202 """
203 Delete a specified key from the session.
205 If key is in the session, remove it and return its value, else return default.
206 If default is not given and key is not in the session, a KeyError is raised.
207 """
208 if key in self or default is _SENTINEL:
209 value = super().pop(key)
210 self.changed = True
212 return value
214 return default
216 def clear(self) -> None:
217 if self.cookie_key:
218 db.Delete(db.Key(self.kindName, self.cookie_key))
219 from viur.core import securitykey
220 securitykey.clear_session_skeys(self.cookie_key)
222 current.request.get().response.unset_cookie(self.cookie_name, strict=False)
224 self.loaded = False
225 self.cookie_key = None
226 super().clear()
228 def popitem(self) -> t.Tuple[t.Any, t.Any]:
229 self.changed = True
230 return super().popitem()
232 def setdefault(self, key, default=None) -> t.Any:
233 if key not in self:
234 self.changed = True
235 return super().setdefault(key, default)
237 @classmethod
238 def on_delete(cls, func: TObserver, /) -> TObserver:
239 """Decorator to register an observer for the _session delete event_."""
240 cls._ON_DELETE_OBSERVER.append(func)
241 return func
243 @classmethod
244 def dispatch_on_delete(cls, entry: db.Entity) -> None:
245 """Call the observers for the _session delete event_."""
246 for observer in cls._ON_DELETE_OBSERVER:
247 observer(entry)
250class DeleteSessionsIter(DeleteEntitiesIter):
251 """
252 QueryIter to delete all session entities encountered.
254 Each deleted entity triggers a _session delete event_
255 which is dispatched by :meth:`Session.dispatch_on_delete`.
256 """
258 @classmethod
259 def handleEntry(cls, entry: db.Entity, customData: t.Any) -> None:
260 db.Delete(entry.key)
261 Session.dispatch_on_delete(entry)
264@tasks.CallDeferred
265def killSessionByUser(user: t.Optional[t.Union[str, "db.Key", None]] = None):
266 """
267 Invalidates all active sessions for the given *user*.
269 This means that this user is instantly logged out.
270 If no user is given, it tries to invalidate **all** active sessions.
272 Use "__guest__" to kill all sessions not associated with a user.
274 :param user: UserID, "__guest__" or None.
275 """
276 logging.info(f"Invalidating all sessions for {user=}")
278 query = db.Query(Session.kindName).filter("user =", str(user))
279 DeleteSessionsIter.startIterOnQuery(query)
282@tasks.PeriodicTask(interval=datetime.timedelta(hours=4))
283def start_clear_sessions():
284 """
285 Removes old (expired) Sessions
286 """
287 query = db.Query(Session.kindName).filter("lastseen <", time.time() - (conf.user.session_life_time + 300))
288 DeleteSessionsIter.startIterOnQuery(query)