import gradio as gr from openai import OpenAI import requests import json from typing import List, Dict, Optional, Tuple import random import time from datetime import datetime class GifChatBot: def __init__(self): """Initialize the chatbot with necessary configurations""" self.openai_client = None self.giphy_key = None self.chat_history = [] self.is_initialized = False self.session = requests.Session() # Constants for GIF validation self.MINIMUM_SIZE_KB = 500 # Strict 500KB minimum self.MAXIMUM_SIZE_MB = 5 # 5MB maximum self.MAX_SEARCH_ATTEMPTS = 10 self.RESULTS_PER_PAGE = 50 # Configure session for better performance self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) def setup_keys(self, openai_key: str, giphy_key: str) -> str: """Initialize API clients with user's keys""" try: self.openai_client = OpenAI(api_key=openai_key) self.giphy_key = giphy_key self._test_giphy_key() self._test_openai_key() self.is_initialized = True return "✅ Setup successful! Let's chat!" except Exception as error: self.is_initialized = False return f"❌ Error setting up: {str(error)}" def _test_giphy_key(self): """Test if GIPHY key is valid""" response = self.session.get( "https://api.giphy.com/v1/gifs/trending", params={"api_key": self.giphy_key, "limit": 1} ) if response.status_code != 200: raise Exception("Invalid GIPHY API key") def _test_openai_key(self): """Test if OpenAI key is valid""" try: self.openai_client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": "test"}], max_tokens=5 ) except Exception: raise Exception("Invalid OpenAI API key") def _verify_gif_url(self, url: str) -> bool: """Verify GIF URL is accessible""" try: response = self.session.head(url, timeout=3) return response.status_code == 200 except: return False def get_gif(self, search_query: str) -> Optional[str]: """Persistently search for a GIF above 500KB""" attempt = 0 offset = 0 print(f"\nSearching for GIF: {search_query}") while attempt < self.MAX_SEARCH_ATTEMPTS: try: params = { 'api_key': self.giphy_key, 'q': search_query, 'limit': self.RESULTS_PER_PAGE, 'offset': offset, 'rating': 'pg-13' } print(f"Attempt {attempt + 1} with offset {offset}") response = self.session.get( "https://api.giphy.com/v1/gifs/search", params=params, timeout=5 ) if response.status_code == 200: data = response.json() if not data.get("data"): print("No GIFs found, modifying search...") # Try appending "reaction" to search query if "reaction" not in search_query.lower(): search_query += " reaction" offset = 0 attempt += 1 continue break # First sort by size to try largest GIFs first valid_gifs = [] for gif in data["data"]: try: size_str = gif["images"]["original"]["size"] if size_str and size_str.isdigit(): size_bytes = int(size_str) size_kb = size_bytes / 1024 if size_kb >= self.MINIMUM_SIZE_KB: valid_gifs.append((size_kb, gif)) print(f"Found valid GIF: {size_kb:.2f}KB") except (KeyError, ValueError) as e: continue # Sort by size, largest first valid_gifs.sort(reverse=True, key=lambda x: x[0]) if valid_gifs: # Pick one of the top 5 largest GIFs randomly top_gifs = valid_gifs[:5] size_kb, chosen_gif = random.choice(top_gifs) url = chosen_gif["images"]["original"]["url"] print(f"Selected GIF URL: {url}") print(f"Selected GIF size: {size_kb:.2f}KB") # Verify URL is accessible if self._verify_gif_url(url): return url print("GIF URL verification failed, continuing search...") # If we didn't find a valid GIF or URL verification failed offset += self.RESULTS_PER_PAGE attempt += 1 continue attempt += 1 except Exception as error: print(f"Error in attempt {attempt + 1}: {error}") attempt += 1 # If we get here, try trending as last resort print("No suitable GIFs found in search, trying trending...") return self._get_trending_gif() def _get_trending_gif(self) -> Optional[str]: """Get a trending GIF ensuring 500KB minimum size""" attempt = 0 while attempt < 3: # Try up to 3 times with trending try: params = { 'api_key': self.giphy_key, 'limit': self.RESULTS_PER_PAGE, 'rating': 'pg-13' } print(f"Fetching trending GIFs (attempt {attempt + 1})...") response = self.session.get( "https://api.giphy.com/v1/gifs/trending", params=params, timeout=5 ) if response.status_code == 200: data = response.json() if data.get("data"): # Sort by size first valid_gifs = [] for gif in data["data"]: try: size_str = gif["images"]["original"]["size"] if size_str and size_str.isdigit(): size_bytes = int(size_str) size_kb = size_bytes / 1024 if size_kb >= self.MINIMUM_SIZE_KB: valid_gifs.append((size_kb, gif)) print(f"Found valid trending GIF: {size_kb:.2f}KB") except (KeyError, ValueError): continue # Sort by size, largest first valid_gifs.sort(reverse=True, key=lambda x: x[0]) if valid_gifs: # Pick one of the top 5 largest GIFs randomly top_gifs = valid_gifs[:5] size_kb, chosen_gif = random.choice(top_gifs) url = chosen_gif["images"]["original"]["url"] if self._verify_gif_url(url): print(f"Selected trending GIF: {size_kb:.2f}KB") return url attempt += 1 except Exception as error: print(f"Trending GIF error: {error}") attempt += 1 print("Failed to find any suitable GIFs") return None def reset_chat(self) -> Tuple[List[Dict[str, str]], str]: """Reset the chat history""" self.chat_history = [] return [], "" def format_message(self, role: str, content: str) -> Dict[str, str]: """Format message in the new Gradio chat format""" return {"role": role, "content": content} def chat(self, message: str, history: List[Dict[str, str]]) -> Tuple[str, List[Dict[str, str]], str]: """Main chat function with natural GIF integration""" if not self.is_initialized: return message, history, "Please set up your API keys first!" if not message.strip(): return message, history, "" try: # Enhanced system message emphasizing GIF requirements system_message = """You are a supportive, empathetic friend who uses GIFs naturally in conversation. IMPORTANT: All GIFs must be high file size (>500KB). If a suitable GIF isn't found, the system will keep searching until it finds one. and if still not found, the system will say sorry I did not find anything below 500kb. Keep search terms simple and universal: When sending an image, say "here is a gif that is 500kb or more in size" Examples: - User feeling hungry -> [GIF: hungry] - User feeling sad -> [GIF: comforting hug] - User celebrating -> [GIF: celebration] - User confused -> [GIF: confused] Keep your responses: 1. Empathetic and natural 2. Context-aware 3. Use GIFs that match the emotion 4. Simple search terms for better matches The system will automatically ensure all GIFs meet quality requirements.""" messages = [{"role": "system", "content": system_message}] for chat in history: messages.append({"role": chat["role"], "content": chat["content"]}) messages.append({"role": "user", "content": message}) response = self.openai_client.chat.completions.create( model="gpt-4o-mini", messages=messages, temperature=0.9, max_tokens=3000 ) ai_message = response.choices[0].message.content final_response = "" parts = ai_message.split("[GIF:") final_response += parts[0] for part in parts[1:]: gif_desc_end = part.find("]") if gif_desc_end != -1: gif_desc = part[:gif_desc_end].strip() print(f"\nProcessing GIF request: {gif_desc}") gif_url = None # Keep trying until we get a valid GIF retry_count = 0 while not gif_url and retry_count < 3: gif_url = self.get_gif(gif_desc) if not gif_url: print(f"Retrying GIF search (attempt {retry_count + 1})...") retry_count += 1 if gif_url: final_response += f"\n![GIF]({gif_url})\n" print(f"Successfully added GIF: {gif_url}") final_response += part[gif_desc_end + 1:] history.append(self.format_message("user", message)) history.append(self.format_message("assistant", final_response)) return "", history, "" except Exception as error: error_message = f"Oops! Something went wrong: {str(error)}" print(f"Chat error: {error}") return message, history, error_message def create_interface(): """Create the Gradio interface""" bot = GifChatBot() with gr.Blocks(theme=gr.themes.Soft()) as interface: gr.Markdown(""" # 🎭 Friendly Chat Bot with GIFs Chat with an empathetic AI friend who expresses themselves through GIFs! Enter your API keys below to start. """) with gr.Row(): with gr.Column(scale=1): openai_key = gr.Textbox( label="OpenAI API Key", placeholder="sk-...", type="password", scale=2 ) with gr.Column(scale=1): giphy_key = gr.Textbox( label="GIPHY API Key", placeholder="Enter your GIPHY API key", type="password", scale=2 ) setup_button = gr.Button("Set up Keys", variant="primary") setup_status = gr.Textbox(label="Setup Status") chatbot = gr.Chatbot( label="Chat", bubble_full_width=False, height=450, type="messages" ) with gr.Row(): with gr.Column(scale=4): message_box = gr.Textbox( label="Type your message", placeholder="Say something...", show_label=False, container=False ) with gr.Column(scale=1): clear_button = gr.Button("Clear Chat", variant="secondary") error_box = gr.Textbox(label="Error Messages", visible=True) # Set up event handlers setup_button.click( bot.setup_keys, inputs=[openai_key, giphy_key], outputs=setup_status ) message_box.submit( bot.chat, inputs=[message_box, chatbot], outputs=[message_box, chatbot, error_box] ) clear_button.click( bot.reset_chat, outputs=[chatbot, error_box] ) gr.Markdown(""" ### Tips: - 🤝 Share how you're feeling - the AI responds empathetically - 💭 The conversation is context-aware - 🎯 GIFs are chosen to match the emotion - 🔄 Use 'Clear Chat' to start fresh Note: All GIFs are strictly validated to be above 500KB for quality assurance. """) return interface if __name__ == "__main__": demo = create_interface() demo.launch()