Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/script.py: 0%
111 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 11:04 +0000
1import io
2import typing as t
3from viur.core.bones import *
4from viur.core.prototypes.tree import Tree, TreeSkel, SkelType
5from viur.core.modules.file import File
6from viur.core import db, conf, current, skeleton, tasks, errors
7from viur.core.decorators import exposed
8from viur.core.i18n import translate
9import zipfile
12class BaseScriptAbstractSkel(TreeSkel):
13 path = StringBone(
14 descr="Path",
15 readOnly=True,
16 unique=UniqueValue(UniqueLockMethod.SameValue, True, "This path is already taken!")
17 )
19 @classmethod
20 def fromClient(cls, skel, data, *args, **kwargs):
21 # Set script name when provided, so that the path can be regenerated
22 if name := data.get("name"):
23 skel["name"] = name
24 conf.main_app.script.update_path(skel)
26 ret = super().fromClient(skel, data, *args, **kwargs)
28 if not ret:
29 # in case the path failed because the unique value is already taken, rewrite the error for name field
30 for error in skel.errors:
31 if error.severity == ReadFromClientErrorSeverity.Invalid and error.fieldPath == ["path"]:
32 error.fieldPath = ["name"]
33 break
35 return ret
38class ScriptNodeSkel(BaseScriptAbstractSkel):
39 kindName = "viur-script-node"
41 rootNode = BooleanBone(
42 descr="Is root node?",
43 defaultValue=False,
44 )
46 plugin = BooleanBone(
47 descr="Is plugin?",
48 defaultValue=False
49 )
51 name = StringBone(
52 descr="Folder",
53 required=True,
54 vfunc=lambda value: None if File.is_valid_filename(value) else "Foldername is invalid"
55 )
58class ScriptLeafSkel(BaseScriptAbstractSkel):
59 kindName = "viur-script-leaf"
61 name = StringBone(
62 descr="Filename",
63 required=True,
64 vfunc=lambda value:
65 None if File.is_valid_filename(value) and value.endswith(".py") and value.removesuffix(".py")
66 else "Filename is invalid or doesn't have a '.py'-suffix",
67 )
69 script = RawBone(
70 descr="Code",
71 indexed=False,
72 )
74 access = SelectBone(
75 descr="Required access rights to run this Script",
76 values=lambda: {
77 right: translate(f"viur.core.modules.user.accessright.{right}", defaultText=right)
78 for right in sorted(conf.user.access_rights)
79 },
80 multiple=True,
81 )
84class Script(Tree):
85 """
86 Script is a system module used to serve a filesystem for scripts used by ViUR Scriptor and ViUR CLI.
87 """
89 leafSkelCls = ScriptLeafSkel
90 nodeSkelCls = ScriptNodeSkel
92 roles = {
93 "admin": "*",
94 }
96 def adminInfo(self):
97 return conf.script_admin_info or {}
99 def getAvailableRootNodes(self):
100 if not current.user.get():
101 return []
103 return [{
104 "name": "Scripts",
105 "key": self.rootnodeSkel(ensure=True)["key"],
106 }]
108 @exposed
109 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any:
110 try:
111 return super().view(skelType, key, *args, **kwargs)
112 except errors.NotFound:
113 # When key is not found, try to interpret key as path
114 if skel := self.viewSkel(skelType).all().mergeExternalFilter({"path": key}).getSkel():
115 return super().view(skelType, skel["key"], *args, **kwargs)
117 raise
119 def onEdit(self, skelType, skel):
120 self.update_path(skel)
121 super().onEdit(skelType, skel)
123 def onEdited(self, skelType, skel):
124 if skelType == "node":
125 self.update_path_recursive("node", skel["path"], skel["key"])
126 self.update_path_recursive("leaf", skel["path"], skel["key"])
128 super().onEdited(skelType, skel)
130 @tasks.CallDeferred
131 def update_path_recursive(self, skel_type, path, parent_key, cursor=None):
132 """
133 Recursively updates all items under a given parent key.
134 """
135 query = self.editSkel(skel_type).all().filter("parententry", parent_key)
136 query.setCursor(cursor)
138 for skel in query.fetch(99):
139 new_path = path + "/" + skel["name"]
141 # only update when path changed
142 if new_path != skel["path"]:
143 skel["path"] = new_path # self.onEdit() is NOT required, as it resolves the path again.
144 skel.write()
145 self.onEdited(skel_type, skel) # triggers this recursion for nodes, again.
147 if cursor := query.getCursor():
148 self.update_path_recursive(skel_type, path, parent_key, cursor)
150 def update_path(self, skel):
151 """
152 Updates the path-value of a either a folder or a script file, by resolving the repository's root node.
153 """
154 path = [skel["name"]]
156 key = skel["parententry"]
157 while key:
158 parent_skel = self.viewSkel("node")
159 if not parent_skel.read(key) or parent_skel["key"] == skel["parentrepo"]:
160 break
162 path.insert(0, parent_skel["name"])
163 key = parent_skel["parententry"]
165 skel["path"] = "/".join(path)
167 @exposed
168 def get_importable(self):
170 def get_files_recursively(_importable_key):
171 res = []
172 importable_files_query = self.viewSkel("leaf").all().filter("parententry", _importable_key)
173 if not (importable_files_query := self.listFilter(importable_files_query)):
174 raise errors.Unauthorized()
175 for script_entry in importable_files_query.iter():
176 if script_entry["script"]:
177 res.append(script_entry)
178 importable_files_query = self.viewSkel("node").all().filter("parententry", _importable_key)
179 for folder_entry in importable_files_query.iter():
180 res.extend(get_files_recursively(folder_entry.key))
181 return res
183 # get importable key
184 qry_importable = (self.viewSkel("node").all()
185 .filter("parententry", self.rootnodeSkel(ensure=True)["key"])
186 .filter("name =", "importable"))
187 if not (qry_importable := self.listFilter(qry_importable)):
188 raise errors.Unauthorized()
190 importable_key = (entity := qry_importable.getEntry()) and entity.key
191 if not importable_key:
192 raise errors.NotFound("No importable folder defined")
194 importable_files = get_files_recursively(importable_key)
195 if not importable_files:
196 raise errors.NotFound("Importable folder is empty")
198 zip_buffer = io.BytesIO()
199 with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
200 for file in importable_files:
201 zip_file.writestr(file["path"], file["script"])
203 current.request.get().response.headers["Content-Disposition"] = "attachment; filename=importable.zip"
204 current.request.get().response.headers["Content-Type"] = "application/zip"
205 return zip_buffer.getvalue()