# app.py — ColPali + MCP (search-only) + GPT-5 follow-up responses # Images are injected by the app in new calls; no base64 is passed through MCP. import os import base64 import tempfile from io import BytesIO from urllib.request import urlretrieve from typing import List, Tuple, Dict, Any, Optional import gradio as gr from gradio_pdf import PDF import torch from pdf2image import convert_from_path from PIL import Image from torch.utils.data import DataLoader from tqdm import tqdm from colpali_engine.models import ColQwen2, ColQwen2Processor # Streaming Responses API from openai import OpenAI # ============================= # Globals & Config # ============================= api_key_env = os.getenv("OPENAI_API_KEY", "").strip() ds: List[torch.Tensor] = [] # page embeddings images: List[Image.Image] = [] # PIL images in page order current_pdf_path: Optional[str] = None device_map = ( "cuda:0" if torch.cuda.is_available() else ("mps" if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available() else "cpu") ) # ============================= # Load Model & Processor # ============================= model = ColQwen2.from_pretrained( "vidore/colqwen2-v1.0", torch_dtype=torch.bfloat16, device_map=device_map, attn_implementation="flash_attention_2", ).eval() processor = ColQwen2Processor.from_pretrained("vidore/colqwen2-v1.0") # ============================= # Utilities # ============================= def _ensure_model_device() -> str: dev = ( "cuda:0" if torch.cuda.is_available() else ("mps" if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available() else "cpu") ) if str(model.device) != dev: model.to(dev) return dev def encode_image_to_base64(image: Image.Image) -> str: """Encodes a PIL image to base64 (JPEG).""" buffered = BytesIO() image.save(buffered, format="JPEG") return base64.b64encode(buffered.getvalue()).decode("utf-8") # ============================= # Indexing Helpers # ============================= def convert_files(pdf_path: str) -> List[Image.Image]: """Convert a single PDF path into a list of PIL Images (pages).""" imgs = convert_from_path(pdf_path, thread_count=4) if len(imgs) >= 800: raise gr.Error("The number of images in the dataset should be less than 800.") return imgs def index_gpu(imgs: List[Image.Image]) -> str: """Embed a list of images (pages) with ColQwen2 (ColPali) and store in globals.""" global ds, images device = _ensure_model_device() # reset previous dataset ds = [] images = imgs dataloader = DataLoader( images, batch_size=4, shuffle=False, collate_fn=lambda x: processor.process_images(x).to(model.device), ) for batch_doc in tqdm(dataloader, desc="Indexing pages"): with torch.no_grad(): batch_doc = {k: v.to(device) for k, v in batch_doc.items()} embeddings_doc = model(**batch_doc) ds.extend(list(torch.unbind(embeddings_doc.to("cpu")))) return f"Indexed {len(images)} pages successfully." def index_from_path(pdf_path: str) -> str: imgs = convert_files(pdf_path) return index_gpu(imgs) def index_from_url(url: str) -> Tuple[str, str]: """ Download a PDF from URL and index it. Returns: (status_message, saved_pdf_path) """ tmp_dir = tempfile.mkdtemp(prefix="colpali_") local_path = os.path.join(tmp_dir, "document.pdf") urlretrieve(url, local_path) status = index_from_path(local_path) return status, local_path # ============================= # Local Search (ColPali) # ============================= def search(query: str, k: int = 5) -> List[int]: """ Search within an indexed PDF and return ONLY the indices of the most relevant pages (0-based). Returns: List[int]: Sorted unique 0-based indices of pages to inspect (includes neighbor expansion). """ global ds, images if not images or not ds: return [] k = max(1, min(int(k), len(images))) device = _ensure_model_device() with torch.no_grad(): batch_query = processor.process_queries([query]).to(model.device) embeddings_query = model(**batch_query) q_vecs = list(torch.unbind(embeddings_query.to("cpu"))) scores = processor.score(q_vecs, ds, device=device) top_k_indices = scores[0].topk(k).indices.tolist() print("[search]", query, top_k_indices) # Neighbor expansion for context base = set(top_k_indices) expanded = set(base) for i in base: expanded.add(i - 1) expanded.add(i + 1) expanded = {i for i in expanded if 0 <= i < len(images)} return sorted(expanded) def _build_image_parts_from_indices(indices: List[int]) -> List[Dict[str, Any]]: """Turn page indices into OpenAI vision content parts.""" parts: List[Dict[str, Any]] = [] seen = sorted({i for i in indices if 0 <= i < len(images)}) for idx in seen: b64 = encode_image_to_base64(images[idx]) parts.append({ "type": "input_image", "image_url": f"data:image/jpeg;base64,{b64}", }) return parts # ============================= # Agent System Prompt # ============================= SYSTEM = ( """ You are a PDF research agent with a single tool: mcp_test_search(query: string, k: int). Act iteratively: 1) Split the user question into 1–4 focused sub-queries. Subqueries should be asked as natural language questions in the english language, not just keywords. 2) For each sub-query, call mcp_test_search (k=5 by default; increase to up to 10 if you need to go deep). 3) You will receive the output of mcp_test_search as a list of indices correspondinf to page numbers. Print them out and stop generating. You will be fed the corresponding pages as images in a follow-up message. 3) Stop early when confident; otherwise refine and repeat, up to 4 iterations and 20 searches in total. If info is missing, try to continue searching using new keywords and queries. Workflow: • Use ONLY the provided images for grounding and cite as (p.). • If an answer is not present, say “Not found in the provided pages.” Deliverable: • Return a clear, standalone Markdown answer in the user's language. Include concise tables for lists of dates/items. """ ).strip() # ============================= # MCP config (search-only) # ============================= DEFAULT_MCP_SERVER_URL = "https://manu-mcp-test.hf.space/gradio_api/mcp/" DEFAULT_MCP_SERVER_LABEL = "colpali_rag" DEFAULT_ALLOWED_TOOLS = "mcp_test_search" # search-only; no get_pages # ============================= # Streaming Agent (multi-round with previous_response_id) # ============================= def stream_agent(question: str, api_key: str, model_name: str, server_url: str, server_label: str, require_approval: str, allowed_tools: str): """ Multi-round streaming: • Seed: optional local ColPali search on the user question to attach initial pages. • Each round: open a GPT-5 stream with *attached images* (if any). • If the model calls mcp_test_search and returns indices, we end the stream and start a NEW API call with previous_response_id + the requested pages attached. """ if not api_key: yield "⚠️ **Please provide your OpenAI API key.**", "", "" return if not images or not ds: yield "⚠️ **Index a PDF first in tab 1.**", "", "" return client = OpenAI(api_key=api_key) # Optional seeding: attach some likely pages on round 1 try: seed_indices = [] # search(question, k=5) or [] except Exception as e: yield f"❌ Search failed: {e}", "", "" return log_lines = ["Log", f"[seed] indices={seed_indices}"] prev_response_id: Optional[str] = None # MCP tool routing (search-only) tools = [{ "type": "mcp", "server_label": server_label or DEFAULT_MCP_SERVER_LABEL, "server_url": server_url or DEFAULT_MCP_SERVER_URL, "allowed_tools": [t.strip() for t in (allowed_tools or DEFAULT_ALLOWED_TOOLS).split(",") if t.strip()], "require_approval": require_approval or "never", }] # Shared mutable state for each round round_state: Dict[str, Any] = { "last_search_indices": None, "final_text": "", "summary_text": "", } def run_round(round_idx: int, attached_indices: List[int]): """ Stream one round. If tool results (indices) arrive, store them in round_state["last_search_indices"]. """ nonlocal prev_response_id round_state["last_search_indices"] = None round_state["final_text"] = "" round_state["summary_text"] = "" # Build the user content for this round parts: List[Dict[str, Any]] = [] if round_idx == 1: parts.append({"type": "input_text", "text": question}) else: parts.append({"type": "input_text", "text": "Continue reasoning with the newly attached pages."}) parts += _build_image_parts_from_indices(attached_indices) if attached_indices: pages_str = ", ".join(str(i + 1) for i in sorted(set(attached_indices))) parts.append({"type": "input_text", "text": f"(Attached pages: {pages_str}). Use ONLY these images; cite as (p.X)."}) # First call includes system; follow-ups use previous_response_id if prev_response_id: req_input = [{"role": "user", "content": parts}] else: req_input = [ {"role": "system", "content": SYSTEM}, {"role": "user", "content": parts}, ] req_kwargs = dict( model=model_name, input=req_input, reasoning={"effort": "medium", "summary": "auto"}, tools=tools, store=True, # persist conversation state on server ) if prev_response_id: req_kwargs["previous_response_id"] = prev_response_id # Helper: parse a JSON array of ints from tool result text def _maybe_parse_indices(chunk: str) -> List[int]: import json, re arrs = re.findall(r'\[[^\]]*\]', chunk) for s in reversed(arrs): try: val = json.loads(s) if isinstance(val, list) and all(isinstance(x, int) for x in val): return sorted({x for x in val if 0 <= x < len(images)}) except Exception: pass return [] tool_result_buffer = "" # accumulate tool result deltas try: with client.responses.stream(**req_kwargs) as stream: for event in stream: etype = getattr(event, "type", "") if etype == "response.output_text.delta": round_state["final_text"] += event.delta yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:]) elif etype == "response.reasoning_summary_text.delta": round_state["summary_text"] += event.delta yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:]) # Log tool call argument deltas (optional) elif etype in ("response.function_call_arguments.delta", "response.tool_call_arguments.delta"): delta = getattr(event, "delta", None) if delta: log_lines.append(str(delta)) # Capture tool RESULT text and try to parse indices elif etype.startswith("response.tool_result"): print("A tool output was detected") delta_text = getattr(event, "delta", "") or getattr(event, "output_text", "") if delta_text: tool_result_buffer += str(delta_text) parsed_now = _maybe_parse_indices(tool_result_buffer) if parsed_now: round_state["last_search_indices"] = parsed_now log_lines.append(f"[tool-result] indices={parsed_now}") yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:]) # Finalize this response; remember ID for follow-ups _final = stream.get_final_response() try: prev_response_id = getattr(_final, "id", None) except Exception: prev_response_id = None # Emit one last update after stream ends yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:]) except Exception as e: log_lines.append(f"[round {round_idx}] stream error: {e}") yield f"❌ {e}", round_state["summary_text"] or "", "\n".join(log_lines[-400:]) return # Controller: iterate rounds; if the model searched, attach those pages next max_rounds = 3 round_idx = 1 pending_indices = list(seed_indices) while round_idx <= max_rounds: print("Round ", round_idx, ", Indices: ", pending_indices) for final_md, summary_md, log_md in run_round(round_idx, pending_indices): yield final_md, summary_md, log_md # If the model returned indices via the tool, use them in a fresh call next_indices = round_state.get("last_search_indices") or [] if next_indices: pending_indices = next_indices round_idx += 1 continue # No further tool-driven retrieval → done break return # ============================= # Gradio UI # ============================= CUSTOM_CSS = """ :root { --bg: #0e1117; --panel: #111827; --accent: #7c3aed; --accent-2: #06b6d4; --text: #e5e7eb; --muted: #9ca3af; --border: #1f2937; } .gradio-container {max-width: 1180px !important; margin: 0 auto !important;} body {background: radial-gradient(1200px 600px at 20% -10%, rgba(124,58,237,.25), transparent 60%), radial-gradient(1000px 500px at 120% 10%, rgba(6,182,212,.2), transparent 60%), var(--bg) !important;} .app-header { display:flex; gap:16px; align-items:center; padding:20px 18px; margin:8px 0 12px; border:1px solid var(--border); border-radius:20px; background: linear-gradient(180deg, rgba(255,255,255,.02), rgba(255,255,255,.01)); box-shadow: 0 10px 30px rgba(0,0,0,.25), inset 0 1px 0 rgba(255,255,255,.05); } .app-header .icon { width:48px; height:48px; display:grid; place-items:center; border-radius:14px; background: linear-gradient(135deg, var(--accent), var(--accent-2)); color:white; font-size:26px; } .app-header h1 {font-size:22px; margin:0; color:var(--text); letter-spacing:.2px;} .app-header p {margin:2px 0 0; color:var(--muted); font-size:14px;} .card { border:1px solid var(--border); border-radius:18px; padding:14px 16px; background: linear-gradient(180deg, rgba(255,255,255,.02), rgba(255,255,255,.01)); box-shadow: 0 12px 28px rgba(0,0,0,.18), inset 0 1px 0 rgba(255,255,255,.04); } .gr-button-primary {border-radius:12px !important; font-weight:600;} .gradio-container .tabs {border-radius:16px; overflow:hidden; border:1px solid var(--border);} .markdown-wrap {min-height: 260px;} .summary-wrap {min-height: 180px;} .gr-markdown, .gr-prose { color: var(--text) !important; } .gr-markdown h1, .gr-markdown h2, .gr-markdown h3 {color: #f3f4f6;} .gr-markdown a {color: var(--accent-2); text-decoration: none;} .gr-markdown a:hover {text-decoration: underline;} .gr-markdown table {width: 100%; border-collapse: collapse; margin: 10px 0 16px;} .gr-markdown th, .gr-markdown td {border: 1px solid var(--border); padding: 8px 10px;} .gr-markdown th {background: rgba(255,255,255,.03);} .gr-markdown pre, .gr-markdown code { background: #0b1220; color: #eaeaf0; border-radius: 12px; border: 1px solid #172036; } .gr-markdown pre {padding: 12px 14px; overflow:auto;} .gr-markdown blockquote { border-left: 4px solid var(--accent); padding: 6px 12px; margin: 8px 0; color: #d1d5db; background: rgba(124,58,237,.06); border-radius: 8px; } .log-box { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; white-space: pre-wrap; color: #d1d5db; background:#0b1220; border:1px solid #172036; border-radius:14px; padding:12px; max-height:280px; overflow:auto; } """ def build_ui(): theme = gr.themes.Soft() with gr.Blocks(title="ColPali PDF RAG + Follow-up Responses", theme=theme, css=CUSTOM_CSS) as demo: gr.HTML( """
📚

