Spaces:
Running
on
Zero
Running
on
Zero
import subprocess | |
subprocess.run('pip install flash-attn==2.7.0.post2 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) | |
import spaces | |
import argparse | |
import os | |
import re | |
from typing import List, Optional, Tuple | |
import gradio as gr | |
import PIL.Image | |
import torch | |
import numpy as np | |
from moviepy.editor import VideoFileClip | |
from transformers import AutoModelForCausalLM | |
# --- Global Model Variable --- | |
# model = None | |
# This should point to the directory containing your SVG file. | |
CUR_DIR = os.path.dirname(os.path.abspath(__file__)) | |
# --- Helper Functions --- | |
def load_video_frames(video_path: Optional[str], n_frames: int = 8) -> Optional[List[PIL.Image.Image]]: | |
"""Extracts a specified number of frames from a video file.""" | |
if not video_path: | |
return None | |
try: | |
with VideoFileClip(video_path) as clip: | |
total_frames = int(clip.fps * clip.duration) | |
if total_frames <= 0: return None | |
num_to_extract = min(n_frames, total_frames) | |
indices = np.linspace(0, total_frames - 1, num_to_extract, dtype=int) | |
frames = [PIL.Image.fromarray(clip.get_frame(index / clip.fps)) for index in indices] | |
return frames | |
except Exception as e: | |
print(f"Error processing video {video_path}: {e}") | |
return None | |
def parse_model_output(response_text: str, enable_thinking: bool) -> str: | |
"""Formats the model output, separating 'thinking' and 'response' parts if enabled.""" | |
if enable_thinking: | |
think_match = re.search(r"<think>(.*?)</think>", response_text, re.DOTALL) | |
if think_match: | |
thinking_content = think_match.group(1).strip() | |
response_content = re.sub(r"<think>.*?</think>", "", response_text, flags=re.DOTALL).strip() | |
return f"**Thinking:**\n```\n{thinking_content}\n```\n\n**Response:**\n{response_content}" | |
else: | |
return response_text | |
else: | |
return response_text | |
# --- Core Inference Logic --- | |
def run_inference( | |
image_input: Optional[PIL.Image.Image], | |
video_input: Optional[str], | |
prompt: str, | |
do_sample: bool, | |
max_new_tokens: int, | |
enable_thinking: bool, | |
) -> List[List[str]]: | |
"""Runs a single turn of inference and formats the output for a gr.Chatbot.""" | |
if (not image_input and not video_input and not prompt) or not prompt: | |
gr.Warning("A text prompt is required for generation.") | |
return [] | |
content = [] | |
if image_input: | |
content.append({"type": "image", "image": image_input}) | |
if video_input: | |
frames = load_video_frames(video_input) | |
if frames: content.append({"type": "video", "video": frames}) | |
else: | |
gr.Warning("Failed to process the video file.") | |
return [[prompt, "Error: Could not process the video file."]] | |
content.append({"type": "text", "text": prompt}) | |
messages = [{"role": "user", "content": content}] | |
try: | |
if video_input: | |
input_ids, pixel_values, grid_thws = model.preprocess_inputs(messages=messages, add_generation_prompt=True, enable_thinking=enable_thinking, max_pixels=896*896) | |
else: | |
input_ids, pixel_values, grid_thws = model.preprocess_inputs(messages=messages, add_generation_prompt=True, enable_thinking=enable_thinking) | |
except Exception as e: | |
return [[prompt, f"Error during input preprocessing: {e}"]] | |
input_ids = input_ids.to(model.device) | |
if pixel_values is not None: | |
pixel_values = pixel_values.to(model.device, dtype=torch.bfloat16) | |
if grid_thws is not None: | |
grid_thws = grid_thws.to(model.device) | |
gen_kwargs = { | |
"max_new_tokens": max_new_tokens, "do_sample": do_sample, | |
"eos_token_id": model.text_tokenizer.eos_token_id, "pad_token_id": model.text_tokenizer.pad_token_id | |
} | |
with torch.inference_mode(): | |
try: | |
outputs = model.generate(inputs=input_ids, pixel_values=pixel_values, grid_thws=grid_thws, **gen_kwargs) | |
except Exception as e: | |
return [[prompt, f"Error during model generation: {e}"]] | |
response_text = model.text_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
formatted_response = parse_model_output(response_text, enable_thinking) | |
return [[prompt, formatted_response]] | |
# --- UI Helper Functions --- | |
def toggle_media_input(choice: str) -> Tuple: | |
"""Switches visibility between Image/Video inputs and their corresponding examples.""" | |
if choice == "Image": | |
return gr.update(visible=True, value=None), gr.update(visible=False, value=None), gr.update(visible=True), gr.update(visible=False) | |
else: # Video | |
return gr.update(visible=False, value=None), gr.update(visible=True, value=None), gr.update(visible=False), gr.update(visible=True) | |
# --- Build Gradio Application --- | |
# @spaces.GPU | |
def build_demo(model_path: str): | |
"""Builds the Gradio user interface for the model.""" | |
global model | |
device = f"cuda" | |
print(f"Loading model {model_path} onto device {device}...") | |
model = AutoModelForCausalLM.from_pretrained( | |
model_path, torch_dtype=torch.bfloat16, trust_remote_code=True | |
).to(device).eval() | |
print("Model loaded successfully.") | |
model_name_display = model_path.split('/')[-1] | |
# --- Logo & Header --- | |
logo_html = "" | |
logo_svg_path = os.path.join(CUR_DIR, "resource", "logo.svg") | |
if os.path.exists(logo_svg_path): | |
with open(logo_svg_path, "r", encoding="utf-8") as svg_file: | |
svg_content = svg_file.read() | |
font_size = "2.5em" | |
svg_content_styled = re.sub(r'(<svg[^>]*)(>)', rf'\1 height="{font_size}" style="vertical-align: middle; display: inline-block;"\2', svg_content) | |
logo_html = f'<span style="display: inline-block; vertical-align: middle;">{svg_content_styled}</span>' | |
else: | |
# Fallback if SVG is not found | |
logo_html = '<span style="font-weight: bold; font-size: 2.5em; display: inline-block; vertical-align: middle;">Ovis</span>' | |
print(f"Warning: Logo file not found at {logo_svg_path}. Using text fallback.") | |
html_header = f""" | |
<p align="center" style="font-size: 2.5em; line-height: 1;"> | |
{logo_html} | |
<span style="display: inline-block; vertical-align: middle;">{model_name_display}</span> | |
</p> | |
<center><font size=3><b>Ovis</b> has been open-sourced on <a href='https://huggingface.co/{model_path}'>😊 Huggingface</a> and <a href='https://github.com/AIDC-AI/Ovis'>🌟 GitHub</a>. If you find Ovis useful, a like❤️ or a star🌟 would be appreciated.</font></center> | |
""" | |
with gr.Blocks(theme=gr.themes.Ocean()) as demo: | |
gr.HTML(html_header) | |
gr.Markdown(f"This interface is served by a single model. Each submission starts a new, independent conversation.") | |
with gr.Row(): | |
# --- Left Column (Media Inputs, Settings, Prompt & Actions) --- | |
with gr.Column(scale=4): | |
input_type_radio = gr.Radio(choices=["Image"], value="Image", label="Select Input Type") | |
image_input = gr.Image(label="Image Input", type="pil", visible=True) | |
video_input = gr.Video(label="Video Input", visible=False) | |
with gr.Accordion("Generation Settings", open=True): | |
do_sample = gr.Checkbox(label="Enable Sampling (Do Sample)", value=False) | |
max_new_tokens = gr.Slider(minimum=32, maximum=4096, value=1024, step=32, label="Max New Tokens") | |
enable_thinking = gr.Checkbox(label="Enable Deep Thinking", value=True) | |
prompt_input = gr.Textbox(label="Prompt", placeholder="Enter your text here and press ENTER", lines=3) | |
with gr.Row(): | |
generate_btn = gr.Button("Send", variant="primary") | |
clear_btn = gr.Button("Clear", variant="secondary") | |
with gr.Column(visible=True) as image_examples_col: | |
gr.Examples( | |
examples=[ | |
[os.path.join(CUR_DIR, "examples", "ovis2_math0.jpg"), "Each face of the polyhedron shown is either a triangle or a square. Each square borders 4 triangles, and each triangle borders 3 squares. The polyhedron has 6 squares. How many triangles does it have?\n\nProvide a step-by-step solution to the problem, and conclude with 'the answer is' followed by the final solution."], | |
[os.path.join(CUR_DIR, "examples", "ovis2_math1.jpg"), "A large square touches another two squares, as shown in the picture. The numbers inside the smaller squares indicate their areas. What is the area of the largest square?\n\nProvide a step-by-step solution to the problem, and conclude with 'the answer is' followed by the final solution."], | |
[os.path.join(CUR_DIR, "examples", "ovis2_figure0.png"), "Explain this model."], | |
[os.path.join(CUR_DIR, "examples", "ovis2_figure1.png"), "Organize the notes about GRPO in the figure."], | |
[os.path.join(CUR_DIR, "examples", "ovis2_multi0.jpg"), "Posso avere un frappuccino e un caffè americano di taglia M? Quanto costa in totale?"], | |
], | |
inputs=[image_input, prompt_input] | |
) | |
# with gr.Column(visible=False) as video_examples_col: | |
# gr.Examples(examples=[[os.path.join(CUR_DIR, "examples", "video_demo_1.mp4"), "Describe the video."]], | |
# inputs=[video_input, prompt_input]) | |
# --- Right Column (Chat Display) --- | |
with gr.Column(scale=6): | |
chatbot = gr.Chatbot(label="Ovis", height=750, show_copy_button=True, layout="panel") | |
# --- Event Handlers --- | |
input_type_radio.change( | |
fn=toggle_media_input, | |
inputs=input_type_radio, | |
outputs=[image_input, video_input, image_examples_col] | |
) | |
run_inputs = [image_input, video_input, prompt_input, do_sample, max_new_tokens, enable_thinking] | |
generate_btn.click(fn=run_inference, inputs=run_inputs, outputs=chatbot) | |
prompt_input.submit(fn=run_inference, inputs=run_inputs, outputs=chatbot) | |
clear_btn.click( | |
fn=lambda: ([], None, None, "", "Image", False, 1024, True), | |
outputs=[chatbot, image_input, video_input, prompt_input, input_type_radio, do_sample, max_new_tokens, enable_thinking] | |
).then( | |
fn=toggle_media_input, | |
inputs=input_type_radio, | |
outputs=[image_input, video_input, image_examples_col] | |
) | |
return demo | |
# --- Main Execution Block --- | |
# def parse_args(): | |
# parser = argparse.ArgumentParser(description="Gradio interface for a single Multimodal Large Language Model.") | |
# parser.add_argument("--model-path", type=str, default='AIDC-AI/Ovis2.5-9B', help="Path to the model checkpoint on Hugging Face Hub or local directory.") | |
# parser.add_argument("--gpu", type=int, default=0, help="GPU index to run the model on.") | |
# parser.add_argument("--port", type=int, default=7860, help="Port to run the Gradio server on.") | |
# parser.add_argument("--server-name", type=str, default="0.0.0.0", help="Server name for the Gradio app.") | |
# return parser.parse_args() | |
# if __name__ == "__main__": | |
# if not os.path.exists("examples"): os.makedirs("examples") | |
# if not os.path.exists("resource"): os.makedirs("resource") | |
# print("Note: For the logo to display correctly, place 'logo.svg' inside the 'resource' directory.") | |
# example_files = [ | |
# "ovis2_math0.jpg", | |
# "ovis2_math1.jpg", | |
# "ovis2_figure0.png", | |
# "ovis2_figure1.png", | |
# "ovis2_multi0.jpg", | |
# "video_demo_1.mp4", | |
# ] | |
# for fname in example_files: | |
# fpath = os.path.join("examples", fname) | |
# if not os.path.exists(fpath): | |
# if fname.endswith(".mp4"): | |
# os.system(f'ffmpeg -y -f lavfi -i "smptebars=size=128x72:rate=10" -t 3 -pix_fmt yuv420p "{fpath}" >/dev/null 2>&1') | |
# else: | |
# PIL.Image.new('RGB', (224, 224), color = 'grey').save(fpath) | |
model_path = 'AIDC-AI/Ovis2.5-9B' | |
demo = build_demo(model_path=model_path) | |
# print(f"Launching Gradio app on http://{args.server_name}:{args.port}") | |
# demo.queue().launch(server_name=args.server_name, server_port=args.port, share=False, ssl_verify=False) | |
demo.launch() |