jzou19950715 commited on
Commit
8e8b790
·
verified ·
1 Parent(s): 4ad2745

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +127 -145
app.py CHANGED
@@ -12,12 +12,109 @@ class GifChatBot:
12
  self.chat_history = []
13
  self.is_initialized = False
14
  self.session = requests.Session()
15
- # Set a custom User-Agent to mimic a browser
16
- self.session.headers.update({
17
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
18
- "AppleWebKit/537.36 (KHTML, like Gecko) "
19
- "Chrome/91.0.4472.124 Safari/537.36"
20
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
  def setup_keys(self, openai_key: str, giphy_key: str) -> str:
23
  """Initialize API clients with user's keys"""
@@ -36,8 +133,7 @@ class GifChatBot:
36
  """Test if GIPHY key is valid"""
37
  response = self.session.get(
38
  "https://api.giphy.com/v1/gifs/trending",
39
- params={"api_key": self.giphy_key, "limit": 1},
40
- timeout=5
41
  )
42
  if response.status_code != 200:
43
  raise Exception("Invalid GIPHY API key")
@@ -53,127 +149,6 @@ class GifChatBot:
53
  except Exception:
54
  raise Exception("Invalid OpenAI API key")
55
 
56
- def verify_gif_url(self, gif_url: str) -> bool:
57
- """
58
- Verify if a GIF URL is accessible and not an error page.
59
- It first uses a HEAD request (with redirects allowed) and, if necessary, falls back to a GET.
60
- URLs returning HTML or very small content (e.g. < 10KB) are rejected.
61
- """
62
- try:
63
- # Always attempt verification to filter out error pages.
64
- response = self.session.head(gif_url, timeout=3, allow_redirects=True)
65
- if response.status_code != 200:
66
- response = self.session.get(gif_url, timeout=5, stream=True, allow_redirects=True)
67
- if response.status_code == 200:
68
- content_type = response.headers.get('content-type', '').lower()
69
- # If the content type is HTML, this is likely an error page.
70
- if 'text/html' in content_type:
71
- return False
72
- # Optionally check content length (reject very small files)
73
- content_length = response.headers.get('content-length')
74
- if content_length:
75
- try:
76
- if int(content_length) < 10000:
77
- return False
78
- except Exception:
79
- pass
80
- if 'gif' in content_type or 'image' in content_type:
81
- return True
82
- return False
83
- except Exception:
84
- return False
85
-
86
- def get_gif(self, search_query: str) -> Optional[str]:
87
- """Search GIPHY with validation for working GIFs"""
88
- try:
89
- params = {
90
- 'api_key': self.giphy_key,
91
- 'q': search_query,
92
- 'limit': 25,
93
- 'rating': 'pg-13',
94
- 'bundle': 'messaging_non_clips'
95
- }
96
-
97
- response = self.session.get(
98
- "https://api.giphy.com/v1/gifs/search",
99
- params=params,
100
- timeout=5
101
- )
102
-
103
- if response.status_code == 200:
104
- data = response.json()
105
- if data.get("data"):
106
- gifs = list(data["data"])
107
- random.shuffle(gifs)
108
-
109
- for gif in gifs:
110
- if not self._is_good_quality_gif(gif):
111
- continue
112
-
113
- gif_url = gif["images"]["original"]["url"]
114
- if self.verify_gif_url(gif_url):
115
- return gif_url
116
-
117
- # Fallback: try a trending GIF if none in search pass verification.
118
- return self._get_trending_gif()
119
- return None
120
-
121
- except Exception as error:
122
- return None
123
-
124
- def _get_trending_gif(self) -> Optional[str]:
125
- """Get a verified trending GIF as fallback"""
126
- try:
127
- response = self.session.get(
128
- "https://api.giphy.com/v1/gifs/trending",
129
- params={
130
- 'api_key': self.giphy_key,
131
- 'limit': 25,
132
- 'rating': 'pg-13',
133
- 'bundle': 'messaging_non_clips'
134
- },
135
- timeout=5
136
- )
137
-
138
- if response.status_code == 200:
139
- data = response.json()
140
- if data.get("data"):
141
- gifs = list(data["data"])
142
- random.shuffle(gifs)
143
-
144
- for gif in gifs:
145
- gif_url = gif["images"]["original"]["url"]
146
- if self.verify_gif_url(gif_url):
147
- return gif_url
148
-
149
- except Exception as error:
150
- pass
151
- return None
152
-
153
- def _is_good_quality_gif(self, gif: Dict) -> bool:
154
- """Check if a GIF meets quality criteria: number of frames, maximum size, and minimum size (500 KB)"""
155
- try:
156
- original = gif.get("images", {}).get("original", {})
157
- # Reject if too many frames (more than 50)
158
- if int(original.get("frames", 50)) > 50:
159
- return False
160
-
161
- # Reject if file size is above 2MB
162
- size = int(original.get("size", 0))
163
- if size > 2000000:
164
- return False
165
-
166
- # New check: Accept only GIFs above 500 KB
167
- if size < 500000:
168
- return False
169
-
170
- if not original.get("url"):
171
- return False
172
-
173
- return True
174
- except:
175
- return False
176
-
177
  def reset_chat(self) -> Tuple[List[Dict[str, str]], str]:
178
  """Reset the chat history"""
179
  self.chat_history = []
@@ -192,26 +167,30 @@ class GifChatBot:
192
  return message, history, ""
193
 
194
  try:
195
- system_message = (
196
- "You are a supportive, empathetic friend who uses GIFs naturally in conversation. \n"
197
- "When using GIFs, keep search terms simple and contextual:\n\n"
198
- "Examples:\n"
199
- "- User feeling hungry -> [GIF: hungry]\n"
200
- "- User feeling sad -> [GIF: comforting hug]\n"
201
- "- User celebrating -> [GIF: celebration]\n"
202
- "- User confused -> [GIF: confused]\n\n"
203
- "Keep your responses:\n"
204
- "1. Empathetic and natural\n"
205
- "2. Context-aware\n"
206
- "3. Use GIFs that match the emotion\n\n"
207
- "Use 0-1 GIFs per message unless the moment really calls for more."
208
- )
 
 
209
 
 
210
  messages = [{"role": "system", "content": system_message}]
211
  for chat in history:
212
  messages.append({"role": chat["role"], "content": chat["content"]})
213
  messages.append({"role": "user", "content": message})
214
 
 
215
  response = self.openai_client.chat.completions.create(
216
  model="gpt-4o-mini",
217
  messages=messages,
@@ -219,6 +198,7 @@ class GifChatBot:
219
  max_tokens=150
220
  )
221
 
 
222
  ai_message = response.choices[0].message.content
223
  final_response = ""
224
 
@@ -229,9 +209,11 @@ class GifChatBot:
229
  gif_desc_end = part.find("]")
230
  if gif_desc_end != -1:
231
  gif_desc = part[:gif_desc_end].strip()
 
232
  gif_url = self.get_gif(gif_desc)
233
  if gif_url:
234
  final_response += f"\n![GIF]({gif_url})\n"
 
235
  final_response += part[gif_desc_end + 1:]
236
 
237
  history.append(self.format_message("user", message))
@@ -276,7 +258,7 @@ def create_interface():
276
  label="Chat",
277
  bubble_full_width=False,
278
  height=450,
279
- type="messages" # Use new message format
280
  )
281
 
282
  with gr.Row():
@@ -322,4 +304,4 @@ def create_interface():
322
 
323
  if __name__ == "__main__":
324
  demo = create_interface()
325
- demo.launch()
 
12
  self.chat_history = []
13
  self.is_initialized = False
14
  self.session = requests.Session()
15
+
16
+ # Define size constraints
17
+ self.MINIMUM_FILE_SIZE = 500 * 1024 # 500KB in bytes
18
+ self.MAXIMUM_FILE_SIZE = 5 * 1024 * 1024 # 5MB in bytes
19
+ self.MAX_RETRIES = 10 # Maximum number of retries for finding a good GIF
20
+
21
+ def verify_gif_size(self, gif_url: str) -> bool:
22
+ """Verify if GIF meets size requirements"""
23
+ try:
24
+ # Do HEAD request to get content length
25
+ response = self.session.head(gif_url, timeout=3)
26
+ if response.status_code != 200:
27
+ return False
28
+
29
+ # Get content length
30
+ content_length = response.headers.get('content-length')
31
+ if not content_length:
32
+ return False
33
+
34
+ file_size = int(content_length)
35
+ return self.MINIMUM_FILE_SIZE <= file_size <= self.MAXIMUM_FILE_SIZE
36
+
37
+ except Exception as error:
38
+ print(f"Size verification error: {error}")
39
+ return False
40
+
41
+ def get_gif(self, search_query: str) -> Optional[str]:
42
+ """Get a GIF meeting size requirements with retries"""
43
+ for attempt in range(self.MAX_RETRIES):
44
+ try:
45
+ # Calculate offset based on attempt number to get different results
46
+ offset = attempt * 10
47
+
48
+ params = {
49
+ 'api_key': self.giphy_key,
50
+ 'q': search_query,
51
+ 'limit': 10,
52
+ 'offset': offset,
53
+ 'rating': 'pg-13'
54
+ }
55
+
56
+ response = self.session.get(
57
+ "https://api.giphy.com/v1/gifs/search",
58
+ params=params,
59
+ timeout=5
60
+ )
61
+
62
+ if response.status_code == 200:
63
+ data = response.json()
64
+ if data["data"]:
65
+ # Shuffle results for variety
66
+ gifs = list(data["data"])
67
+ random.shuffle(gifs)
68
+
69
+ # Try each GIF until we find one meeting size requirements
70
+ for gif in gifs:
71
+ gif_url = gif["images"]["original"]["url"]
72
+ if self.verify_gif_size(gif_url):
73
+ print(f"Found valid GIF on attempt {attempt + 1}")
74
+ return gif_url
75
+
76
+ print(f"No valid GIFs found in attempt {attempt + 1}, retrying...")
77
+ else:
78
+ print("No GIFs found in search")
79
+ break
80
+
81
+ except Exception as error:
82
+ print(f"Error in attempt {attempt + 1}: {error}")
83
+ continue
84
+
85
+ # If we get here, try trending as last resort
86
+ return self._get_trending_gif()
87
+
88
+ def _get_trending_gif(self) -> Optional[str]:
89
+ """Get a trending GIF meeting size requirements"""
90
+ try:
91
+ params = {
92
+ 'api_key': self.giphy_key,
93
+ 'limit': 25,
94
+ 'rating': 'pg-13'
95
+ }
96
+
97
+ response = self.session.get(
98
+ "https://api.giphy.com/v1/gifs/trending",
99
+ params=params,
100
+ timeout=5
101
+ )
102
+
103
+ if response.status_code == 200:
104
+ data = response.json()
105
+ if data["data"]:
106
+ gifs = list(data["data"])
107
+ random.shuffle(gifs)
108
+
109
+ for gif in gifs:
110
+ gif_url = gif["images"]["original"]["url"]
111
+ if self.verify_gif_size(gif_url):
112
+ print("Found valid trending GIF")
113
+ return gif_url
114
+
115
+ except Exception as error:
116
+ print(f"Trending GIF error: {error}")
117
+ return None
118
 
119
  def setup_keys(self, openai_key: str, giphy_key: str) -> str:
120
  """Initialize API clients with user's keys"""
 
133
  """Test if GIPHY key is valid"""
134
  response = self.session.get(
135
  "https://api.giphy.com/v1/gifs/trending",
136
+ params={"api_key": self.giphy_key, "limit": 1}
 
137
  )
138
  if response.status_code != 200:
139
  raise Exception("Invalid GIPHY API key")
 
149
  except Exception:
150
  raise Exception("Invalid OpenAI API key")
151
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  def reset_chat(self) -> Tuple[List[Dict[str, str]], str]:
153
  """Reset the chat history"""
154
  self.chat_history = []
 
167
  return message, history, ""
168
 
169
  try:
170
+ # System message emphasizing natural GIF usage
171
+ system_message = """You are a supportive, empathetic friend who uses GIFs naturally in conversation.
172
+ When using GIFs, keep search terms simple and contextual:
173
+
174
+ Examples:
175
+ - User feeling hungry -> [GIF: hungry]
176
+ - User feeling sad -> [GIF: comforting hug]
177
+ - User celebrating -> [GIF: celebration]
178
+ - User confused -> [GIF: confused]
179
+
180
+ Keep your responses:
181
+ 1. Empathetic and natural
182
+ 2. Context-aware (reference previous messages)
183
+ 3. Use GIFs that match the emotion
184
+
185
+ Use 0-1 GIFs per message unless the moment really calls for more."""
186
 
187
+ # Prepare conversation history
188
  messages = [{"role": "system", "content": system_message}]
189
  for chat in history:
190
  messages.append({"role": chat["role"], "content": chat["content"]})
191
  messages.append({"role": "user", "content": message})
192
 
193
+ # Get AI response
194
  response = self.openai_client.chat.completions.create(
195
  model="gpt-4o-mini",
196
  messages=messages,
 
198
  max_tokens=150
199
  )
200
 
201
+ # Process response and insert GIFs
202
  ai_message = response.choices[0].message.content
203
  final_response = ""
204
 
 
209
  gif_desc_end = part.find("]")
210
  if gif_desc_end != -1:
211
  gif_desc = part[:gif_desc_end].strip()
212
+ print(f"Searching for GIF: {gif_desc}")
213
  gif_url = self.get_gif(gif_desc)
214
  if gif_url:
215
  final_response += f"\n![GIF]({gif_url})\n"
216
+ print(f"Added GIF: {gif_url}")
217
  final_response += part[gif_desc_end + 1:]
218
 
219
  history.append(self.format_message("user", message))
 
258
  label="Chat",
259
  bubble_full_width=False,
260
  height=450,
261
+ type="messages"
262
  )
263
 
264
  with gr.Row():
 
304
 
305
  if __name__ == "__main__":
306
  demo = create_interface()
307
+ demo.launch()