Spaces:
Running
Running
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| import os | |
| import json | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import requests | |
| from smolagents.mcp_client import MCPClient | |
| ACCESS_TOKEN = os.getenv("HF_TOKEN") | |
| print("Access token loaded.") | |
| # Function to encode image to base64 | |
| def encode_image(image_path): | |
| if not image_path: | |
| print("No image path provided") | |
| return None | |
| try: | |
| print(f"Encoding image from path: {image_path}") | |
| # If it's already a PIL Image | |
| if isinstance(image_path, Image.Image): | |
| image = image_path | |
| else: | |
| # Try to open the image file | |
| image = Image.open(image_path) | |
| # Convert to RGB if image has an alpha channel (RGBA) | |
| if image.mode == 'RGBA': | |
| image = image.convert('RGB') | |
| # Encode to base64 | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| print("Image encoded successfully") | |
| return img_str | |
| except Exception as e: | |
| print(f"Error encoding image: {e}") | |
| return None | |
| # Dictionary to store active MCP connections | |
| mcp_connections = {} | |
| def connect_to_mcp_server(server_url, server_name=None): | |
| """Connect to an MCP server and return available tools""" | |
| if not server_url: | |
| return None, "No server URL provided" | |
| try: | |
| # Create an MCP client and connect to the server | |
| client = MCPClient({"url": server_url}) | |
| # Get available tools | |
| tools = client.get_tools() | |
| # Store the connection for later use | |
| name = server_name or f"Server_{len(mcp_connections)}" | |
| mcp_connections[name] = {"client": client, "tools": tools, "url": server_url} | |
| return name, f"Successfully connected to {name} with {len(tools)} available tools" | |
| except Exception as e: | |
| print(f"Error connecting to MCP server: {e}") | |
| return None, f"Error connecting to MCP server: {str(e)}" | |
| def list_mcp_tools(server_name): | |
| """List available tools for a connected MCP server""" | |
| if server_name not in mcp_connections: | |
| return "Server not connected" | |
| tools = mcp_connections[server_name]["tools"] | |
| tool_info = [] | |
| for tool in tools: | |
| tool_info.append(f"- {tool.name}: {tool.description}") | |
| if not tool_info: | |
| return "No tools available for this server" | |
| return "\n".join(tool_info) | |
| def call_mcp_tool(server_name, tool_name, **kwargs): | |
| """Call a specific tool from an MCP server""" | |
| if server_name not in mcp_connections: | |
| return f"Server '{server_name}' not connected" | |
| client = mcp_connections[server_name]["client"] | |
| tools = mcp_connections[server_name]["tools"] | |
| # Find the requested tool | |
| tool = next((t for t in tools if t.name == tool_name), None) | |
| if not tool: | |
| return f"Tool '{tool_name}' not found on server '{server_name}'" | |
| try: | |
| # Call the tool with provided arguments | |
| result = client.call_tool(tool_name, kwargs) | |
| return result | |
| except Exception as e: | |
| print(f"Error calling MCP tool: {e}") | |
| return f"Error calling MCP tool: {str(e)}" | |
| def analyze_message_for_tool_call(message, active_mcp_servers, client, model_to_use, system_message): | |
| """Analyze a message to determine if an MCP tool should be called""" | |
| # Skip analysis if message is empty | |
| if not message or not message.strip(): | |
| return None, None | |
| # Get information about available tools | |
| tool_info = [] | |
| for server_name in active_mcp_servers: | |
| if server_name in mcp_connections: | |
| server_tools = mcp_connections[server_name]["tools"] | |
| for tool in server_tools: | |
| tool_info.append({ | |
| "server_name": server_name, | |
| "tool_name": tool.name, | |
| "description": tool.description | |
| }) | |
| if not tool_info: | |
| return None, None | |
| # Create a structured query for the LLM to analyze if a tool call is needed | |
| tools_desc = [] | |
| for info in tool_info: | |
| tools_desc.append(f"{info['server_name']}.{info['tool_name']}: {info['description']}") | |
| tools_string = "\n".join(tools_desc) | |
| analysis_system_prompt = f"""You are an assistant that helps determine if a user message requires using an external tool. | |
| Available tools: | |
| {tools_string} | |
| Your job is to: | |
| 1. Analyze the user's message | |
| 2. Determine if they're asking to use one of the tools | |
| 3. If yes, respond with a JSON object with the server_name, tool_name, and parameters | |
| 4. If no, respond with "NO_TOOL_NEEDED" | |
| Example 1: | |
| User: "Please turn this text into speech: Hello world" | |
| Response: {{"server_name": "kokoroTTS", "tool_name": "text_to_audio", "parameters": {{"text": "Hello world", "speed": 1.0}}}} | |
| Example 2: | |
| User: "What is the capital of France?" | |
| Response: NO_TOOL_NEEDED""" | |
| try: | |
| # Call the LLM to analyze the message | |
| response = client.chat_completion( | |
| model=model_to_use, | |
| messages=[ | |
| {"role": "system", "content": analysis_system_prompt}, | |
| {"role": "user", "content": message} | |
| ], | |
| temperature=0.2, # Low temperature for more deterministic responses | |
| max_tokens=300 | |
| ) | |
| analysis = response.choices[0].message.content | |
| print(f"Tool analysis: {analysis}") | |
| if "NO_TOOL_NEEDED" in analysis: | |
| return None, None | |
| # Try to extract JSON from the response | |
| json_start = analysis.find("{") | |
| json_end = analysis.rfind("}") + 1 | |
| if json_start < 0 or json_end <= 0: | |
| return None, None | |
| json_str = analysis[json_start:json_end] | |
| try: | |
| tool_call = json.loads(json_str) | |
| return tool_call.get("server_name"), { | |
| "tool_name": tool_call.get("tool_name"), | |
| "parameters": tool_call.get("parameters", {}) | |
| } | |
| except json.JSONDecodeError: | |
| print(f"Failed to parse tool call JSON: {json_str}") | |
| return None, None | |
| except Exception as e: | |
| print(f"Error analyzing message for tool calls: {str(e)}") | |
| return None, None | |
| def respond( | |
| message, | |
| image_files, | |
| history: list[tuple[str, str]], | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| frequency_penalty, | |
| seed, | |
| provider, | |
| custom_api_key, | |
| custom_model, | |
| model_search_term, | |
| selected_model, | |
| mcp_enabled=False, | |
| active_mcp_servers=None, | |
| mcp_interaction_mode="Natural Language" | |
| ): | |
| print(f"Received message: {message}") | |
| print(f"Received {len(image_files) if image_files else 0} images") | |
| print(f"History: {history}") | |
| print(f"System message: {system_message}") | |
| print(f"Max tokens: {max_tokens}, Temperature: {temperature}, Top-P: {top_p}") | |
| print(f"Frequency Penalty: {frequency_penalty}, Seed: {seed}") | |
| print(f"Selected provider: {provider}") | |
| print(f"Custom API Key provided: {bool(custom_api_key.strip())}") | |
| print(f"Selected model (custom_model): {custom_model}") | |
| print(f"Model search term: {model_search_term}") | |
| print(f"Selected model from radio: {selected_model}") | |
| print(f"MCP enabled: {mcp_enabled}") | |
| print(f"Active MCP servers: {active_mcp_servers}") | |
| print(f"MCP interaction mode: {mcp_interaction_mode}") | |
| # Determine which token to use | |
| token_to_use = custom_api_key if custom_api_key.strip() != "" else ACCESS_TOKEN | |
| if custom_api_key.strip() != "": | |
| print("USING CUSTOM API KEY: BYOK token provided by user is being used for authentication") | |
| else: | |
| print("USING DEFAULT API KEY: Environment variable HF_TOKEN is being used for authentication") | |
| # Initialize the Inference Client with the provider and appropriate token | |
| client = InferenceClient(token=token_to_use, provider=provider) | |
| print(f"Hugging Face Inference Client initialized with {provider} provider.") | |
| # Convert seed to None if -1 (meaning random) | |
| if seed == -1: | |
| seed = None | |
| # Determine which model to use | |
| model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model | |
| print(f"Model selected for inference: {model_to_use}") | |
| # Process MCP commands in command mode | |
| if mcp_enabled and message: | |
| if message.startswith("/mcp"): # Always handle explicit commands | |
| # Handle MCP command | |
| command_parts = message.split(" ", 3) | |
| if len(command_parts) < 3: | |
| return "Invalid MCP command. Format: /mcp <server_name> <tool_name> [arguments]" | |
| _, server_name, tool_name = command_parts[:3] | |
| args_json = "{}" if len(command_parts) < 4 else command_parts[3] | |
| try: | |
| args_dict = json.loads(args_json) | |
| result = call_mcp_tool(server_name, tool_name, **args_dict) | |
| if isinstance(result, dict): | |
| return json.dumps(result, indent=2) | |
| return str(result) | |
| except json.JSONDecodeError: | |
| return f"Invalid JSON arguments: {args_json}" | |
| except Exception as e: | |
| return f"Error executing MCP command: {str(e)}" | |
| elif mcp_interaction_mode == "Natural Language" and active_mcp_servers: | |
| # Use natural language processing to detect tool calls | |
| server_name, tool_info = analyze_message_for_tool_call( | |
| message, | |
| active_mcp_servers, | |
| client, | |
| model_to_use, | |
| system_message | |
| ) | |
| if server_name and tool_info: | |
| try: | |
| # Call the detected tool | |
| print(f"Calling tool via natural language: {server_name}.{tool_info['tool_name']} with parameters: {tool_info['parameters']}") | |
| result = call_mcp_tool(server_name, tool_info['tool_name'], **tool_info['parameters']) | |
| # Format the response to include what was done | |
| if isinstance(result, dict): | |
| result_str = json.dumps(result, indent=2) | |
| else: | |
| result_str = str(result) | |
| return f"I used the {tool_info['tool_name']} tool from {server_name} with your request.\n\nResult:\n{result_str}" | |
| except Exception as e: | |
| print(f"Error executing MCP tool via natural language: {str(e)}") | |
| # Continue with normal response if tool call fails | |
| # Create multimodal content if images are present | |
| if image_files and len(image_files) > 0: | |
| # Process the user message to include images | |
| user_content = [] | |
| # Add text part if there is any | |
| if message and message.strip(): | |
| user_content.append({ | |
| "type": "text", | |
| "text": message | |
| }) | |
| # Add image parts | |
| for img in image_files: | |
| if img is not None: | |
| # Get raw image data from path | |
| try: | |
| encoded_image = encode_image(img) | |
| if encoded_image: | |
| user_content.append({ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{encoded_image}" | |
| } | |
| }) | |
| except Exception as e: | |
| print(f"Error encoding image: {e}") | |
| else: | |
| # Text-only message | |
| user_content = message | |
| # Add information about available MCP tools to the system message if MCP is enabled | |
| augmented_system_message = system_message | |
| if mcp_enabled and active_mcp_servers: | |
| tool_info = [] | |
| for server_name in active_mcp_servers: | |
| if server_name in mcp_connections: | |
| server_tools = list_mcp_tools(server_name).split("\n") | |
| tool_info.extend([f"{server_name}: {tool}" for tool in server_tools]) | |
| if tool_info: | |
| mcp_tools_description = "\n".join(tool_info) | |
| if mcp_interaction_mode == "Command Mode": | |
| augmented_system_message += f"\n\nYou have access to the following MCP tools:\n{mcp_tools_description}\n\nTo use these tools, the user can type a command in the format: /mcp <server_name> <tool_name> <arguments_json>" | |
| else: | |
| augmented_system_message += f"\n\nYou have access to the following MCP tools:\n{mcp_tools_description}\n\nThe user can use these tools by describing what they want in natural language, and the system will automatically detect when to use a tool based on their request." | |
| # Prepare messages in the format expected by the API | |
| messages = [{"role": "system", "content": augmented_system_message}] | |
| print("Initial messages array constructed.") | |
| # Add conversation history to the context | |
| for val in history: | |
| user_part = val[0] | |
| assistant_part = val[1] | |
| if user_part: | |
| # Handle both text-only and multimodal messages in history | |
| if isinstance(user_part, tuple) and len(user_part) == 2: | |
| # This is a multimodal message with text and images | |
| history_content = [] | |
| if user_part[0]: # Text | |
| history_content.append({ | |
| "type": "text", | |
| "text": user_part[0] | |
| }) | |
| for img in user_part[1]: # Images | |
| if img: | |
| try: | |
| encoded_img = encode_image(img) | |
| if encoded_img: | |
| history_content.append({ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{encoded_img}" | |
| } | |
| }) | |
| except Exception as e: | |
| print(f"Error encoding history image: {e}") | |
| messages.append({"role": "user", "content": history_content}) | |
| else: | |
| # Regular text message | |
| messages.append({"role": "user", "content": user_part}) | |
| print(f"Added user message to context (type: {type(user_part)})") | |
| if assistant_part: | |
| messages.append({"role": "assistant", "content": assistant_part}) | |
| print(f"Added assistant message to context: {assistant_part}") | |
| # Append the latest user message | |
| messages.append({"role": "user", "content": user_content}) | |
| print(f"Latest user message appended (content type: {type(user_content)})") | |
| # Determine which model to use, prioritizing custom_model if provided | |
| model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model | |
| print(f"Model selected for inference: {model_to_use}") | |
| # Start with an empty string to build the response as tokens stream in | |
| response = "" | |
| print(f"Sending request to {provider} provider.") | |
| # Prepare parameters for the chat completion request | |
| parameters = { | |
| "max_tokens": max_tokens, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "frequency_penalty": frequency_penalty, | |
| } | |
| if seed is not None: | |
| parameters["seed"] = seed | |
| # Use the InferenceClient for making the request | |
| try: | |
| # Create a generator for the streaming response | |
| stream = client.chat_completion( | |
| model=model_to_use, | |
| messages=messages, | |
| stream=True, | |
| **parameters | |
| ) | |
| print("Received tokens: ", end="", flush=True) | |
| # Process the streaming response | |
| for chunk in stream: | |
| if hasattr(chunk, 'choices') and len(chunk.choices) > 0: | |
| # Extract the content from the response | |
| if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): | |
| token_text = chunk.choices[0].delta.content | |
| if token_text: | |
| print(token_text, end="", flush=True) | |
| response += token_text | |
| yield response | |
| print() | |
| except Exception as e: | |
| print(f"Error during inference: {e}") | |
| response += f"\nError: {str(e)}" | |
| yield response | |
| print("Completed response generation.") | |
| # GRADIO UI | |
| with gr.Blocks(theme="Nymbo/Nymbo_Theme") as demo: | |
| # Create the chatbot component | |
| chatbot = gr.Chatbot( | |
| height=600, | |
| show_copy_button=True, | |
| placeholder="Select a model and begin chatting. Now supports multiple inference providers, multimodal inputs, and MCP tools", | |
| layout="panel" | |
| ) | |
| print("Chatbot interface created.") | |
| # Multimodal textbox for messages (combines text and file uploads) | |
| msg = gr.MultimodalTextbox( | |
| placeholder="Type a message or upload images...", | |
| show_label=False, | |
| container=False, | |
| scale=12, | |
| file_types=["image"], | |
| file_count="multiple", | |
| sources=["upload"] | |
| ) | |
| # Create accordion for settings | |
| with gr.Accordion("Settings", open=False): | |
| # System message | |
| system_message_box = gr.Textbox( | |
| value="You are a helpful AI assistant that can understand images and text.", | |
| placeholder="You are a helpful assistant.", | |
| label="System Prompt" | |
| ) | |
| # Generation parameters | |
| with gr.Row(): | |
| with gr.Column(): | |
| max_tokens_slider = gr.Slider( | |
| minimum=1, | |
| maximum=4096, | |
| value=512, | |
| step=1, | |
| label="Max tokens" | |
| ) | |
| temperature_slider = gr.Slider( | |
| minimum=0.1, | |
| maximum=4.0, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature" | |
| ) | |
| top_p_slider = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.95, | |
| step=0.05, | |
| label="Top-P" | |
| ) | |
| with gr.Column(): | |
| frequency_penalty_slider = gr.Slider( | |
| minimum=-2.0, | |
| maximum=2.0, | |
| value=0.0, | |
| step=0.1, | |
| label="Frequency Penalty" | |
| ) | |
| seed_slider = gr.Slider( | |
| minimum=-1, | |
| maximum=65535, | |
| value=-1, | |
| step=1, | |
| label="Seed (-1 for random)" | |
| ) | |
| # Provider selection | |
| providers_list = [ | |
| "hf-inference", # Default Hugging Face Inference | |
| "cerebras", # Cerebras provider | |
| "together", # Together AI | |
| "sambanova", # SambaNova | |
| "novita", # Novita AI | |
| "cohere", # Cohere | |
| "fireworks-ai", # Fireworks AI | |
| "hyperbolic", # Hyperbolic | |
| "nebius", # Nebius | |
| ] | |
| provider_radio = gr.Radio( | |
| choices=providers_list, | |
| value="hf-inference", | |
| label="Inference Provider", | |
| ) | |
| # New BYOK textbox | |
| byok_textbox = gr.Textbox( | |
| value="", | |
| label="BYOK (Bring Your Own Key)", | |
| info="Enter a custom Hugging Face API key here. When empty, only 'hf-inference' provider can be used.", | |
| placeholder="Enter your Hugging Face API token", | |
| type="password" # Hide the API key for security | |
| ) | |
| # Custom model box | |
| custom_model_box = gr.Textbox( | |
| value="", | |
| label="Custom Model", | |
| info="(Optional) Provide a custom Hugging Face model path. Overrides any selected featured model.", | |
| placeholder="meta-llama/Llama-3.3-70B-Instruct" | |
| ) | |
| # Model search | |
| model_search_box = gr.Textbox( | |
| label="Filter Models", | |
| placeholder="Search for a featured model...", | |
| lines=1 | |
| ) | |
| # Featured models list | |
| # Updated to include multimodal models | |
| models_list = [ | |
| "meta-llama/Llama-3.2-11B-Vision-Instruct", | |
| "meta-llama/Llama-3.3-70B-Instruct", | |
| "meta-llama/Llama-3.1-70B-Instruct", | |
| "meta-llama/Llama-3.0-70B-Instruct", | |
| "meta-llama/Llama-3.2-3B-Instruct", | |
| "meta-llama/Llama-3.2-1B-Instruct", | |
| "meta-llama/Llama-3.1-8B-Instruct", | |
| "NousResearch/Hermes-3-Llama-3.1-8B", | |
| "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", | |
| "mistralai/Mistral-Nemo-Instruct-2407", | |
| "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| "mistralai/Mistral-7B-Instruct-v0.3", | |
| "mistralai/Mistral-7B-Instruct-v0.2", | |
| "Qwen/Qwen3-235B-A22B", | |
| "Qwen/Qwen3-32B", | |
| "Qwen/Qwen2.5-72B-Instruct", | |
| "Qwen/Qwen2.5-3B-Instruct", | |
| "Qwen/Qwen2.5-0.5B-Instruct", | |
| "Qwen/QwQ-32B", | |
| "Qwen/Qwen2.5-Coder-32B-Instruct", | |
| "microsoft/Phi-3.5-mini-instruct", | |
| "microsoft/Phi-3-mini-128k-instruct", | |
| "microsoft/Phi-3-mini-4k-instruct", | |
| ] | |
| featured_model_radio = gr.Radio( | |
| label="Select a model below", | |
| choices=models_list, | |
| value="meta-llama/Llama-3.2-11B-Vision-Instruct", # Default to a multimodal model | |
| interactive=True | |
| ) | |
| gr.Markdown("[View all Text-to-Text models](https://huggingface.co/models?inference_provider=all&pipeline_tag=text-generation&sort=trending) | [View all multimodal models](https://huggingface.co/models?inference_provider=all&pipeline_tag=image-text-to-text&sort=trending)") | |
| # Create accordion for MCP settings | |
| with gr.Accordion("MCP Settings", open=False): | |
| mcp_enabled_checkbox = gr.Checkbox( | |
| label="Enable MCP Support", | |
| value=False, | |
| info="Enable Model Context Protocol support to connect to external tools and services" | |
| ) | |
| with gr.Row(): | |
| mcp_server_url = gr.Textbox( | |
| label="MCP Server URL", | |
| placeholder="https://example-mcp-server.hf.space/gradio_api/mcp/sse", | |
| info="URL of the MCP server to connect to" | |
| ) | |
| mcp_server_name = gr.Textbox( | |
| label="Server Name", | |
| placeholder="Optional name for this server", | |
| info="A friendly name to identify this server" | |
| ) | |
| mcp_connect_button = gr.Button("Connect to MCP Server") | |
| mcp_status = gr.Textbox( | |
| label="MCP Connection Status", | |
| placeholder="No MCP servers connected", | |
| interactive=False | |
| ) | |
| active_mcp_servers = gr.Dropdown( | |
| label="Active MCP Servers", | |
| choices=[], | |
| multiselect=True, | |
| info="Select which MCP servers to use in chat" | |
| ) | |
| mcp_mode = gr.Radio( | |
| label="MCP Interaction Mode", | |
| choices=["Natural Language", "Command Mode"], | |
| value="Natural Language", | |
| info="Choose how to interact with MCP tools" | |
| ) | |
| gr.Markdown(""" | |
| ### MCP Interaction Modes | |
| **Natural Language Mode**: Simply describe what you want in plain English. Examples: | |
| ``` | |
| Please convert the text "Hello world" to speech | |
| Can you read this text aloud: "Welcome to MCP integration" | |
| ``` | |
| **Command Mode**: Use structured commands (for advanced users) | |
| ``` | |
| /mcp <server_name> <tool_name> {"param1": "value1", "param2": "value2"} | |
| ``` | |
| Example: | |
| ``` | |
| /mcp kokoroTTS text_to_audio {"text": "Hello world", "speed": 1.0} | |
| ``` | |
| """) | |
| # Chat history state | |
| chat_history = gr.State([]) | |
| # Function to filter models | |
| def filter_models(search_term): | |
| print(f"Filtering models with search term: {search_term}") | |
| filtered = [m for m in models_list if search_term.lower() in m.lower()] | |
| print(f"Filtered models: {filtered}") | |
| return gr.update(choices=filtered) | |
| # Function to set custom model from radio | |
| def set_custom_model_from_radio(selected): | |
| print(f"Featured model selected: {selected}") | |
| return selected | |
| # Function to connect to MCP server | |
| def connect_mcp_server(url, name): | |
| server_name, status = connect_to_mcp_server(url, name) | |
| # Update the active servers dropdown | |
| servers = list(mcp_connections.keys()) | |
| # Return the status message and updated server list | |
| return status, gr.update(choices=servers) | |
| # Function for the chat interface | |
| def user(user_message, history): | |
| # Debug logging for troubleshooting | |
| print(f"User message received: {user_message}") | |
| # Skip if message is empty (no text and no files) | |
| if not user_message or (not user_message.get("text") and not user_message.get("files")): | |
| print("Empty message, skipping") | |
| return history | |
| # Prepare multimodal message format | |
| text_content = user_message.get("text", "").strip() | |
| files = user_message.get("files", []) | |
| print(f"Text content: {text_content}") | |
| print(f"Files: {files}") | |
| # If both text and files are empty, skip | |
| if not text_content and not files: | |
| print("No content to display") | |
| return history | |
| # Add message with images to history | |
| if files and len(files) > 0: | |
| # Add text message first if it exists | |
| if text_content: | |
| # Add a separate text message | |
| print(f"Adding text message: {text_content}") | |
| history.append([text_content, None]) | |
| # Then add each image file separately | |
| for file_path in files: | |
| if file_path and isinstance(file_path, str): | |
| print(f"Adding image: {file_path}") | |
| # Add image as a separate message with no text | |
| history.append([f"", None]) | |
| return history | |
| else: | |
| # For text-only messages | |
| print(f"Adding text-only message: {text_content}") | |
| history.append([text_content, None]) | |
| return history | |
| # Define bot response function | |
| def bot(history, system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model, mcp_enabled, selected_servers): | |
| # Check if history is valid | |
| if not history or len(history) == 0: | |
| print("No history to process") | |
| return history | |
| # Get the most recent message and detect if it's an image | |
| user_message = history[-1][0] | |
| print(f"Processing user message: {user_message}") | |
| is_image = False | |
| image_path = None | |
| text_content = user_message | |
| # Check if this is an image message (marked with ![Image]) | |
| if isinstance(user_message, str) and user_message.startswith(": | |
| is_image = True | |
| # Extract image path from markdown format  | |
| image_path = user_message.replace(".replace(")", "") | |
| print(f"Image detected: {image_path}") | |
| text_content = "" # No text for image-only messages | |
| # Look back for text context if this is an image | |
| text_context = "" | |
| if is_image and len(history) > 1: | |
| # Use the previous message as context if it's text | |
| prev_message = history[-2][0] | |
| if isinstance(prev_message, str) and not prev_message.startswith(": | |
| text_context = prev_message | |
| print(f"Using text context from previous message: {text_context}") | |
| # Process message through respond function | |
| history[-1][1] = "" | |
| # Use either the image or text for the API | |
| if is_image: | |
| # For image messages | |
| for response in respond( | |
| text_context, # Text context from previous message if any | |
| [image_path], # Current image | |
| history[:-1], # Previous history | |
| system_msg, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| freq_penalty, | |
| seed, | |
| provider, | |
| api_key, | |
| custom_model, | |
| search_term, | |
| selected_model, | |
| mcp_enabled, | |
| selected_servers | |
| ): | |
| history[-1][1] = response | |
| yield history | |
| else: | |
| # For text-only messages | |
| for response in respond( | |
| text_content, # Text message | |
| None, # No image | |
| history[:-1], # Previous history | |
| system_msg, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| freq_penalty, | |
| seed, | |
| provider, | |
| api_key, | |
| custom_model, | |
| search_term, | |
| selected_model, | |
| mcp_enabled, | |
| selected_servers | |
| ): | |
| history[-1][1] = response | |
| yield history | |
| # Update function for provider validation based on BYOK | |
| def validate_provider(api_key, provider): | |
| if not api_key.strip() and provider != "hf-inference": | |
| return gr.update(value="hf-inference") | |
| return gr.update(value=provider) | |
| # Event handlers | |
| msg.submit( | |
| user, | |
| [msg, chatbot], | |
| [chatbot], | |
| queue=False | |
| ).then( | |
| bot, | |
| [chatbot, system_message_box, max_tokens_slider, temperature_slider, top_p_slider, | |
| frequency_penalty_slider, seed_slider, provider_radio, byok_textbox, custom_model_box, | |
| model_search_box, featured_model_radio, mcp_enabled_checkbox, active_mcp_servers, mcp_mode], | |
| [chatbot] | |
| ).then( | |
| lambda: {"text": "", "files": []}, # Clear inputs after submission | |
| None, | |
| [msg] | |
| ) | |
| # Connect MCP connect button | |
| mcp_connect_button.click( | |
| connect_mcp_server, | |
| [mcp_server_url, mcp_server_name], | |
| [mcp_status, active_mcp_servers] | |
| ) | |
| # Connect the model filter to update the radio choices | |
| model_search_box.change( | |
| fn=filter_models, | |
| inputs=model_search_box, | |
| outputs=featured_model_radio | |
| ) | |
| print("Model search box change event linked.") | |
| # Connect the featured model radio to update the custom model box | |
| featured_model_radio.change( | |
| fn=set_custom_model_from_radio, | |
| inputs=featured_model_radio, | |
| outputs=custom_model_box | |
| ) | |
| print("Featured model radio button change event linked.") | |
| # Connect the BYOK textbox to validate provider selection | |
| byok_textbox.change( | |
| fn=validate_provider, | |
| inputs=[byok_textbox, provider_radio], | |
| outputs=provider_radio | |
| ) | |
| print("BYOK textbox change event linked.") | |
| # Also validate provider when the radio changes to ensure consistency | |
| provider_radio.change( | |
| fn=validate_provider, | |
| inputs=[byok_textbox, provider_radio], | |
| outputs=provider_radio | |
| ) | |
| print("Provider radio button change event linked.") | |
| print("Gradio interface initialized.") | |
| if __name__ == "__main__": | |
| print("Launching the demo application.") | |
| demo.launch(show_api=True, mcp_server=False) # Not launching as MCP server as we're the client |