import hmac from hashlib import sha256 from typing import Dict, Any from fastapi import FastAPI, Header, HTTPException, Request, Response from huggingface_hub import HfApi import os # --- Configuration --- # Load from Space secrets WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET") HF_TOKEN = os.environ.get("HF_TOKEN") RESOURCE_GROUP_ID = os.environ.get("RESOURCE_GROUP_ID") if not all([WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID]): raise ValueError("Missing one or more required environment variables: WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID") app = FastAPI() hf_api = HfApi(token=os.getenv("HF_TOKEN")) # --- Webhook validation --- async def validate_webhook(request: Request, x_webhook_secret: str = Header(...)): if not x_webhook_secret: raise HTTPException(status_code=400, detail="X-Webhook-Secret header is missing") # Make sure the secret matches if x_webhook_secret != WEBHOOK_SECRET: raise HTTPException(status_code=401, detail="Invalid webhook secret") # --- Webhook endpoint --- @app.get("/webhook") def webhook_endpoint(): # path_to_call = f"/api/models/{MODEL_REPO_ID}/resource-group" payload = {"resourceGroupId": RESOURCE_GROUP_ID} response = hf_api.get_user_overview(username="CharlieBoyer", token=HF_TOKEN) print(response) return {"status": "event_not_handled"} @app.get("/") def health_check(): return "Webhook listener is running."