#!/usr/bin/env python import os import re import tempfile from collections.abc import Iterator from threading import Thread import cv2 import gradio as gr import spaces import torch from loguru import logger from PIL import Image from transformers import AutoProcessor, TextIteratorStreamer # ───────────────────────────────────────────────────────────────────── # Model & processor # ───────────────────────────────────────────────────────────────────── MODEL_ID = os.getenv("MODEL_ID", "rmdhirr/Kenanga-11B-IT") processor = AutoProcessor.from_pretrained(MODEL_ID, padding_side="left") # Try Gemma-3 vision first; if it fails, fall back to Llama 3.2 Vision (Mllama) model = None _last_load_error = None try: from transformers import Gemma3ForConditionalGeneration model = Gemma3ForConditionalGeneration.from_pretrained( MODEL_ID, device_map="auto", torch_dtype=torch.bfloat16, attn_implementation="eager" ) except Exception as e: _last_load_error = e try: from transformers import MllamaForConditionalGeneration model = MllamaForConditionalGeneration.from_pretrained( MODEL_ID, device_map="auto", torch_dtype=torch.bfloat16, attn_implementation="eager" ) except Exception as e2: raise RuntimeError( f"Failed to load model as Gemma3 and Mllama.\nGemma3 error: {type(_last_load_error).__name__}: {_last_load_error}\n" f"Mllama error: {type(e2).__name__}: {e2}" ) MAX_NUM_IMAGES = int(os.getenv("MAX_NUM_IMAGES", "5")) # ───────────────────────────────────────────────────────────────────── # Identity controls (System Prompt + Stream Sanitizer + Optional Logit Ban) # ───────────────────────────────────────────────────────────────────── IDENTITY_PROMPT = ( "You are Kenanga, an Indonesian multimodal LVLM adapted for Sundanese and Javanese.\n" "Identity rules:\n" "• When referring to yourself, always say “Kenanga”.\n" "• Never claim to be Gemma/Llama or any base model. If asked about your base, reply briefly: " "“I’m Kenanga (locally adapted); please refer to me as Kenanga.”\n" "• Stay helpful, concise, and safe." ) BAN_BASE_NAMES = os.getenv("BAN_BASE_NAMES", "0") == "1" def _make_bad_words_ids(words): toks = processor.tokenizer ids = [] for w in words: for variant in {w, w.lower(), w.upper(), w.title(), " " + w, " " + w.lower()}: enc = toks(variant, add_special_tokens=False).input_ids if enc: ids.append(enc) # dedupe uniq, seen = [], set() for seq in ids: t = tuple(seq) if t and t not in seen: uniq.append(seq) seen.add(t) return uniq BAD_WORDS_IDS = _make_bad_words_ids([ "Gemma", "Gemma-3", "Gemma 3", "Gemma3", # Uncomment to ban base model family self-calls entirely: # "Llama", "LLaMA", "Llama 3", "Llama 3.2", "Llama3", "Llama3.2", ]) # Only rewrite self-identity claims; allow legitimate mentions in analysis/comparison text SELF_REF_PAT = re.compile( r"\b(?:(?:I\s*am|I'm|This\s+is|You'?re\s+chatting\s+with)\s+)(Gemma(?:[-\s]?3)?|LLa?ma(?:\s*3(?:\.2)?)?)\b", flags=re.IGNORECASE, ) AS_MODEL_PAT = re.compile( r"\bAs\s+(?:an?\s+)?(Gemma(?:[-\s]?3)?|LLa?ma(?:\s*3(?:\.2)?)?)\b", flags=re.IGNORECASE, ) THIS_MODEL_IS_PAT = re.compile( r"\b(This\s+model\s+is)\s+(Gemma(?:[-\s]?3)?|LLa?ma(?:\s*3(?:\.2)?)?)\b", flags=re.IGNORECASE, ) def sanitize_identity(text: str) -> str: text = SELF_REF_PAT.sub("I am Kenanga", text) text = AS_MODEL_PAT.sub("As Kenanga", text) text = THIS_MODEL_IS_PAT.sub(r"\1 Kenanga", text) return text # ───────────────────────────────────────────────────────────────────── # Media utilities # ───────────────────────────────────────────────────────────────────── def count_files_in_new_message(paths: list[str]) -> tuple[int, int]: image_count = 0 video_count = 0 for path in paths: if path.endswith(".mp4"): video_count += 1 else: image_count += 1 return image_count, video_count def count_files_in_history(history: list[dict]) -> tuple[int, int]: image_count = 0 video_count = 0 for item in history: if item["role"] != "user" or isinstance(item["content"], str): continue if item["content"][0].endswith(".mp4"): video_count += 1 else: image_count += 1 return image_count, video_count def validate_media_constraints(message: dict, history: list[dict]) -> bool: new_image_count, new_video_count = count_files_in_new_message(message["files"]) history_image_count, history_video_count = count_files_in_history(history) image_count = history_image_count + new_image_count video_count = history_video_count + new_video_count if video_count > 1: gr.Warning("Only one video is supported.") return False if video_count == 1: if image_count > 0: gr.Warning("Mixing images and videos is not allowed.") return False if "" in message["text"]: gr.Warning("Using tags with video files is not supported.") return False if video_count == 0 and image_count > MAX_NUM_IMAGES: gr.Warning(f"You can upload up to {MAX_NUM_IMAGES} images.") return False if "" in message["text"] and message["text"].count("") != new_image_count: gr.Warning("The number of tags in the text does not match the number of images.") return False return True def downsample_video(video_path: str) -> list[tuple[Image.Image, float]]: vidcap = cv2.VideoCapture(video_path) fps = vidcap.get(cv2.CAP_PROP_FPS) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_interval = max(total_frames // MAX_NUM_IMAGES, 1) frames: list[tuple[Image.Image, float]] = [] for i in range(0, min(total_frames, MAX_NUM_IMAGES * frame_interval), frame_interval): if len(frames) >= MAX_NUM_IMAGES: break vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) success, image = vidcap.read() if success: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(image) timestamp = round(i / fps, 2) frames.append((pil_image, timestamp)) vidcap.release() return frames def process_video(video_path: str) -> list[dict]: content = [] frames = downsample_video(video_path) for pil_image, timestamp in frames: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: pil_image.save(temp_file.name) content.append({"type": "text", "text": f"Frame {timestamp}:"}) content.append({"type": "image", "url": temp_file.name}) logger.debug(f"{content=}") return content def process_interleaved_images(message: dict) -> list[dict]: logger.debug(f"{message['files']=}") parts = re.split(r"()", message["text"]) logger.debug(f"{parts=}") content = [] image_index = 0 for part in parts: logger.debug(f"{part=}") if part == "": content.append({"type": "image", "url": message["files"][image_index]}) logger.debug(f"file: {message['files'][image_index]}") image_index += 1 elif part.strip(): content.append({"type": "text", "text": part.strip()}) elif isinstance(part, str) and part != "": content.append({"type": "text", "text": part}) logger.debug(f"{content=}") return content def process_new_user_message(message: dict) -> list[dict]: if not message["files"]: return [{"type": "text", "text": message["text"]}] if message["files"][0].endswith(".mp4"): return [{"type": "text", "text": message["text"]}, *process_video(message["files"][0])] if "" in message["text"]: return process_interleaved_images(message) return [ {"type": "text", "text": message["text"]}, *[{"type": "image", "url": path} for path in message["files"]], ] def process_history(history: list[dict]) -> list[dict]: messages = [] current_user_content: list[dict] = [] for item in history: if item["role"] == "assistant": if current_user_content: messages.append({"role": "user", "content": current_user_content}) current_user_content = [] messages.append({"role": "assistant", "content": [{"type": "text", "text": item["content"]}]}) else: content = item["content"] if isinstance(content, str): current_user_content.append({"type": "text", "text": content}) else: current_user_content.append({"type": "image", "url": content[0]}) return messages # ───────────────────────────────────────────────────────────────────── # Generation # ───────────────────────────────────────────────────────────────────── @spaces.GPU(duration=120) def run(message: dict, history: list[dict], system_prompt: str = "", max_new_tokens: int = 512) -> Iterator[str]: if not validate_media_constraints(message, history): yield "" return effective_sys = IDENTITY_PROMPT if not system_prompt else (IDENTITY_PROMPT + "\n\n" + system_prompt) messages = [] messages.append({"role": "system", "content": [{"type": "text", "text": effective_sys}]}) messages.extend(process_history(history)) messages.append({"role": "user", "content": process_new_user_message(message)}) inputs = processor.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ).to(device=model.device, dtype=torch.bfloat16) streamer = TextIteratorStreamer( processor.tokenizer, timeout=30.0, skip_prompt=True, skip_special_tokens=True ) generate_kwargs = dict( inputs, streamer=streamer, max_new_tokens=max_new_tokens, disable_compile=True, ) if BAN_BASE_NAMES and BAD_WORDS_IDS: generate_kwargs["bad_words_ids"] = BAD_WORDS_IDS t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() output = "" for delta in streamer: output += delta yield sanitize_identity(output) # ───────────────────────────────────────────────────────────────────── # Demo UI # ───────────────────────────────────────────────────────────────────── examples = [ [ { "text": "Abdi kudu di Jepang salila 10 poé, ka Tokyo, Kyoto, jeung Osaka. Pikirkeun sabaraha objek wisata di unggal kota teras bagi sabaraha poé keur tiap kota. Jieun rekomendasi transportasi umum.", "files": [], } ], [ { "text": "Tulisna kode matplotlib kanggo ngasilake diagram batang sing padha.", "files": ["assets/additional-examples/barchart.png"], } ], [ { "text": "Naon anu anéh tina video ieu?", "files": ["assets/additional-examples/tmp.mp4"], } ], [ { "text": "Aku wis duwe suplemen iki lan pengin tuku sing iki . Ana peringatan apa sing kudu dakkerteni?", "files": ["assets/additional-examples/pill1.png", "assets/additional-examples/pill2.png"], } ], [ { "text": "Tulis sajak anu diilhamkeun ku unsur visual tina gambar-gambar.", "files": ["assets/sample-images/06-1.png", "assets/sample-images/06-2.png"], } ], [ { "text": "Gawéna gending cendhak sing ka-inspirasi saka unsur visual ing gambar-gambar.", "files": [ "assets/sample-images/07-1.png", "assets/sample-images/07-2.png", "assets/sample-images/07-3.png", "assets/sample-images/07-4.png", ], } ], [ { "text": "Tulis carita pondok ngeunaan naon anu tiasa kajadian di ieu imah.", "files": ["assets/sample-images/08.png"], } ], [ { "text": "Gawe crita cekak adhedhasar urutan gambar.", "files": [ "assets/sample-images/09-1.png", "assets/sample-images/09-2.png", "assets/sample-images/09-3.png", "assets/sample-images/09-4.png", "assets/sample-images/09-5.png", ], } ], [ { "text": "Gambarkeun mahluk-mahluk anu bakal hirup di dunya ieu.", "files": ["assets/sample-images/10.png"], } ], [ { "text": "Waca teks sing ana ing gambar.", "files": ["assets/additional-examples/1.png"], } ], [ { "text": "Ieu tikét tanggal sabaraha jeung sabaraha hargana?", "files": ["assets/additional-examples/2.png"], } ], [ { "text": "Wacanen teks ing gambar lan tulisen ing format markdown.", "files": ["assets/additional-examples/3.png"], } ], [ { "text": "Itung nilai integral ieu.", "files": ["assets/additional-examples/4.png"], } ], [ { "text": "Naon warna bulu ucing ieu teh?", "files": ["assets/sample-images/01.png"], } ], [ { "text": "Tanda éta nyebut naon?", "files": ["assets/sample-images/02.png"], } ], [ { "text": "Bandhingna lan bedakake loro gambar kasebut.", "files": ["assets/sample-images/03.png"], } ], [ { "text": "Daptarkeun sakabéh obyék dina gambar sarta warnana.", "files": ["assets/sample-images/04.png"], } ], [ { "text": "Jlentrehna suasana adegan kasebut ku basa Jawa.", "files": ["assets/sample-images/05.png"], } ], ] DESCRIPTION = """\
This is a demo of Kenanga 11B IT, a multimodal Large Vision-Language Model (LVLM) adapted for Sundanese and Javanese support.
You can upload images, as well as interleaved images and videos. Video input is limited to single-turn conversations and must be in MP4 format.
""" demo = gr.ChatInterface( fn=run, type="messages", chatbot=gr.Chatbot(type="messages", scale=1, allow_tags=["image"]), textbox=gr.MultimodalTextbox(file_types=["image", ".mp4"], file_count="multiple", autofocus=True), multimodal=True, additional_inputs=[ gr.Textbox(label="System Prompt", value=IDENTITY_PROMPT), gr.Slider(label="Max New Tokens", minimum=100, maximum=2000, step=10, value=700), ], stop_btn=False, title="🌺 Kenanga 11B IT", description=DESCRIPTION, examples=examples, run_examples_on_click=False, cache_examples=False, css_paths="style.css", delete_cache=(1800, 1800), ) if __name__ == "__main__": demo.launch()