File size: 4,040 Bytes
9f373ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08b67ae
9f373ac
 
 
 
08b67ae
9f373ac
 
 
 
 
47fde9b
08b67ae
9f373ac
47fde9b
9f373ac
47fde9b
9f373ac
47fde9b
9f373ac
47fde9b
9f373ac
47fde9b
9f373ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch
import numpy as np
import av
import spaces
import gradio as gr
import os

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' 

processor = LlavaNextVideoProcessor.from_pretrained(model_name)
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    model_name,
    quantization_config=quantization_config,
    device_map='auto'
)

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.
    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

@spaces.GPU
def process_video(video_file, question_parts):
    # Open video and sample frames
    with av.open(video_file.name) as container: # Access file name from Gradio input
        total_frames = container.streams.video[0].frames
        indices = np.arange(0, total_frames, total_frames / 8).astype(int)
        video_clip = read_video_pyav(container, indices)

    # Combine question parts into a single question
    question = " ".join(question_parts)

    # Prepare conversation
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": f"{question}"},
                {"type": "video"},
            ],
        },
    ]
    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # Prepare inputs for the model
    input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device)

    # Generate output
    generate_kwargs = {"max_new_tokens": 500, "do_sample": False, "top_p": 0.9}
    output = model.generate(**input, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)[0]
    
    return generated_text.split("ASSISTANT: ", 1)[-1].strip()

def process_videos(video_files, question):
    """Processes multiple videos and answers a single question for each."""
    answers = []
    for video_file in video_files:
        video_name = os.path.basename(video_file.name)
        answer = process_video(video_file, question)
        answers.append(f"**Video: {video_name}**\n{answer}\n")
    return "\n---\n".join(answers)

# Define Gradio interface for multiple videos
def gradio_interface(videos, indoors_outdoors, standing_sitting, hands_free, interacting_screen):

    question = "Is the subject in the video"
    if indoors_outdoors:
        question += "present indoors or outdoors? "
    if standing_sitting:
        question += "standing or sitting? "
    if hands_free:
        question += "hands free or not? "
    if interacting_screen:
        question += "interacting with any screen in the background?"
    
    answers = process_videos(videos, question)
    return answers

iface = gr.Interface(
    fn=gradio_interface,
    inputs=[
        gr.File(label="Upload Videos", file_count="multiple"),
        gr.Checkbox(label="Indoors or Outdoors", value=False),
        gr.Checkbox(label="Standing or Sitting", value=False),
        gr.Checkbox(label="Hands Free or Not", value=False),
        gr.Checkbox(label="Interacting with Screen", value=False),
    ],
    outputs=gr.Textbox(label="Generated Answers"),
    title="Video Question Answering",
    description="Upload multiple videos and select questions to get answers."
)

if __name__ == "__main__":
    iface.launch(debug=True)