import pytest import os import cv2 from PIL import Image from pathlib import Path import tempfile from app import get_frames, process_video, process_user_input, process_history, extract_pdf_text # Get the project root directory ROOT_DIR = Path(__file__).parent.parent def test_correct_frame_return(): """Test that get_frames returns a list of (Image, float) tuples.""" # Path to a test video file video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") # Ensure the test video exists assert os.path.exists(video_path) # Test with a small number of frames max_images = 3 frames = get_frames(video_path, max_images) assert isinstance(frames, list) assert all(isinstance(item, tuple) and len(item) == 2 for item in frames) assert all( isinstance(img, Image.Image) and isinstance(ts, float) for img, ts in frames ) def test_process_video_structure(): """Test that process_video returns the expected list structure.""" video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") max_images = 2 result = process_video(video_path, max_images) # Should have 2 items (text + image) per frame assert len(result) == max_images * 2 # Check structure of items for i in range(0, len(result), 2): # Text item assert result[i]["type"] == "text" assert result[i]["text"].startswith("Frame ") # Image item assert result[i + 1]["type"] == "image" assert "url" in result[i + 1] assert os.path.exists(result[i + 1]["url"]) # Verify the image file is valid try: img = Image.open(result[i + 1]["url"]) img.verify() # Make sure it's a valid image except Exception as e: pytest.fail(f"Invalid image file: {e}") def test_process_video_timestamps(): """Test that timestamps in the result are properly formatted.""" video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") max_images = 3 result = process_video(video_path, max_images) # Extract timestamps from text items timestamps = [] for i in range(0, len(result), 2): if result[i]["type"] == "text": # Extract timestamp from "Frame X.XX:" format timestamp_text = result[i]["text"].split()[1].rstrip(":") timestamps.append(float(timestamp_text)) # Check timestamps are ascending assert len(timestamps) == max_images assert all(timestamps[i] <= timestamps[i + 1] for i in range(len(timestamps) - 1)) def test_process_video_temp_files(): """Test that temporary files are created and cleaned up properly.""" video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") max_images = 1 result = process_video(video_path, max_images) # Verify temp file exists image_path = result[1]["url"] assert os.path.exists(image_path) assert image_path.endswith(".png") def test_process_video_invalid_path(): """Test that process_video handles invalid paths appropriately.""" with pytest.raises(ValueError): process_video("nonexistent_video.mp4", 3) def test_process_user_input_text_only(): """Test processing user input with text only (no files).""" message = { "text": "This is a test message", "files": [] } # Add the max_images parameter result = process_user_input(message, 5) # Should return a single text item assert len(result) == 1 assert result[0]["type"] == "text" assert result[0]["text"] == "This is a test message" def test_process_user_input_with_video(): """Test processing user input with a video file.""" video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") assert os.path.exists(video_path), f"Test video not found at {video_path}" message = { "text": "Video analysis", "files": [video_path] } result = process_user_input(message, 4) # Should have at least 3 items (text + at least one frame with text and image) assert len(result) >= 3 # First item should be the message text assert result[0]["type"] == "text" assert result[0]["text"] == "Video analysis" # Following items should be frame text and images assert result[1]["type"] == "text" assert result[1]["text"].startswith("Frame ") assert result[2]["type"] == "image" assert "url" in result[2] assert os.path.exists(result[2]["url"]) def test_process_user_input_with_images(): """Test processing user input with image files.""" # Create temporary image files for testing with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as img1, \ tempfile.NamedTemporaryFile(suffix=".png", delete=False) as img2: image_paths = [img1.name, img2.name] message = { "text": "Image analysis", "files": image_paths } result = process_user_input(message, 5) # Should have 3 items (text + 2 images) assert len(result) == 3 # First item should be the message text assert result[0]["type"] == "text" assert result[0]["text"] == "Image analysis" # Following items should be images assert result[1]["type"] == "image" assert result[1]["url"] == image_paths[0] assert result[2]["type"] == "image" assert result[2]["url"] == image_paths[1] # Clean up temp files for path in image_paths: if os.path.exists(path): os.unlink(path) def test_process_user_input_empty_text(): """Test processing user input with empty text but with files.""" video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") message = { "text": "", # Empty text "files": [video_path] } # Add max_images parameter result = process_user_input(message, 3) # First item should be empty text assert result[0]["type"] == "text" assert result[0]["text"] == "" # Rest should be video frames assert len(result) > 1 def test_process_user_input_handles_empty_files_list(): """Test that an empty files list is handled correctly.""" message = { "text": "No files", "files": [] } # Add max_images parameter result = process_user_input(message, 3) assert len(result) == 1 assert result[0]["type"] == "text" assert result[0]["text"] == "No files" def test_process_user_input_max_images_effect(): """Test that max_images parameter correctly limits the number of frames.""" video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") message = { "text": "Video with few frames", "files": [video_path] } result_few = process_user_input(message, 2) result_many = process_user_input(message, 5) # Count actual frames (each frame has a text and image entry) frames_few = (len(result_few) - 1) // 2 # -1 for initial text message frames_many = (len(result_many) - 1) // 2 # Should respect max_images parameter assert frames_few <= 2 assert frames_many <= 5 assert frames_few < frames_many def test_process_history_basic_functionality(): """Test basic conversation processing and content buffering.""" # Empty history assert process_history([]) == [] # Simple conversation history = [ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"}, {"role": "user", "content": "How are you?"} ] result = process_history(history) assert len(result) == 3 assert result[0] == {"role": "user", "content": [{"type": "text", "text": "Hello"}]} assert result[1] == {"role": "assistant", "content": [{"type": "text", "text": "Hi there!"}]} assert result[2] == {"role": "user", "content": [{"type": "text", "text": "How are you?"}]} def test_process_history_file_handling(): """Test processing of different file types and content buffering.""" # Create temp image file with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as img: image_path = img.name video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4") try: history = [ {"role": "user", "content": (image_path,)}, {"role": "user", "content": "What's this image?"}, {"role": "user", "content": (video_path,)}, {"role": "assistant", "content": "I see an image and video."}, {"role": "user", "content": "First"}, {"role": "user", "content": "Second"}, {"role": "user", "content": "Third"} # Multiple user messages at end ] result = process_history(history) assert len(result) == 3 # First user turn: image + text + video assert result[0]["role"] == "user" assert len(result[0]["content"]) == 3 assert result[0]["content"][0] == {"type": "image", "url": image_path} assert result[0]["content"][1] == {"type": "text", "text": "What's this image?"} assert result[0]["content"][2] == {"type": "text", "text": "[Video uploaded previously]"} # Assistant response assert result[1]["role"] == "assistant" assert result[1]["content"] == [{"type": "text", "text": "I see an image and video."}] # Final user turn: multiple buffered messages assert result[2]["role"] == "user" assert len(result[2]["content"]) == 3 assert result[2]["content"][0] == {"type": "text", "text": "First"} assert result[2]["content"][1] == {"type": "text", "text": "Second"} assert result[2]["content"][2] == {"type": "text", "text": "Third"} finally: if os.path.exists(image_path): os.unlink(image_path) def test_extract_pdf_text_nonexistent_file(): """Test that extract_pdf_text handles non-existent files appropriately.""" with pytest.raises(ValueError, match="File not found"): extract_pdf_text("nonexistent_file.pdf") def test_extract_pdf_text_with_mock_pdf(): """Test PDF text extraction with a simple PDF file.""" import fitz # PyMuPDF # Create a temporary PDF with some text content with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: pdf_path = temp_pdf.name try: # Create a simple PDF with text doc = fitz.open() # Create new PDF page = doc.new_page() # Add some text to the page text_content = "This is a test PDF document.\nIt contains multiple lines of text.\nPage 1 content here." page.insert_text((50, 100), text_content, fontsize=12) # Save the PDF doc.save(pdf_path) doc.close() # Test the extract_pdf_text function result = extract_pdf_text(pdf_path) # Verify the extracted text contains our content assert isinstance(result, str) assert "This is a test PDF document" in result assert "Page 1:" in result # Should include page number assert "multiple lines of text" in result finally: # Clean up the temporary PDF file if os.path.exists(pdf_path): os.unlink(pdf_path) def test_extract_pdf_text_empty_pdf(): """Test PDF text extraction with an empty PDF (no text content).""" import fitz # Create a temporary empty PDF with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: pdf_path = temp_pdf.name try: # Create an empty PDF doc = fitz.open() # Create new PDF page = doc.new_page() # Add empty page doc.save(pdf_path) doc.close() # Test the extract_pdf_text function result = extract_pdf_text(pdf_path) # Should return message about no content assert result == "No text content found in the PDF." finally: # Clean up if os.path.exists(pdf_path): os.unlink(pdf_path) def test_extract_pdf_text_multipage(): """Test PDF text extraction with multiple pages.""" import fitz # Create a temporary multi-page PDF with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: pdf_path = temp_pdf.name try: # Create a PDF with multiple pages doc = fitz.open() # Page 1 page1 = doc.new_page() page1.insert_text((50, 100), "Content from page one.", fontsize=12) # Page 2 page2 = doc.new_page() page2.insert_text((50, 100), "Content from page two.", fontsize=12) # Page 3 (empty) page3 = doc.new_page() # Page 4 page4 = doc.new_page() page4.insert_text((50, 100), "Content from page four.", fontsize=12) doc.save(pdf_path) doc.close() # Test the extract_pdf_text function result = extract_pdf_text(pdf_path) # Verify all pages with content are included assert "Page 1:" in result assert "Content from page one" in result assert "Page 2:" in result assert "Content from page two" in result assert "Page 4:" in result assert "Content from page four" in result # Page 3 should be excluded (empty) assert "Page 3:" not in result # Check that pages are separated properly assert "\n\n" in result # Pages should be separated by double newlines finally: # Clean up if os.path.exists(pdf_path): os.unlink(pdf_path) def test_process_user_input_with_pdf(): """Test processing user input with a PDF file.""" import fitz # Create a temporary PDF for testing with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: pdf_path = temp_pdf.name try: # Create a simple PDF doc = fitz.open() page = doc.new_page() page.insert_text((50, 100), "Test PDF content for user input processing.", fontsize=12) doc.save(pdf_path) doc.close() # Test processing user input with PDF message = { "text": "Analyze this PDF", "files": [pdf_path] } result = process_user_input(message, 3) # Should have 2 items (original text + PDF content) assert len(result) == 2 # First item should be the message text assert result[0]["type"] == "text" assert result[0]["text"] == "Analyze this PDF" # Second item should be PDF content assert result[1]["type"] == "text" assert "PDF Content:" in result[1]["text"] assert "Test PDF content for user input processing" in result[1]["text"] assert "Page 1:" in result[1]["text"] finally: # Clean up if os.path.exists(pdf_path): os.unlink(pdf_path) def test_process_user_input_pdf_error_handling(): """Test that PDF processing errors are handled gracefully.""" # Create a file that looks like a PDF but isn't valid with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file: temp_file.write(b"This is not a valid PDF file content") invalid_pdf_path = temp_file.name try: message = { "text": "Process invalid PDF", "files": [invalid_pdf_path] } result = process_user_input(message, 3) # Should have 2 items (original text + error message) assert len(result) == 2 # First item should be the message text assert result[0]["type"] == "text" assert result[0]["text"] == "Process invalid PDF" # Second item should be error message assert result[1]["type"] == "text" assert "Error processing PDF:" in result[1]["text"] finally: # Clean up if os.path.exists(invalid_pdf_path): os.unlink(invalid_pdf_path) def test_process_history_with_pdf(): """Test that PDF files in history are handled correctly.""" import fitz # Create a temporary PDF for testing with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: pdf_path = temp_pdf.name try: # Create a simple PDF doc = fitz.open() page = doc.new_page() page.insert_text((50, 100), "Historical PDF content.", fontsize=12) doc.save(pdf_path) doc.close() # Test history with PDF file history = [ {"role": "user", "content": (pdf_path,)}, {"role": "user", "content": "What does this PDF contain?"}, {"role": "assistant", "content": "The PDF contains some text."}, {"role": "user", "content": "Thanks!"} ] result = process_history(history) # Should have 3 messages (user turn, assistant turn, final user turn) assert len(result) == 3 # First user turn should have PDF placeholder and text assert result[0]["role"] == "user" assert len(result[0]["content"]) == 2 assert result[0]["content"][0] == {"type": "text", "text": "[PDF uploaded previously]"} assert result[0]["content"][1] == {"type": "text", "text": "What does this PDF contain?"} # Assistant response assert result[1]["role"] == "assistant" assert result[1]["content"] == [{"type": "text", "text": "The PDF contains some text."}] # Final user message assert result[2]["role"] == "user" assert result[2]["content"] == [{"type": "text", "text": "Thanks!"}] finally: # Clean up if os.path.exists(pdf_path): os.unlink(pdf_path)