AC2513 commited on
Commit
89f9a59
·
1 Parent(s): 7eafc96

adjusted tests and model loading

Browse files
Files changed (2) hide show
  1. app.py +34 -15
  2. tests/test_main.py +57 -785
app.py CHANGED
@@ -28,21 +28,37 @@ model_3n_id = os.getenv("MODEL_3N_ID", "google/gemma-3n-E4B-it")
28
  MAX_VIDEO_SIZE = 100 * 1024 * 1024 # 100 MB
29
  MAX_IMAGE_SIZE = 10 * 1024 * 1024 # 10 MB
30
 
31
- input_processor = Gemma3Processor.from_pretrained(model_12_id)
32
-
33
- model_12 = Gemma3ForConditionalGeneration.from_pretrained(
34
- model_12_id,
35
- torch_dtype=torch.bfloat16,
36
- device_map="auto",
37
- attn_implementation="eager",
38
- )
39
-
40
- model_3n = Gemma3nForConditionalGeneration.from_pretrained(
41
- model_3n_id,
42
- torch_dtype=torch.bfloat16,
43
- device_map="auto",
44
- attn_implementation="eager",
45
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
 
48
  def check_file_size(file_path: str) -> bool:
@@ -213,6 +229,9 @@ def run(
213
  repetition_penalty: float,
214
  ) -> Iterator[str]:
215
 
 
 
 
216
  # Define preset system prompts
217
  preset_prompts = {
218
  "General Assistant": "You are a helpful AI assistant capable of analyzing images, videos, and PDF documents. Provide clear, accurate, and helpful responses to user queries.",
 
28
  MAX_VIDEO_SIZE = 100 * 1024 * 1024 # 100 MB
29
  MAX_IMAGE_SIZE = 10 * 1024 * 1024 # 10 MB
30
 
31
+ # Global variables to hold models (loaded lazily)
32
+ input_processor = None
33
+ model_12 = None
34
+ model_3n = None
35
+
36
+ def load_models():
37
+ """Load models lazily when needed."""
38
+ global input_processor, model_12, model_3n
39
+
40
+ # Skip model loading during testing
41
+ if os.getenv("SKIP_MODEL_LOADING") == "true" or "pytest" in os.getenv("_", ""):
42
+ return
43
+
44
+ if input_processor is None:
45
+ input_processor = Gemma3Processor.from_pretrained(model_12_id)
46
+
47
+ if model_12 is None:
48
+ model_12 = Gemma3ForConditionalGeneration.from_pretrained(
49
+ model_12_id,
50
+ torch_dtype=torch.bfloat16,
51
+ device_map="auto",
52
+ attn_implementation="eager",
53
+ )
54
+
55
+ if model_3n is None:
56
+ model_3n = Gemma3nForConditionalGeneration.from_pretrained(
57
+ model_3n_id,
58
+ torch_dtype=torch.bfloat16,
59
+ device_map="auto",
60
+ attn_implementation="eager",
61
+ )
62
 
63
 
64
  def check_file_size(file_path: str) -> bool:
 
229
  repetition_penalty: float,
230
  ) -> Iterator[str]:
231
 
232
+ # Load models only when needed (not during testing)
233
+ load_models()
234
+
235
  # Define preset system prompts
236
  preset_prompts = {
237
  "General Assistant": "You are a helpful AI assistant capable of analyzing images, videos, and PDF documents. Provide clear, accurate, and helpful responses to user queries.",
tests/test_main.py CHANGED
@@ -11,236 +11,52 @@ from app import get_frames, process_video, process_user_input, process_history,
11
  ROOT_DIR = Path(__file__).parent.parent
12
 
13
 
14
- def test_correct_frame_return():
15
- """Test that get_frames returns a list of (Image, float) tuples."""
16
- # Path to a test video file
17
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
18
-
19
- # Ensure the test video exists
20
  assert os.path.exists(video_path)
21
 
22
- # Test with a small number of frames
23
- max_images = 3
24
- frames = get_frames(video_path, max_images)
25
-
26
  assert isinstance(frames, list)
27
  assert all(isinstance(item, tuple) and len(item) == 2 for item in frames)
28
- assert all(
29
- isinstance(img, Image.Image) and isinstance(ts, float) for img, ts in frames
30
- )
31
-
32
 
33
- def test_process_video_structure():
34
- """Test that process_video returns the expected list structure."""
35
 
 
 
36
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
37
- max_images = 2
38
-
39
- result = process_video(video_path, max_images)
40
-
41
- # Should have 2 items (text + image) per frame
42
- assert len(result) == max_images * 2
43
-
44
- # Check structure of items
45
- for i in range(0, len(result), 2):
46
- # Text item
47
- assert result[i]["type"] == "text"
48
- assert result[i]["text"].startswith("Frame ")
49
-
50
- # Image item
51
- assert result[i + 1]["type"] == "image"
52
- assert "url" in result[i + 1]
53
- assert os.path.exists(result[i + 1]["url"])
54
-
55
- # Verify the image file is valid
56
- try:
57
- img = Image.open(result[i + 1]["url"])
58
- img.verify() # Make sure it's a valid image
59
- except Exception as e:
60
- pytest.fail(f"Invalid image file: {e}")
61
-
62
 
63
- def test_process_video_timestamps():
64
- """Test that timestamps in the result are properly formatted."""
65
-
66
- video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
67
- max_images = 3
68
-
69
- result = process_video(video_path, max_images)
70
-
71
- # Extract timestamps from text items
72
- timestamps = []
73
- for i in range(0, len(result), 2):
74
- if result[i]["type"] == "text":
75
- # Extract timestamp from "Frame X.XX:" format
76
- timestamp_text = result[i]["text"].split()[1].rstrip(":")
77
- timestamps.append(float(timestamp_text))
78
-
79
- # Check timestamps are ascending
80
- assert len(timestamps) == max_images
81
- assert all(timestamps[i] <= timestamps[i + 1] for i in range(len(timestamps) - 1))
82
-
83
-
84
- def test_process_video_temp_files():
85
- """Test that temporary files are created and cleaned up properly."""
86
-
87
- video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
88
- max_images = 1
89
-
90
- result = process_video(video_path, max_images)
91
-
92
- # Verify temp file exists
93
- image_path = result[1]["url"]
94
- assert os.path.exists(image_path)
95
- assert image_path.endswith(".png")
96
 
97
 
98
  def test_process_video_invalid_path():
99
  """Test that process_video handles invalid paths appropriately."""
100
-
101
  with pytest.raises(ValueError):
102
  process_video("nonexistent_video.mp4", 3)
103
 
104
- def test_process_user_input_text_only():
105
- """Test processing user input with text only (no files)."""
106
- message = {
107
- "text": "This is a test message",
108
- "files": []
109
- }
110
-
111
- # Add the max_images parameter
112
  result = process_user_input(message, 5)
113
-
114
- # Should return a single text item
115
  assert len(result) == 1
116
  assert result[0]["type"] == "text"
117
- assert result[0]["text"] == "This is a test message"
118
-
119
-
120
- def test_process_user_input_with_video():
121
- """Test processing user input with a video file."""
122
- video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
123
- assert os.path.exists(video_path), f"Test video not found at {video_path}"
124
-
125
- message = {
126
- "text": "Video analysis",
127
- "files": [video_path]
128
- }
129
-
130
- result = process_user_input(message, 4)
131
-
132
- # Should have at least 3 items (text + at least one frame with text and image)
133
- assert len(result) >= 3
134
-
135
- # First item should be the message text
136
- assert result[0]["type"] == "text"
137
- assert result[0]["text"] == "Video analysis"
138
 
139
- # Following items should be frame text and images
140
- assert result[1]["type"] == "text"
141
- assert result[1]["text"].startswith("Frame ")
142
-
143
- assert result[2]["type"] == "image"
144
- assert "url" in result[2]
145
- assert os.path.exists(result[2]["url"])
146
-
147
-
148
- def test_process_user_input_with_images():
149
- """Test processing user input with image files."""
150
- # Create temporary image files for testing
151
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as img1, \
152
- tempfile.NamedTemporaryFile(suffix=".png", delete=False) as img2:
153
-
154
- image_paths = [img1.name, img2.name]
155
-
156
- message = {
157
- "text": "Image analysis",
158
- "files": image_paths
159
- }
160
-
161
- result = process_user_input(message, 5)
162
-
163
- # Should have 3 items (text + 2 images)
164
- assert len(result) == 3
165
-
166
- # First item should be the message text
167
- assert result[0]["type"] == "text"
168
- assert result[0]["text"] == "Image analysis"
169
-
170
- # Following items should be images
171
- assert result[1]["type"] == "image"
172
- assert result[1]["url"] == image_paths[0]
173
-
174
- assert result[2]["type"] == "image"
175
- assert result[2]["url"] == image_paths[1]
176
-
177
- # Clean up temp files
178
- for path in image_paths:
179
- if os.path.exists(path):
180
- os.unlink(path)
181
-
182
-
183
- def test_process_user_input_empty_text():
184
- """Test processing user input with empty text but with files."""
185
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
186
-
187
- message = {
188
- "text": "", # Empty text
189
- "files": [video_path]
190
- }
191
-
192
- # Add max_images parameter
193
- result = process_user_input(message, 3)
194
-
195
- # First item should be empty text
196
- assert result[0]["type"] == "text"
197
- assert result[0]["text"] == ""
198
-
199
- # Rest should be video frames
200
- assert len(result) > 1
201
-
202
-
203
- def test_process_user_input_handles_empty_files_list():
204
- """Test that an empty files list is handled correctly."""
205
- message = {
206
- "text": "No files",
207
- "files": []
208
- }
209
-
210
- # Add max_images parameter
211
- result = process_user_input(message, 3)
212
- assert len(result) == 1
213
- assert result[0]["type"] == "text"
214
- assert result[0]["text"] == "No files"
215
 
216
 
217
- def test_process_user_input_max_images_effect():
218
- """Test that max_images parameter correctly limits the number of frames."""
219
- video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
220
-
221
- message = {
222
- "text": "Video with few frames",
223
- "files": [video_path]
224
- }
225
-
226
- result_few = process_user_input(message, 2)
227
- result_many = process_user_input(message, 5)
228
-
229
- # Count actual frames (each frame has a text and image entry)
230
- frames_few = (len(result_few) - 1) // 2 # -1 for initial text message
231
- frames_many = (len(result_many) - 1) // 2
232
-
233
- # Should respect max_images parameter
234
- assert frames_few <= 2
235
- assert frames_many <= 5
236
- assert frames_few < frames_many
237
-
238
- def test_process_history_basic_functionality():
239
- """Test basic conversation processing and content buffering."""
240
- # Empty history
241
- assert process_history([]) == []
242
-
243
- # Simple conversation
244
  history = [
245
  {"role": "user", "content": "Hello"},
246
  {"role": "assistant", "content": "Hi there!"},
@@ -251,665 +67,121 @@ def test_process_history_basic_functionality():
251
  assert len(result) == 3
252
  assert result[0] == {"role": "user", "content": [{"type": "text", "text": "Hello"}]}
253
  assert result[1] == {"role": "assistant", "content": [{"type": "text", "text": "Hi there!"}]}
254
- assert result[2] == {"role": "user", "content": [{"type": "text", "text": "How are you?"}]}
255
 
256
 
257
- def test_process_history_file_handling():
258
- """Test processing of different file types and content buffering."""
259
- # Create temp image file
260
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as img:
261
- image_path = img.name
262
-
263
- video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
264
 
265
- try:
266
- history = [
267
- {"role": "user", "content": (image_path,)},
268
- {"role": "user", "content": "What's this image?"},
269
- {"role": "user", "content": (video_path,)},
270
- {"role": "assistant", "content": "I see an image and video."},
271
- {"role": "user", "content": "First"},
272
- {"role": "user", "content": "Second"},
273
- {"role": "user", "content": "Third"} # Multiple user messages at end
274
- ]
275
-
276
- result = process_history(history)
277
- assert len(result) == 3
278
-
279
- # First user turn: image + text + video
280
- assert result[0]["role"] == "user"
281
- assert len(result[0]["content"]) == 3
282
- assert result[0]["content"][0] == {"type": "image", "url": image_path}
283
- assert result[0]["content"][1] == {"type": "text", "text": "What's this image?"}
284
- assert result[0]["content"][2] == {"type": "text", "text": "[Video uploaded previously]"}
285
-
286
- # Assistant response
287
- assert result[1]["role"] == "assistant"
288
- assert result[1]["content"] == [{"type": "text", "text": "I see an image and video."}]
289
-
290
- # Final user turn: multiple buffered messages
291
- assert result[2]["role"] == "user"
292
- assert len(result[2]["content"]) == 3
293
- assert result[2]["content"][0] == {"type": "text", "text": "First"}
294
- assert result[2]["content"][1] == {"type": "text", "text": "Second"}
295
- assert result[2]["content"][2] == {"type": "text", "text": "Third"}
296
-
297
- finally:
298
- if os.path.exists(image_path):
299
- os.unlink(image_path)
300
-
301
-
302
- def test_extract_pdf_text_nonexistent_file():
303
- """Test that extract_pdf_text handles non-existent files appropriately."""
304
  with pytest.raises(ValueError, match="File not found"):
305
  extract_pdf_text("nonexistent_file.pdf")
306
-
307
-
308
- def test_extract_pdf_text_with_mock_pdf():
309
- """Test PDF text extraction with a simple PDF file."""
310
- import fitz # PyMuPDF
311
-
312
- # Create a temporary PDF with some text content
313
- with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
314
- pdf_path = temp_pdf.name
315
-
316
- try:
317
- # Create a simple PDF with text
318
- doc = fitz.open() # Create new PDF
319
- page = doc.new_page()
320
-
321
- # Add some text to the page
322
- text_content = "This is a test PDF document.\nIt contains multiple lines of text.\nPage 1 content here."
323
- page.insert_text((50, 100), text_content, fontsize=12)
324
-
325
- # Save the PDF
326
- doc.save(pdf_path)
327
- doc.close()
328
-
329
- # Test the extract_pdf_text function
330
- result = extract_pdf_text(pdf_path)
331
-
332
- # Verify the extracted text contains our content
333
- assert isinstance(result, str)
334
- assert "This is a test PDF document" in result
335
- assert "Page 1:" in result # Should include page number
336
- assert "multiple lines of text" in result
337
-
338
- finally:
339
- # Clean up the temporary PDF file
340
- if os.path.exists(pdf_path):
341
- os.unlink(pdf_path)
342
-
343
-
344
- def test_extract_pdf_text_empty_pdf():
345
- """Test PDF text extraction with an empty PDF (no text content)."""
346
- import fitz
347
-
348
- # Create a temporary empty PDF
349
- with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
350
- pdf_path = temp_pdf.name
351
-
352
- try:
353
- # Create an empty PDF
354
- doc = fitz.open() # Create new PDF
355
- page = doc.new_page() # Add empty page
356
- doc.save(pdf_path)
357
- doc.close()
358
-
359
- # Test the extract_pdf_text function
360
- result = extract_pdf_text(pdf_path)
361
-
362
- # Should return message about no content
363
- assert result == "No text content found in the PDF."
364
-
365
- finally:
366
- # Clean up
367
- if os.path.exists(pdf_path):
368
- os.unlink(pdf_path)
369
-
370
-
371
- def test_extract_pdf_text_multipage():
372
- """Test PDF text extraction with multiple pages."""
373
- import fitz
374
 
375
- # Create a temporary multi-page PDF
376
  with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
377
  pdf_path = temp_pdf.name
378
 
379
  try:
380
- # Create a PDF with multiple pages
381
  doc = fitz.open()
382
-
383
- # Page 1
384
- page1 = doc.new_page()
385
- page1.insert_text((50, 100), "Content from page one.", fontsize=12)
386
-
387
- # Page 2
388
- page2 = doc.new_page()
389
- page2.insert_text((50, 100), "Content from page two.", fontsize=12)
390
-
391
- # Page 3 (empty)
392
- page3 = doc.new_page()
393
-
394
- # Page 4
395
- page4 = doc.new_page()
396
- page4.insert_text((50, 100), "Content from page four.", fontsize=12)
397
-
398
  doc.save(pdf_path)
399
  doc.close()
400
 
401
- # Test the extract_pdf_text function
402
  result = extract_pdf_text(pdf_path)
403
-
404
- # Verify all pages with content are included
405
  assert "Page 1:" in result
406
- assert "Content from page one" in result
407
- assert "Page 2:" in result
408
- assert "Content from page two" in result
409
- assert "Page 4:" in result
410
- assert "Content from page four" in result
411
-
412
- # Page 3 should be excluded (empty)
413
- assert "Page 3:" not in result
414
-
415
- # Check that pages are separated properly
416
- assert "\n\n" in result # Pages should be separated by double newlines
417
 
418
  finally:
419
- # Clean up
420
  if os.path.exists(pdf_path):
421
  os.unlink(pdf_path)
422
 
423
 
424
  def test_process_user_input_with_pdf():
425
- """Test processing user input with a PDF file."""
426
  import fitz
427
 
428
- # Create a temporary PDF for testing
429
  with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
430
  pdf_path = temp_pdf.name
431
 
432
  try:
433
- # Create a simple PDF
434
  doc = fitz.open()
435
  page = doc.new_page()
436
- page.insert_text((50, 100), "Test PDF content for user input processing.", fontsize=12)
437
  doc.save(pdf_path)
438
  doc.close()
439
 
440
- # Test processing user input with PDF
441
- message = {
442
- "text": "Analyze this PDF",
443
- "files": [pdf_path]
444
- }
445
-
446
  result = process_user_input(message, 3)
447
 
448
- # Should have 2 items (original text + PDF content)
449
  assert len(result) == 2
450
-
451
- # First item should be the message text
452
- assert result[0]["type"] == "text"
453
- assert result[0]["text"] == "Analyze this PDF"
454
-
455
- # Second item should be PDF content
456
- assert result[1]["type"] == "text"
457
  assert "PDF Content:" in result[1]["text"]
458
- assert "Test PDF content for user input processing" in result[1]["text"]
459
- assert "Page 1:" in result[1]["text"]
460
 
461
  finally:
462
- # Clean up
463
  if os.path.exists(pdf_path):
464
  os.unlink(pdf_path)
465
 
466
 
467
  def test_process_user_input_pdf_error_handling():
468
- """Test that PDF processing errors are handled gracefully."""
469
- # Create a file that looks like a PDF but isn't valid
470
  with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
471
- temp_file.write(b"This is not a valid PDF file content")
472
  invalid_pdf_path = temp_file.name
473
 
474
  try:
475
- message = {
476
- "text": "Process invalid PDF",
477
- "files": [invalid_pdf_path]
478
- }
479
-
480
  result = process_user_input(message, 3)
481
 
482
- # Should have 2 items (original text + error message)
483
  assert len(result) == 2
484
-
485
- # First item should be the message text
486
- assert result[0]["type"] == "text"
487
  assert result[0]["text"] == "Process invalid PDF"
488
-
489
- # Second item should be error message
490
- assert result[1]["type"] == "text"
491
  assert "Failed to extract text from PDF:" in result[1]["text"]
492
 
493
  finally:
494
- # Clean up
495
  if os.path.exists(invalid_pdf_path):
496
  os.unlink(invalid_pdf_path)
497
 
498
 
499
- def test_process_history_with_pdf():
500
- """Test that PDF files in history are handled correctly."""
501
- import fitz
502
-
503
- # Create a temporary PDF for testing
504
- with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
505
- pdf_path = temp_pdf.name
506
-
507
- try:
508
- # Create a simple PDF
509
- doc = fitz.open()
510
- page = doc.new_page()
511
- page.insert_text((50, 100), "Historical PDF content.", fontsize=12)
512
- doc.save(pdf_path)
513
- doc.close()
514
-
515
- # Test history with PDF file
516
- history = [
517
- {"role": "user", "content": (pdf_path,)},
518
- {"role": "user", "content": "What does this PDF contain?"},
519
- {"role": "assistant", "content": "The PDF contains some text."},
520
- {"role": "user", "content": "Thanks!"}
521
- ]
522
-
523
- result = process_history(history)
524
-
525
- # Should have 3 messages (user turn, assistant turn, final user turn)
526
- assert len(result) == 3
527
-
528
- # First user turn should have PDF placeholder and text
529
- assert result[0]["role"] == "user"
530
- assert len(result[0]["content"]) == 2
531
- assert result[0]["content"][0] == {"type": "text", "text": "[PDF uploaded previously]"}
532
- assert result[0]["content"][1] == {"type": "text", "text": "What does this PDF contain?"}
533
-
534
- # Assistant response
535
- assert result[1]["role"] == "assistant"
536
- assert result[1]["content"] == [{"type": "text", "text": "The PDF contains some text."}]
537
-
538
- # Final user message
539
- assert result[2]["role"] == "user"
540
- assert result[2]["content"] == [{"type": "text", "text": "Thanks!"}]
541
-
542
- finally:
543
- # Clean up
544
- if os.path.exists(pdf_path):
545
- os.unlink(pdf_path)
546
-
547
-
548
- def test_update_custom_prompt_general_assistant():
549
- """Test that selecting 'General Assistant' returns the correct prompt."""
550
- result = update_custom_prompt("General Assistant")
551
- expected = "You are a helpful AI assistant capable of analyzing images, videos, and PDF documents. Provide clear, accurate, and helpful responses to user queries."
552
- assert result == expected
553
-
554
-
555
- def test_update_custom_prompt_document_analyzer():
556
- """Test that selecting 'Document Analyzer' returns the correct prompt."""
557
- result = update_custom_prompt("Document Analyzer")
558
- expected = "You are a specialized document analysis assistant. Focus on extracting key information, summarizing content, and answering specific questions about uploaded documents. For PDFs, provide structured analysis including main topics, key points, and relevant details. For images containing text, perform OCR-like analysis."
559
- assert result == expected
560
-
561
-
562
- def test_update_custom_prompt_visual_content_expert():
563
- """Test that selecting 'Visual Content Expert' returns the correct prompt."""
564
- result = update_custom_prompt("Visual Content Expert")
565
- expected = "You are an expert in visual content analysis. When analyzing images, provide detailed descriptions of visual elements, composition, colors, objects, people, and scenes. For videos, describe the sequence of events, movements, and changes between frames. Identify artistic techniques, styles, and visual storytelling elements."
566
- assert result == expected
567
-
568
-
569
- def test_update_custom_prompt_educational_tutor():
570
- """Test that selecting 'Educational Tutor' returns the correct prompt."""
571
- result = update_custom_prompt("Educational Tutor")
572
- expected = "You are a patient and encouraging educational tutor. Break down complex concepts into simple, understandable explanations. When analyzing educational materials (images, videos, or documents), focus on learning objectives, key concepts, and provide additional context or examples to enhance understanding."
573
- assert result == expected
574
-
575
-
576
- def test_update_custom_prompt_technical_reviewer():
577
- """Test that selecting 'Technical Reviewer' returns the correct prompt."""
578
- result = update_custom_prompt("Technical Reviewer")
579
- expected = "You are a technical expert specializing in analyzing technical documents, diagrams, code screenshots, and instructional videos. Provide detailed technical insights, identify potential issues, suggest improvements, and explain technical concepts with precision and accuracy."
580
- assert result == expected
581
-
582
-
583
- def test_update_custom_prompt_creative_storyteller():
584
- """Test that selecting 'Creative Storyteller' returns the correct prompt."""
585
- result = update_custom_prompt("Creative Storyteller")
586
- expected = "You are a creative storyteller who brings visual content to life through engaging narratives. When analyzing images or videos, create compelling stories, describe scenes with rich detail, and help users explore the creative and emotional aspects of visual content."
587
- assert result == expected
588
-
589
-
590
- def test_update_custom_prompt_custom():
591
- """Test that selecting 'Custom' returns an empty string."""
592
- result = update_custom_prompt("Custom")
593
- assert result == ""
594
-
595
-
596
- def test_update_custom_prompt_invalid_choice():
597
- """Test that an invalid choice returns an empty string."""
598
- result = update_custom_prompt("NonExistentChoice")
599
- assert result == ""
600
-
601
-
602
- def test_update_custom_prompt_case_sensitivity():
603
- """Test that the function is case-sensitive."""
604
- # Should not match lowercase
605
- result = update_custom_prompt("general assistant")
606
- assert result == ""
607
-
608
- # Should match exact case
609
- result = update_custom_prompt("General Assistant")
610
- assert result != ""
611
-
612
-
613
- def test_system_prompt_selection_logic():
614
- """Test the system prompt selection logic from the run function."""
615
- # Mock the preset prompts dictionary (same as in the run function)
616
- preset_prompts = {
617
- "General Assistant": "You are a helpful AI assistant capable of analyzing images, videos, and PDF documents. Provide clear, accurate, and helpful responses to user queries.",
618
- "Document Analyzer": "You are a specialized document analysis assistant. Focus on extracting key information, summarizing content, and answering specific questions about uploaded documents. For PDFs, provide structured analysis including main topics, key points, and relevant details. For images containing text, perform OCR-like analysis.",
619
- "Visual Content Expert": "You are an expert in visual content analysis. When analyzing images, provide detailed descriptions of visual elements, composition, colors, objects, people, and scenes. For videos, describe the sequence of events, movements, and changes between frames. Identify artistic techniques, styles, and visual storytelling elements.",
620
- "Educational Tutor": "You are a patient and encouraging educational tutor. Break down complex concepts into simple, understandable explanations. When analyzing educational materials (images, videos, or documents), focus on learning objectives, key concepts, and provide additional context or examples to enhance understanding.",
621
- "Technical Reviewer": "You are a technical expert specializing in analyzing technical documents, diagrams, code screenshots, and instructional videos. Provide detailed technical insights, identify potential issues, suggest improvements, and explain technical concepts with precision and accuracy.",
622
- "Creative Storyteller": "You are a creative storyteller who brings visual content to life through engaging narratives. When analyzing images or videos, create compelling stories, describe scenes with rich detail, and help users explore the creative and emotional aspects of visual content.",
623
- }
624
-
625
- # Test preset selection
626
- for preset_name, expected_prompt in preset_prompts.items():
627
- custom_prompt = "This is a custom prompt"
628
-
629
- # When preset is selected (not "Custom"), should use preset
630
- if preset_name != "Custom":
631
- selected_prompt = preset_prompts.get(preset_name, custom_prompt)
632
- assert selected_prompt == expected_prompt
633
- assert selected_prompt != custom_prompt
634
-
635
- # Test custom selection
636
- custom_prompt = "This is my custom system prompt"
637
- system_prompt_preset = "Custom"
638
-
639
- if system_prompt_preset == "Custom":
640
- selected_prompt = custom_prompt
641
- else:
642
- selected_prompt = preset_prompts.get(system_prompt_preset, custom_prompt)
643
-
644
- assert selected_prompt == custom_prompt
645
-
646
-
647
- def test_system_prompt_preset_choices():
648
- """Test that all expected preset choices are available."""
649
- expected_choices = [
650
- "General Assistant",
651
- "Document Analyzer",
652
- "Visual Content Expert",
653
- "Educational Tutor",
654
- "Technical Reviewer",
655
- "Creative Storyteller",
656
- "Custom"
657
- ]
658
-
659
- # Verify all choices exist in update_custom_prompt function
660
- for choice in expected_choices[:-1]: # Exclude "Custom" as it returns empty string
661
- result = update_custom_prompt(choice)
662
- assert isinstance(result, str)
663
- assert len(result) > 0 # Should return a non-empty prompt
664
-
665
- # Test Custom specifically
666
- custom_result = update_custom_prompt("Custom")
667
- assert custom_result == ""
668
-
669
-
670
- def test_system_prompt_content_quality():
671
- """Test that system prompts contain expected keywords for their specialization."""
672
-
673
- # General Assistant should mention general capabilities
674
  general = update_custom_prompt("General Assistant")
675
- assert "images" in general.lower()
676
- assert "videos" in general.lower()
677
- assert "pdf" in general.lower()
678
 
679
- # Document Analyzer should focus on document analysis
680
  document = update_custom_prompt("Document Analyzer")
681
- assert "document" in document.lower()
682
- assert "analysis" in document.lower()
683
- assert "pdf" in document.lower()
684
-
685
- # Visual Content Expert should focus on visual analysis
686
- visual = update_custom_prompt("Visual Content Expert")
687
- assert "visual" in visual.lower()
688
- assert "images" in visual.lower()
689
- assert "videos" in visual.lower()
690
-
691
- # Educational Tutor should mention teaching/learning
692
- educational = update_custom_prompt("Educational Tutor")
693
- assert any(word in educational.lower() for word in ["educational", "tutor", "learning", "teaching", "concepts"])
694
-
695
- # Technical Reviewer should mention technical aspects
696
- technical = update_custom_prompt("Technical Reviewer")
697
- assert "technical" in technical.lower()
698
 
699
- # Creative Storyteller should mention creativity/stories
700
- creative = update_custom_prompt("Creative Storyteller")
701
- assert any(word in creative.lower() for word in ["creative", "story", "narrative", "storyteller"])
702
 
703
 
704
- def test_check_file_size_nonexistent_file():
705
- """Test that check_file_size raises ValueError for non-existent files."""
 
706
  with pytest.raises(ValueError, match="File not found"):
707
  check_file_size("nonexistent_file.txt")
708
-
709
-
710
- def test_check_file_size_valid_image():
711
- """Test that check_file_size returns True for valid image files."""
712
- # Create a small temporary image file
713
  with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
714
- # Write minimal JPEG header to make it a valid file
715
- temp_file.write(b"small image content")
716
  temp_path = temp_file.name
717
 
718
  try:
719
- # File should be well under the image size limit
720
- result = check_file_size(temp_path)
721
- assert result is True
722
-
723
  finally:
724
  if os.path.exists(temp_path):
725
  os.unlink(temp_path)
726
-
727
-
728
- def test_check_file_size_valid_video():
729
- """Test that check_file_size returns True for valid video files."""
730
- # Create a small temporary video file
731
- with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
732
- # Write some minimal content
733
- temp_file.write(b"small video content")
734
- temp_path = temp_file.name
735
 
736
- try:
737
- # File should be well under the video size limit
738
- result = check_file_size(temp_path)
739
- assert result is True
740
-
741
- finally:
742
- if os.path.exists(temp_path):
743
- os.unlink(temp_path)
744
-
745
-
746
- def test_check_file_size_large_image():
747
- """Test that check_file_size raises ValueError for oversized image files."""
748
- # Create a temporary file that exceeds the image size limit
749
  with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file:
750
- # Write content larger than MAX_IMAGE_SIZE (10MB)
751
- large_content = b"x" * (MAX_IMAGE_SIZE + 1024) # 10MB + 1KB
752
- temp_file.write(large_content)
753
  temp_path = temp_file.name
754
 
755
  try:
756
  with pytest.raises(ValueError, match="Image file too large"):
757
  check_file_size(temp_path)
758
-
759
- finally:
760
- if os.path.exists(temp_path):
761
- os.unlink(temp_path)
762
-
763
-
764
- def test_check_file_size_large_video():
765
- """Test that check_file_size raises ValueError for oversized video files."""
766
- # Create a temporary file that exceeds the video size limit
767
- with tempfile.NamedTemporaryFile(suffix=".mov", delete=False) as temp_file:
768
- # Write content larger than MAX_VIDEO_SIZE (100MB)
769
- # Write in chunks to avoid memory issues
770
- chunk_size = 1024 * 1024 # 1MB chunks
771
- chunks_needed = (MAX_VIDEO_SIZE // chunk_size) + 2 # Exceed limit by 2MB
772
-
773
- for _ in range(chunks_needed):
774
- temp_file.write(b"x" * chunk_size)
775
- temp_path = temp_file.name
776
-
777
- try:
778
- with pytest.raises(ValueError, match="Video file too large"):
779
- check_file_size(temp_path)
780
-
781
- finally:
782
- if os.path.exists(temp_path):
783
- os.unlink(temp_path)
784
-
785
-
786
- def test_check_file_size_edge_cases():
787
- """Test check_file_size with files at the exact size limits."""
788
- # Test image file at exact limit
789
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
790
- # Write exactly MAX_IMAGE_SIZE bytes
791
- temp_file.write(b"x" * MAX_IMAGE_SIZE)
792
- image_path = temp_file.name
793
-
794
- try:
795
- # Should pass at exact limit
796
- result = check_file_size(image_path)
797
- assert result is True
798
-
799
- finally:
800
- if os.path.exists(image_path):
801
- os.unlink(image_path)
802
-
803
- # Test video file at exact limit
804
- with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
805
- # Write exactly MAX_VIDEO_SIZE bytes in chunks to avoid memory issues
806
- chunk_size = 1024 * 1024 # 1MB chunks
807
- chunks_needed = MAX_VIDEO_SIZE // chunk_size
808
-
809
- for _ in range(chunks_needed):
810
- temp_file.write(b"x" * chunk_size)
811
- video_path = temp_file.name
812
-
813
- try:
814
- # Should pass at exact limit
815
- result = check_file_size(video_path)
816
- assert result is True
817
-
818
- finally:
819
- if os.path.exists(video_path):
820
- os.unlink(video_path)
821
-
822
-
823
- def test_check_file_size_different_extensions():
824
- """Test that check_file_size correctly categorizes different file extensions."""
825
- # Test various video extensions
826
- video_extensions = [".mp4", ".mov", ".MP4", ".MOV"] # Test case sensitivity
827
- for ext in video_extensions:
828
- with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file:
829
- temp_file.write(b"small content")
830
- temp_path = temp_file.name
831
-
832
- try:
833
- # Should be treated as video file (checked against video limit)
834
- result = check_file_size(temp_path)
835
- assert result is True
836
-
837
- finally:
838
- if os.path.exists(temp_path):
839
- os.unlink(temp_path)
840
-
841
- # Test various image extensions
842
- image_extensions = [".jpg", ".png", ".jpeg", ".gif", ".bmp", ".JPG", ".PNG"]
843
- for ext in image_extensions:
844
- with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file:
845
- temp_file.write(b"small content")
846
- temp_path = temp_file.name
847
-
848
- try:
849
- # Should be treated as image file (checked against image limit)
850
- result = check_file_size(temp_path)
851
- assert result is True
852
-
853
- finally:
854
- if os.path.exists(temp_path):
855
- os.unlink(temp_path)
856
-
857
-
858
- def test_check_file_size_empty_file():
859
- """Test that check_file_size handles empty files correctly."""
860
- # Create empty image file
861
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
862
- temp_path = temp_file.name # File is created but empty
863
-
864
- try:
865
- # Empty file should pass size check
866
- result = check_file_size(temp_path)
867
- assert result is True
868
-
869
- finally:
870
- if os.path.exists(temp_path):
871
- os.unlink(temp_path)
872
-
873
-
874
- def test_check_file_size_error_messages():
875
- """Test that check_file_size provides informative error messages."""
876
- # Test oversized image error message
877
- with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file:
878
- oversized_content = b"x" * (MAX_IMAGE_SIZE + 1024)
879
- temp_file.write(oversized_content)
880
- temp_path = temp_file.name
881
-
882
- try:
883
- with pytest.raises(ValueError) as exc_info:
884
- check_file_size(temp_path)
885
-
886
- error_message = str(exc_info.value)
887
- assert "Image file too large" in error_message
888
- assert "Maximum allowed:" in error_message
889
- assert "10MB" in error_message # Should show the limit
890
-
891
  finally:
892
  if os.path.exists(temp_path):
893
- os.unlink(temp_path)
894
-
895
-
896
- def test_check_file_size_with_actual_test_video():
897
- """Test check_file_size with the actual test video file."""
898
- video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
899
-
900
- if os.path.exists(video_path):
901
- # Should pass since test video should be under the limit
902
- result = check_file_size(video_path)
903
- assert result is True
904
- else:
905
- pytest.skip("Test video file not found")
906
-
907
-
908
- def test_check_file_size_constants():
909
- """Test that the size constants are set to expected values."""
910
- # Verify the constants are set correctly
911
- assert MAX_VIDEO_SIZE == 100 * 1024 * 1024 # 100 MB
912
- assert MAX_IMAGE_SIZE == 10 * 1024 * 1024 # 10 MB
913
-
914
- # Ensure video limit is larger than image limit
915
- assert MAX_VIDEO_SIZE > MAX_IMAGE_SIZE
 
11
  ROOT_DIR = Path(__file__).parent.parent
12
 
13
 
14
+ def test_get_frames():
15
+ """Test that get_frames returns correct structure and handles video processing."""
 
16
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
 
 
17
  assert os.path.exists(video_path)
18
 
19
+ frames = get_frames(video_path, 3)
 
 
 
20
  assert isinstance(frames, list)
21
  assert all(isinstance(item, tuple) and len(item) == 2 for item in frames)
22
+ assert all(isinstance(img, Image.Image) and isinstance(ts, float) for img, ts in frames)
 
 
 
23
 
 
 
24
 
25
+ def test_process_video():
26
+ """Test video processing returns expected structure."""
27
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
28
+ result = process_video(video_path, 2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
+ assert len(result) == 4 # 2 frames * 2 items per frame
31
+ assert result[0]["type"] == "text" and result[0]["text"].startswith("Frame ")
32
+ assert result[1]["type"] == "image" and os.path.exists(result[1]["url"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
 
35
  def test_process_video_invalid_path():
36
  """Test that process_video handles invalid paths appropriately."""
 
37
  with pytest.raises(ValueError):
38
  process_video("nonexistent_video.mp4", 3)
39
 
40
+ def test_process_user_input():
41
+ """Test processing user input with different file types."""
42
+ # Text only
43
+ message = {"text": "Test message", "files": []}
 
 
 
 
44
  result = process_user_input(message, 5)
 
 
45
  assert len(result) == 1
46
  assert result[0]["type"] == "text"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
+ # With video
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
50
+ if os.path.exists(video_path):
51
+ message = {"text": "Video analysis", "files": [video_path]}
52
+ result = process_user_input(message, 2)
53
+ assert len(result) >= 3 # text + frames
54
+ assert result[0]["text"] == "Video analysis"
55
+ assert result[1]["text"].startswith("Frame ")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
 
58
+ def test_process_history():
59
+ """Test basic conversation processing."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  history = [
61
  {"role": "user", "content": "Hello"},
62
  {"role": "assistant", "content": "Hi there!"},
 
67
  assert len(result) == 3
68
  assert result[0] == {"role": "user", "content": [{"type": "text", "text": "Hello"}]}
69
  assert result[1] == {"role": "assistant", "content": [{"type": "text", "text": "Hi there!"}]}
 
70
 
71
 
72
+ def test_extract_pdf_text():
73
+ """Test PDF text extraction."""
74
+ import fitz
 
 
 
 
75
 
76
+ # Test non-existent file
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  with pytest.raises(ValueError, match="File not found"):
78
  extract_pdf_text("nonexistent_file.pdf")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
+ # Test with valid PDF
81
  with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
82
  pdf_path = temp_pdf.name
83
 
84
  try:
 
85
  doc = fitz.open()
86
+ page = doc.new_page()
87
+ page.insert_text((50, 100), "Test PDF content.", fontsize=12)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  doc.save(pdf_path)
89
  doc.close()
90
 
 
91
  result = extract_pdf_text(pdf_path)
92
+ assert "Test PDF content" in result
 
93
  assert "Page 1:" in result
 
 
 
 
 
 
 
 
 
 
 
94
 
95
  finally:
 
96
  if os.path.exists(pdf_path):
97
  os.unlink(pdf_path)
98
 
99
 
100
  def test_process_user_input_with_pdf():
101
+ """Test processing user input with PDF."""
102
  import fitz
103
 
 
104
  with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
105
  pdf_path = temp_pdf.name
106
 
107
  try:
 
108
  doc = fitz.open()
109
  page = doc.new_page()
110
+ page.insert_text((50, 100), "Test PDF content.", fontsize=12)
111
  doc.save(pdf_path)
112
  doc.close()
113
 
114
+ message = {"text": "Analyze PDF", "files": [pdf_path]}
 
 
 
 
 
115
  result = process_user_input(message, 3)
116
 
 
117
  assert len(result) == 2
118
+ assert result[0]["text"] == "Analyze PDF"
 
 
 
 
 
 
119
  assert "PDF Content:" in result[1]["text"]
120
+ assert "Test PDF content" in result[1]["text"]
 
121
 
122
  finally:
 
123
  if os.path.exists(pdf_path):
124
  os.unlink(pdf_path)
125
 
126
 
127
  def test_process_user_input_pdf_error_handling():
128
+ """Test PDF error handling."""
 
129
  with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
130
+ temp_file.write(b"Invalid PDF content")
131
  invalid_pdf_path = temp_file.name
132
 
133
  try:
134
+ message = {"text": "Process invalid PDF", "files": [invalid_pdf_path]}
 
 
 
 
135
  result = process_user_input(message, 3)
136
 
 
137
  assert len(result) == 2
 
 
 
138
  assert result[0]["text"] == "Process invalid PDF"
 
 
 
139
  assert "Failed to extract text from PDF:" in result[1]["text"]
140
 
141
  finally:
 
142
  if os.path.exists(invalid_pdf_path):
143
  os.unlink(invalid_pdf_path)
144
 
145
 
146
+ def test_update_custom_prompt():
147
+ """Test system prompt selection."""
148
+ # Test key prompts
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
149
  general = update_custom_prompt("General Assistant")
150
+ assert "images" in general.lower() and "videos" in general.lower()
 
 
151
 
 
152
  document = update_custom_prompt("Document Analyzer")
153
+ assert "document" in document.lower() and "analysis" in document.lower()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
 
155
+ # Test custom returns empty
156
+ assert update_custom_prompt("Custom") == ""
157
+ assert update_custom_prompt("Invalid") == ""
158
 
159
 
160
+ def test_check_file_size():
161
+ """Test file size validation."""
162
+ # Test non-existent file
163
  with pytest.raises(ValueError, match="File not found"):
164
  check_file_size("nonexistent_file.txt")
165
+
166
+ # Test valid small files
 
 
 
167
  with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
168
+ temp_file.write(b"small content")
 
169
  temp_path = temp_file.name
170
 
171
  try:
172
+ assert check_file_size(temp_path) is True
 
 
 
173
  finally:
174
  if os.path.exists(temp_path):
175
  os.unlink(temp_path)
 
 
 
 
 
 
 
 
 
176
 
177
+ # Test oversized image
 
 
 
 
 
 
 
 
 
 
 
 
178
  with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file:
179
+ temp_file.write(b"x" * (MAX_IMAGE_SIZE + 1024))
 
 
180
  temp_path = temp_file.name
181
 
182
  try:
183
  with pytest.raises(ValueError, match="Image file too large"):
184
  check_file_size(temp_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  finally:
186
  if os.path.exists(temp_path):
187
+ os.unlink(temp_path)