import json import time import requests import threading from tqdm import tqdm from huggingface_hub import HfApi from smtp import send_email from config import * def get_space_status(repo_id: str): response: list = requests.get( url=f"{HF_DOMAIN}/api/spaces/semantic-search", params={"q": repo_id}, headers=HEADER, timeout=TIMEOUT, ).json() if not (response and response[0]["id"] == repo_id): return "SLEEPING" # 口袋罪 return response[0]["runtime"]["stage"] def get_spaces(username: str): sleepings, errors = [], [] try: spaces = HfApi().list_spaces(author=username) for space in spaces: status = get_space_status(space.id) if status == "SLEEPING": space_id = space.id.replace("/", "-").replace("_", "-").lower() if space.sdk == "gradio": sleepings.append(f"https://{space_id}.hf.space") else: sleepings.append(f"https://{space_id}.static.hf.space") elif "ERROR" in status: errors.append(f"{HF_DOMAIN}/spaces/{space.id}") return sleepings, errors except Exception as e: print(f"An error occurred in the request: {e}") return [], [] def activate_space(url: str): try: response = requests.get(url, headers=HEADER, timeout=TIMEOUT) response.raise_for_status() except Exception as e: print(e) def get_studios(username: str): try: response = requests.put( f"{MS_DOMAIN}/api/v1/studios/{username}/list", data=json.dumps( { "PageNumber": 1, "PageSize": 1000, "Name": "", "SortBy": "gmt_modified", "Order": "desc", } ), headers=HEADER, timeout=TIMEOUT, ) response.raise_for_status() spaces: list = response.json()["Data"]["Studios"] if spaces: studios, errors = [], [] for space in spaces: status = space["Status"] if status == "Expired": studios.append(f"{username}/{space['Name']}") elif status == "Failed": errors.append(f"{MS_DOMAIN}/studios/{username}/{space['Name']}") return studios, errors except requests.exceptions.Timeout as e: print(f"Timeout: {e}, retrying...") time.sleep(DELAY) return get_studios(username) except Exception as e: print(f"Requesting error: {e}") return [], [] def activate_studio(repo: str, holding_delay=5): repo_page = f"{MS_DOMAIN}/studios/{repo}" status_api = f"{MS_DOMAIN}/api/v1/studio/{repo}/status" start_expired_api = f"{MS_DOMAIN}/api/v1/studio/{repo}/start_expired" try: response = requests.put(start_expired_api, headers=HEADER, timeout=TIMEOUT) response.raise_for_status() while ( requests.get(status_api, headers=HEADER, timeout=TIMEOUT).json()["Data"][ "Status" ] != "Running" ): requests.get(repo_page, headers=HEADER, timeout=TIMEOUT) time.sleep(holding_delay) except requests.exceptions.Timeout as e: print(f"Failed to activate {repo}: {e}, retrying...") activate_studio(repo) except Exception as e: print(e) def trigger(users=USERS): spaces, studios, failures = [], [], [] usernames = users.split(";") for user in tqdm(usernames, desc="Collecting spaces"): username = user.strip() if username: sleeps, errors = get_spaces(username) spaces += sleeps failures += errors time.sleep(DELAY) for space in tqdm(spaces, desc="Activating spaces"): activate_space(space) time.sleep(DELAY) for user in tqdm(usernames, desc="Collecting studios"): username = user.strip() if username: sleeps, errors = get_studios(username) studios += sleeps failures += errors time.sleep(DELAY) for studio in tqdm(studios, desc="Activating studios"): threading.Thread(target=activate_studio, args=(studio,), daemon=True).start() time.sleep(DELAY) print("\n".join(spaces + studios) + "\nActivation complete!") content = "" for failure in failures: errepo: str = failure errepo = errepo.replace(HF_DOMAIN, "").replace(MS_DOMAIN, "") content += f"
{errepo[1:]}
" if content: send_email(content)