Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/key.py: 10%

88 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-27 07:59 +0000

1import copy 

2import logging 

3import typing as t 

4from viur.core import db 

5from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

6 

7 

8class KeyBone(BaseBone): 

9 """ 

10 The KeyBone is used for managing keys in the database. It provides various methods for validating, 

11 converting, and storing key values, as well as querying the database. 

12 Key management is crucial for maintaining relationships between entities in the database, and the 

13 KeyBone class helps ensure that keys are handled correctly and efficiently throughout the system. 

14 

15 :param descr: The description of the KeyBone. 

16 :param readOnly: Whether the KeyBone is read-only. 

17 :param visible: Whether the KeyBone is visible. 

18 :param allowed_kinds: The allowed entity kinds for the KeyBone. 

19 :param check: Whether to check for entity existence. 

20 """ 

21 type = "key" 

22 

23 def __init__( 

24 self, 

25 *, 

26 descr: str = "Key", 

27 readOnly: bool = True, # default is readonly 

28 visible: bool = False, # default is invisible 

29 allowed_kinds: t.Optional[t.Iterable[str]] = None, # None allows for any kind 

30 check: bool = False, # check for entity existence 

31 **kwargs 

32 ): 

33 super().__init__(descr=descr, readOnly=readOnly, visible=visible, defaultValue=None, **kwargs) 

34 self.allowed_kinds = tuple(allowed_kinds) if allowed_kinds else None 

35 self.check = check 

36 

37 def singleValueFromClient(self, value, skel=None, bone_name=None, client_data=None, parse_only: bool = False): 

38 # check for correct key 

39 if isinstance(value, str): 

40 value = value.strip() 

41 

42 if self.allowed_kinds: 

43 try: 

44 key = db.keyHelper(value, self.allowed_kinds[0], self.allowed_kinds[1:]) 

45 except ValueError as e: 

46 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, e.args[0])] 

47 else: 

48 try: 

49 if isinstance(value, db.Key): 

50 key = db.normalizeKey(value) 

51 else: 

52 key = db.normalizeKey(db.Key.from_legacy_urlsafe(value)) 

53 except Exception as exc: 

54 logging.exception(f"Failed to normalize {value}: {exc}") 

55 return self.getEmptyValue(), [ 

56 ReadFromClientError( 

57 ReadFromClientErrorSeverity.Invalid, 

58 "The provided key is not a valid database key" 

59 ) 

60 ] 

61 

62 if not parse_only: 

63 # Check custom validity 

64 if err := self.isInvalid(key): 

65 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

66 

67 if self.check: 

68 if db.Get(key) is None: 

69 return self.getEmptyValue(), [ 

70 ReadFromClientError( 

71 ReadFromClientErrorSeverity.Invalid, 

72 "The provided key does not exist" 

73 ) 

74 ] 

75 

76 return key, None 

77 

78 def singleValueUnserialize(self, val): 

79 if not val: 

80 rval = None 

81 elif isinstance(val, db.Key): 

82 rval = db.normalizeKey(val) 

83 else: 

84 rval, err = self.singleValueFromClient(val, parse_only=True) 

85 if err: 

86 raise ValueError(err[0].errorMessage) 

87 

88 return rval 

89 

90 def unserialize(self, skel: 'SkeletonInstance', name: str) -> bool: 

91 if ( 

92 name == "key" 

93 and isinstance(skel.dbEntity, db.Entity) 

94 and skel.dbEntity.key 

95 and not skel.dbEntity.key.is_partial 

96 ): 

97 skel.accessedValues[name] = skel.dbEntity.key 

98 return True 

99 return super().unserialize(skel, name) 

100 

101 def serialize(self, skel: 'SkeletonInstance', name: str, parentIndexed: bool) -> bool: 

102 if name not in skel.accessedValues: 

103 return False 

104 if name == "key": 

105 skel.dbEntity.key = skel.accessedValues["key"] 

106 return True 

107 

108 return super().serialize(skel, name, parentIndexed=parentIndexed) 

109 

110 def buildDBFilter( 

111 self, 

112 name: str, 

113 skel: 'viur.core.skeleton.SkeletonInstance', 

114 dbFilter: db.Query, 

115 rawFilter: dict, 

116 prefix: t.Optional[str] = None 

117 ) -> db.Query: 

118 """ 

119 This method parses the search filter specified by the client in their request and converts 

120 it into a format that can be understood by the datastore. It takes care of ignoring filters 

121 that do not target this bone and safely handles malformed data in the raw filter. 

122 

123 :param name: The property name of this bone in the Skeleton (not the description). 

124 :param skel: The :class:viur.core.skeleton.SkeletonInstance this bone is a part of. 

125 :param dbFilter: The current :class:viur.core.db.Query instance the filters should be 

126 applied to. 

127 :param rawFilter: The dictionary of filters the client wants to have applied. 

128 :param prefix: An optional string to prepend to the filter key. Defaults to None. 

129 

130 :return: The modified :class:viur.core.db.Query. 

131 

132 The method takes the following steps: 

133 

134 #. Decodes the provided key(s) from the raw filter. 

135 #. If the filter contains a list of keys, it iterates through the list, creating a new 

136 filter for each key and appending it to the list of queries. 

137 #. If the filter contains a single key, it applies the filter directly to the query. 

138 #. In case of any invalid key or other issues, it raises a RuntimeError. 

139 """ 

140 

141 def _decodeKey(key): 

142 if isinstance(key, db.Key): 

143 return key 

144 else: 

145 try: 

146 return db.Key.from_legacy_urlsafe(key) 

147 except Exception as e: 

148 logging.exception(e) 

149 logging.warning(f"Could not decode key {key}") 

150 raise RuntimeError() 

151 

152 if name in rawFilter: 

153 if isinstance(rawFilter[name], list): 

154 if isinstance(dbFilter.queries, list): 

155 raise ValueError("In-Filter already used!") 

156 elif dbFilter.queries is None: 

157 return dbFilter # Query is already unsatisfiable 

158 oldFilter = dbFilter.queries 

159 dbFilter.queries = [] 

160 for key in rawFilter[name]: 

161 newFilter = copy.deepcopy(oldFilter) 

162 try: 

163 if name == "key": 

164 newFilter.filters[f"{prefix or ''}{db.KEY_SPECIAL_PROPERTY} ="] = _decodeKey(key) 

165 else: 

166 newFilter.filters[f"{prefix or ''}{name} ="] = _decodeKey(key) 

167 except: # Invalid key or something 

168 raise RuntimeError() 

169 dbFilter.queries.append(newFilter) 

170 else: 

171 try: 

172 if name == "key": 

173 dbFilter.filter(f"""{prefix or ""}{db.KEY_SPECIAL_PROPERTY} =""", _decodeKey(rawFilter[name])) 

174 else: 

175 dbFilter.filter(f"""{prefix or ""}{name} =""", _decodeKey(rawFilter[name])) 

176 except: # Invalid key or something 

177 raise RuntimeError() 

178 return dbFilter