Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / session.py: 29%
128 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 23:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 23:08 +0000
1import datetime
2import logging
3import time
4import typing as t
6from viur.core import current, db, tasks, utils
7from viur.core.config import conf # this import has to stay alone due partial import
8from viur.core.tasks import DeleteEntitiesIter
10"""
11 Provides the session implementation for the Google AppEngine™ based on the datastore.
12 To access the current session, and call current.session.get()
14 Example:
16 .. code-block:: python
18 from viur.core import current
19 sessionData = current.session.get()
20 sessionData["your_key"] = "your_data"
21 data = sessionData["your_key"]
23 A get-method is provided for convenience.
24 It returns None instead of raising an Exception if the key is not found.
25"""
27_SENTINEL: t.Final[object] = object()
29TObserver = t.TypeVar("TObserver", bound=t.Callable[[db.Entity], None])
30"""Type of the observer for :meth:`Session.on_delete`"""
33class Session(db.Entity):
34 """
35 Store Sessions inside the datastore.
36 The behaviour of this module can be customized in the following ways:
38 - :prop:same_site can be set to None, "none", "lax" or "strict" to influence the same-site tag on the cookies
39 we set
40 - :prop:use_session_cookie is set to True by default, causing the cookie to be treated as a session cookie
41 (it will be deleted on browser close). If set to False, it will be emitted with the life-time in
42 conf.user.session_life_time.
43 - The config variable conf.user.session_life_time: Determines, how long (in seconds) a session is valid.
44 Even if :prop:use_session_cookie is set to True, the session is voided server-side after no request has been
45 made within the configured lifetime.
46 - The config variables conf.user.session_persistent_fields_on_login and
47 conf.user.session_persistent_fields_on_logout lists fields, that may survive a login/logout action.
48 For security reasons, we completely destroy a session on login/logout (it will be deleted, a new empty
49 database object will be created and a new cookie with a different key is sent to the browser). This causes
50 all data currently stored to be lost. Only keys listed in these variables will be copied into the new
51 session.
52 """
53 kindName = "viur-session"
54 same_site = "lax" # Either None (don't issue same_site header), "none", "lax" or "strict"
55 use_session_cookie = False # If True, issue the cookie without a lifeTime (will disappear on browser close)
56 cookie_name = f"""viur_cookie_{conf.instance.project_id}"""
57 GUEST_USER = "__guest__"
59 _ON_DELETE_OBSERVER = []
61 def __init__(self):
62 super().__init__()
63 self.changed = False
64 self.cookie_key = None
65 self.static_security_key = None
66 self.loaded = False
68 def load(self):
69 """
70 Initializes the Session.
72 If the client supplied a valid Cookie, the session is read from the datastore, otherwise a new,
73 empty session will be initialized.
74 """
76 if cookie_key := current.request.get().request.cookies.get(self.cookie_name):
77 cookie_key = str(cookie_key)
78 if data := db.get(db.Key(self.kindName, cookie_key)): # Loaded successfully
79 if data["lastseen"] < time.time() - conf.user.session_life_time.total_seconds():
80 # This session is too old
81 self.reset()
82 return False
84 self.loaded = True
85 self.cookie_key = cookie_key
87 super().clear()
88 super().update(data["data"])
90 self.static_security_key = data.get("static_security_key") or data.get("staticSecurityKey")
91 if data["lastseen"] < time.time() - 5 * 60: # Refresh every 5 Minutes
92 self.changed = True
94 else:
95 self.reset()
97 def save(self):
98 """
99 Writes the session into the database.
101 Does nothing, in case the session hasn't been changed in the current request.
102 """
104 if not self.changed:
105 return
106 current_request = current.request.get()
107 # We will not issue sessions over http anymore
108 if not (current_request.isSSLConnection or conf.instance.is_dev_server):
109 return
111 # Get the current user's key
112 try:
113 # Check for our custom user-api
114 user_key = conf.main_app.vi.user.getCurrentUser()["key"]
115 except Exception:
116 user_key = Session.GUEST_USER # this is a guest
118 if not self.loaded:
119 self.cookie_key = utils.string.random(42)
120 self.static_security_key = utils.string.random(13)
122 dbSession = db.Entity(db.Key(self.kindName, self.cookie_key))
124 dbSession["data"] = db.fix_unindexable_properties(self)
125 dbSession["static_security_key"] = self.static_security_key
126 dbSession["lastseen"] = time.time()
127 dbSession["user"] = str(user_key) # allow filtering for users
128 dbSession.exclude_from_indexes = {"data"}
130 db.put(dbSession)
132 # Provide Set-Cookie header entry with configured properties
133 current_request.response.headerlist.append(
134 ("Set-Cookie", f"{self.cookie_name}={self.cookie_key};{self.build_flags()}")
135 )
137 @classmethod
138 def build_flags(cls) -> str:
139 """
140 Assembles the attribute part of a ``Set-Cookie`` header for ViUR sessions.
142 The method was extracted from :meth:`save` so that the same cookie flags can be reused
143 when constructing out-of-band session cookies (e.g. for the App Login Flow in
144 :meth:`~viur.core.modules.user.User._get_cookie_for_app`).
146 Flag behaviour:
148 - ``Path=/`` – cookie is valid for the whole application.
149 - ``HttpOnly`` – cookie is not accessible via JavaScript (XSS mitigation).
150 - ``SameSite=…`` – only added when :attr:`same_site` is set **and** we are not running
151 on the dev server (the dev server often operates cross-site, so the flag would break
152 local development).
153 - ``Secure`` – only added on non-dev servers (requires HTTPS).
154 - ``Max-Age=…`` – only added when :attr:`use_session_cookie` is ``False``; omitting it
155 turns the cookie into a browser-session cookie that disappears on close.
157 :returns: Semicolon-joined flag string ready to be appended to
158 ``<cookie_name>=<value>;`` in a ``Set-Cookie`` header, e.g.
159 ``Path=/;HttpOnly;SameSite=lax;Secure;Max-Age=86400``.
160 """
161 flags = (
162 "Path=/",
163 "HttpOnly",
164 f"SameSite={cls.same_site}" if cls.same_site and not conf.instance.is_dev_server else None,
165 "Secure" if not conf.instance.is_dev_server else None,
166 f"Max-Age={int(conf.user.session_life_time.total_seconds())}" if not cls.use_session_cookie else None,
167 )
168 return ";".join(flag for flag in flags if flag)
170 def __setitem__(self, key: str, item: t.Any):
171 """
172 Stores a new value under the given key.
174 If that key exists before, its value is
175 overwritten.
176 """
177 super().__setitem__(key, item)
178 self.changed = True
180 def markChanged(self) -> None:
181 """
182 Explicitly mark the current session as changed.
183 This will force save() to write into the datastore,
184 even if it believes that this session hasn't changed.
185 """
186 self.changed = True
188 def reset(self) -> None:
189 """
190 Invalidates the current session and starts a new one.
192 This function is especially useful at login, where
193 we might need to create an SSL-capable session.
195 :warning: Everything is flushed.
196 """
198 self.clear()
199 self.cookie_key = utils.string.random(42)
200 self.static_security_key = utils.string.random(13)
201 self.loaded = True
202 self.changed = True
204 def __delitem__(self, key: str) -> None:
205 """
206 Removes a *key* from the session.
207 This key must exist.
208 """
209 super().__delitem__(key)
210 self.changed = True
212 def __ior__(self, other: dict) -> t.Self:
213 """
214 Merges the contents of a dict into the session.
215 """
216 super().__ior__(other)
217 self.changed = True
218 return self
220 def update(self, other: dict) -> None:
221 """
222 Merges the contents of a dict into the session.
223 """
224 self |= other
226 def pop(self, key: str, default=_SENTINEL) -> t.Any:
227 """
228 Delete a specified key from the session.
230 If key is in the session, remove it and return its value, else return default.
231 If default is not given and key is not in the session, a KeyError is raised.
232 """
233 if key in self or default is _SENTINEL:
234 value = super().pop(key)
235 self.changed = True
237 return value
239 return default
241 def clear(self) -> None:
242 if self.cookie_key:
243 db.delete(db.Key(self.kindName, self.cookie_key))
244 from viur.core import securitykey
245 securitykey.clear_session_skeys(self.cookie_key)
247 current.request.get().response.unset_cookie(self.cookie_name, strict=False)
249 self.loaded = False
250 self.cookie_key = None
251 super().clear()
253 def popitem(self) -> t.Tuple[t.Any, t.Any]:
254 self.changed = True
255 return super().popitem()
257 def setdefault(self, key, default=None) -> t.Any:
258 if key not in self:
259 self.changed = True
260 return super().setdefault(key, default)
262 @classmethod
263 def on_delete(cls, func: TObserver, /) -> TObserver:
264 """Decorator to register an observer for the _session delete event_."""
265 cls._ON_DELETE_OBSERVER.append(func)
266 return func
268 @classmethod
269 def dispatch_on_delete(cls, entry: db.Entity) -> None:
270 """Call the observers for the _session delete event_."""
271 for observer in cls._ON_DELETE_OBSERVER:
272 observer(entry)
275class DeleteSessionsIter(DeleteEntitiesIter):
276 """
277 QueryIter to delete all session entities encountered.
279 Each deleted entity triggers a _session delete event_
280 which is dispatched by :meth:`Session.dispatch_on_delete`.
281 """
283 @classmethod
284 def handleEntry(cls, entry: db.Entity, customData: t.Any) -> None:
285 db.delete(entry.key)
286 Session.dispatch_on_delete(entry)
289@tasks.CallDeferred
290def killSessionByUser(user: t.Optional[t.Union[str, "db.Key", None]] = None):
291 """
292 Invalidates all active sessions for the given *user*.
294 This means that this user is instantly logged out.
295 If no user is given, it tries to invalidate **all** active sessions.
297 Use "__guest__" to kill all sessions not associated with a user.
299 :param user: UserID, "__guest__" or None.
300 """
301 logging.info(f"Invalidating all sessions for {user=}")
303 query = db.Query(Session.kindName).filter("user =", str(user))
304 DeleteSessionsIter.startIterOnQuery(query)
307@tasks.PeriodicTask(interval=datetime.timedelta(hours=4))
308def start_clear_sessions():
309 """
310 Removes old (expired) Sessions
311 """
312 query = db.Query(Session.kindName).filter(
313 "lastseen <", time.time() - (conf.user.session_life_time.total_seconds() + 300))
314 DeleteSessionsIter.startIterOnQuery(query)