online_tools_2 / activate.py
admin
upl ksa
751f3b1
raw
history blame
4.74 kB
import json
import time
import requests
import threading
from tqdm import tqdm
from huggingface_hub import HfApi
from smtp import send_email
from config import *
def get_space_status(repo_id: str):
response: list = requests.get(
url=f"{HF_DOMAIN}/api/spaces/semantic-search",
params={"q": repo_id},
headers=HEADER,
timeout=TIMEOUT,
).json()
if not (response and response[0]["id"] == repo_id):
return "SLEEPING" # 口袋罪
return response[0]["runtime"]["stage"]
def get_spaces(username: str):
sleepings, errors = [], []
try:
spaces = HfApi().list_spaces(author=username)
for space in spaces:
status = get_space_status(space.id)
if status == "SLEEPING":
space_id = space.id.replace("/", "-").replace("_", "-").lower()
if space.sdk == "gradio":
sleepings.append(f"https://{space_id}.hf.space")
else:
sleepings.append(f"https://{space_id}.static.hf.space")
elif "ERROR" in status:
errors.append(f"{HF_DOMAIN}/spaces/{space.id}")
return sleepings, errors
except Exception as e:
print(f"An error occurred in the request: {e}")
return [], []
def activate_space(url: str):
try:
response = requests.get(url, headers=HEADER, timeout=TIMEOUT)
response.raise_for_status()
except Exception as e:
print(e)
def get_studios(username: str):
try:
response = requests.put(
f"{MS_DOMAIN}/api/v1/studios/{username}/list",
data=json.dumps(
{
"PageNumber": 1,
"PageSize": 1000,
"Name": "",
"SortBy": "gmt_modified",
"Order": "desc",
}
),
headers=HEADER,
timeout=TIMEOUT,
)
response.raise_for_status()
spaces: list = response.json()["Data"]["Studios"]
if spaces:
studios, errors = [], []
for space in spaces:
status = space["Status"]
if status == "Expired":
studios.append(f"{username}/{space['Name']}")
elif status == "Failed":
errors.append(f"{MS_DOMAIN}/studios/{username}/{space['Name']}")
return studios, errors
except requests.exceptions.Timeout as e:
print(f"Timeout: {e}, retrying...")
time.sleep(DELAY)
return get_studios(username)
except Exception as e:
print(f"Requesting error: {e}")
return [], []
def activate_studio(repo: str, holding_delay=5):
repo_page = f"{MS_DOMAIN}/studios/{repo}"
status_api = f"{MS_DOMAIN}/api/v1/studio/{repo}/status"
start_expired_api = f"{MS_DOMAIN}/api/v1/studio/{repo}/start_expired"
try:
response = requests.put(start_expired_api, headers=HEADER, timeout=TIMEOUT)
response.raise_for_status()
while (
requests.get(status_api, headers=HEADER, timeout=TIMEOUT).json()["Data"][
"Status"
]
!= "Running"
):
requests.get(repo_page, headers=HEADER, timeout=TIMEOUT)
time.sleep(holding_delay)
except requests.exceptions.Timeout as e:
print(f"Failed to activate {repo}: {e}, retrying...")
activate_studio(repo)
except Exception as e:
print(e)
def trigger(users=USERS):
spaces, studios, failures = [], [], []
usernames = users.split(";")
for user in tqdm(usernames, desc="Collecting spaces"):
username = user.strip()
if username:
sleeps, errors = get_spaces(username)
spaces += sleeps
failures += errors
time.sleep(DELAY)
for space in tqdm(spaces, desc="Activating spaces"):
activate_space(space)
time.sleep(DELAY)
for user in tqdm(usernames, desc="Collecting studios"):
username = user.strip()
if username:
sleeps, errors = get_studios(username)
studios += sleeps
failures += errors
time.sleep(DELAY)
for studio in tqdm(studios, desc="Activating studios"):
threading.Thread(target=activate_studio, args=(studio,), daemon=True).start()
time.sleep(DELAY)
print("\n".join(spaces + studios) + "\nActivation complete!")
content = ""
for failure in failures:
errepo: str = failure
errepo = errepo.replace(HF_DOMAIN, "").replace(MS_DOMAIN, "")
content += f"<br><a href='{failure}'>{errepo[1:]}</a><br>"
if content:
send_email(content)