import subprocess
subprocess.run('pip install flash-attn==2.7.0.post2 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True)
import spaces
import argparse
import os
import re
from typing import List, Optional, Tuple
import gradio as gr
import PIL.Image
import torch
import numpy as np
from moviepy.editor import VideoFileClip
from transformers import AutoModelForCausalLM
# --- Global Model Variable ---
# model = None
# This should point to the directory containing your SVG file.
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
# --- Helper Functions ---
def load_video_frames(video_path: Optional[str], n_frames: int = 8) -> Optional[List[PIL.Image.Image]]:
"""Extracts a specified number of frames from a video file."""
if not video_path:
return None
try:
with VideoFileClip(video_path) as clip:
total_frames = int(clip.fps * clip.duration)
if total_frames <= 0: return None
num_to_extract = min(n_frames, total_frames)
indices = np.linspace(0, total_frames - 1, num_to_extract, dtype=int)
frames = [PIL.Image.fromarray(clip.get_frame(index / clip.fps)) for index in indices]
return frames
except Exception as e:
print(f"Error processing video {video_path}: {e}")
return None
def parse_model_output(response_text: str, enable_thinking: bool) -> str:
"""Formats the model output, separating 'thinking' and 'response' parts if enabled."""
if enable_thinking:
think_match = re.search(r"(.*?)", response_text, re.DOTALL)
if think_match:
thinking_content = think_match.group(1).strip()
response_content = re.sub(r".*?", "", response_text, flags=re.DOTALL).strip()
return f"**Thinking:**\n```\n{thinking_content}\n```\n\n**Response:**\n{response_content}"
else:
return response_text
else:
return response_text
# --- Core Inference Logic ---
@spaces.GPU
def run_inference(
image_input: Optional[PIL.Image.Image],
video_input: Optional[str],
prompt: str,
do_sample: bool,
max_new_tokens: int,
enable_thinking: bool,
) -> List[List[str]]:
"""Runs a single turn of inference and formats the output for a gr.Chatbot."""
if (not image_input and not video_input and not prompt) or not prompt:
gr.Warning("A text prompt is required for generation.")
return []
content = []
if image_input:
content.append({"type": "image", "image": image_input})
if video_input:
frames = load_video_frames(video_input)
if frames: content.append({"type": "video", "video": frames})
else:
gr.Warning("Failed to process the video file.")
return [[prompt, "Error: Could not process the video file."]]
content.append({"type": "text", "text": prompt})
messages = [{"role": "user", "content": content}]
try:
if video_input:
input_ids, pixel_values, grid_thws = model.preprocess_inputs(messages=messages, add_generation_prompt=True, enable_thinking=enable_thinking, max_pixels=896*896)
else:
input_ids, pixel_values, grid_thws = model.preprocess_inputs(messages=messages, add_generation_prompt=True, enable_thinking=enable_thinking)
except Exception as e:
return [[prompt, f"Error during input preprocessing: {e}"]]
input_ids = input_ids.to(model.device)
if pixel_values is not None:
pixel_values = pixel_values.to(model.device, dtype=torch.bfloat16)
if grid_thws is not None:
grid_thws = grid_thws.to(model.device)
gen_kwargs = {
"max_new_tokens": max_new_tokens, "do_sample": do_sample,
"eos_token_id": model.text_tokenizer.eos_token_id, "pad_token_id": model.text_tokenizer.pad_token_id
}
with torch.inference_mode():
try:
outputs = model.generate(inputs=input_ids, pixel_values=pixel_values, grid_thws=grid_thws, **gen_kwargs)
except Exception as e:
return [[prompt, f"Error during model generation: {e}"]]
response_text = model.text_tokenizer.decode(outputs[0], skip_special_tokens=True)
formatted_response = parse_model_output(response_text, enable_thinking)
return [[prompt, formatted_response]]
# --- UI Helper Functions ---
def toggle_media_input(choice: str) -> Tuple:
"""Switches visibility between Image/Video inputs and their corresponding examples."""
if choice == "Image":
return gr.update(visible=True, value=None), gr.update(visible=False, value=None), gr.update(visible=True), gr.update(visible=False)
else: # Video
return gr.update(visible=False, value=None), gr.update(visible=True, value=None), gr.update(visible=False), gr.update(visible=True)
# --- Build Gradio Application ---
# @spaces.GPU
def build_demo(model_path: str):
"""Builds the Gradio user interface for the model."""
global model
device = f"cuda"
print(f"Loading model {model_path} onto device {device}...")
model = AutoModelForCausalLM.from_pretrained(
model_path, torch_dtype=torch.bfloat16, trust_remote_code=True
).to(device).eval()
print("Model loaded successfully.")
model_name_display = model_path.split('/')[-1]
# --- Logo & Header ---
logo_html = ""
logo_svg_path = os.path.join(CUR_DIR, "resource", "logo.svg")
if os.path.exists(logo_svg_path):
with open(logo_svg_path, "r", encoding="utf-8") as svg_file:
svg_content = svg_file.read()
font_size = "2.5em"
svg_content_styled = re.sub(r'(