Coverage for  / home / runner / work / viur-core / viur-core / viur / src / viur / core / modules / script.py: 0%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-23 12:32 +0000

1import io 

2import typing as t 

3from viur.core.bones import * 

4from viur.core.prototypes.tree import Tree, TreeSkel, SkelType 

5from viur.core.modules.file import File 

6from viur.core import db, conf, current, skeleton, tasks, errors 

7from viur.core.decorators import exposed 

8from viur.core.i18n import translate 

9import zipfile 

10 

11 

12class BaseScriptAbstractSkel(TreeSkel): 

13 path = StringBone( 

14 descr="Path", 

15 readOnly=True, 

16 unique=UniqueValue(UniqueLockMethod.SameValue, True, "This path is already taken!") 

17 ) 

18 

19 @classmethod 

20 def fromClient(cls, skel, data, *args, **kwargs): 

21 # Set script name when provided, so that the path can be regenerated 

22 if name := data.get("name"): 

23 skel["name"] = name 

24 conf.main_app.script.update_path(skel) 

25 

26 ret = super().fromClient(skel, data, *args, **kwargs) 

27 

28 if not ret: 

29 # in case the path failed because the unique value is already taken, rewrite the error for name field 

30 for error in skel.errors: 

31 if error.severity == ReadFromClientErrorSeverity.Invalid and error.fieldPath == ["path"]: 

32 error.fieldPath = ["name"] 

33 break 

34 

35 return ret 

36 

37 

38class ScriptNodeSkel(BaseScriptAbstractSkel): 

39 kindName = "viur-script-node" 

40 

41 rootNode = BooleanBone( 

42 descr="Is root node?", 

43 defaultValue=False, 

44 ) 

45 

46 plugin = BooleanBone( 

47 descr="Is plugin?", 

48 defaultValue=False 

49 ) 

50 

51 name = StringBone( 

52 descr="Folder", 

53 required=True, 

54 vfunc=lambda value: None if File.is_valid_filename(value) else "Foldername is invalid" 

55 ) 

56 

57 

58class ScriptLeafSkel(BaseScriptAbstractSkel): 

59 kindName = "viur-script-leaf" 

60 

61 name = StringBone( 

62 descr="Filename", 

63 required=True, 

64 vfunc=lambda value: 

65 None if File.is_valid_filename(value) and value.endswith(".py") and value.removesuffix(".py") 

66 else "Filename is invalid or doesn't have a '.py'-suffix", 

67 ) 

68 

69 script = RawBone( 

70 descr="Code", 

71 type_suffix="code.python", 

72 indexed=False, 

73 ) 

74 

75 access = SelectBone( 

76 descr="Required access rights to run this Script", 

77 values=lambda: { 

78 right: translate(f"viur.core.modules.user.accessright.{right}", defaultText=right) 

79 for right in sorted(conf.user.access_rights) 

80 }, 

81 multiple=True, 

82 ) 

83 

84 

85class Script(Tree): 

86 """ 

87 Script is a system module used to serve a filesystem for scripts used by ViUR Scriptor and ViUR CLI. 

88 """ 

89 

90 leafSkelCls = ScriptLeafSkel 

91 nodeSkelCls = ScriptNodeSkel 

92 

93 roles = { 

94 "admin": "*", 

95 } 

96 

97 def adminInfo(self): 

98 return conf.script_admin_info or {} 

99 

100 def getAvailableRootNodes(self): 

101 if not current.user.get(): 

102 return [] 

103 

104 return [{ 

105 "name": "Scripts", 

106 "key": self.rootnodeSkel(ensure=True)["key"], 

107 }] 

108 

109 @exposed 

110 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

111 try: 

112 return super().view(skelType, key, *args, **kwargs) 

113 except errors.NotFound: 

114 # When key is not found, try to interpret key as path 

115 if skel := self.viewSkel(skelType).all().mergeExternalFilter({"path": key}).getSkel(): 

116 return super().view(skelType, skel["key"], *args, **kwargs) 

117 

118 raise 

119 

120 def onEdit(self, skelType, skel): 

121 self.update_path(skel) 

122 super().onEdit(skelType, skel) 

123 

124 def onEdited(self, skelType, skel): 

125 if skelType == "node": 

126 self.update_path_recursive("node", skel["path"], skel["key"]) 

127 self.update_path_recursive("leaf", skel["path"], skel["key"]) 

128 

129 super().onEdited(skelType, skel) 

130 

131 @tasks.CallDeferred 

132 def update_path_recursive(self, skel_type, path, parent_key, cursor=None): 

133 """ 

134 Recursively updates all items under a given parent key. 

135 """ 

136 query = self.editSkel(skel_type).all().filter("parententry", parent_key) 

137 query.setCursor(cursor) 

138 

139 for skel in query.fetch(99): 

140 new_path = path + "/" + skel["name"] 

141 

142 # only update when path changed 

143 if new_path != skel["path"]: 

144 skel["path"] = new_path # self.onEdit() is NOT required, as it resolves the path again. 

145 skel.write() 

146 self.onEdited(skel_type, skel) # triggers this recursion for nodes, again. 

147 

148 if cursor := query.getCursor(): 

149 self.update_path_recursive(skel_type, path, parent_key, cursor) 

150 

151 def update_path(self, skel): 

152 """ 

153 Updates the path-value of a either a folder or a script file, by resolving the repository's root node. 

154 """ 

155 path = [skel["name"]] 

156 

157 key = skel["parententry"] 

158 while key: 

159 parent_skel = self.viewSkel("node") 

160 if not parent_skel.read(key) or parent_skel["key"] == skel["parentrepo"]: 

161 break 

162 

163 path.insert(0, parent_skel["name"]) 

164 key = parent_skel["parententry"] 

165 

166 skel["path"] = "/".join(path) 

167 

168 @exposed 

169 def get_importable(self): 

170 

171 def get_files_recursively(_importable_key): 

172 res = [] 

173 importable_files_query = self.viewSkel("leaf").all().filter("parententry", _importable_key) 

174 if not (importable_files_query := self.listFilter(importable_files_query)): 

175 raise errors.Unauthorized() 

176 for script_entry in importable_files_query.iter(): 

177 if script_entry["script"]: 

178 res.append(script_entry) 

179 importable_files_query = self.viewSkel("node").all().filter("parententry", _importable_key) 

180 for folder_entry in importable_files_query.iter(): 

181 res.extend(get_files_recursively(folder_entry.key)) 

182 return res 

183 

184 # get importable key 

185 qry_importable = (self.viewSkel("node").all() 

186 .filter("parententry", self.rootnodeSkel(ensure=True)["key"]) 

187 .filter("name =", "importable")) 

188 if not (qry_importable := self.listFilter(qry_importable)): 

189 raise errors.Unauthorized() 

190 

191 importable_key = (entity := qry_importable.getEntry()) and entity.key 

192 if not importable_key: 

193 raise errors.NotFound("No importable folder defined") 

194 

195 importable_files = get_files_recursively(importable_key) 

196 if not importable_files: 

197 raise errors.NotFound("Importable folder is empty") 

198 

199 zip_buffer = io.BytesIO() 

200 with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: 

201 for file in importable_files: 

202 zip_file.writestr(file["path"], file["script"]) 

203 

204 current.request.get().response.headers["Content-Disposition"] = "attachment; filename=importable.zip" 

205 current.request.get().response.headers["Content-Type"] = "application/zip" 

206 return zip_buffer.getvalue()