Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import glob | |
import json | |
from dataclasses import dataclass | |
from typing import Optional | |
from huggingface_hub import HfApi, snapshot_download | |
from src.envs import TOKEN | |
from src.logging import setup_logger | |
logger = setup_logger(__name__) | |
PENDING_STATUS = "PENDING" | |
RUNNING_STATUS = "RUNNING" | |
FINISHED_STATUS = "FINISHED" | |
FAILED_STATUS = "FAILED" | |
class EvalRequest: | |
"""This class represents one evaluation request file.""" | |
model: str | |
status: str | |
json_filepath: str | |
weight_type: str = "Original" | |
model_type: Optional[str] = None # pretrained, fine-tuned, etc. - define your own categories in | |
precision: str = "" # float16, bfloat16 | |
revision: str = "main" # commit hash | |
submitted_time: Optional[str] = ( | |
"2022-05-18T11:40:22.519222" # random date just so that we can still order requests by date | |
) | |
likes: Optional[int] = 0 | |
params: Optional[int] = None | |
license: Optional[str] = "" | |
base_model: Optional[str] = "" | |
private: Optional[bool] = False | |
def get_model_args(self): | |
"""Edit this function if you want to manage more complex quantization issues. You'll need to map it to | |
the evaluation suite you chose. | |
""" | |
model_args = f"pretrained={self.model},revision={self.revision}" | |
if self.precision in ["float16", "bfloat16"]: | |
model_args += f",dtype={self.precision}" | |
# Quantized models need some added config, the install of bits and bytes, etc | |
else: | |
raise Exception(f"Unknown precision {self.precision}.") | |
return model_args | |
def set_eval_request(api: HfApi, eval_request: EvalRequest, set_to_status: str, hf_repo: str, local_dir: str): | |
"""Updates a given eval request with its new status on the hub (running, completed, failed, ...)""" | |
json_filepath = eval_request.json_filepath | |
with open(json_filepath) as fp: | |
data = json.load(fp) | |
data["status"] = set_to_status | |
with open(json_filepath, "w") as f: | |
f.write(json.dumps(data)) | |
api.upload_file( | |
path_or_fileobj=json_filepath, | |
path_in_repo=json_filepath.replace(local_dir, ""), | |
repo_id=hf_repo, | |
repo_type="dataset", | |
) | |
def get_eval_requests(job_status: list, local_dir: str, hf_repo: str) -> list[EvalRequest]: | |
"""Gets all pending evaluation requests and return a list in which private | |
models appearing first, followed by public models sorted by the number of | |
likes. | |
Returns: | |
`list[EvalRequest]`: a list of model info dicts. | |
""" | |
snapshot_download( | |
repo_id=hf_repo, revision="main", local_dir=local_dir, repo_type="dataset", max_workers=60, token=TOKEN | |
) | |
json_files = glob.glob(f"{local_dir}/**/*.json", recursive=True) | |
eval_requests = [] | |
for json_filepath in json_files: | |
with open(json_filepath) as fp: | |
data = json.load(fp) | |
if data["status"] in job_status: | |
data["json_filepath"] = json_filepath | |
eval_request = EvalRequest(**data) | |
eval_requests.append(eval_request) | |
return eval_requests | |
def eval_was_running(eval_request: EvalRequest): | |
"""Checks whether a file says it's RUNNING to determine whether to FAIL""" | |
json_filepath = eval_request.json_filepath | |
with open(json_filepath) as fp: | |
data = json.load(fp) | |
status = data["status"] | |
return status == RUNNING_STATUS | |
def check_completed_evals( | |
api: HfApi, | |
hf_repo: str, | |
local_dir: str, | |
checked_status: str, | |
completed_status: str, | |
failed_status: str, | |
hf_repo_results: str, | |
local_dir_results: str, | |
): | |
"""Checks if the currently running evals are completed, if yes, update their status on the hub.""" | |
snapshot_download( | |
repo_id=hf_repo_results, | |
revision="main", | |
local_dir=local_dir_results, | |
repo_type="dataset", | |
max_workers=60, | |
token=TOKEN, | |
) | |
running_evals = get_eval_requests(checked_status, hf_repo=hf_repo, local_dir=local_dir) | |
for eval_request in running_evals: | |
model = eval_request.model | |
logger.info("====================================") | |
logger.info(f"Checking {model}") | |
output_path = model | |
output_file = f"{local_dir_results}/{output_path}/results*.json" | |
output_file_exists = len(glob.glob(output_file)) > 0 | |
if output_file_exists: | |
logger.info(f"EXISTS output file exists for {model} setting it to {completed_status}") | |
set_eval_request(api, eval_request, completed_status, hf_repo, local_dir) | |
else: | |
if eval_was_running(eval_request=eval_request): | |
logger.info(f"No result file found for {model} setting it to {failed_status}") | |
set_eval_request(api, eval_request, failed_status, hf_repo, local_dir) | |