Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/key.py: 13%

90 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import copy 

2import logging 

3import typing as t 

4from viur.core import db, i18n 

5from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

6 

7 

8class KeyBone(BaseBone): 

9 """ 

10 The KeyBone is used for managing keys in the database. It provides various methods for validating, 

11 converting, and storing key values, as well as querying the database. 

12 Key management is crucial for maintaining relationships between entities in the database, and the 

13 KeyBone class helps ensure that keys are handled correctly and efficiently throughout the system. 

14 

15 :param descr: The description of the KeyBone. 

16 :param readOnly: Whether the KeyBone is read-only. 

17 :param visible: Whether the KeyBone is visible. 

18 :param allowed_kinds: The allowed entity kinds for the KeyBone. 

19 :param check: Whether to check for entity existence. 

20 """ 

21 type = "key" 

22 

23 def __init__( 

24 self, 

25 *, 

26 descr: str = "Key", 

27 readOnly: bool = True, # default is readonly 

28 visible: bool = False, # default is invisible 

29 allowed_kinds: t.Optional[t.Iterable[str]] = None, # None allows for any kind 

30 check: bool = False, # check for entity existence 

31 **kwargs 

32 ): 

33 super().__init__(descr=descr, readOnly=readOnly, visible=visible, defaultValue=None, **kwargs) 

34 self.allowed_kinds = tuple(allowed_kinds) if allowed_kinds else None 

35 self.check = check 

36 

37 def singleValueFromClient(self, value, skel=None, bone_name=None, client_data=None, parse_only: bool = False): 

38 # check for correct key 

39 if isinstance(value, str): 

40 value = value.strip() 

41 

42 if self.allowed_kinds: 

43 try: 

44 key = db.key_helper(value, self.allowed_kinds[0], self.allowed_kinds[1:]) 

45 except ValueError as e: 

46 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, e.args[0])] 

47 else: 

48 try: 

49 key = db.normalize_key(value) 

50 except Exception as exc: 

51 logging.exception(f"Failed to normalize {value}: {exc}") 

52 return self.getEmptyValue(), [ 

53 ReadFromClientError( 

54 ReadFromClientErrorSeverity.Invalid, 

55 i18n.translate("core.bones.error.invalidkey", "No valid database key could be parsed") 

56 ) 

57 ] 

58 

59 if not parse_only: 

60 # Check custom validity 

61 if err := self.isInvalid(key): 

62 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

63 

64 if self.check: 

65 if db.get(key) is None: 

66 return self.getEmptyValue(), [ 

67 ReadFromClientError( 

68 ReadFromClientErrorSeverity.Invalid, 

69 i18n.translate("core.bones.error.keynotfound", "The provided database key does not exist") 

70 ) 

71 ] 

72 

73 return key, None 

74 

75 def singleValueUnserialize(self, val): 

76 if not val: 

77 rval = None 

78 elif isinstance(val, db.Key): 

79 rval = db.normalize_key(val) 

80 else: 

81 rval, err = self.singleValueFromClient(val, parse_only=True) 

82 if err: 

83 raise ValueError(err[0].errorMessage) 

84 

85 return rval 

86 

87 def unserialize(self, skel: 'SkeletonInstance', name: str) -> bool: 

88 if ( 

89 name == "key" 

90 and isinstance(skel.dbEntity, db.Entity) 

91 and skel.dbEntity.key 

92 and not skel.dbEntity.key.is_partial 

93 ): 

94 skel.accessedValues[name] = skel.dbEntity.key 

95 return True 

96 return super().unserialize(skel, name) 

97 

98 def serialize(self, skel: 'SkeletonInstance', name: str, parentIndexed: bool) -> bool: 

99 if name not in skel.accessedValues: 

100 return False 

101 if name == "key": 

102 skel.dbEntity.key = skel.accessedValues["key"] 

103 return True 

104 

105 return super().serialize(skel, name, parentIndexed=parentIndexed) 

106 

107 def buildDBFilter( 

108 self, 

109 name: str, 

110 skel: 'viur.core.skeleton.SkeletonInstance', 

111 dbFilter: db.Query, 

112 rawFilter: dict, 

113 prefix: t.Optional[str] = None 

114 ) -> db.Query: 

115 """ 

116 This method parses the search filter specified by the client in their request and converts 

117 it into a format that can be understood by the datastore. It takes care of ignoring filters 

118 that do not target this bone and safely handles malformed data in the raw filter. 

119 

120 :param name: The property name of this bone in the Skeleton (not the description). 

121 :param skel: The :class:viur.core.skeleton.SkeletonInstance this bone is a part of. 

122 :param dbFilter: The current :class:viur.core.db.Query instance the filters should be 

123 applied to. 

124 :param rawFilter: The dictionary of filters the client wants to have applied. 

125 :param prefix: An optional string to prepend to the filter key. Defaults to None. 

126 

127 :return: The modified :class:viur.core.db.Query. 

128 

129 The method takes the following steps: 

130 

131 #. Decodes the provided key(s) from the raw filter. 

132 #. If the filter contains a list of keys, it iterates through the list, creating a new 

133 filter for each key and appending it to the list of queries. 

134 #. If the filter contains a single key, it applies the filter directly to the query. 

135 #. In case of any invalid key or other issues, it raises a RuntimeError. 

136 """ 

137 

138 def _decodeKey(key): 

139 if isinstance(key, db.Key): 

140 return key 

141 else: 

142 try: 

143 return db.Key.from_legacy_urlsafe(key) 

144 except Exception as e: 

145 logging.exception(e) 

146 logging.warning(f"Could not decode key {key}") 

147 raise RuntimeError() 

148 

149 if name in rawFilter: 

150 if isinstance(rawFilter[name], list): 

151 if isinstance(dbFilter.queries, list): 

152 raise ValueError("In-Filter already used!") 

153 elif dbFilter.queries is None: 

154 return dbFilter # Query is already unsatisfiable 

155 oldFilter = dbFilter.queries 

156 dbFilter.queries = [] 

157 for key in rawFilter[name]: 

158 newFilter = copy.deepcopy(oldFilter) 

159 try: 

160 if name == "key": 

161 newFilter.filters[f"{prefix or ''}{db.KEY_SPECIAL_PROPERTY} ="] = _decodeKey(key) 

162 else: 

163 newFilter.filters[f"{prefix or ''}{name} ="] = _decodeKey(key) 

164 except: # Invalid key or something 

165 raise RuntimeError() 

166 dbFilter.queries.append(newFilter) 

167 else: 

168 try: 

169 if name == "key": 

170 dbFilter.filter(f"""{prefix or ""}{db.KEY_SPECIAL_PROPERTY} =""", _decodeKey(rawFilter[name])) 

171 else: 

172 dbFilter.filter(f"""{prefix or ""}{name} =""", _decodeKey(rawFilter[name])) 

173 except: # Invalid key or something 

174 raise RuntimeError() 

175 return dbFilter 

176 

177 def _atomic_dump(self, value): 

178 if not value: 

179 return None 

180 

181 return str(value)