Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / bones / key.py: 13%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 20:18 +0000

1import copy 

2import logging 

3import typing as t 

4from viur.core import db, i18n 

5from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

6 

7 

8class KeyBone(BaseBone): 

9 """ 

10 The KeyBone is used for managing keys in the database. It provides various methods for validating, 

11 converting, and storing key values, as well as querying the database. 

12 Key management is crucial for maintaining relationships between entities in the database, and the 

13 KeyBone class helps ensure that keys are handled correctly and efficiently throughout the system. 

14 

15 :param descr: The description of the KeyBone. 

16 :param readOnly: Whether the KeyBone is read-only. 

17 :param visible: Whether the KeyBone is visible. 

18 :param allowed_kinds: The allowed entity kinds for the KeyBone. 

19 :param check: Whether to check for entity existence. 

20 """ 

21 type = "key" 

22 

23 def __init__( 

24 self, 

25 *, 

26 descr: str = "Key", 

27 readOnly: bool = True, # default is readonly 

28 visible: bool = False, # default is invisible 

29 allowed_kinds: t.Optional[t.Iterable[str]] = None, # None allows for any kind 

30 check: bool = False, # check for entity existence 

31 **kwargs 

32 ): 

33 super().__init__(descr=descr, readOnly=readOnly, visible=visible, defaultValue=None, **kwargs) 

34 self.allowed_kinds = tuple(allowed_kinds) if allowed_kinds else None 

35 self.check = check 

36 

37 def singleValueFromClient(self, value, skel=None, bone_name=None, client_data=None, parse_only: bool = False): 

38 # check for correct key 

39 if isinstance(value, str): 

40 value = value.strip() 

41 

42 if self.allowed_kinds: 

43 try: 

44 key = db.key_helper(value, self.allowed_kinds[0], self.allowed_kinds[1:]) 

45 except ValueError as e: 

46 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, e.args[0])] 

47 else: 

48 try: 

49 key = db.normalize_key(value) 

50 except Exception as exc: 

51 logging.exception(f"Failed to normalize {value}: {exc}") 

52 return self.getEmptyValue(), [ 

53 ReadFromClientError( 

54 ReadFromClientErrorSeverity.Invalid, 

55 i18n.translate("core.bones.error.invalidkey", "No valid database key could be parsed") 

56 ) 

57 ] 

58 

59 if not parse_only: 

60 # Check custom validity 

61 if err := self.isInvalid(key): 

62 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

63 

64 if self.check: 

65 if db.get(key) is None: 

66 return self.getEmptyValue(), [ 

67 ReadFromClientError( 

68 ReadFromClientErrorSeverity.Invalid, 

69 i18n.translate("core.bones.error.keynotfound", "The provided database key does not exist") 

70 ) 

71 ] 

72 

73 return key, None 

74 

75 def singleValueUnserialize(self, val): 

76 if not val: 

77 rval = None 

78 elif isinstance(val, db.Key): 

79 rval = db.normalize_key(val) 

80 else: 

81 rval, err = self.singleValueFromClient(val, parse_only=True) 

82 if err: 

83 raise ValueError(err[0].errorMessage) 

84 

85 return rval 

86 

87 def unserialize(self, skel: 'SkeletonInstance', name: str) -> bool: 

88 if ( 

89 name == "key" 

90 and isinstance(skel.dbEntity, db.Entity) 

91 and skel.dbEntity.key 

92 and not skel.dbEntity.key.is_partial 

93 ): 

94 skel.accessedValues[name] = skel.dbEntity.key 

95 return True 

96 return super().unserialize(skel, name) 

97 

98 def serialize(self, skel: 'SkeletonInstance', name: str, parentIndexed: bool) -> bool: 

99 if name == "key": 

100 if name not in skel.accessedValues: 

101 return False 

102 

103 skel.dbEntity.key = skel.accessedValues[name] 

104 return True 

105 

106 return super().serialize(skel, name, parentIndexed=parentIndexed) 

107 

108 def buildDBFilter( 

109 self, 

110 name: str, 

111 skel: 'viur.core.skeleton.SkeletonInstance', 

112 dbFilter: db.Query, 

113 rawFilter: dict, 

114 prefix: t.Optional[str] = None 

115 ) -> db.Query: 

116 """ 

117 This method parses the search filter specified by the client in their request and converts 

118 it into a format that can be understood by the datastore. It takes care of ignoring filters 

119 that do not target this bone and safely handles malformed data in the raw filter. 

120 

121 :param name: The property name of this bone in the Skeleton (not the description). 

122 :param skel: The :class:viur.core.skeleton.SkeletonInstance this bone is a part of. 

123 :param dbFilter: The current :class:viur.core.db.Query instance the filters should be 

124 applied to. 

125 :param rawFilter: The dictionary of filters the client wants to have applied. 

126 :param prefix: An optional string to prepend to the filter key. Defaults to None. 

127 

128 :return: The modified :class:viur.core.db.Query. 

129 

130 The method takes the following steps: 

131 

132 #. Decodes the provided key(s) from the raw filter. 

133 #. If the filter contains a list of keys, it iterates through the list, creating a new 

134 filter for each key and appending it to the list of queries. 

135 #. If the filter contains a single key, it applies the filter directly to the query. 

136 #. In case of any invalid key or other issues, it raises a RuntimeError. 

137 """ 

138 

139 def _decodeKey(key): 

140 if isinstance(key, db.Key): 

141 return key 

142 else: 

143 try: 

144 return db.Key.from_legacy_urlsafe(key) 

145 except Exception as e: 

146 logging.exception(e) 

147 logging.warning(f"Could not decode key {key}") 

148 raise RuntimeError() 

149 

150 if name in rawFilter: 

151 if isinstance(rawFilter[name], list): 

152 if isinstance(dbFilter.queries, list): 

153 raise ValueError("In-Filter already used!") 

154 elif dbFilter.queries is None: 

155 return dbFilter # Query is already unsatisfiable 

156 oldFilter = dbFilter.queries 

157 dbFilter.queries = [] 

158 for key in rawFilter[name]: 

159 newFilter = copy.deepcopy(oldFilter) 

160 try: 

161 if name == "key": 

162 newFilter.filters[f"{prefix or ''}{db.KEY_SPECIAL_PROPERTY} ="] = _decodeKey(key) 

163 else: 

164 newFilter.filters[f"{prefix or ''}{name} ="] = _decodeKey(key) 

165 except: # Invalid key or something 

166 raise RuntimeError() 

167 dbFilter.queries.append(newFilter) 

168 else: 

169 try: 

170 if name == "key": 

171 dbFilter.filter(f"""{prefix or ""}{db.KEY_SPECIAL_PROPERTY} =""", _decodeKey(rawFilter[name])) 

172 else: 

173 dbFilter.filter(f"""{prefix or ""}{name} =""", _decodeKey(rawFilter[name])) 

174 except: # Invalid key or something 

175 raise RuntimeError() 

176 return dbFilter 

177 

178 def _atomic_dump(self, value): 

179 if not value: 

180 return None 

181 

182 return str(value)