import fire CONFIG = { "preserve_insertion_order": False } CMD_SRC_KWARGS = """ SELECT ('hf://datasets/{src}/' || lo.arguments['splits']['{split}']) AS path, function FROM ( SELECT unnest(li.loading_codes) AS lo, li.function[4:] as function FROM ( SELECT unnest(libraries) as li FROM read_json('https://datasets-server.huggingface.co/compatible-libraries?dataset={src}') ) WHERE li.function[:3] = 'pl.' ) WHERE lo.config_name='{config}'; """.strip() CMD_SRC = """ CREATE VIEW src AS SELECT * FROM {function}('{path}'); """.strip() CMD_DST = """ COPY ({query}) to 'tmp' (FORMAT PARQUET, ROW_GROUP_SIZE_BYTES '100MB', ROW_GROUPS_PER_FILE 5, PER_THREAD_OUTPUT true); """.strip() CMD_SRC_DRY_RUN = CMD_SRC[:-1] + " LIMIT 5;" CMD_DST_DRY_RUN = "{query};" DATA_CARD = "# Dataset Card for {dst}\n\nDataset prepared from [{src}](https://huggingface.co/datasets/{src}) using\n\n```\n{query}\n```\n" def sql(src: str, dst: str, query: str, config: str = "default", split: str = "train", private: bool = False, dry_run: bool = False): import os import duckdb from huggingface_hub import CommitScheduler, DatasetCard class CommitAndCleanScheduler(CommitScheduler): def push_to_hub(self): for path in self.folder_path.with_name("tmp").glob("*.parquet"): with path.open("rb") as f: footer = f.read(4) and f.seek(-4, os.SEEK_END) and f.read(4) if footer == b"PAR1": path.rename(self.folder_path / path.name) super().push_to_hub() for path in self.last_uploaded: path.unlink(missing_ok=True) con = duckdb.connect(":memory:", config=CONFIG) src_kwargs = con.sql(CMD_SRC_KWARGS.format(src=src, config=config, split=split)).df().to_dict(orient="records") if not src_kwargs: raise ValueError(f'Invalid --config "{config}" for dataset "{src}", please select a valid dataset config/subset.') con.sql((CMD_SRC_DRY_RUN if dry_run else CMD_SRC).format(**src_kwargs[0])) if dry_run: print(f"Sample data from '{src}' that would be written to dataset '{dst}':\n") result = con.sql(CMD_DST_DRY_RUN.format(query=query.rstrip("\n ;"))) print(result.df().to_markdown()) return with CommitAndCleanScheduler(repo_id=dst, repo_type="dataset", folder_path="dst", path_in_repo="data", every=0.1, private=private): con.sql("PRAGMA enable_progress_bar;") result = con.sql(CMD_DST.format(query=query.rstrip("\n ;"))) DatasetCard(DATA_CARD.format(src=src, dst=dst, query=query)).push_to_hub(repo_id=dst, repo_type="dataset") print("done") if __name__ == '__main__': fire.Fire(sql)