import gradio as gr import json import requests import urllib.request import os import ssl import base64 from PIL import Image import soundfile as sf import mimetypes import logging from io import BytesIO import tempfile # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Azure ML endpoint configuration url = os.getenv("AZURE_ENDPOINT") api_key = os.getenv("AZURE_API_KEY") # Default parameter values default_max_tokens = 4096 default_temperature = 0.0 default_top_p = 1.0 default_presence_penalty = 0.0 default_frequency_penalty = 0.0 # Initialize MIME types mimetypes.init() def call_aml_endpoint(payload, url, api_key, params=None): """Call Azure ML endpoint with the given payload.""" # Allow self-signed HTTPS certificates def allow_self_signed_https(allowed): if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None): ssl._create_default_https_context = ssl._create_unverified_context allow_self_signed_https(True) # Set parameters from the UI inputs or use defaults if params is None: params = { "max_tokens": default_max_tokens, "temperature": default_temperature, "top_p": default_top_p, "presence_penalty": default_presence_penalty, "frequency_penalty": default_frequency_penalty } parameters = { "max_tokens": int(params["max_tokens"]), "temperature": float(params["temperature"]), "top_p": float(params["top_p"]), "presence_penalty": float(params["presence_penalty"]), "frequency_penalty": float(params["frequency_penalty"]), "stream": True } if "parameters" not in payload["input_data"]: payload["input_data"]["parameters"] = parameters # Encode the request body body = str.encode(json.dumps(payload)) if not api_key: raise Exception("A key should be provided to invoke the endpoint") # Set up headers headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + api_key)} # Create and send the request req = urllib.request.Request(url, body, headers) try: logger.info(f"Sending request to {url}") logger.info(f"Using parameters: {parameters}") response = urllib.request.urlopen(req) result = response.read().decode('utf-8') logger.info("Received response successfully") return json.loads(result) except urllib.error.HTTPError as error: logger.error(f"Request failed with status code: {error.code}") logger.error(f"Headers: {error.info()}") error_message = error.read().decode("utf8", 'ignore') logger.error(f"Error message: {error_message}") return {"error": error_message} def improved_fetch_audio_from_url(url): """Improved function to fetch audio data from URL and convert to base64 Args: url (str): URL of the audio file Returns: tuple: (mime_type, base64_encoded_data) if successful, (None, None) otherwise """ try: # Get the audio file from the URL logger.info(f"Fetching audio from URL: {url}") # Use a session with increased timeout session = requests.Session() response = session.get(url, timeout=30) response.raise_for_status() # Determine MIME type based on URL file_extension = os.path.splitext(url)[1].lower() mime_type = None if file_extension == '.wav': mime_type = "audio/wav" elif file_extension == '.mp3': mime_type = "audio/mpeg" elif file_extension == '.flac': mime_type = "audio/flac" elif file_extension in ['.m4a', '.aac']: mime_type = "audio/aac" elif file_extension == '.ogg': mime_type = "audio/ogg" else: # Try to detect the MIME type from headers content_type = response.headers.get('Content-Type', '') if content_type.startswith('audio/'): mime_type = content_type else: mime_type = "audio/wav" # Default to WAV logger.info(f"Detected MIME type: {mime_type}") # Save content to a temporary file for debugging temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) temp_file.write(response.content) temp_file.close() logger.info(f"Saved audio to temporary file: {temp_file.name}") # Read the file to verify it's valid try: # For WAV files, try to read with soundfile to verify if mime_type == "audio/wav": data, samplerate = sf.read(temp_file.name) logger.info(f"Successfully read audio file: {len(data)} samples, {samplerate}Hz") except Exception as e: logger.warning(f"Could not verify audio with soundfile: {e}") # Continue anyway, the file might still be valid # Convert to base64 with open(temp_file.name, "rb") as f: audio_content = f.read() base64_audio = base64.b64encode(audio_content).decode('utf-8') logger.info(f"Successfully encoded audio to base64, length: {len(base64_audio)}") # Clean up temporary file try: os.unlink(temp_file.name) except: pass return mime_type, base64_audio except Exception as e: logger.error(f"Error fetching audio from URL: {e}", exc_info=True) return None, None def fetch_image_from_url(url): """Fetch image data from URL and convert to base64 Args: url (str): URL of the image file Returns: tuple: (mime_type, base64_encoded_data) if successful, (None, None) otherwise """ try: # Get the image file from the URL logger.info(f"Fetching image from URL: {url}") response = requests.get(url) response.raise_for_status() # Determine MIME type based on URL file_extension = os.path.splitext(url)[1].lower() if file_extension in ['.jpg', '.jpeg']: mime_type = "image/jpeg" elif file_extension == '.png': mime_type = "image/png" elif file_extension == '.gif': mime_type = "image/gif" elif file_extension in ['.bmp', '.tiff', '.webp']: mime_type = f"image/{file_extension[1:]}" else: mime_type = "image/jpeg" # Default to JPEG # Convert to base64 base64_image = base64.b64encode(response.content).decode('utf-8') logger.info(f"Successfully fetched and encoded image, mime type: {mime_type}") return mime_type, base64_image except Exception as e: logger.error(f"Error fetching image from URL: {e}") return None, None def encode_base64_from_file(file_path): """Encode file content to base64 string and determine MIME type.""" file_extension = os.path.splitext(file_path)[1].lower() # Map file extensions to MIME types if file_extension in ['.jpg', '.jpeg']: mime_type = "image/jpeg" elif file_extension == '.png': mime_type = "image/png" elif file_extension == '.gif': mime_type = "image/gif" elif file_extension in ['.bmp', '.tiff', '.webp']: mime_type = f"image/{file_extension[1:]}" elif file_extension == '.flac': mime_type = "audio/flac" elif file_extension == '.wav': mime_type = "audio/wav" elif file_extension == '.mp3': mime_type = "audio/mpeg" elif file_extension in ['.m4a', '.aac']: mime_type = "audio/aac" elif file_extension == '.ogg': mime_type = "audio/ogg" else: mime_type = "application/octet-stream" # Read and encode file content with open(file_path, "rb") as file: encoded_string = base64.b64encode(file.read()).decode('utf-8') return encoded_string, mime_type def process_message(history, message, conversation_state): """Process user message and update both history and internal state.""" # Extract text and files text_content = message["text"] if message["text"] else "" image_files = [] audio_files = [] # Create content array for internal state content_items = [] # Add text if available if text_content: content_items.append({"type": "text", "text": text_content}) # Check if we need to clear history when uploading a second image or audio should_clear_history = False # Count existing images and audio in history existing_images = 0 existing_audio = 0 for msg in conversation_state: if msg["role"] == "user" and "content" in msg: for content_item in msg["content"]: if isinstance(content_item, dict): if content_item.get("type") == "image_url": existing_images += 1 elif content_item.get("type") == "audio_url": existing_audio += 1 # Process and immediately convert files to base64 if message["files"] and len(message["files"]) > 0: for file_path in message["files"]: file_extension = os.path.splitext(file_path)[1].lower() file_name = os.path.basename(file_path) # Convert the file to base64 immediately base64_content, mime_type = encode_base64_from_file(file_path) # Add to content items for the API if mime_type.startswith("image/"): content_items.append({ "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{base64_content}" } }) image_files.append(file_path) # Check if this is a second image if existing_images > 0: should_clear_history = True logger.info("Detected second image upload - clearing history") elif mime_type.startswith("audio/"): content_items.append({ "type": "audio_url", "audio_url": { "url": f"data:{mime_type};base64,{base64_content}" } }) audio_files.append(file_path) # Check if this is a second audio if existing_audio > 0: should_clear_history = True logger.info("Detected second audio upload - clearing history") # Only proceed if we have content if content_items: # Clear history if we're uploading a second image or audio if should_clear_history: history = [] conversation_state = [] logger.info("History cleared due to second image/audio upload") # Add to Gradio chatbot history (for display) history.append({"role": "user", "content": text_content}) # Add file messages if present for file_path in image_files + audio_files: history.append({"role": "user", "content": {"path": file_path}}) logger.info(f"Updated history with user message. Current conversation has {existing_images + len(image_files)} images and {existing_audio + len(audio_files)} audio files") # Add to internal conversation state (with base64 data) conversation_state.append({ "role": "user", "content": content_items }) return history, gr.MultimodalTextbox(value=None, interactive=False), conversation_state def process_text_example(example_text, history, conversation_state): """Process a text example directly.""" try: # Initialize history and conversation_state if they're None if history is None: history = [] if conversation_state is None: conversation_state = [] # Add text message to history for display history.append({"role": "user", "content": example_text}) # Add to conversation state content_items = [ {"type": "text", "text": example_text} ] conversation_state.append({ "role": "user", "content": content_items }) # Generate bot response return bot_response(history, conversation_state) except Exception as e: logger.error(f"Error processing text example: {e}", exc_info=True) if history is None: history = [] history.append({"role": "user", "content": example_text}) history.append({"role": "assistant", "content": f"Error: {str(e)}"}) return history, conversation_state def process_audio_example_direct(example_text, example_audio_url, history, conversation_state): """Process an audio example directly from a URL.""" try: logger.info(f"Processing audio example with text: {example_text}, URL: {example_audio_url}") # Initialize history and conversation_state if they're None if history is None: history = [] if conversation_state is None: conversation_state = [] # Check if we need to clear history (if there's already an audio in the conversation) should_clear_history = False for msg in conversation_state: if msg["role"] == "user" and "content" in msg: for content_item in msg["content"]: if isinstance(content_item, dict) and content_item.get("type") == "audio_url": should_clear_history = True break if should_clear_history: history = [] conversation_state = [] logger.info("History cleared due to example with second audio") # Fetch audio and convert to base64 directly using improved function mime_type, base64_audio = improved_fetch_audio_from_url(example_audio_url) if not mime_type or not base64_audio: error_msg = f"Failed to load audio from {example_audio_url}" logger.error(error_msg) history.append({"role": "user", "content": f"{example_text} (Audio URL: {example_audio_url})"}) history.append({"role": "assistant", "content": f"Error: {error_msg}"}) return history, conversation_state logger.info(f"Successfully loaded audio, mime type: {mime_type}, base64 length: {len(base64_audio)}") # Add text message to history for display history.append({"role": "user", "content": example_text}) # Add to conversation state content_items = [ {"type": "text", "text": example_text}, {"type": "audio_url", "audio_url": {"url": f"data:{mime_type};base64,{base64_audio}"}} ] conversation_state.append({ "role": "user", "content": content_items }) logger.info("Successfully prepared conversation state, now generating response") # Generate bot response return bot_response(history, conversation_state) except Exception as e: logger.error(f"Error processing audio example: {e}", exc_info=True) if history is None: history = [] history.append({"role": "user", "content": f"{example_text} (Audio URL: {example_audio_url})"}) history.append({"role": "assistant", "content": f"Error: {str(e)}"}) return history, conversation_state def process_image_example_direct(example_text, example_image_url, history, conversation_state): """Process an image example directly from a URL.""" try: # Initialize history and conversation_state if they're None if history is None: history = [] if conversation_state is None: conversation_state = [] # Check if we need to clear history (if there's already an image in the conversation) should_clear_history = False for msg in conversation_state: if msg["role"] == "user" and "content" in msg: for content_item in msg["content"]: if isinstance(content_item, dict) and content_item.get("type") == "image_url": should_clear_history = True break if should_clear_history: history = [] conversation_state = [] logger.info("History cleared due to example with second image") # Fetch image and convert to base64 directly mime_type, base64_image = fetch_image_from_url(example_image_url) if not mime_type or not base64_image: error_msg = f"Failed to load image from {example_image_url}" logger.error(error_msg) history.append({"role": "user", "content": f"{example_text} (Image URL: {example_image_url})"}) history.append({"role": "assistant", "content": f"Error: {error_msg}"}) return history, conversation_state # Add text message to history for display history.append({"role": "user", "content": example_text}) # Add to conversation state content_items = [ {"type": "text", "text": example_text}, {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}} ] conversation_state.append({ "role": "user", "content": content_items }) # Generate bot response return bot_response(history, conversation_state) except Exception as e: logger.error(f"Error processing image example: {e}", exc_info=True) if history is None: history = [] history.append({"role": "user", "content": f"{example_text} (Image URL: {example_image_url})"}) history.append({"role": "assistant", "content": f"Error: {str(e)}"}) return history, conversation_state def bot_response(history, conversation_state): """Generate bot response based on conversation state.""" if not conversation_state: return history, conversation_state # Create the payload payload = { "input_data": { "input_string": conversation_state } } # Log the payload for debugging (without base64 data) debug_payload = json.loads(json.dumps(payload)) for item in debug_payload["input_data"]["input_string"]: if "content" in item and isinstance(item["content"], list): for content_item in item["content"]: if "image_url" in content_item: parts = content_item["image_url"]["url"].split(",") if len(parts) > 1: content_item["image_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" if "audio_url" in content_item: parts = content_item["audio_url"]["url"].split(",") if len(parts) > 1: content_item["audio_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" logger.info(f"Sending payload: {json.dumps(debug_payload, indent=2)}") # Call Azure ML endpoint response = call_aml_endpoint(payload, url, api_key) # Extract text response from the Azure ML endpoint response try: if isinstance(response, dict): if "result" in response: result = response["result"] elif "output" in response: # Depending on your API's response format if isinstance(response["output"], list) and len(response["output"]) > 0: result = response["output"][0] else: result = str(response["output"]) elif "error" in response: result = f"Error: {response['error']}" else: # Just return the whole response as string if we can't parse it result = f"Received response: {json.dumps(response)}" else: result = str(response) except Exception as e: result = f"Error processing response: {str(e)}" # Add bot response to history if result=="None": result = "This demo does not support text + audio + image inputs in the same conversation. Please click Clear conversation button." history.append({"role": "assistant", "content": result}) # Add to conversation state conversation_state.append({ "role": "assistant", "content": [{"type": "text", "text": result}] }) return history, conversation_state def enable_input(): """Re-enable the input box after bot responds.""" return gr.MultimodalTextbox(interactive=True) def update_debug(conversation_state): """Update debug output with the last payload that would be sent.""" if not conversation_state: return {} # Create a payload from the conversation payload = { "input_data": { "input_string": conversation_state } } # Remove base64 data to avoid cluttering the UI sanitized_payload = json.loads(json.dumps(payload)) for item in sanitized_payload["input_data"]["input_string"]: if "content" in item and isinstance(item["content"], list): for content_item in item["content"]: if "image_url" in content_item: parts = content_item["image_url"]["url"].split(",") if len(parts) > 1: content_item["image_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" if "audio_url" in content_item: parts = content_item["audio_url"]["url"].split(",") if len(parts) > 1: content_item["audio_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]" return sanitized_payload # Add this near the beginning of your Blocks definition, before you define your components css = """ #small-audio audio { height: 20px !important; width: 100px !important; } #small-audio .wrap { max-width: 220px !important; } #small-audio .audio-container { min-height: 0px !important; } """ # Create Gradio demo with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo: title = gr.Markdown("# Phi-4-Multimodal Playground") description = gr.Markdown(""" This demo allows you to interact with the [Phi-4-Multimodal AI model](https://aka.ms/phi-4-multimodal/techreport). You can type messages, upload images, or record audio to communicate with the AI. Other demos include [Phi-4-Mini playground](https://huggingface.co/spaces/microsoft/phi-4-mini), [Thoughts Organizer](https://huggingface.co/spaces/microsoft/ThoughtsOrganizer), [Stories Come Alive](https://huggingface.co/spaces/microsoft/StoriesComeAlive), [Phine Speech Translator](https://huggingface.co/spaces/microsoft/PhineSpeechTranslator) """) # Store the conversation state with base64 data conversation_state = gr.State([]) with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( type="messages", avatar_images=(None, "https://upload.wikimedia.org/wikipedia/commons/d/d3/Phi-integrated-information-symbol.png",), height=600 ) # trash icon clear all chatbot.clear(lambda: [], None, conversation_state) with gr.Row(): chat_input = gr.MultimodalTextbox( interactive=True, file_count="multiple", placeholder="Enter a message or upload files (images, audio)...", show_label=False, sources=["microphone", "upload"], ) with gr.Row(): clear_btn = gr.ClearButton([chatbot, chat_input], value="Clear conversation") clear_btn.click(lambda: [], None, conversation_state) # Also clear the conversation state gr.HTML("
Powered by Microsoft Phi-4-multimodal model on Azure AI.©2025
") with gr.Column(scale=1): with gr.Tab("Audio & Text"): # Example 1 gr.Audio("https://diamondfan.github.io/audio_files/english.weekend.plan.wav", label="Preview", elem_id="small-audio") example1_btn = gr.Button("Transcribe this audio clip") gr.Markdown("-----") # Example 2 gr.Audio("https://diamondfan.github.io/audio_files/japanese.seattle.trip.report.wav", label="Preview", elem_id="small-audio") example2_btn = gr.Button("Translate audio transcription to English") # Define handlers for audio examples def run_audio_example1(): return process_audio_example_direct( "Transcribe this audio clip", "https://diamondfan.github.io/audio_files/english.weekend.plan.wav", [], [] ) def run_audio_example2(): return process_audio_example_direct( "Translate audio transcription to English", "https://diamondfan.github.io/audio_files/japanese.seattle.trip.report.wav", [], [] ) # Connect buttons to handlers example1_btn.click( run_audio_example1, inputs=[], outputs=[chatbot, conversation_state] ) example2_btn.click( run_audio_example2, inputs=[], outputs=[chatbot, conversation_state] ) with gr.Tab("Image & Text"): # Example 1 gr.Image("https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Hanoi_Temple_of_Literature.jpg/640px-Hanoi_Temple_of_Literature.jpg", label="Preview") img_example1_btn = gr.Button("Write a limerick about this image") # Example 2 gr.Image("https://pub-c2c1d9230f0b4abb9b0d2d95e06fd4ef.r2.dev/sites/566/2024/09/Screenshot-2024-09-16-115417.png", label="Preview") img_example2_btn = gr.Button("Convert the chart to a markdown table") # Define handlers for image examples def run_image_example1(): return process_image_example_direct( "Write a limerick about this image", "https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Hanoi_Temple_of_Literature.jpg/640px-Hanoi_Temple_of_Literature.jpg", [], [] ) def run_image_example2(): return process_image_example_direct( "Convert the chart to a markdown table", "https://pub-c2c1d9230f0b4abb9b0d2d95e06fd4ef.r2.dev/sites/566/2024/09/Screenshot-2024-09-16-115417.png", [], [] ) # Connect buttons to handlers img_example1_btn.click( run_image_example1, inputs=[], outputs=[chatbot, conversation_state] ) img_example2_btn.click( run_image_example2, inputs=[], outputs=[chatbot, conversation_state] ) with gr.Tab("Text Only"): # Create a list of example texts text_example_list = [ "I'd like to buy a new car. Start by asking me about my budget and which features I care most about, then provide a recommendation.", "Coffee shops have been slimming down their menus lately. Is less choice making our coffee runs better or do we miss the variety?", "Explain the Transformer model to a medieval knight" ] # Create buttons for each example for i, example_text in enumerate(text_example_list): with gr.Row(): # gr.Markdown(f"Example {i+1}: **{example_text}**") text_example_btn = gr.Button(f"{example_text}") # Connect button to handler with the specific example text text_example_btn.click( fn=lambda text=example_text: process_text_example(text, [], []), inputs=[], outputs=[chatbot, conversation_state] ) gr.Markdown("### Instructions") gr.Markdown(""" - Type a question or statement - Upload images or audio files - You can combine text with media files - Support 2 modalities at the same time - The model can analyze images and transcribe audio - For best results with images, use JPG or PNG files - For audio, use WAV, MP3, or FLAC files """) gr.Markdown("### Capabilities") gr.Markdown(""" This chatbot can: - Answer questions and provide explanations - Describe and analyze images - Transcribe, translate, summarize, and analyze audio content - Process multiple inputs in the same message - Maintain context throughout the conversation """) with gr.Accordion("Debug Info", open=False): debug_output = gr.JSON( label="Last API Request", value={} ) # Set up event handlers msg_submit = chat_input.submit( process_message, [chatbot, chat_input, conversation_state], [chatbot, chat_input, conversation_state], queue=False ) msg_response = msg_submit.then( bot_response, [chatbot, conversation_state], [chatbot, conversation_state], api_name="bot_response" ) msg_response.then(enable_input, None, chat_input) # Update debug info msg_response.then(update_debug, conversation_state, debug_output) demo.launch(share=True, debug=True)