Spaces:
Running
on
Zero
Running
on
Zero
import subprocess | |
subprocess.run( | |
"pip install flash-attn --no-build-isolation", | |
shell=True, | |
) | |
import base64 | |
from collections import Counter | |
from io import BytesIO | |
import re | |
from PIL import Image, ImageDraw | |
import gradio as gr | |
import spaces | |
import torch | |
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor | |
from qwen_vl_utils import process_vision_info, smart_resize | |
repo_id = "hal-utokyo/MangaLMM" | |
processor = AutoProcessor.from_pretrained("alfredplpl/MangaLMM") | |
# preload | |
device = "cuda" | |
model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
repo_id, | |
torch_dtype=torch.bfloat16, | |
attn_implementation="flash_attention_2", | |
device_map=device, | |
) | |
def pil2base64(image: Image.Image) -> str: | |
buffered = BytesIO() | |
image.save(buffered, format="PNG") | |
return base64.b64encode(buffered.getvalue()).decode() | |
def bbox2d_to_quad(bbox_2d): | |
xmin, ymin, xmax, ymax = bbox_2d | |
return [xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax] | |
def normalize_repeated_symbols(text): | |
text = re.sub(r'([~\~\〜\-\ー]+)', lambda m: m.group(1)[0], text) | |
text = re.sub(r'[~~〜]', '~', text) | |
text = re.sub(r'[-ー]', '-', text) | |
return text | |
def normalize_punctuation(text): | |
conversion_map = { | |
"!": "!", | |
"?": "?", | |
"…": "..." | |
} | |
text = re.sub("|".join(map(re.escape, conversion_map.keys())), lambda m: conversion_map[m.group()], text) | |
text = re.sub(r'[・・.]', '・', text) | |
return text | |
def restore_chouon(text): | |
# hirakana + katakana + kanji | |
# jp_range = r"ぁ-んァ-ン一-龯㐀-䶵" # \u3400-\u4DBF = r"㐀-䶵" | |
# Extended Unicode version: covers Hiragana, Katakana, and a wide range of Kanji (including Extension A) | |
jp_range = r"\u3040-\u309F\u30A0-\u30FF\u3400-\u4DBF\u4E00-\u9FFF" | |
pattern = rf"(?<=[{jp_range}])-(?=[{jp_range}])" | |
return re.sub(pattern, "ー", text) | |
def process_text(text: str) -> str: | |
text = re.sub(r"[\s\u3000]+", "", text) | |
text = normalize_repeated_symbols(text) | |
text = normalize_punctuation(text) | |
text = restore_chouon(text) | |
return text | |
def parse_ocr_text(text: str) -> list[list]: | |
if not text.strip(): | |
return [] | |
# handle escape | |
text = text.replace('\\"', '"') | |
# find \n\t{ ... } blocks | |
blocks = re.findall(r"\n\t\{.*?\}", text, re.DOTALL) | |
# extract OCR text and bounding box | |
ocrs = [] | |
for block in blocks: | |
block = block.strip() # remove \n\t | |
bbox_match = re.search(r'"bbox_2d"\s*:\s*\[([^\]]+)\]', block, flags=re.DOTALL) | |
text_match = re.search( | |
r'"text_content"\s*:\s*"([^"]*)"', block, flags=re.DOTALL | |
) | |
if bbox_match and text_match: | |
try: | |
bbox_list = [int(x.strip()) for x in bbox_match.group(1).split(",")] | |
content = process_text(text_match.group(1)) | |
quad = bbox2d_to_quad(bbox_list) | |
ocrs.append([content, quad]) | |
except: | |
continue | |
# remove duplicates (sometimes the model generates the same text multiple times) | |
counter = Counter([ocr[0] for ocr in ocrs]) | |
ocrs = [ocr for ocr in ocrs if counter[ocr[0]] < 10] | |
return ocrs | |
def inference_fn( | |
image: Image.Image | None, | |
text: str | None, | |
# progress=gr.Progress(track_tqdm=True), | |
) -> tuple[str, str, Image.Image | None]: | |
if image is None: | |
gr.Warning("Please upload an image!", duration=10) | |
return "Please upload an image!", "Please upload an image!", None | |
if image.width * image.height > 2116800: | |
gr.Warning("The image size is too large! We resize it to smaller size.", duration=10) | |
resized_height, resized_width = smart_resize( | |
height=image.height, | |
width=image.width, | |
factor=processor.image_processor.patch_size * processor.image_processor.merge_size, | |
min_pixels=processor.image_processor.min_pixels, | |
max_pixels=processor.image_processor.max_pixels, | |
) | |
image = image.resize((resized_width, resized_height), resample=Image.Resampling.BICUBIC) | |
if text is None or text.strip() == "": | |
# OCR | |
text = "Please perform OCR on this image and output the recognized Japanese text along with its position (grounding)." | |
base64_image = pil2base64(image) | |
messages = [ | |
{"role": "user", "content": [ | |
{"type": "image", "image": f"data:image;base64,{base64_image}"}, | |
{"type": "text", "text": text}, | |
]}, | |
] | |
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
image_inputs, video_inputs = process_vision_info(messages) | |
inputs = processor( | |
text=[text], | |
images=image_inputs, | |
videos=video_inputs, | |
padding=True, | |
return_tensors="pt", | |
) | |
inputs = inputs.to(model.device) | |
generated_ids = model.generate(**inputs, max_new_tokens=4096) | |
generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] | |
raw_output = processor.batch_decode( | |
generated_ids_trimmed, | |
skip_special_tokens=True, | |
clean_up_tokenization_spaces=False, | |
)[0] | |
result_image = image_inputs[0].copy() | |
ocrs = parse_ocr_text(raw_output) | |
if not ocrs: | |
return raw_output, "OCR feature was not performed.", result_image | |
draw = ImageDraw.Draw(result_image) | |
ocr_texts = [] | |
for ocr_text, quad in ocrs: | |
ocr_texts.append(f'{ocr_text} ({quad[0]}, {quad[1]}, {quad[4]}, {quad[5]})') | |
for i in range(4): | |
start_point = quad[i*2:i*2+2] | |
end_point = quad[i*2+2:i*2+4] if i < 3 else quad[:2] | |
draw.line(start_point + end_point, fill="red", width=4) | |
draw.polygon(quad, outline="red", width=4) | |
# draw.text((quad[0], quad[1]), ocr_text, fill="red") | |
ocr_texts_str = "\n".join(ocr_texts) | |
return "No question was entered.", ocr_texts_str, result_image | |
with gr.Blocks() as demo: | |
gr.Markdown("""# MangaLMM Unofficial Demo | |
 | |
We propose MangaVQA and MangaLMM, which are a benchmark and a specialized LMM for multimodal manga understanding. | |
This demo uses our [MangaLMM model](https://huggingface.co/hal-utokyo/MangaLMM) to perform OCR on an image of manga panels and answer a question about the image. | |
Please ensure that the image contains fewer than 2116800 pixels. (e.g. 1600x1200, 1920x1080, etc.) If more, we resize it to smaller size. | |
*Note: This model is for research purposes only and may return incorrect results. Please use it at your own risk.* | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
input_button = gr.Button(value="Submit") | |
input_text = gr.Textbox( | |
label="Input Text", lines=5, max_lines=5, | |
placeholder="Please enter a question about your image.\nEmpty text will perform OCR.", | |
) | |
input_image = gr.Image(label="Input Image", image_mode="RGB", type="pil") | |
with gr.Column(): | |
vqa_text = gr.Textbox(label="VQA Result", lines=2, max_lines=2) | |
ocr_text = gr.Textbox(label="OCR Result", lines=3, max_lines=3) | |
ocr_image = gr.Image(label="OCR Result", type="pil", show_label=False) | |
input_button.click( | |
fn=inference_fn, | |
inputs=[input_image, input_text], | |
outputs=[vqa_text, ocr_text, ocr_image], | |
) | |
ocr_examples = gr.Examples( | |
examples=[], | |
fn=inference_fn, | |
inputs=[input_image, input_text], | |
outputs=[vqa_text, ocr_text, ocr_image], | |
cache_examples=False, | |
) | |
demo.queue().launch() | |