AC2513 commited on
Commit
76e1435
·
1 Parent(s): eab0adb

added tests for processing video

Browse files
Files changed (2) hide show
  1. src/app.py +28 -7
  2. tests/test_video.py +82 -7
src/app.py CHANGED
@@ -1,8 +1,13 @@
1
  import torch
2
  from huggingface_hub import login
3
  from collections.abc import Iterator
4
- from transformers import Gemma3ForConditionalGeneration, TextIteratorStreamer, Gemma3Processor
 
 
 
 
5
  import spaces
 
6
  from threading import Thread
7
  import gradio as gr
8
  import os
@@ -26,21 +31,21 @@ model = Gemma3ForConditionalGeneration.from_pretrained(
26
  attn_implementation="eager",
27
  )
28
 
 
29
  def get_frames(video_path: str, max_images: int) -> list[tuple[Image.Image, float]]:
30
  frames: list[tuple[Image.Image, float]] = []
31
  capture = cv2.VideoCapture(video_path)
32
  if not capture.isOpened():
33
  raise ValueError(f"Could not open video file: {video_path}")
34
-
35
  fps = capture.get(cv2.CAP_PROP_FPS)
36
  total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
37
 
38
  frame_interval = max(total_frames // max_images, 1)
 
 
39
 
40
- for i in range(0, min(total_frames, max_images * frame_interval), frame_interval):
41
- if len(frames) >= max_images:
42
- break
43
-
44
  capture.set(cv2.CAP_PROP_POS_FRAMES, i)
45
  success, image = capture.read()
46
  if success:
@@ -49,5 +54,21 @@ def get_frames(video_path: str, max_images: int) -> list[tuple[Image.Image, floa
49
  timestamp = round(i / fps, 2)
50
  frames.append((pil_image, timestamp))
51
 
 
 
52
  capture.release()
53
- return frames
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import torch
2
  from huggingface_hub import login
3
  from collections.abc import Iterator
4
+ from transformers import (
5
+ Gemma3ForConditionalGeneration,
6
+ TextIteratorStreamer,
7
+ Gemma3Processor,
8
+ )
9
  import spaces
10
+ import tempfile
11
  from threading import Thread
12
  import gradio as gr
13
  import os
 
31
  attn_implementation="eager",
32
  )
33
 
34
+
35
  def get_frames(video_path: str, max_images: int) -> list[tuple[Image.Image, float]]:
36
  frames: list[tuple[Image.Image, float]] = []
37
  capture = cv2.VideoCapture(video_path)
38
  if not capture.isOpened():
39
  raise ValueError(f"Could not open video file: {video_path}")
40
+
41
  fps = capture.get(cv2.CAP_PROP_FPS)
42
  total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
43
 
44
  frame_interval = max(total_frames // max_images, 1)
45
+ max_position = min(total_frames, max_images * frame_interval)
46
+ i = 0
47
 
48
+ while i < max_position and len(frames) < max_images:
 
 
 
49
  capture.set(cv2.CAP_PROP_POS_FRAMES, i)
50
  success, image = capture.read()
51
  if success:
 
54
  timestamp = round(i / fps, 2)
55
  frames.append((pil_image, timestamp))
56
 
57
+ i += frame_interval
58
+
59
  capture.release()
60
+ return frames
61
+
62
+ def process_video(video_path: str, max_images: int) -> list[dict]:
63
+ result_content = []
64
+ # TODO: Change max_image to slider
65
+ frames = get_frames(video_path, max_images)
66
+ # Take frame and attach to result_content with timestamp
67
+ for frame in frames:
68
+ image, timestamp = frame
69
+ with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file:
70
+ image.save(temp_file.name)
71
+ result_content.append({"type": "text", "text": f"Frame {timestamp}:"})
72
+ result_content.append({"type": "image", "url": temp_file.name})
73
+ logger.debug(f"Processed {len(frames)} frames from video {video_path} with frames {result_content}")
74
+ return result_content
tests/test_video.py CHANGED
@@ -3,25 +3,100 @@ import os
3
  import cv2
4
  from PIL import Image
5
  from pathlib import Path
 
6
 
7
- from src.app import get_frames
8
 
9
  # Get the project root directory
10
  ROOT_DIR = Path(__file__).parent.parent
11
 
 
12
  def test_correct_frame_return():
13
  """Test that get_frames returns a list of (Image, float) tuples."""
14
  # Path to a test video file
15
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
16
-
17
  # Ensure the test video exists
18
- assert os.path.exists(video_path), f"Test video not found at {video_path}"
19
-
20
  # Test with a small number of frames
21
  max_images = 3
22
  frames = get_frames(video_path, max_images)
23
-
24
- # Check return type
25
  assert isinstance(frames, list)
26
  assert all(isinstance(item, tuple) and len(item) == 2 for item in frames)
27
- assert all(isinstance(img, Image.Image) and isinstance(ts, float) for img, ts in frames)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  import cv2
4
  from PIL import Image
5
  from pathlib import Path
6
+ import tempfile
7
 
8
+ from src.app import get_frames, process_video
9
 
10
  # Get the project root directory
11
  ROOT_DIR = Path(__file__).parent.parent
12
 
13
+
14
  def test_correct_frame_return():
15
  """Test that get_frames returns a list of (Image, float) tuples."""
16
  # Path to a test video file
17
  video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
18
+
19
  # Ensure the test video exists
20
+ assert os.path.exists(video_path)
21
+
22
  # Test with a small number of frames
23
  max_images = 3
24
  frames = get_frames(video_path, max_images)
25
+
 
26
  assert isinstance(frames, list)
27
  assert all(isinstance(item, tuple) and len(item) == 2 for item in frames)
28
+ assert all(
29
+ isinstance(img, Image.Image) and isinstance(ts, float) for img, ts in frames
30
+ )
31
+
32
+
33
+ def test_process_video_structure():
34
+ """Test that process_video returns the expected list structure."""
35
+
36
+ video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
37
+ max_images = 2
38
+
39
+ result = process_video(video_path, max_images)
40
+
41
+ # Should have 2 items (text + image) per frame
42
+ assert len(result) == max_images * 2
43
+
44
+ # Check structure of items
45
+ for i in range(0, len(result), 2):
46
+ # Text item
47
+ assert result[i]["type"] == "text"
48
+ assert result[i]["text"].startswith("Frame ")
49
+
50
+ # Image item
51
+ assert result[i + 1]["type"] == "image"
52
+ assert "url" in result[i + 1]
53
+ assert os.path.exists(result[i + 1]["url"])
54
+
55
+ # Verify the image file is valid
56
+ try:
57
+ img = Image.open(result[i + 1]["url"])
58
+ img.verify() # Make sure it's a valid image
59
+ except Exception as e:
60
+ pytest.fail(f"Invalid image file: {e}")
61
+
62
+
63
+ def test_process_video_timestamps():
64
+ """Test that timestamps in the result are properly formatted."""
65
+
66
+ video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
67
+ max_images = 3
68
+
69
+ result = process_video(video_path, max_images)
70
+
71
+ # Extract timestamps from text items
72
+ timestamps = []
73
+ for i in range(0, len(result), 2):
74
+ if result[i]["type"] == "text":
75
+ # Extract timestamp from "Frame X.XX:" format
76
+ timestamp_text = result[i]["text"].split()[1].rstrip(":")
77
+ timestamps.append(float(timestamp_text))
78
+
79
+ # Check timestamps are ascending
80
+ assert len(timestamps) == max_images
81
+ assert all(timestamps[i] <= timestamps[i + 1] for i in range(len(timestamps) - 1))
82
+
83
+
84
+ def test_process_video_temp_files():
85
+ """Test that temporary files are created and cleaned up properly."""
86
+
87
+ video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
88
+ max_images = 1
89
+
90
+ result = process_video(video_path, max_images)
91
+
92
+ # Verify temp file exists
93
+ image_path = result[1]["url"]
94
+ assert os.path.exists(image_path)
95
+ assert image_path.endswith(".png")
96
+
97
+
98
+ def test_process_video_invalid_path():
99
+ """Test that process_video handles invalid paths appropriately."""
100
+
101
+ with pytest.raises(ValueError):
102
+ process_video("nonexistent_video.mp4", 3)