import os import logging import asyncio import nest_asyncio import requests from hugchat import hugchat from hugchat.login import Login # Import Telegram bot components from telegram import Update, Bot from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext # ------------------------- # Load environment variables # ------------------------- # Expected environment variables: # TELEGRAM_BOT_TOKEN - Telegram bot token # WEBHOOK_DOMAIN - your-space.hf.space # HF_EMAIL and HF_PASSWORD TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") if not TELEGRAM_BOT_TOKEN: raise ValueError("Missing TELEGRAM_BOT_TOKEN environment variable.") WEBHOOK_DOMAIN = os.getenv("WEBHOOK_DOMAIN") if not WEBHOOK_DOMAIN: raise ValueError("Missing WEBHOOK_DOMAIN environment variable.") HF_EMAIL = os.getenv("HF_EMAIL") if not HF_EMAIL: raise ValueError("Missing HF_EMAIL environment variable.") HF_PASSWORD = os.getenv("HF_PASSWORD") if not HF_PASSWORD: raise ValueError("Missing HF_PASSWORD environment variable.") # ------------------------- # Configure logging # ------------------------- logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO) logger = logging.getLogger(__name__) # ------------------------- # Set up Hugging Chat integration # ------------------------- # Logs into Hugging Face or uses a fallback to get cookies. try: logger.info("Logging into Hugging Face using provided credentials.") sign = Login(HF_EMAIL, HF_PASSWORD) cookies = sign.login() print("cookies: ", cookies) except Exception as e: logger.error(f"Error in Logging-into Hugging Chat: {e}") logger.info("Using fallback method to get cookies.") cookies = requests.get("https://huggingface.co/chat/").cookies # Create a ChatBot instance from the hugchat library. chatbot = hugchat.ChatBot(cookies=cookies.get_dict()) # ------------------------- # Helper Functions # ------------------------- def detect_language(user_input: str) -> str: """ Detect language using langid. Returns 'hebrew' if Hebrew, 'english' if English, otherwise 'unsupported'. """ try: import langid lang, _ = langid.classify(user_input) if lang == "he": return "hebrew" elif lang == "en": return "english" else: return "unsupported" except Exception as e: logger.error(f"Language detection error: {e}") return "unsupported" def generate_response_sync(message: str) -> str: """ Generate a response using Hugging Chat in a synchronous manner. This function is blocking; we'll run it in an executor. """ language = detect_language(message) if language == "hebrew": prompt = "תענה בקצרה אבל תשתף את תהליך קבלת ההחלטות שלך, " + message elif language == "english": prompt = "keep it short but tell your decision making process, " + message else: return "Sorry, I only support Hebrew and English." response_queue = "" full_response = "" try: # The huggingchat.ChatBot.chat() method returns a generator (streaming tokens) for resp in chatbot.chat(prompt, _stream_yield_all=True): if resp and "token" in resp: response_queue += resp["token"] # Flush the response if it's long (optional, here we accumulate) full_response = response_queue return full_response except Exception as e: logger.error(f"Error generating response via Hugging Chat: {e}") return "Error: Could not generate response." async def generate_response(message: str) -> str: """ Asynchronous wrapper around generate_response_sync using run_in_executor. """ loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, generate_response_sync, message) return response # ------------------------- # Telegram Bot Handlers # ------------------------- async def start(update: Update, context: CallbackContext): """ Handler for the /start command. """ await update.message.reply_text("Hello! Tell me your decision-making issue, and I'll try to help.") logger.info("Received /start command.") async def handle_message(update: Update, context: CallbackContext): """ Handler for incoming text messages. It generates a response using the Hugging Chat integration. """ user_text = update.message.text logger.info(f"Received message: {user_text}") response_text = await generate_response(user_text) logger.info(f"Generated response: {response_text}") await update.message.reply_text(response_text) # ------------------------- # Webhook Configuration for Telegram # ------------------------- # Construct the full webhook URL. # We use the bot token as the URL path for uniqueness. WEBHOOK_URL = f"https://{WEBHOOK_DOMAIN}/{TELEGRAM_BOT_TOKEN}" # ------------------------- # Main function to run the Telegram Bot using Webhook mode # ------------------------- async def main(): # Build the Telegram Application. application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # Add handlers. application.add_handler(CommandHandler("start", start)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) logger.info("Starting bot in webhook mode...") print("Starting bot in webhook mode...") # Run the bot using webhook mode. await application.run_webhook( listen="0.0.0.0", # Listen on all interfaces. port=7860, # Port to listen on – must match the exposed port in Dockerfile. url_path=TELEGRAM_BOT_TOKEN, # Use the bot token as the URL path. webhook_url=WEBHOOK_URL # Full webhook URL that Telegram will call. ) # ------------------------- # Run the main function (handling event loop issues) # ------------------------- if __name__ == "__main__": # Apply nest_asyncio to support nested event loops (useful in HF Spaces). nest_asyncio.apply() loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except Exception as e: logger.error(f"Error in main loop: {e}") print(f"Error in main loop: {e}")