Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import gradio as gr | |
| # Azure AI Agents SDK | |
| from azure.core.credentials import AzureKeyCredential | |
| from azure.ai.agents import AgentsClient | |
| from azure.ai.agents.models import ( | |
| FilePurpose, | |
| CodeInterpreterTool, | |
| ListSortOrder, | |
| MessageRole, | |
| ) | |
| def init_agent(endpoint: str, api_key: str, model_deployment: str, data_file) -> dict: | |
| """ | |
| Initialize an Azure AI Agent with optional data file for the Code Interpreter. | |
| Returns a session dict with client, agent, thread, and bookkeeping. | |
| """ | |
| if not endpoint or not api_key or not model_deployment: | |
| raise ValueError("Please provide endpoint, key, and model deployment name.") | |
| # Create client (API key auth) | |
| client = AgentsClient( | |
| endpoint=endpoint.strip(), | |
| credential=AzureKeyCredential(api_key.strip()), | |
| ) | |
| # Create a temporary file path if a file is provided | |
| temp_path = None | |
| if data_file is not None: | |
| # Gradio gives a tempfile-like object; persist it to a path for upload | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
| tmp.write(data_file.read()) | |
| temp_path = tmp.name | |
| with client: | |
| code_interpreter = None | |
| if temp_path: | |
| # Upload file for agent use | |
| up = client.files.upload_and_poll(file_path=temp_path, purpose=FilePurpose.AGENTS) | |
| # Create the tool bound to this file | |
| code_interpreter = CodeInterpreterTool(file_ids=[up.id]) | |
| # Define the agent (attach tools if we created one) | |
| agent = client.create_agent( | |
| model=model_deployment, | |
| name="data-agent", | |
| instructions=( | |
| "You are an AI agent that analyzes the uploaded data when present. " | |
| "Use Python via the Code Interpreter to compute statistical metrics or produce " | |
| "text-based charts when asked. If no file is provided, proceed with normal reasoning." | |
| ), | |
| tools=(code_interpreter.definitions if code_interpreter else None), | |
| tool_resources=(code_interpreter.resources if code_interpreter else None), | |
| ) | |
| # Create a thread for the conversation | |
| thread = client.threads.create() | |
| # Keep the client open for subsequent calls (no context manager here) | |
| session = { | |
| "endpoint": endpoint.strip(), | |
| "api_key": api_key.strip(), | |
| "model": model_deployment.strip(), | |
| "client": client, | |
| "agent_id": agent.id, | |
| "thread_id": thread.id, | |
| "has_file": data_file is not None, | |
| "temp_path": temp_path, # to clean up later if we want | |
| } | |
| return session | |
| def send_message(user_msg: str, session: dict): | |
| """ | |
| Send a user message to the existing thread and return the agent's latest reply | |
| as well as a printable conversation history. | |
| """ | |
| if not session or "client" not in session: | |
| raise ValueError("Agent is not initialized. Click 'Connect & Prepare' first.") | |
| client: AgentsClient = session["client"] | |
| agent_id = session["agent_id"] | |
| thread_id = session["thread_id"] | |
| # Create the user message on the thread | |
| client.messages.create( | |
| thread_id=thread_id, | |
| role="user", | |
| content=user_msg, | |
| ) | |
| # Run the agent on the thread and wait for completion | |
| run = client.runs.create_and_process(thread_id=thread_id, agent_id=agent_id) | |
| if getattr(run, "status", None) == "failed": | |
| last_error = getattr(run, "last_error", "Unknown error") | |
| return f"Run failed: {last_error}", "" | |
| # Get the last agent message text | |
| last_msg = client.messages.get_last_message_text_by_role( | |
| thread_id=thread_id, | |
| role=MessageRole.AGENT, | |
| ) | |
| agent_reply = last_msg.text.value if last_msg else "(No reply text found.)" | |
| # Build a readable conversation history | |
| history_lines = [] | |
| messages = client.messages.list(thread_id=thread_id, order=ListSortOrder.ASCENDING) | |
| for m in messages: | |
| if m.text_messages: | |
| last_text = m.text_messages[-1].text.value | |
| history_lines.append(f"{m.role}: {last_text}") | |
| history_str = "\n\n".join(history_lines) | |
| return agent_reply, history_str | |
| def teardown(session: dict): | |
| """ | |
| Delete the agent (and optionally the temp file) to avoid unnecessary Azure costs. | |
| Note: Threads are retained by service; you can delete agents to clean up. | |
| """ | |
| if not session: | |
| return "Nothing to clean up." | |
| msg = [] | |
| try: | |
| client: AgentsClient = session.get("client") | |
| if client: | |
| with client: | |
| agent_id = session.get("agent_id") | |
| if agent_id: | |
| client.delete_agent(agent_id) | |
| msg.append("Deleted agent.") | |
| except Exception as e: | |
| msg.append(f"Cleanup warning: {e}") | |
| # Remove temp file if created | |
| try: | |
| temp_path = session.get("temp_path") | |
| if temp_path and os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| msg.append("Removed temp file.") | |
| except Exception as e: | |
| msg.append(f"Temp cleanup warning: {e}") | |
| return " ".join(msg) if msg else "Cleanup complete." | |
| # ----------------- Gradio UI ----------------- | |
| with gr.Blocks(title="Azure AI Agent (Endpoint+Key) — Gradio") as demo: | |
| gr.Markdown( | |
| "## Azure AI Agent (Code Interpreter Ready)\n" | |
| "Enter your **Project Endpoint** and **Key**, select your **Model Deployment** (e.g., `gpt-4o`), " | |
| "optionally upload a data file (CSV/TXT), then chat.\n" | |
| "Click **Connect & Prepare** once, then send prompts in the chat." | |
| ) | |
| with gr.Row(): | |
| endpoint = gr.Textbox(label="Project Endpoint", placeholder="https://<your-project-endpoint>") | |
| api_key = gr.Textbox(label="Project Key", placeholder="paste your key", type="password") | |
| with gr.Row(): | |
| model = gr.Textbox(label="Model Deployment Name", value="gpt-4o") | |
| data_file = gr.File(label="Optional data file for Code Interpreter (txt/csv)", file_types=[".txt", ".csv"], type="binary") | |
| session_state = gr.State(value=None) | |
| connect_btn = gr.Button("🔌 Connect & Prepare Agent", variant="primary") | |
| connect_status = gr.Markdown("") | |
| with gr.Row(): | |
| chatbot = gr.Chatbot(height=420, label="Conversation").style(height=420) | |
| user_input = gr.Textbox(label="Your message", placeholder="Ask a question or request a chart…") | |
| with gr.Row(): | |
| send_btn = gr.Button("Send ▶") | |
| cleanup_btn = gr.Button("Delete Agent & Cleanup 🧹") | |
| history = gr.Textbox(label="Conversation Log (chronological)", lines=12) | |
| # Callbacks | |
| def on_connect(ep, key, mdl, f): | |
| try: | |
| sess = init_agent(ep, key, mdl, f) | |
| return sess, "✅ Connected. Agent and thread are ready." | |
| except Exception as e: | |
| return None, f"❌ Connection error: {e}" | |
| connect_btn.click( | |
| fn=on_connect, | |
| inputs=[endpoint, api_key, model, data_file], | |
| outputs=[session_state, connect_status], | |
| ) | |
| def on_send(msg, session, chat_hist): | |
| if not msg: | |
| return gr.update(), chat_hist, gr.update(value="Please enter a message.") | |
| try: | |
| reply, log = send_message(msg, session) | |
| chat_hist = (chat_hist or []) + [[msg, reply]] | |
| return chat_hist, chat_hist, gr.update(value=log) | |
| except Exception as e: | |
| return chat_hist, chat_hist, gr.update(value=f"❌ Error: {e}") | |
| send_btn.click( | |
| fn=on_send, | |
| inputs=[user_input, session_state, chatbot], | |
| outputs=[chatbot, chatbot, history], | |
| ) | |
| def on_cleanup(session): | |
| try: | |
| msg = teardown(session) | |
| return None, f"🧹 {msg}" | |
| except Exception as e: | |
| return session, f"⚠️ Cleanup error: {e}" | |
| cleanup_btn.click( | |
| fn=on_cleanup, | |
| inputs=[session_state], | |
| outputs=[session_state, connect_status], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |