nguyenbh
update params
6d105e8
import gradio as gr
import json
import requests
import urllib.request
import os
import ssl
import base64
from PIL import Image
import soundfile as sf
import mimetypes
import logging
from io import BytesIO
import tempfile
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Azure ML endpoint configuration
url = os.getenv("AZURE_ENDPOINT")
api_key = os.getenv("AZURE_API_KEY")
# Default parameter values
default_max_tokens = 4096
default_temperature = 0.0
default_top_p = 1.0
default_presence_penalty = 0.0
default_frequency_penalty = 0.0
# Initialize MIME types
mimetypes.init()
def call_aml_endpoint(payload, url, api_key, params=None):
"""Call Azure ML endpoint with the given payload."""
# Allow self-signed HTTPS certificates
def allow_self_signed_https(allowed):
if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None):
ssl._create_default_https_context = ssl._create_unverified_context
allow_self_signed_https(True)
# Set parameters from the UI inputs or use defaults
if params is None:
params = {
"max_tokens": default_max_tokens,
"temperature": default_temperature,
"top_p": default_top_p,
"presence_penalty": default_presence_penalty,
"frequency_penalty": default_frequency_penalty
}
parameters = {
"max_tokens": int(params["max_tokens"]),
"temperature": float(params["temperature"]),
"top_p": float(params["top_p"]),
"presence_penalty": float(params["presence_penalty"]),
"frequency_penalty": float(params["frequency_penalty"]),
"stream": True
}
if "parameters" not in payload["input_data"]:
payload["input_data"]["parameters"] = parameters
# Encode the request body
body = str.encode(json.dumps(payload))
if not api_key:
raise Exception("A key should be provided to invoke the endpoint")
# Set up headers
headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + api_key)}
# Create and send the request
req = urllib.request.Request(url, body, headers)
try:
logger.info(f"Sending request to {url}")
logger.info(f"Using parameters: {parameters}")
response = urllib.request.urlopen(req)
result = response.read().decode('utf-8')
logger.info("Received response successfully")
return json.loads(result)
except urllib.error.HTTPError as error:
logger.error(f"Request failed with status code: {error.code}")
logger.error(f"Headers: {error.info()}")
error_message = error.read().decode("utf8", 'ignore')
logger.error(f"Error message: {error_message}")
return {"error": error_message}
def improved_fetch_audio_from_url(url):
"""Improved function to fetch audio data from URL and convert to base64
Args:
url (str): URL of the audio file
Returns:
tuple: (mime_type, base64_encoded_data) if successful, (None, None) otherwise
"""
try:
# Get the audio file from the URL
logger.info(f"Fetching audio from URL: {url}")
# Use a session with increased timeout
session = requests.Session()
response = session.get(url, timeout=30)
response.raise_for_status()
# Determine MIME type based on URL
file_extension = os.path.splitext(url)[1].lower()
mime_type = None
if file_extension == '.wav':
mime_type = "audio/wav"
elif file_extension == '.mp3':
mime_type = "audio/mpeg"
elif file_extension == '.flac':
mime_type = "audio/flac"
elif file_extension in ['.m4a', '.aac']:
mime_type = "audio/aac"
elif file_extension == '.ogg':
mime_type = "audio/ogg"
else:
# Try to detect the MIME type from headers
content_type = response.headers.get('Content-Type', '')
if content_type.startswith('audio/'):
mime_type = content_type
else:
mime_type = "audio/wav" # Default to WAV
logger.info(f"Detected MIME type: {mime_type}")
# Save content to a temporary file for debugging
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension)
temp_file.write(response.content)
temp_file.close()
logger.info(f"Saved audio to temporary file: {temp_file.name}")
# Read the file to verify it's valid
try:
# For WAV files, try to read with soundfile to verify
if mime_type == "audio/wav":
data, samplerate = sf.read(temp_file.name)
logger.info(f"Successfully read audio file: {len(data)} samples, {samplerate}Hz")
except Exception as e:
logger.warning(f"Could not verify audio with soundfile: {e}")
# Continue anyway, the file might still be valid
# Convert to base64
with open(temp_file.name, "rb") as f:
audio_content = f.read()
base64_audio = base64.b64encode(audio_content).decode('utf-8')
logger.info(f"Successfully encoded audio to base64, length: {len(base64_audio)}")
# Clean up temporary file
try:
os.unlink(temp_file.name)
except:
pass
return mime_type, base64_audio
except Exception as e:
logger.error(f"Error fetching audio from URL: {e}", exc_info=True)
return None, None
def fetch_image_from_url(url):
"""Fetch image data from URL and convert to base64
Args:
url (str): URL of the image file
Returns:
tuple: (mime_type, base64_encoded_data) if successful, (None, None) otherwise
"""
try:
# Get the image file from the URL
logger.info(f"Fetching image from URL: {url}")
response = requests.get(url)
response.raise_for_status()
# Determine MIME type based on URL
file_extension = os.path.splitext(url)[1].lower()
if file_extension in ['.jpg', '.jpeg']:
mime_type = "image/jpeg"
elif file_extension == '.png':
mime_type = "image/png"
elif file_extension == '.gif':
mime_type = "image/gif"
elif file_extension in ['.bmp', '.tiff', '.webp']:
mime_type = f"image/{file_extension[1:]}"
else:
mime_type = "image/jpeg" # Default to JPEG
# Convert to base64
base64_image = base64.b64encode(response.content).decode('utf-8')
logger.info(f"Successfully fetched and encoded image, mime type: {mime_type}")
return mime_type, base64_image
except Exception as e:
logger.error(f"Error fetching image from URL: {e}")
return None, None
def encode_base64_from_file(file_path):
"""Encode file content to base64 string and determine MIME type."""
file_extension = os.path.splitext(file_path)[1].lower()
# Map file extensions to MIME types
if file_extension in ['.jpg', '.jpeg']:
mime_type = "image/jpeg"
elif file_extension == '.png':
mime_type = "image/png"
elif file_extension == '.gif':
mime_type = "image/gif"
elif file_extension in ['.bmp', '.tiff', '.webp']:
mime_type = f"image/{file_extension[1:]}"
elif file_extension == '.flac':
mime_type = "audio/flac"
elif file_extension == '.wav':
mime_type = "audio/wav"
elif file_extension == '.mp3':
mime_type = "audio/mpeg"
elif file_extension in ['.m4a', '.aac']:
mime_type = "audio/aac"
elif file_extension == '.ogg':
mime_type = "audio/ogg"
else:
mime_type = "application/octet-stream"
# Read and encode file content
with open(file_path, "rb") as file:
encoded_string = base64.b64encode(file.read()).decode('utf-8')
return encoded_string, mime_type
def process_message(history, message, conversation_state):
"""Process user message and update both history and internal state."""
# Extract text and files
text_content = message["text"] if message["text"] else ""
image_files = []
audio_files = []
# Create content array for internal state
content_items = []
# Add text if available
if text_content:
content_items.append({"type": "text", "text": text_content})
# Check if we need to clear history when uploading a second image or audio
should_clear_history = False
# Count existing images and audio in history
existing_images = 0
existing_audio = 0
for msg in conversation_state:
if msg["role"] == "user" and "content" in msg:
for content_item in msg["content"]:
if isinstance(content_item, dict):
if content_item.get("type") == "image_url":
existing_images += 1
elif content_item.get("type") == "audio_url":
existing_audio += 1
# Process and immediately convert files to base64
if message["files"] and len(message["files"]) > 0:
for file_path in message["files"]:
file_extension = os.path.splitext(file_path)[1].lower()
file_name = os.path.basename(file_path)
# Convert the file to base64 immediately
base64_content, mime_type = encode_base64_from_file(file_path)
# Add to content items for the API
if mime_type.startswith("image/"):
content_items.append({
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{base64_content}"
}
})
image_files.append(file_path)
# Check if this is a second image
if existing_images > 0:
should_clear_history = True
logger.info("Detected second image upload - clearing history")
elif mime_type.startswith("audio/"):
content_items.append({
"type": "audio_url",
"audio_url": {
"url": f"data:{mime_type};base64,{base64_content}"
}
})
audio_files.append(file_path)
# Check if this is a second audio
if existing_audio > 0:
should_clear_history = True
logger.info("Detected second audio upload - clearing history")
# Only proceed if we have content
if content_items:
# Clear history if we're uploading a second image or audio
if should_clear_history:
history = []
conversation_state = []
logger.info("History cleared due to second image/audio upload")
# Add to Gradio chatbot history (for display)
history.append({"role": "user", "content": text_content})
# Add file messages if present
for file_path in image_files + audio_files:
history.append({"role": "user", "content": {"path": file_path}})
logger.info(f"Updated history with user message. Current conversation has {existing_images + len(image_files)} images and {existing_audio + len(audio_files)} audio files")
# Add to internal conversation state (with base64 data)
conversation_state.append({
"role": "user",
"content": content_items
})
return history, gr.MultimodalTextbox(value=None, interactive=False), conversation_state
def process_text_example(example_text, history, conversation_state):
"""Process a text example directly."""
try:
# Initialize history and conversation_state if they're None
if history is None:
history = []
if conversation_state is None:
conversation_state = []
# Add text message to history for display
history.append({"role": "user", "content": example_text})
# Add to conversation state
content_items = [
{"type": "text", "text": example_text}
]
conversation_state.append({
"role": "user",
"content": content_items
})
# Generate bot response
return bot_response(history, conversation_state)
except Exception as e:
logger.error(f"Error processing text example: {e}", exc_info=True)
if history is None:
history = []
history.append({"role": "user", "content": example_text})
history.append({"role": "assistant", "content": f"Error: {str(e)}"})
return history, conversation_state
def process_audio_example_direct(example_text, example_audio_url, history, conversation_state):
"""Process an audio example directly from a URL."""
try:
logger.info(f"Processing audio example with text: {example_text}, URL: {example_audio_url}")
# Initialize history and conversation_state if they're None
if history is None:
history = []
if conversation_state is None:
conversation_state = []
# Check if we need to clear history (if there's already an audio in the conversation)
should_clear_history = False
for msg in conversation_state:
if msg["role"] == "user" and "content" in msg:
for content_item in msg["content"]:
if isinstance(content_item, dict) and content_item.get("type") == "audio_url":
should_clear_history = True
break
if should_clear_history:
history = []
conversation_state = []
logger.info("History cleared due to example with second audio")
# Fetch audio and convert to base64 directly using improved function
mime_type, base64_audio = improved_fetch_audio_from_url(example_audio_url)
if not mime_type or not base64_audio:
error_msg = f"Failed to load audio from {example_audio_url}"
logger.error(error_msg)
history.append({"role": "user", "content": f"{example_text} (Audio URL: {example_audio_url})"})
history.append({"role": "assistant", "content": f"Error: {error_msg}"})
return history, conversation_state
logger.info(f"Successfully loaded audio, mime type: {mime_type}, base64 length: {len(base64_audio)}")
# Add text message to history for display
history.append({"role": "user", "content": example_text})
# Add to conversation state
content_items = [
{"type": "text", "text": example_text},
{"type": "audio_url", "audio_url": {"url": f"data:{mime_type};base64,{base64_audio}"}}
]
conversation_state.append({
"role": "user",
"content": content_items
})
logger.info("Successfully prepared conversation state, now generating response")
# Generate bot response
return bot_response(history, conversation_state)
except Exception as e:
logger.error(f"Error processing audio example: {e}", exc_info=True)
if history is None:
history = []
history.append({"role": "user", "content": f"{example_text} (Audio URL: {example_audio_url})"})
history.append({"role": "assistant", "content": f"Error: {str(e)}"})
return history, conversation_state
def process_image_example_direct(example_text, example_image_url, history, conversation_state):
"""Process an image example directly from a URL."""
try:
# Initialize history and conversation_state if they're None
if history is None:
history = []
if conversation_state is None:
conversation_state = []
# Check if we need to clear history (if there's already an image in the conversation)
should_clear_history = False
for msg in conversation_state:
if msg["role"] == "user" and "content" in msg:
for content_item in msg["content"]:
if isinstance(content_item, dict) and content_item.get("type") == "image_url":
should_clear_history = True
break
if should_clear_history:
history = []
conversation_state = []
logger.info("History cleared due to example with second image")
# Fetch image and convert to base64 directly
mime_type, base64_image = fetch_image_from_url(example_image_url)
if not mime_type or not base64_image:
error_msg = f"Failed to load image from {example_image_url}"
logger.error(error_msg)
history.append({"role": "user", "content": f"{example_text} (Image URL: {example_image_url})"})
history.append({"role": "assistant", "content": f"Error: {error_msg}"})
return history, conversation_state
# Add text message to history for display
history.append({"role": "user", "content": example_text})
# Add to conversation state
content_items = [
{"type": "text", "text": example_text},
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}}
]
conversation_state.append({
"role": "user",
"content": content_items
})
# Generate bot response
return bot_response(history, conversation_state)
except Exception as e:
logger.error(f"Error processing image example: {e}", exc_info=True)
if history is None:
history = []
history.append({"role": "user", "content": f"{example_text} (Image URL: {example_image_url})"})
history.append({"role": "assistant", "content": f"Error: {str(e)}"})
return history, conversation_state
def bot_response(history, conversation_state):
"""Generate bot response based on conversation state."""
if not conversation_state:
return history, conversation_state
# Create the payload
payload = {
"input_data": {
"input_string": conversation_state
}
}
# Log the payload for debugging (without base64 data)
debug_payload = json.loads(json.dumps(payload))
for item in debug_payload["input_data"]["input_string"]:
if "content" in item and isinstance(item["content"], list):
for content_item in item["content"]:
if "image_url" in content_item:
parts = content_item["image_url"]["url"].split(",")
if len(parts) > 1:
content_item["image_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]"
if "audio_url" in content_item:
parts = content_item["audio_url"]["url"].split(",")
if len(parts) > 1:
content_item["audio_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]"
logger.info(f"Sending payload: {json.dumps(debug_payload, indent=2)}")
# Call Azure ML endpoint
response = call_aml_endpoint(payload, url, api_key)
# Extract text response from the Azure ML endpoint response
try:
if isinstance(response, dict):
if "result" in response:
result = response["result"]
elif "output" in response:
# Depending on your API's response format
if isinstance(response["output"], list) and len(response["output"]) > 0:
result = response["output"][0]
else:
result = str(response["output"])
elif "error" in response:
result = f"Error: {response['error']}"
else:
# Just return the whole response as string if we can't parse it
result = f"Received response: {json.dumps(response)}"
else:
result = str(response)
except Exception as e:
result = f"Error processing response: {str(e)}"
# Add bot response to history
if result=="None":
result = "This demo does not support text + audio + image inputs in the same conversation. Please click Clear conversation button."
history.append({"role": "assistant", "content": result})
# Add to conversation state
conversation_state.append({
"role": "assistant",
"content": [{"type": "text", "text": result}]
})
return history, conversation_state
def enable_input():
"""Re-enable the input box after bot responds."""
return gr.MultimodalTextbox(interactive=True)
def update_debug(conversation_state):
"""Update debug output with the last payload that would be sent."""
if not conversation_state:
return {}
# Create a payload from the conversation
payload = {
"input_data": {
"input_string": conversation_state
}
}
# Remove base64 data to avoid cluttering the UI
sanitized_payload = json.loads(json.dumps(payload))
for item in sanitized_payload["input_data"]["input_string"]:
if "content" in item and isinstance(item["content"], list):
for content_item in item["content"]:
if "image_url" in content_item:
parts = content_item["image_url"]["url"].split(",")
if len(parts) > 1:
content_item["image_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]"
if "audio_url" in content_item:
parts = content_item["audio_url"]["url"].split(",")
if len(parts) > 1:
content_item["audio_url"]["url"] = parts[0] + ",[BASE64_DATA_REMOVED]"
return sanitized_payload
# Add this near the beginning of your Blocks definition, before you define your components
css = """
#small-audio audio {
height: 20px !important;
width: 100px !important;
}
#small-audio .wrap {
max-width: 220px !important;
}
#small-audio .audio-container {
min-height: 0px !important;
}
"""
# Create Gradio demo
with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo:
title = gr.Markdown("# Phi-4-Multimodal Playground")
description = gr.Markdown("""
This demo allows you to interact with the [Phi-4-Multimodal AI model](https://aka.ms/phi-4-multimodal/techreport).
You can type messages, upload images, or record audio to communicate with the AI.
Other demos include [Phi-4-Mini playground](https://huggingface.co/spaces/microsoft/phi-4-mini), [Thoughts Organizer](https://huggingface.co/spaces/microsoft/ThoughtsOrganizer),
[Stories Come Alive](https://huggingface.co/spaces/microsoft/StoriesComeAlive), [Phine Speech Translator](https://huggingface.co/spaces/microsoft/PhineSpeechTranslator)
""")
# Store the conversation state with base64 data
conversation_state = gr.State([])
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
type="messages",
avatar_images=(None, "https://upload.wikimedia.org/wikipedia/commons/d/d3/Phi-integrated-information-symbol.png",),
height=600
)
# trash icon clear all
chatbot.clear(lambda: [], None, conversation_state)
with gr.Row():
chat_input = gr.MultimodalTextbox(
interactive=True,
file_count="multiple",
placeholder="Enter a message or upload files (images, audio)...",
show_label=False,
sources=["microphone", "upload"],
)
with gr.Row():
clear_btn = gr.ClearButton([chatbot, chat_input], value="Clear conversation")
clear_btn.click(lambda: [], None, conversation_state) # Also clear the conversation state
gr.HTML("<div style='text-align: right; margin-top: 5px;'><small>Powered by Microsoft <a href=\"https://aka.ms/phi-4-multimodal/azure\">Phi-4-multimodal</a> model on Azure AI.©2025</small></div>")
with gr.Column(scale=1):
with gr.Tab("Audio & Text"):
# Example 1
gr.Audio("https://diamondfan.github.io/audio_files/english.weekend.plan.wav",
label="Preview", elem_id="small-audio")
example1_btn = gr.Button("Transcribe this audio clip")
gr.Markdown("-----")
# Example 2
gr.Audio("https://diamondfan.github.io/audio_files/japanese.seattle.trip.report.wav",
label="Preview", elem_id="small-audio")
example2_btn = gr.Button("Translate audio transcription to English")
# Define handlers for audio examples
def run_audio_example1():
return process_audio_example_direct(
"Transcribe this audio clip",
"https://diamondfan.github.io/audio_files/english.weekend.plan.wav",
[], []
)
def run_audio_example2():
return process_audio_example_direct(
"Translate audio transcription to English",
"https://diamondfan.github.io/audio_files/japanese.seattle.trip.report.wav",
[], []
)
# Connect buttons to handlers
example1_btn.click(
run_audio_example1,
inputs=[],
outputs=[chatbot, conversation_state]
)
example2_btn.click(
run_audio_example2,
inputs=[],
outputs=[chatbot, conversation_state]
)
with gr.Tab("Image & Text"):
# Example 1
gr.Image("https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Hanoi_Temple_of_Literature.jpg/640px-Hanoi_Temple_of_Literature.jpg", label="Preview")
img_example1_btn = gr.Button("Write a limerick about this image")
# Example 2
gr.Image("https://pub-c2c1d9230f0b4abb9b0d2d95e06fd4ef.r2.dev/sites/566/2024/09/Screenshot-2024-09-16-115417.png", label="Preview")
img_example2_btn = gr.Button("Convert the chart to a markdown table")
# Define handlers for image examples
def run_image_example1():
return process_image_example_direct(
"Write a limerick about this image",
"https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Hanoi_Temple_of_Literature.jpg/640px-Hanoi_Temple_of_Literature.jpg",
[], []
)
def run_image_example2():
return process_image_example_direct(
"Convert the chart to a markdown table",
"https://pub-c2c1d9230f0b4abb9b0d2d95e06fd4ef.r2.dev/sites/566/2024/09/Screenshot-2024-09-16-115417.png",
[], []
)
# Connect buttons to handlers
img_example1_btn.click(
run_image_example1,
inputs=[],
outputs=[chatbot, conversation_state]
)
img_example2_btn.click(
run_image_example2,
inputs=[],
outputs=[chatbot, conversation_state]
)
with gr.Tab("Text Only"):
# Create a list of example texts
text_example_list = [
"I'd like to buy a new car. Start by asking me about my budget and which features I care most about, then provide a recommendation.",
"Coffee shops have been slimming down their menus lately. Is less choice making our coffee runs better or do we miss the variety?",
"Explain the Transformer model to a medieval knight"
]
# Create buttons for each example
for i, example_text in enumerate(text_example_list):
with gr.Row():
# gr.Markdown(f"Example {i+1}: **{example_text}**")
text_example_btn = gr.Button(f"{example_text}")
# Connect button to handler with the specific example text
text_example_btn.click(
fn=lambda text=example_text: process_text_example(text, [], []),
inputs=[],
outputs=[chatbot, conversation_state]
)
gr.Markdown("### Instructions")
gr.Markdown("""
- Type a question or statement
- Upload images or audio files
- You can combine text with media files
- Support 2 modalities at the same time
- The model can analyze images and transcribe audio
- For best results with images, use JPG or PNG files
- For audio, use WAV, MP3, or FLAC files
""")
gr.Markdown("### Capabilities")
gr.Markdown("""
This chatbot can:
- Answer questions and provide explanations
- Describe and analyze images
- Transcribe, translate, summarize, and analyze audio content
- Process multiple inputs in the same message
- Maintain context throughout the conversation
""")
with gr.Accordion("Debug Info", open=False):
debug_output = gr.JSON(
label="Last API Request",
value={}
)
# Set up event handlers
msg_submit = chat_input.submit(
process_message, [chatbot, chat_input, conversation_state], [chatbot, chat_input, conversation_state], queue=False
)
msg_response = msg_submit.then(
bot_response, [chatbot, conversation_state], [chatbot, conversation_state], api_name="bot_response"
)
msg_response.then(enable_input, None, chat_input)
# Update debug info
msg_response.then(update_debug, conversation_state, debug_output)
demo.launch(share=True, debug=True)