Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / securitykey.py: 0%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 14:23 +0000

1""" 

2 Implementation of one-time CSRF-security-keys. 

3 

4 CSRF-security-keys (Cross-Site Request Forgery) are used mostly to make requests unique and non-reproducible. 

5 Doing the same request again requires to obtain a fresh security key first. 

6 Furthermore, security keys can be used to implemented credential-reset mechanisms or similar features, where a 

7 URL is only valid for one call. 

8 

9 ..note: 

10 There's also a hidden 3rd type of security-key: The session's static security key. 

11 

12 This key is only revealed once during login, as the protected header "Sec-X-ViUR-StaticSessionKey". 

13 

14 This can be used instead of the one-time sessions security key by sending it back as the same protected HTTP 

15 header and setting the skey value to "STATIC_SESSION_KEY". This is only intended for non-web-browser, 

16 programmatic access (admin tools, import tools etc.) where CSRF attacks are not applicable. 

17 

18 Therefor that header is prefixed with "Sec-" - so it cannot be read or set using JavaScript. 

19""" 

20import typing as t 

21import datetime 

22import hmac 

23from viur.core import conf, utils, current, db, tasks 

24 

25SECURITYKEY_KINDNAME = "viur-securitykey" 

26SECURITYKEY_DURATION = 24 * 60 * 60 # one day 

27SECURITYKEY_STATIC_HEADER: t.Final[str] = "Sec-X-ViUR-StaticSessionKey" 

28"""The name of the header in which the static session key is provided at login 

29and must be specified in requests that require a skey.""" 

30SECURITYKEY_STATIC_SKEY: t.Final[str] = "STATIC_SESSION_KEY" 

31"""Value that must be used as a marker in the payload (key: skey) to indicate 

32that the session key from the headers should be used.""" 

33 

34 

35def create( 

36 duration: None | int | datetime.timedelta = None, 

37 session_bound: bool = True, 

38 key_length: int = 13, 

39 indexed: bool = True, 

40 **custom_data, 

41) -> str: 

42 """ 

43 Creates a new one-time CSRF-security-key. 

44 

45 The custom data (given as **custom_data) that can be stored with the key. 

46 Any data provided must be serializable by the datastore. 

47 

48 :param duration: Make this CSRF-token valid for a fixed timeframe. 

49 :param session_bound: Bind this CSRF-token to the current session. 

50 :param indexed: Indexes all values stored with the security-key (default), set False to not index. 

51 :param key_length: Allows to modify the length of the generated randomized key 

52 :param custom_data: Any other data is stored with the CSRF-token, for later re-use. 

53 

54 :returns: The new one-time key, which is a randomized string. 

55 """ 

56 if any(k.startswith("viur_") for k in custom_data): 

57 raise ValueError("custom_data keys with a 'viur_'-prefix are reserved.") 

58 

59 if not duration: 

60 duration = conf.user.session_life_time if session_bound else SECURITYKEY_DURATION 

61 

62 key = utils.string.random(key_length) 

63 

64 entity = db.Entity(db.Key(SECURITYKEY_KINDNAME, key)) 

65 entity |= custom_data 

66 if session_bound: 

67 session = current.session.get() 

68 if not session.loaded: 

69 session.reset() 

70 entity["viur_session"] = session.cookie_key 

71 

72 else: 

73 entity["viur_session"] = None 

74 

75 entity["viur_until"] = utils.utcNow() + utils.parse.timedelta(duration) 

76 

77 

78 if not indexed: 

79 entity.exclude_from_indexes = [k for k in entity.keys() if not k.startswith("viur_")] 

80 

81 db.put(entity) 

82 

83 return key 

84 

85 

86def validate(key: str, session_bound: bool = True) -> bool | db.Entity: 

87 """ 

88 Validates a CSRF-security-key. 

89 

90 :param key: The CSRF-token to be validated. 

91 :param session_bound: If True, make sure the CSRF-token is created inside the current session. 

92 :returns: False if the key was not valid for whatever reasons, the data (given during :meth:`create`) as 

93 dictionary or True if the dict is empty (or session was True). 

94 """ 

95 if session_bound and key == SECURITYKEY_STATIC_SKEY: 

96 if skey_header_value := current.request.get().request.headers.get(SECURITYKEY_STATIC_HEADER): 

97 return hmac.compare_digest(current.session.get().static_security_key, skey_header_value) 

98 

99 return False 

100 

101 if not key or not (entity := db.get(db.Key(SECURITYKEY_KINDNAME, key))): 

102 return False 

103 

104 # First of all, delete the entity, validation is done afterward. 

105 db.delete(entity) 

106 

107 # Key has expired? 

108 if entity["viur_until"] < utils.utcNow(): 

109 return False 

110 

111 del entity["viur_until"] 

112 

113 # Key is session bound? 

114 if session_bound: 

115 if entity["viur_session"] != current.session.get().cookie_key: 

116 return False 

117 elif entity["viur_session"]: 

118 return False 

119 

120 del entity["viur_session"] 

121 

122 return entity or True 

123 

124 

125@tasks.PeriodicTask(interval=datetime.timedelta(hours=4)) 

126def periodic_clear_skeys(): 

127 from viur.core import tasks 

128 """ 

129 Removes expired CSRF-security-keys periodically. 

130 """ 

131 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_until <", utils.utcNow() - datetime.timedelta(seconds=300)) 

132 tasks.DeleteEntitiesIter.startIterOnQuery(query) 

133 

134 

135@tasks.CallDeferred 

136def clear_session_skeys(session_key): 

137 from viur.core import tasks 

138 """ 

139 Removes any CSRF-security-keys bound to a specific session. 

140 This function is called by the Session-module based on reset-actions. 

141 """ 

142 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_session", session_key) 

143 tasks.DeleteEntitiesIter.startIterOnQuery(query)