|
import gradio as gr |
|
from openai import OpenAI |
|
import requests |
|
import json |
|
from typing import List, Dict, Optional, Tuple |
|
import random |
|
import time |
|
from datetime import datetime |
|
|
|
class GifChatBot: |
|
def __init__(self): |
|
"""Initialize the chatbot with necessary configurations""" |
|
self.openai_client = None |
|
self.giphy_key = None |
|
self.chat_history = [] |
|
self.is_initialized = False |
|
self.session = requests.Session() |
|
|
|
|
|
self.MINIMUM_SIZE_KB = 500 |
|
self.MAXIMUM_SIZE_MB = 5 |
|
self.MAX_SEARCH_ATTEMPTS = 10 |
|
self.RESULTS_PER_PAGE = 50 |
|
|
|
|
|
self.session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
|
|
def setup_keys(self, openai_key: str, giphy_key: str) -> str: |
|
"""Initialize API clients with user's keys""" |
|
try: |
|
self.openai_client = OpenAI(api_key=openai_key) |
|
self.giphy_key = giphy_key |
|
self._test_giphy_key() |
|
self._test_openai_key() |
|
self.is_initialized = True |
|
return "β
Setup successful! Let's chat!" |
|
except Exception as error: |
|
self.is_initialized = False |
|
return f"β Error setting up: {str(error)}" |
|
|
|
def _test_giphy_key(self): |
|
"""Test if GIPHY key is valid""" |
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/trending", |
|
params={"api_key": self.giphy_key, "limit": 1} |
|
) |
|
if response.status_code != 200: |
|
raise Exception("Invalid GIPHY API key") |
|
|
|
def _test_openai_key(self): |
|
"""Test if OpenAI key is valid""" |
|
try: |
|
self.openai_client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[{"role": "user", "content": "test"}], |
|
max_tokens=5 |
|
) |
|
except Exception: |
|
raise Exception("Invalid OpenAI API key") |
|
|
|
def _verify_gif_url(self, url: str) -> bool: |
|
"""Verify GIF URL is accessible""" |
|
try: |
|
response = self.session.head(url, timeout=3) |
|
return response.status_code == 200 |
|
except: |
|
return False |
|
|
|
def get_gif(self, search_query: str) -> Optional[str]: |
|
"""Persistently search for a GIF above 500KB""" |
|
attempt = 0 |
|
offset = 0 |
|
|
|
print(f"\nSearching for GIF: {search_query}") |
|
|
|
while attempt < self.MAX_SEARCH_ATTEMPTS: |
|
try: |
|
params = { |
|
'api_key': self.giphy_key, |
|
'q': search_query, |
|
'limit': self.RESULTS_PER_PAGE, |
|
'offset': offset, |
|
'rating': 'pg-13' |
|
} |
|
|
|
print(f"Attempt {attempt + 1} with offset {offset}") |
|
|
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/search", |
|
params=params, |
|
timeout=5 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if not data.get("data"): |
|
print("No GIFs found, modifying search...") |
|
|
|
if "reaction" not in search_query.lower(): |
|
search_query += " reaction" |
|
offset = 0 |
|
attempt += 1 |
|
continue |
|
break |
|
|
|
|
|
valid_gifs = [] |
|
for gif in data["data"]: |
|
try: |
|
size_str = gif["images"]["original"]["size"] |
|
if size_str and size_str.isdigit(): |
|
size_bytes = int(size_str) |
|
size_kb = size_bytes / 1024 |
|
if size_kb >= self.MINIMUM_SIZE_KB: |
|
valid_gifs.append((size_kb, gif)) |
|
print(f"Found valid GIF: {size_kb:.2f}KB") |
|
except (KeyError, ValueError) as e: |
|
continue |
|
|
|
|
|
valid_gifs.sort(reverse=True, key=lambda x: x[0]) |
|
|
|
if valid_gifs: |
|
|
|
top_gifs = valid_gifs[:5] |
|
size_kb, chosen_gif = random.choice(top_gifs) |
|
url = chosen_gif["images"]["original"]["url"] |
|
print(f"Selected GIF URL: {url}") |
|
print(f"Selected GIF size: {size_kb:.2f}KB") |
|
|
|
if self._verify_gif_url(url): |
|
return url |
|
print("GIF URL verification failed, continuing search...") |
|
|
|
|
|
offset += self.RESULTS_PER_PAGE |
|
attempt += 1 |
|
continue |
|
|
|
attempt += 1 |
|
|
|
except Exception as error: |
|
print(f"Error in attempt {attempt + 1}: {error}") |
|
attempt += 1 |
|
|
|
|
|
print("No suitable GIFs found in search, trying trending...") |
|
return self._get_trending_gif() |
|
|
|
def _get_trending_gif(self) -> Optional[str]: |
|
"""Get a trending GIF ensuring 500KB minimum size""" |
|
attempt = 0 |
|
|
|
while attempt < 3: |
|
try: |
|
params = { |
|
'api_key': self.giphy_key, |
|
'limit': self.RESULTS_PER_PAGE, |
|
'rating': 'pg-13' |
|
} |
|
|
|
print(f"Fetching trending GIFs (attempt {attempt + 1})...") |
|
|
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/trending", |
|
params=params, |
|
timeout=5 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if data.get("data"): |
|
|
|
valid_gifs = [] |
|
for gif in data["data"]: |
|
try: |
|
size_str = gif["images"]["original"]["size"] |
|
if size_str and size_str.isdigit(): |
|
size_bytes = int(size_str) |
|
size_kb = size_bytes / 1024 |
|
if size_kb >= self.MINIMUM_SIZE_KB: |
|
valid_gifs.append((size_kb, gif)) |
|
print(f"Found valid trending GIF: {size_kb:.2f}KB") |
|
except (KeyError, ValueError): |
|
continue |
|
|
|
|
|
valid_gifs.sort(reverse=True, key=lambda x: x[0]) |
|
|
|
if valid_gifs: |
|
|
|
top_gifs = valid_gifs[:5] |
|
size_kb, chosen_gif = random.choice(top_gifs) |
|
url = chosen_gif["images"]["original"]["url"] |
|
if self._verify_gif_url(url): |
|
print(f"Selected trending GIF: {size_kb:.2f}KB") |
|
return url |
|
|
|
attempt += 1 |
|
|
|
except Exception as error: |
|
print(f"Trending GIF error: {error}") |
|
attempt += 1 |
|
|
|
print("Failed to find any suitable GIFs") |
|
return None |
|
|
|
def reset_chat(self) -> Tuple[List[Dict[str, str]], str]: |
|
"""Reset the chat history""" |
|
self.chat_history = [] |
|
return [], "" |
|
|
|
def format_message(self, role: str, content: str) -> Dict[str, str]: |
|
"""Format message in the new Gradio chat format""" |
|
return {"role": role, "content": content} |
|
|
|
def chat(self, message: str, history: List[Dict[str, str]]) -> Tuple[str, List[Dict[str, str]], str]: |
|
"""Main chat function with natural GIF integration""" |
|
if not self.is_initialized: |
|
return message, history, "Please set up your API keys first!" |
|
|
|
if not message.strip(): |
|
return message, history, "" |
|
|
|
try: |
|
|
|
system_message = """You are a supportive, empathetic friend who uses GIFs naturally in conversation. |
|
IMPORTANT: All GIFs must be high file size (>500KB). If a suitable GIF isn't found, the system will |
|
keep searching until it finds one. and if still not found, the system will say sorry I did not find anything below 500kb. Keep search terms simple and universal: |
|
When sending an image, say "here is a gif that is 500kb or more in size" |
|
Examples: |
|
- User feeling hungry -> [GIF: hungry] |
|
- User feeling sad -> [GIF: comforting hug] |
|
- User celebrating -> [GIF: celebration] |
|
- User confused -> [GIF: confused] |
|
|
|
Keep your responses: |
|
1. Empathetic and natural |
|
2. Context-aware |
|
3. Use GIFs that match the emotion |
|
4. Simple search terms for better matches |
|
|
|
The system will automatically ensure all GIFs meet quality requirements.""" |
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
for chat in history: |
|
messages.append({"role": chat["role"], "content": chat["content"]}) |
|
messages.append({"role": "user", "content": message}) |
|
|
|
response = self.openai_client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages, |
|
temperature=0.9, |
|
max_tokens=3000 |
|
) |
|
|
|
ai_message = response.choices[0].message.content |
|
final_response = "" |
|
|
|
parts = ai_message.split("[GIF:") |
|
final_response += parts[0] |
|
|
|
for part in parts[1:]: |
|
gif_desc_end = part.find("]") |
|
if gif_desc_end != -1: |
|
gif_desc = part[:gif_desc_end].strip() |
|
print(f"\nProcessing GIF request: {gif_desc}") |
|
gif_url = None |
|
|
|
|
|
retry_count = 0 |
|
while not gif_url and retry_count < 3: |
|
gif_url = self.get_gif(gif_desc) |
|
if not gif_url: |
|
print(f"Retrying GIF search (attempt {retry_count + 1})...") |
|
retry_count += 1 |
|
|
|
if gif_url: |
|
final_response += f"\n\n" |
|
print(f"Successfully added GIF: {gif_url}") |
|
|
|
final_response += part[gif_desc_end + 1:] |
|
|
|
history.append(self.format_message("user", message)) |
|
history.append(self.format_message("assistant", final_response)) |
|
return "", history, "" |
|
|
|
except Exception as error: |
|
error_message = f"Oops! Something went wrong: {str(error)}" |
|
print(f"Chat error: {error}") |
|
return message, history, error_message |
|
|
|
def create_interface(): |
|
"""Create the Gradio interface""" |
|
bot = GifChatBot() |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as interface: |
|
gr.Markdown(""" |
|
# π Friendly Chat Bot with GIFs |
|
Chat with an empathetic AI friend who expresses themselves through GIFs! |
|
Enter your API keys below to start. |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
openai_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
placeholder="sk-...", |
|
type="password", |
|
scale=2 |
|
) |
|
with gr.Column(scale=1): |
|
giphy_key = gr.Textbox( |
|
label="GIPHY API Key", |
|
placeholder="Enter your GIPHY API key", |
|
type="password", |
|
scale=2 |
|
) |
|
|
|
setup_button = gr.Button("Set up Keys", variant="primary") |
|
setup_status = gr.Textbox(label="Setup Status") |
|
|
|
chatbot = gr.Chatbot( |
|
label="Chat", |
|
bubble_full_width=False, |
|
height=450, |
|
type="messages" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
message_box = gr.Textbox( |
|
label="Type your message", |
|
placeholder="Say something...", |
|
show_label=False, |
|
container=False |
|
) |
|
with gr.Column(scale=1): |
|
clear_button = gr.Button("Clear Chat", variant="secondary") |
|
|
|
error_box = gr.Textbox(label="Error Messages", visible=True) |
|
|
|
|
|
setup_button.click( |
|
bot.setup_keys, |
|
inputs=[openai_key, giphy_key], |
|
outputs=setup_status |
|
) |
|
|
|
message_box.submit( |
|
bot.chat, |
|
inputs=[message_box, chatbot], |
|
outputs=[message_box, chatbot, error_box] |
|
) |
|
|
|
clear_button.click( |
|
bot.reset_chat, |
|
outputs=[chatbot, error_box] |
|
) |
|
|
|
gr.Markdown(""" |
|
### Tips: |
|
- π€ Share how you're feeling - the AI responds empathetically |
|
- π The conversation is context-aware |
|
- π― GIFs are chosen to match the emotion |
|
- π Use 'Clear Chat' to start fresh |
|
|
|
Note: All GIFs are strictly validated to be above 500KB for quality assurance. |
|
""") |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |