from fastapi import FastAPI, Request, Response import os import requests import json import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Configurations SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL") MODEL_CATALOG_WEBHOOK = os.environ.get("MODEL_CATALOG_WEBHOOK") MODEL_CATALOG_WEBHOOK_SECRET = os.environ.get("MODEL_CATALOG_WEBHOOK_SECRET") SIMSHIP_WEBHOOK = os.environ.get("SIMSHIP_WEBHOOK") SIMSHIP_WEBHOOK_SECRET = os.environ.get("SIMSHIP_WEBHOOK_SECRET") def send_slack_message(message: str): """Send a message to Slack using webhook URL""" if not SLACK_WEBHOOK_URL: logger.warning(f"No Slack webhook URL configured. Message: {message}") return payload = {"text": message} try: response = requests.post(SLACK_WEBHOOK_URL, json=payload) response.raise_for_status() logger.info(f"Slack message sent successfully: {message}") except requests.exceptions.RequestException as e: logger.error(f"Failed to send Slack message: {e}") def process_model_catalog_webhook(data: dict): """ Process model catalog webhook """ event = data.get("event", {}) repo = data.get("repo", {}) movedTo = data.get("movedTo", {}) # repo name changed if ( repo.get("type") == "model" and event.get("scope") == "repo" and event.get("action") == "move" ): message = ( "📚 Model Catalog Sync Alert 📚\n\n" f"Model in the catalog renamed 🔄 \n" f"{repo.get('name', '')} → https://hf.co/{movedTo.get('name', '')}" ) send_slack_message(message) # repo deleted elif ( repo.get("type") == "model" and event.get("scope") == "repo" and event.get("action") == "delete" ): message = ( "📚 Model Catalog Sync Alert 📚\n\n" f"Model in the catalog deleted 🗑️\n" f"https://hf.co/{repo.get('name', '')}" ) send_slack_message(message) # other events else: pass def process_simship_webhook(data: dict): """ Process simship webhook """ event = data.get("event", {}) repo = data.get("repo", {}) updatedConfig = data.get("updatedConfig", {}) # repo creation if ( repo.get("type") == "model" and event.get("scope") == "repo" and event.get("action") == "create" ): message = ( "🛥️ SimShip Provider Alert 🛥️\n\n" f"SimShip Model created 🆕\n" f"https://hf.co/{repo.get('name', '')}" ) send_slack_message(message) # repo visibility update elif ( repo.get("type") == "model" and event.get("scope") == "repo.config" and event.get("action") == "update" and updatedConfig.get("private") is False ): message = ( "🛥️ SimShip Provider Alert 🛥️\n\n" f"SimShip Model made public 🆕\n" f"https://hf.co/{repo.get('name', '')}" ) send_slack_message(message) # other events else: pass app = FastAPI() @app.post("/webhook/model-catalog") async def model_catalog_webhook(request: Request): logger.info(f"Received model catalog webhook request from {request.client.host}") if request.headers.get("X-Webhook-Secret") != MODEL_CATALOG_WEBHOOK_SECRET: logger.warning("Invalid webhook secret received for model catalog") return Response("Invalid secret", status_code=401) data = await request.json() logger.info(f"Model catalog webhook payload: {json.dumps(data, indent=2)}") webhook_id = data.get("webhook", {}).get("id", None) if webhook_id == MODEL_CATALOG_WEBHOOK: logger.info("Processing model catalog webhook") process_model_catalog_webhook(data) else: logger.warning("Invalid webhook ID received for model catalog") return Response("Invalid webhook ID", status_code=401) return Response("Model catalog webhook notification received and processed!", status_code=200) @app.post("/webhook/simship") async def simship_webhook(request: Request): logger.info(f"Received simship webhook request from {request.client.host}") if request.headers.get("X-Webhook-Secret") != SIMSHIP_WEBHOOK_SECRET: logger.warning("Invalid webhook secret received for simship") return Response("Invalid secret", status_code=401) data = await request.json() logger.info(f"Simship webhook payload: {json.dumps(data, indent=2)}") webhook_id = data.get("webhook", {}).get("id", None) if webhook_id == SIMSHIP_WEBHOOK: logger.info("Processing simship webhook") process_simship_webhook(data) else: logger.warning("Invalid webhook ID received for simship") return Response("Invalid webhook ID", status_code=401) return Response("Simship webhook notification received and processed!", status_code=200)