Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/securitykey.py: 0%

59 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1""" 

2 Implementation of one-time CSRF-security-keys. 

3 

4 CSRF-security-keys (Cross-Site Request Forgery) are used mostly to make requests unique and non-reproducible. 

5 Doing the same request again requires to obtain a fresh security key first. 

6 Furthermore, security keys can be used to implemented credential-reset mechanisms or similar features, where a 

7 URL is only valid for one call. 

8 

9 ..note: 

10 There's also a hidden 3rd type of security-key: The session's static security key. 

11 

12 This key is only revealed once during login, as the protected header "Sec-X-ViUR-StaticSessionKey". 

13 

14 This can be used instead of the one-time sessions security key by sending it back as the same protected HTTP 

15 header and setting the skey value to "STATIC_SESSION_KEY". This is only intended for non-web-browser, 

16 programmatic access (admin tools, import tools etc.) where CSRF attacks are not applicable. 

17 

18 Therefor that header is prefixed with "Sec-" - so it cannot be read or set using JavaScript. 

19""" 

20import typing as t 

21import datetime 

22import hmac 

23from viur.core import conf, utils, current, db, tasks 

24 

25SECURITYKEY_KINDNAME = "viur-securitykey" 

26SECURITYKEY_DURATION = 24 * 60 * 60 # one day 

27SECURITYKEY_STATIC_HEADER: t.Final[str] = "Sec-X-ViUR-StaticSessionKey" 

28"""The name of the header in which the static session key is provided at login 

29and must be specified in requests that require a skey.""" 

30SECURITYKEY_STATIC_SKEY: t.Final[str] = "STATIC_SESSION_KEY" 

31"""Value that must be used as a marker in the payload (key: skey) to indicate 

32that the session key from the headers should be used.""" 

33 

34 

35def create( 

36 duration: None | int | datetime.timedelta = None, 

37 session_bound: bool = True, 

38 key_length: int = 13, 

39 indexed: bool = True, 

40 **custom_data) -> str: 

41 """ 

42 Creates a new one-time CSRF-security-key. 

43 

44 The custom data (given as **custom_data) that can be stored with the key. 

45 Any data provided must be serializable by the datastore. 

46 

47 :param duration: Make this CSRF-token valid for a fixed timeframe. 

48 :param session_bound: Bind this CSRF-token to the current session. 

49 :param indexed: Indexes all values stored with the security-key (default), set False to not index. 

50 :param custom_data: Any other data is stored with the CSRF-token, for later re-use. 

51 

52 :returns: The new one-time key, which is a randomized string. 

53 """ 

54 if any(k.startswith("viur_") for k in custom_data): 

55 raise ValueError("custom_data keys with a 'viur_'-prefix are reserved.") 

56 

57 if not duration: 

58 duration = conf.user.session_life_time if session_bound else SECURITYKEY_DURATION 

59 key = utils.string.random(key_length) 

60 

61 entity = db.Entity(db.Key(SECURITYKEY_KINDNAME, key)) 

62 entity |= custom_data 

63 if session_bound: 

64 session = current.session.get() 

65 if not session.loaded: 

66 session.reset() 

67 entity["viur_session"] = session.cookie_key 

68 

69 else: 

70 entity["viur_session"] = None 

71 

72 entity["viur_until"] = utils.utcNow() + utils.parse.timedelta(duration) 

73 

74 

75 if not indexed: 

76 entity.exclude_from_indexes = [k for k in entity.keys() if not k.startswith("viur_")] 

77 

78 db.Put(entity) 

79 

80 return key 

81 

82 

83def validate(key: str, session_bound: bool = True) -> bool | db.Entity: 

84 """ 

85 Validates a CSRF-security-key. 

86 

87 :param key: The CSRF-token to be validated. 

88 :param session_bound: If True, make sure the CSRF-token is created inside the current session. 

89 :returns: False if the key was not valid for whatever reasons, the data (given during :meth:`create`) as 

90 dictionary or True if the dict is empty (or session was True). 

91 """ 

92 if session_bound and key == SECURITYKEY_STATIC_SKEY: 

93 if skey_header_value := current.request.get().request.headers.get(SECURITYKEY_STATIC_HEADER): 

94 return hmac.compare_digest(current.session.get().static_security_key, skey_header_value) 

95 

96 return False 

97 

98 if not key or not (entity := db.Get(db.Key(SECURITYKEY_KINDNAME, key))): 

99 return False 

100 

101 # First of all, delete the entity, validation is done afterward. 

102 db.Delete(entity) 

103 

104 # Key has expired? 

105 if entity["viur_until"] < utils.utcNow(): 

106 return False 

107 

108 del entity["viur_until"] 

109 

110 # Key is session bound? 

111 if session_bound: 

112 if entity["viur_session"] != current.session.get().cookie_key: 

113 return False 

114 elif entity["viur_session"]: 

115 return False 

116 

117 del entity["viur_session"] 

118 

119 return entity or True 

120 

121 

122@tasks.PeriodicTask(interval=datetime.timedelta(hours=4)) 

123def periodic_clear_skeys(): 

124 from viur.core import tasks 

125 """ 

126 Removes expired CSRF-security-keys periodically. 

127 """ 

128 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_until <", utils.utcNow() - datetime.timedelta(seconds=300)) 

129 tasks.DeleteEntitiesIter.startIterOnQuery(query) 

130 

131 

132@tasks.CallDeferred 

133def clear_session_skeys(session_key): 

134 from viur.core import tasks 

135 """ 

136 Removes any CSRF-security-keys bound to a specific session. 

137 This function is called by the Session-module based on reset-actions. 

138 """ 

139 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_session", session_key) 

140 tasks.DeleteEntitiesIter.startIterOnQuery(query)