Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/securitykey.py: 0%
59 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-27 07:59 +0000
1"""
2 Implementation of one-time CSRF-security-keys.
4 CSRF-security-keys (Cross-Site Request Forgery) are used mostly to make requests unique and non-reproducible.
5 Doing the same request again requires to obtain a fresh security key first.
6 Furthermore, security keys can be used to implemented credential-reset mechanisms or similar features, where a
7 URL is only valid for one call.
9 ..note:
10 There's also a hidden 3rd type of security-key: The session's static security key.
12 This key is only revealed once during login, as the protected header "Sec-X-ViUR-StaticSessionKey".
14 This can be used instead of the one-time sessions security key by sending it back as the same protected HTTP
15 header and setting the skey value to "STATIC_SESSION_KEY". This is only intended for non-web-browser,
16 programmatic access (admin tools, import tools etc.) where CSRF attacks are not applicable.
18 Therefor that header is prefixed with "Sec-" - so it cannot be read or set using JavaScript.
19"""
20import typing as t
21import datetime
22import hmac
23from viur.core import conf, utils, current, db, tasks
25SECURITYKEY_KINDNAME = "viur-securitykey"
26SECURITYKEY_DURATION = 24 * 60 * 60 # one day
27SECURITYKEY_STATIC_HEADER: t.Final[str] = "Sec-X-ViUR-StaticSessionKey"
28"""The name of the header in which the static session key is provided at login
29and must be specified in requests that require a skey."""
30SECURITYKEY_STATIC_SKEY: t.Final[str] = "STATIC_SESSION_KEY"
31"""Value that must be used as a marker in the payload (key: skey) to indicate
32that the session key from the headers should be used."""
35def create(
36 duration: None | int | datetime.timedelta = None,
37 session_bound: bool = True,
38 key_length: int = 13,
39 indexed: bool = True,
40 **custom_data) -> str:
41 """
42 Creates a new one-time CSRF-security-key.
44 The custom data (given as **custom_data) that can be stored with the key.
45 Any data provided must be serializable by the datastore.
47 :param duration: Make this CSRF-token valid for a fixed timeframe.
48 :param session_bound: Bind this CSRF-token to the current session.
49 :param indexed: Indexes all values stored with the security-key (default), set False to not index.
50 :param custom_data: Any other data is stored with the CSRF-token, for later re-use.
52 :returns: The new one-time key, which is a randomized string.
53 """
54 if any(k.startswith("viur_") for k in custom_data):
55 raise ValueError("custom_data keys with a 'viur_'-prefix are reserved.")
57 if not duration:
58 duration = conf.user.session_life_time if session_bound else SECURITYKEY_DURATION
59 key = utils.string.random(key_length)
61 entity = db.Entity(db.Key(SECURITYKEY_KINDNAME, key))
62 entity |= custom_data
63 if session_bound:
64 session = current.session.get()
65 if not session.loaded:
66 session.reset()
67 entity["viur_session"] = session.cookie_key
69 else:
70 entity["viur_session"] = None
72 entity["viur_until"] = utils.utcNow() + utils.parse.timedelta(duration)
75 if not indexed:
76 entity.exclude_from_indexes = [k for k in entity.keys() if not k.startswith("viur_")]
78 db.Put(entity)
80 return key
83def validate(key: str, session_bound: bool = True) -> bool | db.Entity:
84 """
85 Validates a CSRF-security-key.
87 :param key: The CSRF-token to be validated.
88 :param session_bound: If True, make sure the CSRF-token is created inside the current session.
89 :returns: False if the key was not valid for whatever reasons, the data (given during :meth:`create`) as
90 dictionary or True if the dict is empty (or session was True).
91 """
92 if session_bound and key == SECURITYKEY_STATIC_SKEY:
93 if skey_header_value := current.request.get().request.headers.get(SECURITYKEY_STATIC_HEADER):
94 return hmac.compare_digest(current.session.get().static_security_key, skey_header_value)
96 return False
98 if not key or not (entity := db.Get(db.Key(SECURITYKEY_KINDNAME, key))):
99 return False
101 # First of all, delete the entity, validation is done afterward.
102 db.Delete(entity)
104 # Key has expired?
105 if entity["viur_until"] < utils.utcNow():
106 return False
108 del entity["viur_until"]
110 # Key is session bound?
111 if session_bound:
112 if entity["viur_session"] != current.session.get().cookie_key:
113 return False
114 elif entity["viur_session"]:
115 return False
117 del entity["viur_session"]
119 return entity or True
122@tasks.PeriodicTask(interval=datetime.timedelta(hours=4))
123def periodic_clear_skeys():
124 from viur.core import tasks
125 """
126 Removes expired CSRF-security-keys periodically.
127 """
128 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_until <", utils.utcNow() - datetime.timedelta(seconds=300))
129 tasks.DeleteEntitiesIter.startIterOnQuery(query)
132@tasks.CallDeferred
133def clear_session_skeys(session_key):
134 from viur.core import tasks
135 """
136 Removes any CSRF-security-keys bound to a specific session.
137 This function is called by the Session-module based on reset-actions.
138 """
139 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_session", session_key)
140 tasks.DeleteEntitiesIter.startIterOnQuery(query)