|
import gradio as gr |
|
import tempfile |
|
from pathlib import Path |
|
import subprocess |
|
import sys |
|
import os |
|
import shutil |
|
import uuid |
|
import json |
|
import io |
|
import zipfile |
|
import time |
|
|
|
from run_script_venv import main as run_in_venv |
|
|
|
|
|
def cleanup_old_runs(base_dir: Path, max_age: int): |
|
now = time.time() |
|
for d in base_dir.glob("run_*/"): |
|
if d.is_dir() and now - d.stat().st_mtime > max_age: |
|
shutil.rmtree(d, ignore_errors=True) |
|
|
|
def zip_artifacts(output_dir: Path, zip_path: Path): |
|
with zipfile.ZipFile(zip_path, "w") as zf: |
|
for file in output_dir.iterdir(): |
|
zf.write(file, arcname=file.name) |
|
|
|
|
|
def run_script(code: str, user_input: str = "", cleanup_enabled: bool = True, cleanup_after: int = 600): |
|
base_dir = Path("./script_runs") |
|
base_dir.mkdir(exist_ok=True) |
|
if cleanup_enabled: |
|
cleanup_old_runs(base_dir, cleanup_after) |
|
|
|
run_id = uuid.uuid4().hex[:8] |
|
run_dir = base_dir / f"run_{run_id}" |
|
run_dir.mkdir() |
|
script_path = run_dir / "script.py" |
|
input_path = run_dir / "input.txt" |
|
output_path = run_dir / "output" |
|
output_path.mkdir() |
|
|
|
|
|
script_path.write_text(code) |
|
|
|
|
|
input_path.write_text(user_input or "") |
|
|
|
|
|
|
|
with tempfile.TemporaryFile(mode="w+") as output: |
|
orig_stdout = sys.stdout |
|
orig_stderr = sys.stderr |
|
sys.stdout = sys.stderr = output |
|
|
|
try: |
|
sys.argv = [ |
|
"run_script_venv.py", |
|
str(script_path), |
|
"--keep-workdir", |
|
"--extra", "pandas", |
|
] |
|
os.environ["SCRIPT_INPUT"] = str(input_path) |
|
os.environ["SCRIPT_OUTPUT"] = str(output_path) |
|
run_in_venv() |
|
except SystemExit: |
|
pass |
|
finally: |
|
sys.stdout = orig_stdout |
|
sys.stderr = orig_stderr |
|
|
|
output.seek(0) |
|
logs = output.read() |
|
|
|
|
|
|
|
artifacts = {} |
|
for item in output_path.iterdir(): |
|
if item.suffix in {".txt", ".csv", ".json", ".md"}: |
|
artifacts[item.name] = item.read_text() |
|
elif item.suffix.lower() in {".png", ".jpg", ".jpeg"}: |
|
artifacts[item.name] = f"[image file: {item.name}]" |
|
else: |
|
artifacts[item.name] = f"[file saved: {item.name}]" |
|
|
|
|
|
zip_path = run_dir / "output.zip" |
|
zip_artifacts(output_path, zip_path) |
|
artifacts["Download All (ZIP)"] = str(zip_path) |
|
|
|
artifacts["__workdir__"] = str(run_dir) |
|
return logs, artifacts, str(zip_path) |
|
|
|
|
|
def launch_ui(): |
|
with gr.Blocks() as app: |
|
gr.Markdown("# π Run My Script") |
|
run_btn = gr.Button("Run My Script") |
|
with gr.Row(): |
|
cleanup_toggle = gr.Checkbox(label="Enable Auto Cleanup", value=True) |
|
cleanup_seconds = gr.Number(label="Cleanup After (seconds)", value=600, precision=0) |
|
with gr.Row(): |
|
editor = gr.Code(label="Your Python Script", language="python") |
|
user_input = gr.Textbox(label="Optional Input", lines=4, placeholder="Text, JSON, Markdown...") |
|
with gr.Row(): |
|
output_log = gr.Textbox(label="Terminal Output", lines=3) |
|
output_files = gr.JSON(label="Output Artifacts") |
|
zip_file = gr.File(label="Download All Artifacts") |
|
run_btn.click( |
|
fn=run_script, |
|
inputs=[editor, user_input, cleanup_toggle, cleanup_seconds], |
|
outputs=[output_log, output_files, zip_file] |
|
) |
|
|
|
app.launch() |
|
|
|
if __name__ == "__main__": |
|
launch_ui() |
|
|