ColPali PDF Search + Streaming Agent (Follow-up Responses)

Index PDFs with ColQwen2. The agent attaches images in follow-up GPT-5 calls; MCP is search-only.

""" ) # ---- Tab 1: Index & Preview with gr.Tab("1) Index & Preview"): with gr.Row(): with gr.Column(scale=1): pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"]) index_btn = gr.Button("📥 Index Uploaded PDF", variant="secondary") url_box = gr.Textbox( label="Or index from URL", placeholder="https://example.com/file.pdf", value="", ) index_url_btn = gr.Button("🌐 Load From URL", variant="secondary") status_box = gr.Textbox(label="Status", interactive=False) with gr.Column(scale=2): pdf_view = PDF(label="PDF Preview") # wiring def handle_upload(file): global current_pdf_path if file is None: return "Please upload a PDF.", None path = getattr(file, "name", file) status = index_from_path(path) current_pdf_path = path return status, path def handle_url(url: str): global current_pdf_path if not url or not url.lower().endswith(".pdf"): return "Please provide a direct PDF URL ending in .pdf", None status, path = index_from_url(url) current_pdf_path = path return status, path index_btn.click(handle_upload, inputs=[pdf_input], outputs=[status_box, pdf_view]) index_url_btn.click(handle_url, inputs=[url_box], outputs=[status_box, pdf_view]) # ---- Tab 2: Ask (Direct — returns indices) with gr.Tab("2) Ask (Direct — returns indices)"): with gr.Row(): with gr.Column(scale=1): query_box = gr.Textbox(placeholder="Enter your question…", label="Query", lines=4) k_slider = gr.Slider(minimum=1, maximum=10, step=1, label="Number of results (k)", value=5) search_button = gr.Button("🔍 Search", variant="primary") with gr.Column(scale=2): output_text = gr.Textbox(label="Indices (0-based)", lines=12, placeholder="[0, 1, 2, ...]") search_button.click(search, inputs=[query_box, k_slider], outputs=[output_text]) # ---- Tab 3: Agent (Streaming) with gr.Tab("3) Agent (Streaming)"): with gr.Row(equal_height=True): with gr.Column(scale=1): with gr.Group(): question = gr.Textbox( label="Your question", placeholder="Enter your question…", lines=8, elem_classes=["card"], ) run_btn = gr.Button("Run", variant="primary") with gr.Accordion("Connection & Model", open=False, elem_classes=["card"]): with gr.Row(): api_key_box = gr.Textbox( label="OpenAI API Key", placeholder="sk-...", type="password", value=api_key_env, ) model_box = gr.Dropdown( label="Model", choices=["gpt-5", "gpt-4.1", "gpt-4o"], value="gpt-5", ) with gr.Row(): server_url_box = gr.Textbox( label="MCP Server URL (search-only)", value=DEFAULT_MCP_SERVER_URL, ) server_label_box = gr.Textbox( label="MCP Server Label", value=DEFAULT_MCP_SERVER_LABEL, ) with gr.Row(): allowed_tools_box = gr.Textbox( label="Allowed Tools (comma-separated)", value=DEFAULT_ALLOWED_TOOLS, ) require_approval_box = gr.Dropdown( label="Require Approval", choices=["never", "auto", "always"], value="never", ) with gr.Column(scale=3): with gr.Tab("Answer (Markdown)"): final_md = gr.Markdown(value="", elem_classes=["card", "markdown-wrap"]) with gr.Tab("Live Summary (Markdown)"): summary_md = gr.Markdown(value="", elem_classes=["card", "summary-wrap"]) with gr.Tab("Event Log"): log_md = gr.Markdown(value="", elem_classes=["card", "log-box"]) run_btn.click( stream_agent, inputs=[ question, api_key_box, model_box, server_url_box, server_label_box, require_approval_box, allowed_tools_box, ], outputs=[final_md, summary_md, log_md], ) return demo if __name__ == "__main__": demo = build_ui() # mcp_server=True exposes this app's MCP endpoint at /gradio_api/mcp/ # We keep the MCP server available, but the agent never uses MCP to pass images. demo.queue(max_size=5).launch(debug=True, mcp_server=True)