Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/script.py: 0%

111 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 11:04 +0000

1import io 

2import typing as t 

3from viur.core.bones import * 

4from viur.core.prototypes.tree import Tree, TreeSkel, SkelType 

5from viur.core.modules.file import File 

6from viur.core import db, conf, current, skeleton, tasks, errors 

7from viur.core.decorators import exposed 

8from viur.core.i18n import translate 

9import zipfile 

10 

11 

12class BaseScriptAbstractSkel(TreeSkel): 

13 path = StringBone( 

14 descr="Path", 

15 readOnly=True, 

16 unique=UniqueValue(UniqueLockMethod.SameValue, True, "This path is already taken!") 

17 ) 

18 

19 @classmethod 

20 def fromClient(cls, skel, data, *args, **kwargs): 

21 # Set script name when provided, so that the path can be regenerated 

22 if name := data.get("name"): 

23 skel["name"] = name 

24 conf.main_app.script.update_path(skel) 

25 

26 ret = super().fromClient(skel, data, *args, **kwargs) 

27 

28 if not ret: 

29 # in case the path failed because the unique value is already taken, rewrite the error for name field 

30 for error in skel.errors: 

31 if error.severity == ReadFromClientErrorSeverity.Invalid and error.fieldPath == ["path"]: 

32 error.fieldPath = ["name"] 

33 break 

34 

35 return ret 

36 

37 

38class ScriptNodeSkel(BaseScriptAbstractSkel): 

39 kindName = "viur-script-node" 

40 

41 rootNode = BooleanBone( 

42 descr="Is root node?", 

43 defaultValue=False, 

44 ) 

45 

46 plugin = BooleanBone( 

47 descr="Is plugin?", 

48 defaultValue=False 

49 ) 

50 

51 name = StringBone( 

52 descr="Folder", 

53 required=True, 

54 vfunc=lambda value: None if File.is_valid_filename(value) else "Foldername is invalid" 

55 ) 

56 

57 

58class ScriptLeafSkel(BaseScriptAbstractSkel): 

59 kindName = "viur-script-leaf" 

60 

61 name = StringBone( 

62 descr="Filename", 

63 required=True, 

64 vfunc=lambda value: 

65 None if File.is_valid_filename(value) and value.endswith(".py") and value.removesuffix(".py") 

66 else "Filename is invalid or doesn't have a '.py'-suffix", 

67 ) 

68 

69 script = RawBone( 

70 descr="Code", 

71 indexed=False, 

72 ) 

73 

74 access = SelectBone( 

75 descr="Required access rights to run this Script", 

76 values=lambda: { 

77 right: translate(f"viur.core.modules.user.accessright.{right}", defaultText=right) 

78 for right in sorted(conf.user.access_rights) 

79 }, 

80 multiple=True, 

81 ) 

82 

83 

84class Script(Tree): 

85 """ 

86 Script is a system module used to serve a filesystem for scripts used by ViUR Scriptor and ViUR CLI. 

87 """ 

88 

89 leafSkelCls = ScriptLeafSkel 

90 nodeSkelCls = ScriptNodeSkel 

91 

92 roles = { 

93 "admin": "*", 

94 } 

95 

96 def adminInfo(self): 

97 return conf.script_admin_info or {} 

98 

99 def getAvailableRootNodes(self): 

100 if not current.user.get(): 

101 return [] 

102 

103 return [{ 

104 "name": "Scripts", 

105 "key": self.rootnodeSkel(ensure=True)["key"], 

106 }] 

107 

108 @exposed 

109 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

110 try: 

111 return super().view(skelType, key, *args, **kwargs) 

112 except errors.NotFound: 

113 # When key is not found, try to interpret key as path 

114 if skel := self.viewSkel(skelType).all().mergeExternalFilter({"path": key}).getSkel(): 

115 return super().view(skelType, skel["key"], *args, **kwargs) 

116 

117 raise 

118 

119 def onEdit(self, skelType, skel): 

120 self.update_path(skel) 

121 super().onEdit(skelType, skel) 

122 

123 def onEdited(self, skelType, skel): 

124 if skelType == "node": 

125 self.update_path_recursive("node", skel["path"], skel["key"]) 

126 self.update_path_recursive("leaf", skel["path"], skel["key"]) 

127 

128 super().onEdited(skelType, skel) 

129 

130 @tasks.CallDeferred 

131 def update_path_recursive(self, skel_type, path, parent_key, cursor=None): 

132 """ 

133 Recursively updates all items under a given parent key. 

134 """ 

135 query = self.editSkel(skel_type).all().filter("parententry", parent_key) 

136 query.setCursor(cursor) 

137 

138 for skel in query.fetch(99): 

139 new_path = path + "/" + skel["name"] 

140 

141 # only update when path changed 

142 if new_path != skel["path"]: 

143 skel["path"] = new_path # self.onEdit() is NOT required, as it resolves the path again. 

144 skel.write() 

145 self.onEdited(skel_type, skel) # triggers this recursion for nodes, again. 

146 

147 if cursor := query.getCursor(): 

148 self.update_path_recursive(skel_type, path, parent_key, cursor) 

149 

150 def update_path(self, skel): 

151 """ 

152 Updates the path-value of a either a folder or a script file, by resolving the repository's root node. 

153 """ 

154 path = [skel["name"]] 

155 

156 key = skel["parententry"] 

157 while key: 

158 parent_skel = self.viewSkel("node") 

159 if not parent_skel.read(key) or parent_skel["key"] == skel["parentrepo"]: 

160 break 

161 

162 path.insert(0, parent_skel["name"]) 

163 key = parent_skel["parententry"] 

164 

165 skel["path"] = "/".join(path) 

166 

167 @exposed 

168 def get_importable(self): 

169 

170 def get_files_recursively(_importable_key): 

171 res = [] 

172 importable_files_query = self.viewSkel("leaf").all().filter("parententry", _importable_key) 

173 if not (importable_files_query := self.listFilter(importable_files_query)): 

174 raise errors.Unauthorized() 

175 for script_entry in importable_files_query.iter(): 

176 if script_entry["script"]: 

177 res.append(script_entry) 

178 importable_files_query = self.viewSkel("node").all().filter("parententry", _importable_key) 

179 for folder_entry in importable_files_query.iter(): 

180 res.extend(get_files_recursively(folder_entry.key)) 

181 return res 

182 

183 # get importable key 

184 qry_importable = (self.viewSkel("node").all() 

185 .filter("parententry", self.rootnodeSkel(ensure=True)["key"]) 

186 .filter("name =", "importable")) 

187 if not (qry_importable := self.listFilter(qry_importable)): 

188 raise errors.Unauthorized() 

189 

190 importable_key = (entity := qry_importable.getEntry()) and entity.key 

191 if not importable_key: 

192 raise errors.NotFound("No importable folder defined") 

193 

194 importable_files = get_files_recursively(importable_key) 

195 if not importable_files: 

196 raise errors.NotFound("Importable folder is empty") 

197 

198 zip_buffer = io.BytesIO() 

199 with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: 

200 for file in importable_files: 

201 zip_file.writestr(file["path"], file["script"]) 

202 

203 current.request.get().response.headers["Content-Disposition"] = "attachment; filename=importable.zip" 

204 current.request.get().response.headers["Content-Type"] = "application/zip" 

205 return zip_buffer.getvalue()