import json import os import re import subprocess import time import yaml import gradio as gr import pandas as pd import requests from huggingface_hub import HfApi, get_token CMD = ["python" ,"run_job.py"] ARG_NAMES = ["", "", "", "[-c config]", "[-s split]", "[-p private]"] SPACE_ID = os.environ.get("SPACE_ID") or "lhoestq/run-duckdb-jobs" CONTENT = """ ## Usage: ```bash curl -L 'https://huggingface.co/api/jobs/' \ -H 'Content-Type: application/json' \ -H 'Authorization: Bearer ' \ -d '{{ "spaceId": "{SPACE_ID}", "command": {CMD}, "arguments": {ARG_NAMES}, "environment": {{"HF_TOKEN": }}, "flavor": "cpu-basic" }}' ``` ## Example: """ with open("README.md") as f: METADATA = yaml.safe_load(f.read().split("---\n")[1]) TITLE = METADATA["title"] SHORT_DESCRIPTION = METADATA.get("short_description") EMOJI = METADATA["emoji"] try: process = subprocess.run(CMD + ["--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) HELP = not process.returncode and (process.stdout or process.stderr).decode() except Exception: HELP = False DRY_RUN = bool(HELP) and bool(m :=re.search("--dry(-|_)run", HELP)) and m.group(0) def parse_log(line: str, pbars: dict[str, float] = None): if line.startswith("data: {"): data = json.loads(line[len("data: "):]) data, timestamp = data["data"], data["timestamp"] if pbars is not None and data.startswith("===== Job started at"): pbars.pop("Starting ⚙️", None) pbars["Running 🏃"] = 0.0 return f"[{timestamp}] {data}\n\n" elif pbars is not None and (percent_match := re.search("\\d+(?:\\.\\d+)?%", data)) and any(c in data.split("%")[1][:10] for c in "|█▌"): pbars.pop("Running 🏃", None) [pbars.pop(desc) for desc, percent in pbars.items() if percent == 1.] percent = float(percent_match.group(0)[:-1]) / 100 desc = data[:percent_match.start()].strip() or "Progress" pbars[desc] = percent else: return f"[{timestamp}] {data}\n\n" return "" def dry_run(src, config, split, dst, query): if not all([src, dst, query]): raise gr.Error("Please fill source, destination and query.") args = ["--src", src] + (["--config", config] if config else []) + (["--split", split] if split else []) + [ "--dst", dst, "--query", query, DRY_RUN] cmd = CMD + args logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n" yield {output_markdown: logs, progress_labels: gr.Label(visible=False), details_accordion: gr.Accordion(open=True)} process = subprocess.Popen(cmd, stdout=subprocess.PIPE) for line in iter(process.stdout.readline, b""): logs += line.decode() yield {output_markdown: logs} def run(src, config, split, dst, query, oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None): if not all([src, dst, query]): raise gr.Error("Please fill source, destination and query.") if oauth_token and profile: token = oauth_token.token username = profile.username elif (token := get_token()): username = HfApi().whoami(token=token)["name"] else: raise gr.Error("Please log in to run the job.") args = ["--src", src] + (["--config", config] if config else []) + (["--split", split] if split else []) + [ "--dst", dst, "--query", query] cmd = CMD + args logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n" pbars = {} yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))} resp = requests.post( f"https://huggingface.co/api/jobs/{username}", json={ "spaceId": SPACE_ID, "arguments": args, "command": CMD, "environment": {"HF_TOKEN": token}, "flavor": "cpu-basic" }, headers={"Authorization": f"Bearer {token}"} ) if resp.status_code != 200: logs += resp.text pbars = {"Finished with an error ❌": 1.0} else: job_id = resp.json()["metadata"]["job_id"] pbars = {"Starting ⚙️": 0.0} yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))} resp = requests.get( f"https://huggingface.co/api/jobs/{username}/{job_id}/logs-stream", headers={"Authorization": f"Bearer {token}"}, stream=True ) for line in resp.iter_lines(): logs += parse_log(line.decode("utf-8"), pbars=pbars) yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))} job_status = {"status": {"stage": "RUNNING"}} while True: job_status = requests.get( f"https://huggingface.co/api/jobs/{username}/{job_id}", headers={"Authorization": f"Bearer {token}"} ).json() if job_status["status"]["stage"] == "RUNNING": time.sleep(1) else: break if job_status["status"]["stage"] == "COMPLETED": pbars = {"Finished ✅": 1.0} else: logs += f'{job_status["status"]["message"]} ({job_status["status"]["error"]})' pbars = {"Finished with an error ❌": 1.0} yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))} READ_FUNCTIONS = ("pl.read_parquet", "pl.read_csv", "pl.read_json") NUM_TRENDING_DATASETS = 10 with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=10): gr.Markdown(f"# {TITLE} {EMOJI}") if SHORT_DESCRIPTION: gr.Markdown(SHORT_DESCRIPTION) with gr.Column(): gr.LoginButton() gr.Markdown(CONTENT.format(SPACE_ID=SPACE_ID, CMD=json.dumps(CMD), ARG_NAMES=json.dumps(ARG_NAMES))) with gr.Row(): with gr.Column(scale=10): with gr.Row(): loading_codes_json = gr.JSON([], visible=False) dataset_dropdown = gr.Dropdown(label="Source Dataset", allow_custom_value=True, scale=10) subset_dropdown = gr.Dropdown(info="Subset", allow_custom_value=True, show_label=False, visible=False) split_dropdown = gr.Dropdown(info="Split", allow_custom_value=True, show_label=False, visible=False) with gr.Column(min_width=60): gr.HTML("
") with gr.Column(scale=10): dst_dropdown = gr.Dropdown(label="Destination Dataset", allow_custom_value=True) query_textarea = gr.Textbox(label="SQL Query", lines=2, max_lines=300, placeholder="SELECT * FROM src;", value="SELECT * FROM src;") with gr.Row(): run_button = gr.Button("Run", scale=10, variant="primary") if DRY_RUN: dry_run_button = gr.Button("Dry-Run") progress_labels= gr.Label(visible=False, label="Progress") with gr.Accordion("Details", open=False) as details_accordion: output_markdown = gr.Markdown(label="Output logs") run_button.click(run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[details_accordion, progress_labels, output_markdown]) if DRY_RUN: dry_run_button.click(dry_run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[details_accordion, progress_labels, output_markdown]) def show_subset_dropdown(dataset: str): if dataset and "/" not in dataset.strip().strip("/"): return [] resp = requests.get(f"https://datasets-server.huggingface.co/compatible-libraries?dataset={dataset}", timeout=3).json() loading_codes = ([lib["loading_codes"] for lib in resp.get("libraries", []) if lib["function"] in READ_FUNCTIONS] or [[]])[0] or [] subsets = [loading_code["config_name"] for loading_code in loading_codes] subset = (subsets or [""])[0] return dict(choices=subsets, value=subset, visible=len(subsets) > 1, key=hash(str(loading_codes))), loading_codes def show_split_dropdown(subset: str, loading_codes: list[dict]): splits = ([list(loading_code["arguments"]["splits"]) for loading_code in loading_codes if loading_code["config_name"] == subset] or [[]])[0] split = (splits or [""])[0] return dict(choices=splits, value=split, visible=len(splits) > 1, key=hash(str(loading_codes) + subset)) @demo.load(outputs=[dataset_dropdown, loading_codes_json, subset_dropdown, split_dropdown]) def _fetch_datasets(request: gr.Request): dataset = "CohereForAI/Global-MMLU" datasets = [dataset] + [ds.id for ds in HfApi().list_datasets(limit=NUM_TRENDING_DATASETS, sort="trendingScore", direction=-1) if ds.id != dataset] subsets, loading_codes = show_subset_dropdown(dataset) splits = show_split_dropdown(subsets["value"], loading_codes) return { dataset_dropdown: gr.Dropdown(choices=datasets, value=dataset), loading_codes_json: loading_codes, subset_dropdown: gr.Dropdown(**subsets), split_dropdown: gr.Dropdown(**splits), } @dataset_dropdown.select(inputs=[dataset_dropdown], outputs=[subset_dropdown, split_dropdown]) def _show_subset_dropdown(dataset: str): subsets, loading_codes = show_subset_dropdown(dataset) splits = show_split_dropdown(subsets["value"], loading_codes) return { subset_dropdown: gr.Dropdown(**subsets), split_dropdown: gr.Dropdown(**splits), } @subset_dropdown.select(inputs=[dataset_dropdown, subset_dropdown, loading_codes_json], outputs=[split_dropdown]) def _show_split_dropdown(dataset: str, subset: str, loading_codes: list[dict]): splits = show_split_dropdown(subset, loading_codes) return { split_dropdown: gr.Dropdown(**splits), } if HELP: with demo.route("Help", "/help"): gr.Markdown(f"# Help\n\n```\n{HELP}\n```") with demo.route("Jobs", "/jobs") as page: gr.Markdown("# Jobs") jobs_dataframe = gr.DataFrame(datatype="markdown") @page.load(outputs=[jobs_dataframe]) def list_jobs(oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None): if oauth_token and profile: token = oauth_token.token username = profile.username elif (token := get_token()): username = HfApi().whoami(token=token)["name"] else: return pd.DataFrame({"Log in to see jobs": []}) resp = requests.get( f"https://huggingface.co/api/jobs/{username}", headers={"Authorization": f"Bearer {token}"} ) return pd.DataFrame([ { "id": job["metadata"]["id"], "created_at": job["metadata"]["created_at"], "stage": job["compute"]["status"]["stage"], "output": f'[logs](https://huggingface.co/api/jobs/{username}/{job["metadata"]["id"]}/logs-stream)', "command": str(job["compute"]["spec"]["extra"]["command"]), "args": str(job["compute"]["spec"]["extra"]["args"]), } for job in resp.json() if job["compute"]["spec"]["extra"]["input"]["spaceId"] == SPACE_ID ]) if __name__ == "__main__": demo.launch(server_name="0.0.0.0")