Coverage for / home / runner / work / viur-core / viur-core / viur / src / viur / core / modules / script.py: 0%
111 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 12:32 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 12:32 +0000
1import io
2import typing as t
3from viur.core.bones import *
4from viur.core.prototypes.tree import Tree, TreeSkel, SkelType
5from viur.core.modules.file import File
6from viur.core import db, conf, current, skeleton, tasks, errors
7from viur.core.decorators import exposed
8from viur.core.i18n import translate
9import zipfile
12class BaseScriptAbstractSkel(TreeSkel):
13 path = StringBone(
14 descr="Path",
15 readOnly=True,
16 unique=UniqueValue(UniqueLockMethod.SameValue, True, "This path is already taken!")
17 )
19 @classmethod
20 def fromClient(cls, skel, data, *args, **kwargs):
21 # Set script name when provided, so that the path can be regenerated
22 if name := data.get("name"):
23 skel["name"] = name
24 conf.main_app.script.update_path(skel)
26 ret = super().fromClient(skel, data, *args, **kwargs)
28 if not ret:
29 # in case the path failed because the unique value is already taken, rewrite the error for name field
30 for error in skel.errors:
31 if error.severity == ReadFromClientErrorSeverity.Invalid and error.fieldPath == ["path"]:
32 error.fieldPath = ["name"]
33 break
35 return ret
38class ScriptNodeSkel(BaseScriptAbstractSkel):
39 kindName = "viur-script-node"
41 rootNode = BooleanBone(
42 descr="Is root node?",
43 defaultValue=False,
44 )
46 plugin = BooleanBone(
47 descr="Is plugin?",
48 defaultValue=False
49 )
51 name = StringBone(
52 descr="Folder",
53 required=True,
54 vfunc=lambda value: None if File.is_valid_filename(value) else "Foldername is invalid"
55 )
58class ScriptLeafSkel(BaseScriptAbstractSkel):
59 kindName = "viur-script-leaf"
61 name = StringBone(
62 descr="Filename",
63 required=True,
64 vfunc=lambda value:
65 None if File.is_valid_filename(value) and value.endswith(".py") and value.removesuffix(".py")
66 else "Filename is invalid or doesn't have a '.py'-suffix",
67 )
69 script = RawBone(
70 descr="Code",
71 type_suffix="code.python",
72 indexed=False,
73 )
75 access = SelectBone(
76 descr="Required access rights to run this Script",
77 values=lambda: {
78 right: translate(f"viur.core.modules.user.accessright.{right}", defaultText=right)
79 for right in sorted(conf.user.access_rights)
80 },
81 multiple=True,
82 )
85class Script(Tree):
86 """
87 Script is a system module used to serve a filesystem for scripts used by ViUR Scriptor and ViUR CLI.
88 """
90 leafSkelCls = ScriptLeafSkel
91 nodeSkelCls = ScriptNodeSkel
93 roles = {
94 "admin": "*",
95 }
97 def adminInfo(self):
98 return conf.script_admin_info or {}
100 def getAvailableRootNodes(self):
101 if not current.user.get():
102 return []
104 return [{
105 "name": "Scripts",
106 "key": self.rootnodeSkel(ensure=True)["key"],
107 }]
109 @exposed
110 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any:
111 try:
112 return super().view(skelType, key, *args, **kwargs)
113 except errors.NotFound:
114 # When key is not found, try to interpret key as path
115 if skel := self.viewSkel(skelType).all().mergeExternalFilter({"path": key}).getSkel():
116 return super().view(skelType, skel["key"], *args, **kwargs)
118 raise
120 def onEdit(self, skelType, skel):
121 self.update_path(skel)
122 super().onEdit(skelType, skel)
124 def onEdited(self, skelType, skel):
125 if skelType == "node":
126 self.update_path_recursive("node", skel["path"], skel["key"])
127 self.update_path_recursive("leaf", skel["path"], skel["key"])
129 super().onEdited(skelType, skel)
131 @tasks.CallDeferred
132 def update_path_recursive(self, skel_type, path, parent_key, cursor=None):
133 """
134 Recursively updates all items under a given parent key.
135 """
136 query = self.editSkel(skel_type).all().filter("parententry", parent_key)
137 query.setCursor(cursor)
139 for skel in query.fetch(99):
140 new_path = path + "/" + skel["name"]
142 # only update when path changed
143 if new_path != skel["path"]:
144 skel["path"] = new_path # self.onEdit() is NOT required, as it resolves the path again.
145 skel.write()
146 self.onEdited(skel_type, skel) # triggers this recursion for nodes, again.
148 if cursor := query.getCursor():
149 self.update_path_recursive(skel_type, path, parent_key, cursor)
151 def update_path(self, skel):
152 """
153 Updates the path-value of a either a folder or a script file, by resolving the repository's root node.
154 """
155 path = [skel["name"]]
157 key = skel["parententry"]
158 while key:
159 parent_skel = self.viewSkel("node")
160 if not parent_skel.read(key) or parent_skel["key"] == skel["parentrepo"]:
161 break
163 path.insert(0, parent_skel["name"])
164 key = parent_skel["parententry"]
166 skel["path"] = "/".join(path)
168 @exposed
169 def get_importable(self):
171 def get_files_recursively(_importable_key):
172 res = []
173 importable_files_query = self.viewSkel("leaf").all().filter("parententry", _importable_key)
174 if not (importable_files_query := self.listFilter(importable_files_query)):
175 raise errors.Unauthorized()
176 for script_entry in importable_files_query.iter():
177 if script_entry["script"]:
178 res.append(script_entry)
179 importable_files_query = self.viewSkel("node").all().filter("parententry", _importable_key)
180 for folder_entry in importable_files_query.iter():
181 res.extend(get_files_recursively(folder_entry.key))
182 return res
184 # get importable key
185 qry_importable = (self.viewSkel("node").all()
186 .filter("parententry", self.rootnodeSkel(ensure=True)["key"])
187 .filter("name =", "importable"))
188 if not (qry_importable := self.listFilter(qry_importable)):
189 raise errors.Unauthorized()
191 importable_key = (entity := qry_importable.getEntry()) and entity.key
192 if not importable_key:
193 raise errors.NotFound("No importable folder defined")
195 importable_files = get_files_recursively(importable_key)
196 if not importable_files:
197 raise errors.NotFound("Importable folder is empty")
199 zip_buffer = io.BytesIO()
200 with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
201 for file in importable_files:
202 zip_file.writestr(file["path"], file["script"])
204 current.request.get().response.headers["Content-Disposition"] = "attachment; filename=importable.zip"
205 current.request.get().response.headers["Content-Type"] = "application/zip"
206 return zip_buffer.getvalue()