jzou19950715 commited on
Commit
0be3027
·
verified ·
1 Parent(s): b5fdd97

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +124 -93
app.py CHANGED
@@ -5,15 +5,23 @@ import json
5
  from typing import List, Dict, Optional, Tuple
6
  import random
7
  import time
 
8
 
9
  class GifChatBot:
10
  def __init__(self):
 
11
  self.openai_client = None
12
  self.giphy_key = None
13
  self.chat_history = []
14
  self.is_initialized = False
15
  self.session = requests.Session()
16
 
 
 
 
 
 
 
17
  # Configure session for better performance
18
  self.session.headers.update({
19
  'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
@@ -24,7 +32,6 @@ class GifChatBot:
24
  try:
25
  self.openai_client = OpenAI(api_key=openai_key)
26
  self.giphy_key = giphy_key
27
- # Test both keys
28
  self._test_giphy_key()
29
  self._test_openai_key()
30
  self.is_initialized = True
@@ -53,28 +60,32 @@ class GifChatBot:
53
  except Exception:
54
  raise Exception("Invalid OpenAI API key")
55
 
 
 
 
 
 
 
 
 
56
  def get_gif(self, search_query: str) -> Optional[str]:
57
- """Search for a GIF with strict size requirements"""
58
- total_attempts = 0
59
- max_total_attempts = 3 # Maximum pagination attempts
60
- minimum_size_bytes = 500000 # 500KB in bytes
61
 
62
  print(f"\nSearching for GIF: {search_query}")
63
 
64
- while total_attempts < max_total_attempts:
65
  try:
66
- # Calculate offset for pagination
67
- offset = total_attempts * 25
68
-
69
  params = {
70
  'api_key': self.giphy_key,
71
  'q': search_query,
72
- 'limit': 25,
73
  'offset': offset,
74
  'rating': 'pg-13'
75
  }
76
 
77
- print(f"Attempt {total_attempts + 1} with offset {offset}")
78
 
79
  response = self.session.get(
80
  "https://api.giphy.com/v1/gifs/search",
@@ -85,97 +96,113 @@ class GifChatBot:
85
  if response.status_code == 200:
86
  data = response.json()
87
  if not data.get("data"):
88
- print("No GIFs found in search")
 
 
 
 
 
 
89
  break
90
 
91
- print(f"Found {len(data['data'])} GIFs, filtering by size...")
92
-
93
- # Filter GIFs by size first
94
  valid_gifs = []
95
  for gif in data["data"]:
96
  try:
97
- # Get size from original image data
98
  size_str = gif["images"]["original"]["size"]
99
  if size_str and size_str.isdigit():
100
  size_bytes = int(size_str)
101
- if size_bytes >= minimum_size_bytes:
102
- valid_gifs.append(gif)
103
- print(f"Found valid GIF: {size_bytes / 1024:.2f}KB")
 
104
  except (KeyError, ValueError) as e:
105
- print(f"Size parsing error for GIF: {e}")
106
  continue
107
 
 
 
 
108
  if valid_gifs:
109
- # Randomly select from valid GIFs
110
- chosen_gif = random.choice(valid_gifs)
 
111
  url = chosen_gif["images"]["original"]["url"]
112
- size = int(chosen_gif["images"]["original"]["size"])
113
  print(f"Selected GIF URL: {url}")
114
- print(f"Selected GIF size: {size / 1024:.2f}KB")
115
- return url
 
 
 
116
 
117
- print(f"No GIFs >= 500KB found in attempt {total_attempts + 1}")
 
 
 
118
 
119
- total_attempts += 1
120
 
121
  except Exception as error:
122
- print(f"Error in attempt {total_attempts + 1}: {error}")
123
- total_attempts += 1
124
 
125
  # If we get here, try trending as last resort
126
  print("No suitable GIFs found in search, trying trending...")
127
  return self._get_trending_gif()
128
 
129
  def _get_trending_gif(self) -> Optional[str]:
130
- """Get a trending GIF with size verification"""
131
- minimum_size_bytes = 500000 # 500KB in bytes
132
 
133
- try:
134
- params = {
135
- 'api_key': self.giphy_key,
136
- 'limit': 25,
137
- 'rating': 'pg-13'
138
- }
139
-
140
- print("Fetching trending GIFs...")
141
-
142
- response = self.session.get(
143
- "https://api.giphy.com/v1/gifs/trending",
144
- params=params,
145
- timeout=5
146
- )
147
-
148
- if response.status_code == 200:
149
- data = response.json()
150
- if data.get("data"):
151
- print(f"Found {len(data['data'])} trending GIFs, filtering by size...")
152
-
153
- # Filter by size first
154
- valid_gifs = []
155
- for gif in data["data"]:
156
- try:
157
- size_str = gif["images"]["original"]["size"]
158
- if size_str and size_str.isdigit():
159
- size_bytes = int(size_str)
160
- if size_bytes >= minimum_size_bytes:
161
- valid_gifs.append(gif)
162
- print(f"Found valid trending GIF: {size_bytes / 1024:.2f}KB")
163
- except (KeyError, ValueError) as e:
164
- print(f"Size parsing error for trending GIF: {e}")
165
- continue
166
-
167
- if valid_gifs:
168
- chosen_gif = random.choice(valid_gifs)
169
- url = chosen_gif["images"]["original"]["url"]
170
- size = int(chosen_gif["images"]["original"]["size"])
171
- print(f"Selected trending GIF URL: {url}")
172
- print(f"Selected trending GIF size: {size / 1024:.2f}KB")
173
- return url
174
-
175
- print("No valid trending GIFs found")
176
 
177
- except Exception as error:
178
- print(f"Trending GIF error: {error}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
 
180
  print("Failed to find any suitable GIFs")
181
  return None
@@ -198,9 +225,10 @@ class GifChatBot:
198
  return message, history, ""
199
 
200
  try:
201
- # System message for the AI
202
- system_message = """You are a supportive, empathetic friend who uses GIFs naturally in conversation.
203
- When using GIFs, keep search terms simple and contextual:
 
204
 
205
  Examples:
206
  - User feeling hungry -> [GIF: hungry]
@@ -210,19 +238,17 @@ class GifChatBot:
210
 
211
  Keep your responses:
212
  1. Empathetic and natural
213
- 2. Context-aware (reference previous messages when relevant)
214
- 3. Use GIFs that match the emotional context
 
215
 
216
- Use 0-1 GIFs per message unless the moment really calls for more.
217
- Keep search terms simple and universal for better GIF matching."""
218
 
219
- # Prepare conversation history
220
  messages = [{"role": "system", "content": system_message}]
221
  for chat in history:
222
  messages.append({"role": chat["role"], "content": chat["content"]})
223
  messages.append({"role": "user", "content": message})
224
 
225
- # Get AI response
226
  response = self.openai_client.chat.completions.create(
227
  model="gpt-4o-mini",
228
  messages=messages,
@@ -230,11 +256,9 @@ class GifChatBot:
230
  max_tokens=150
231
  )
232
 
233
- # Process response and insert GIFs
234
  ai_message = response.choices[0].message.content
235
  final_response = ""
236
 
237
- # Split by GIF markers and process
238
  parts = ai_message.split("[GIF:")
239
  final_response += parts[0]
240
 
@@ -243,15 +267,22 @@ class GifChatBot:
243
  if gif_desc_end != -1:
244
  gif_desc = part[:gif_desc_end].strip()
245
  print(f"\nProcessing GIF request: {gif_desc}")
246
- gif_url = self.get_gif(gif_desc)
 
 
 
 
 
 
 
 
 
247
  if gif_url:
248
  final_response += f"\n![GIF]({gif_url})\n"
249
  print(f"Successfully added GIF: {gif_url}")
250
- else:
251
- print("Failed to find suitable GIF")
252
  final_response += part[gif_desc_end + 1:]
253
 
254
- # Update history with new messages
255
  history.append(self.format_message("user", message))
256
  history.append(self.format_message("assistant", final_response))
257
  return "", history, ""
@@ -295,7 +326,7 @@ def create_interface():
295
  label="Chat",
296
  bubble_full_width=False,
297
  height=450,
298
- type="messages" # Use new message format
299
  )
300
 
301
  with gr.Row():
@@ -336,7 +367,7 @@ def create_interface():
336
  - 🎯 GIFs are chosen to match the emotion
337
  - 🔄 Use 'Clear Chat' to start fresh
338
 
339
- Note: GIFs are carefully validated to ensure quality (minimum 500KB size).
340
  """)
341
 
342
  return interface
 
5
  from typing import List, Dict, Optional, Tuple
6
  import random
7
  import time
8
+ from datetime import datetime
9
 
10
  class GifChatBot:
11
  def __init__(self):
12
+ """Initialize the chatbot with necessary configurations"""
13
  self.openai_client = None
14
  self.giphy_key = None
15
  self.chat_history = []
16
  self.is_initialized = False
17
  self.session = requests.Session()
18
 
19
+ # Constants for GIF validation
20
+ self.MINIMUM_SIZE_KB = 500 # Strict 500KB minimum
21
+ self.MAXIMUM_SIZE_MB = 5 # 5MB maximum
22
+ self.MAX_SEARCH_ATTEMPTS = 5
23
+ self.RESULTS_PER_PAGE = 50
24
+
25
  # Configure session for better performance
26
  self.session.headers.update({
27
  'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
 
32
  try:
33
  self.openai_client = OpenAI(api_key=openai_key)
34
  self.giphy_key = giphy_key
 
35
  self._test_giphy_key()
36
  self._test_openai_key()
37
  self.is_initialized = True
 
60
  except Exception:
61
  raise Exception("Invalid OpenAI API key")
62
 
63
+ def _verify_gif_url(self, url: str) -> bool:
64
+ """Verify GIF URL is accessible"""
65
+ try:
66
+ response = self.session.head(url, timeout=3)
67
+ return response.status_code == 200
68
+ except:
69
+ return False
70
+
71
  def get_gif(self, search_query: str) -> Optional[str]:
72
+ """Persistently search for a GIF above 500KB"""
73
+ attempt = 0
74
+ offset = 0
 
75
 
76
  print(f"\nSearching for GIF: {search_query}")
77
 
78
+ while attempt < self.MAX_SEARCH_ATTEMPTS:
79
  try:
 
 
 
80
  params = {
81
  'api_key': self.giphy_key,
82
  'q': search_query,
83
+ 'limit': self.RESULTS_PER_PAGE,
84
  'offset': offset,
85
  'rating': 'pg-13'
86
  }
87
 
88
+ print(f"Attempt {attempt + 1} with offset {offset}")
89
 
90
  response = self.session.get(
91
  "https://api.giphy.com/v1/gifs/search",
 
96
  if response.status_code == 200:
97
  data = response.json()
98
  if not data.get("data"):
99
+ print("No GIFs found, modifying search...")
100
+ # Try appending "reaction" to search query
101
+ if "reaction" not in search_query.lower():
102
+ search_query += " reaction"
103
+ offset = 0
104
+ attempt += 1
105
+ continue
106
  break
107
 
108
+ # First sort by size to try largest GIFs first
 
 
109
  valid_gifs = []
110
  for gif in data["data"]:
111
  try:
 
112
  size_str = gif["images"]["original"]["size"]
113
  if size_str and size_str.isdigit():
114
  size_bytes = int(size_str)
115
+ size_kb = size_bytes / 1024
116
+ if size_kb >= self.MINIMUM_SIZE_KB:
117
+ valid_gifs.append((size_kb, gif))
118
+ print(f"Found valid GIF: {size_kb:.2f}KB")
119
  except (KeyError, ValueError) as e:
 
120
  continue
121
 
122
+ # Sort by size, largest first
123
+ valid_gifs.sort(reverse=True, key=lambda x: x[0])
124
+
125
  if valid_gifs:
126
+ # Pick one of the top 5 largest GIFs randomly
127
+ top_gifs = valid_gifs[:5]
128
+ size_kb, chosen_gif = random.choice(top_gifs)
129
  url = chosen_gif["images"]["original"]["url"]
 
130
  print(f"Selected GIF URL: {url}")
131
+ print(f"Selected GIF size: {size_kb:.2f}KB")
132
+ # Verify URL is accessible
133
+ if self._verify_gif_url(url):
134
+ return url
135
+ print("GIF URL verification failed, continuing search...")
136
 
137
+ # If we didn't find a valid GIF or URL verification failed
138
+ offset += self.RESULTS_PER_PAGE
139
+ attempt += 1
140
+ continue
141
 
142
+ attempt += 1
143
 
144
  except Exception as error:
145
+ print(f"Error in attempt {attempt + 1}: {error}")
146
+ attempt += 1
147
 
148
  # If we get here, try trending as last resort
149
  print("No suitable GIFs found in search, trying trending...")
150
  return self._get_trending_gif()
151
 
152
  def _get_trending_gif(self) -> Optional[str]:
153
+ """Get a trending GIF ensuring 500KB minimum size"""
154
+ attempt = 0
155
 
156
+ while attempt < 3: # Try up to 3 times with trending
157
+ try:
158
+ params = {
159
+ 'api_key': self.giphy_key,
160
+ 'limit': self.RESULTS_PER_PAGE,
161
+ 'rating': 'pg-13'
162
+ }
163
+
164
+ print(f"Fetching trending GIFs (attempt {attempt + 1})...")
165
+
166
+ response = self.session.get(
167
+ "https://api.giphy.com/v1/gifs/trending",
168
+ params=params,
169
+ timeout=5
170
+ )
171
+
172
+ if response.status_code == 200:
173
+ data = response.json()
174
+ if data.get("data"):
175
+ # Sort by size first
176
+ valid_gifs = []
177
+ for gif in data["data"]:
178
+ try:
179
+ size_str = gif["images"]["original"]["size"]
180
+ if size_str and size_str.isdigit():
181
+ size_bytes = int(size_str)
182
+ size_kb = size_bytes / 1024
183
+ if size_kb >= self.MINIMUM_SIZE_KB:
184
+ valid_gifs.append((size_kb, gif))
185
+ print(f"Found valid trending GIF: {size_kb:.2f}KB")
186
+ except (KeyError, ValueError):
187
+ continue
 
 
 
 
 
 
 
 
 
 
 
188
 
189
+ # Sort by size, largest first
190
+ valid_gifs.sort(reverse=True, key=lambda x: x[0])
191
+
192
+ if valid_gifs:
193
+ # Pick one of the top 5 largest GIFs randomly
194
+ top_gifs = valid_gifs[:5]
195
+ size_kb, chosen_gif = random.choice(top_gifs)
196
+ url = chosen_gif["images"]["original"]["url"]
197
+ if self._verify_gif_url(url):
198
+ print(f"Selected trending GIF: {size_kb:.2f}KB")
199
+ return url
200
+
201
+ attempt += 1
202
+
203
+ except Exception as error:
204
+ print(f"Trending GIF error: {error}")
205
+ attempt += 1
206
 
207
  print("Failed to find any suitable GIFs")
208
  return None
 
225
  return message, history, ""
226
 
227
  try:
228
+ # Enhanced system message emphasizing GIF requirements
229
+ system_message = """You are a supportive, empathetic friend who uses GIFs naturally in conversation.
230
+ IMPORTANT: All GIFs must be high quality (>500KB). If a suitable GIF isn't found, the system will
231
+ keep searching until it finds one. Keep search terms simple and universal:
232
 
233
  Examples:
234
  - User feeling hungry -> [GIF: hungry]
 
238
 
239
  Keep your responses:
240
  1. Empathetic and natural
241
+ 2. Context-aware
242
+ 3. Use GIFs that match the emotion
243
+ 4. Simple search terms for better matches
244
 
245
+ The system will automatically ensure all GIFs meet quality requirements."""
 
246
 
 
247
  messages = [{"role": "system", "content": system_message}]
248
  for chat in history:
249
  messages.append({"role": chat["role"], "content": chat["content"]})
250
  messages.append({"role": "user", "content": message})
251
 
 
252
  response = self.openai_client.chat.completions.create(
253
  model="gpt-4o-mini",
254
  messages=messages,
 
256
  max_tokens=150
257
  )
258
 
 
259
  ai_message = response.choices[0].message.content
260
  final_response = ""
261
 
 
262
  parts = ai_message.split("[GIF:")
263
  final_response += parts[0]
264
 
 
267
  if gif_desc_end != -1:
268
  gif_desc = part[:gif_desc_end].strip()
269
  print(f"\nProcessing GIF request: {gif_desc}")
270
+ gif_url = None
271
+
272
+ # Keep trying until we get a valid GIF
273
+ retry_count = 0
274
+ while not gif_url and retry_count < 3:
275
+ gif_url = self.get_gif(gif_desc)
276
+ if not gif_url:
277
+ print(f"Retrying GIF search (attempt {retry_count + 1})...")
278
+ retry_count += 1
279
+
280
  if gif_url:
281
  final_response += f"\n![GIF]({gif_url})\n"
282
  print(f"Successfully added GIF: {gif_url}")
283
+
 
284
  final_response += part[gif_desc_end + 1:]
285
 
 
286
  history.append(self.format_message("user", message))
287
  history.append(self.format_message("assistant", final_response))
288
  return "", history, ""
 
326
  label="Chat",
327
  bubble_full_width=False,
328
  height=450,
329
+ type="messages"
330
  )
331
 
332
  with gr.Row():
 
367
  - 🎯 GIFs are chosen to match the emotion
368
  - 🔄 Use 'Clear Chat' to start fresh
369
 
370
+ Note: All GIFs are strictly validated to be above 500KB for quality assurance.
371
  """)
372
 
373
  return interface