Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/session.py: 29%

125 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 11:31 +0000

1import datetime 

2import logging 

3import time 

4import typing as t 

5 

6from viur.core import current, db, tasks, utils 

7from viur.core.config import conf # this import has to stay alone due partial import 

8from viur.core.tasks import DeleteEntitiesIter 

9 

10""" 

11 Provides the session implementation for the Google AppEngine™ based on the datastore. 

12 To access the current session, and call current.session.get() 

13 

14 Example: 

15 

16 .. code-block:: python 

17 

18 from viur.core import current 

19 sessionData = current.session.get() 

20 sessionData["your_key"] = "your_data" 

21 data = sessionData["your_key"] 

22 

23 A get-method is provided for convenience. 

24 It returns None instead of raising an Exception if the key is not found. 

25""" 

26 

27_SENTINEL: t.Final[object] = object() 

28 

29TObserver = t.TypeVar("TObserver", bound=t.Callable[[db.Entity], None]) 

30"""Type of the observer for :meth:`Session.on_delete`""" 

31 

32 

33class Session(db.Entity): 

34 """ 

35 Store Sessions inside the datastore. 

36 The behaviour of this module can be customized in the following ways: 

37 

38 - :prop:same_site can be set to None, "none", "lax" or "strict" to influence the same-site tag on the cookies 

39 we set 

40 - :prop:use_session_cookie is set to True by default, causing the cookie to be treated as a session cookie 

41 (it will be deleted on browser close). If set to False, it will be emitted with the life-time in 

42 conf.user.session_life_time. 

43 - The config variable conf.user.session_life_time: Determines, how long (in seconds) a session is valid. 

44 Even if :prop:use_session_cookie is set to True, the session is voided server-side after no request has been 

45 made within the configured lifetime. 

46 - The config variables conf.user.session_persistent_fields_on_login and 

47 conf.user.session_persistent_fields_on_logout lists fields, that may survive a login/logout action. 

48 For security reasons, we completely destroy a session on login/logout (it will be deleted, a new empty 

49 database object will be created and a new cookie with a different key is sent to the browser). This causes 

50 all data currently stored to be lost. Only keys listed in these variables will be copied into the new 

51 session. 

52 """ 

53 kindName = "viur-session" 

54 same_site = "lax" # Either None (don't issue same_site header), "none", "lax" or "strict" 

55 use_session_cookie = True # If True, issue the cookie without a lifeTime (will disappear on browser close) 

56 cookie_name = f"""viur_cookie_{conf.instance.project_id}""" 

57 GUEST_USER = "__guest__" 

58 

59 _ON_DELETE_OBSERVER = [] 

60 

61 def __init__(self): 

62 super().__init__() 

63 self.changed = False 

64 self.cookie_key = None 

65 self.static_security_key = None 

66 self.loaded = False 

67 

68 def load(self): 

69 """ 

70 Initializes the Session. 

71 

72 If the client supplied a valid Cookie, the session is read from the datastore, otherwise a new, 

73 empty session will be initialized. 

74 """ 

75 

76 if cookie_key := current.request.get().request.cookies.get(self.cookie_name): 

77 cookie_key = str(cookie_key) 

78 if data := db.Get(db.Key(self.kindName, cookie_key)): # Loaded successfully 

79 if data["lastseen"] < time.time() - conf.user.session_life_time: 

80 # This session is too old 

81 self.reset() 

82 return False 

83 

84 self.loaded = True 

85 self.cookie_key = cookie_key 

86 

87 super().clear() 

88 super().update(data["data"]) 

89 

90 self.static_security_key = data.get("static_security_key") or data.get("staticSecurityKey") 

91 if data["lastseen"] < time.time() - 5 * 60: # Refresh every 5 Minutes 

92 self.changed = True 

93 

94 else: 

95 self.reset() 

96 

97 def save(self): 

98 """ 

99 Writes the session into the database. 

100 

101 Does nothing, in case the session hasn't been changed in the current request. 

102 """ 

103 

104 if not self.changed: 

105 return 

106 current_request = current.request.get() 

107 # We will not issue sessions over http anymore 

108 if not (current_request.isSSLConnection or conf.instance.is_dev_server): 

109 return 

110 

111 # Get the current user's key 

112 try: 

113 # Check for our custom user-api 

114 user_key = conf.main_app.vi.user.getCurrentUser()["key"] 

115 except Exception: 

116 user_key = Session.GUEST_USER # this is a guest 

117 

118 if not self.loaded: 

119 self.cookie_key = utils.string.random(42) 

120 self.static_security_key = utils.string.random(13) 

121 

122 dbSession = db.Entity(db.Key(self.kindName, self.cookie_key)) 

123 

124 dbSession["data"] = db.fixUnindexableProperties(self) 

125 dbSession["static_security_key"] = self.static_security_key 

126 dbSession["lastseen"] = time.time() 

127 dbSession["user"] = str(user_key) # allow filtering for users 

128 dbSession.exclude_from_indexes = {"data"} 

129 

130 db.Put(dbSession) 

131 

132 # Provide Set-Cookie header entry with configured properties 

133 flags = ( 

134 "Path=/", 

135 "HttpOnly", 

136 f"SameSite={self.same_site}" if self.same_site and not conf.instance.is_dev_server else None, 

137 "Secure" if not conf.instance.is_dev_server else None, 

138 f"Max-Age={conf.user.session_life_time}" if not self.use_session_cookie else None, 

139 ) 

140 

141 current_request.response.headerlist.append( 

142 ("Set-Cookie", f"{self.cookie_name}={self.cookie_key};{';'.join([f for f in flags if f])}") 

143 ) 

144 

145 def __setitem__(self, key: str, item: t.Any): 

146 """ 

147 Stores a new value under the given key. 

148 

149 If that key exists before, its value is 

150 overwritten. 

151 """ 

152 super().__setitem__(key, item) 

153 self.changed = True 

154 

155 def markChanged(self) -> None: 

156 """ 

157 Explicitly mark the current session as changed. 

158 This will force save() to write into the datastore, 

159 even if it believes that this session hasn't changed. 

160 """ 

161 self.changed = True 

162 

163 def reset(self) -> None: 

164 """ 

165 Invalidates the current session and starts a new one. 

166 

167 This function is especially useful at login, where 

168 we might need to create an SSL-capable session. 

169 

170 :warning: Everything is flushed. 

171 """ 

172 

173 self.clear() 

174 self.cookie_key = utils.string.random(42) 

175 self.static_security_key = utils.string.random(13) 

176 self.loaded = True 

177 self.changed = True 

178 

179 def __delitem__(self, key: str) -> None: 

180 """ 

181 Removes a *key* from the session. 

182 This key must exist. 

183 """ 

184 super().__delitem__(key) 

185 self.changed = True 

186 

187 def __ior__(self, other: dict) -> t.Self: 

188 """ 

189 Merges the contents of a dict into the session. 

190 """ 

191 super().__ior__(other) 

192 self.changed = True 

193 return self 

194 

195 def update(self, other: dict) -> None: 

196 """ 

197 Merges the contents of a dict into the session. 

198 """ 

199 self |= other 

200 

201 def pop(self, key: str, default=_SENTINEL) -> t.Any: 

202 """ 

203 Delete a specified key from the session. 

204 

205 If key is in the session, remove it and return its value, else return default. 

206 If default is not given and key is not in the session, a KeyError is raised. 

207 """ 

208 if key in self or default is _SENTINEL: 

209 value = super().pop(key) 

210 self.changed = True 

211 

212 return value 

213 

214 return default 

215 

216 def clear(self) -> None: 

217 if self.cookie_key: 

218 db.Delete(db.Key(self.kindName, self.cookie_key)) 

219 from viur.core import securitykey 

220 securitykey.clear_session_skeys(self.cookie_key) 

221 

222 current.request.get().response.unset_cookie(self.cookie_name, strict=False) 

223 

224 self.loaded = False 

225 self.cookie_key = None 

226 super().clear() 

227 

228 def popitem(self) -> t.Tuple[t.Any, t.Any]: 

229 self.changed = True 

230 return super().popitem() 

231 

232 def setdefault(self, key, default=None) -> t.Any: 

233 if key not in self: 

234 self.changed = True 

235 return super().setdefault(key, default) 

236 

237 @classmethod 

238 def on_delete(cls, func: TObserver, /) -> TObserver: 

239 """Decorator to register an observer for the _session delete event_.""" 

240 cls._ON_DELETE_OBSERVER.append(func) 

241 return func 

242 

243 @classmethod 

244 def dispatch_on_delete(cls, entry: db.Entity) -> None: 

245 """Call the observers for the _session delete event_.""" 

246 for observer in cls._ON_DELETE_OBSERVER: 

247 observer(entry) 

248 

249 

250class DeleteSessionsIter(DeleteEntitiesIter): 

251 """ 

252 QueryIter to delete all session entities encountered. 

253 

254 Each deleted entity triggers a _session delete event_ 

255 which is dispatched by :meth:`Session.dispatch_on_delete`. 

256 """ 

257 

258 @classmethod 

259 def handleEntry(cls, entry: db.Entity, customData: t.Any) -> None: 

260 db.Delete(entry.key) 

261 Session.dispatch_on_delete(entry) 

262 

263 

264@tasks.CallDeferred 

265def killSessionByUser(user: t.Optional[t.Union[str, "db.Key", None]] = None): 

266 """ 

267 Invalidates all active sessions for the given *user*. 

268 

269 This means that this user is instantly logged out. 

270 If no user is given, it tries to invalidate **all** active sessions. 

271 

272 Use "__guest__" to kill all sessions not associated with a user. 

273 

274 :param user: UserID, "__guest__" or None. 

275 """ 

276 logging.info(f"Invalidating all sessions for {user=}") 

277 

278 query = db.Query(Session.kindName).filter("user =", str(user)) 

279 DeleteSessionsIter.startIterOnQuery(query) 

280 

281 

282@tasks.PeriodicTask(interval=datetime.timedelta(hours=4)) 

283def start_clear_sessions(): 

284 """ 

285 Removes old (expired) Sessions 

286 """ 

287 query = db.Query(Session.kindName).filter("lastseen <", time.time() - (conf.user.session_life_time + 300)) 

288 DeleteSessionsIter.startIterOnQuery(query)