Spaces:
Running
Running
import os | |
import gradio as gr | |
from typing import List | |
import logging | |
import logging.handlers | |
import time | |
import random | |
from langchain_openai import ChatOpenAI | |
from langchain_core.tools import tool | |
from langgraph.prebuilt import create_react_agent | |
from langchain_core.messages import HumanMessage | |
from langchain_tavily import TavilySearch | |
# Configuration - set to False to disable detailed logging | |
ENABLE_DETAILED_LOGGING = True | |
# Setup logging with rotation (7 days max) | |
if ENABLE_DETAILED_LOGGING: | |
# Create formatter | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Setup console handler | |
console_handler = logging.StreamHandler() | |
console_handler.setFormatter(formatter) | |
# Setup rotating file handler (7 days, daily rotation) | |
file_handler = logging.handlers.TimedRotatingFileHandler( | |
'agent.log', | |
when='midnight', | |
interval=1, | |
backupCount=7, # Keep 7 days of logs | |
encoding='utf-8' | |
) | |
file_handler.setFormatter(formatter) | |
# Configure root logger | |
logging.basicConfig( | |
level=logging.INFO, | |
handlers=[console_handler, file_handler] | |
) | |
else: | |
logging.basicConfig(level=logging.WARNING) | |
logger = logging.getLogger(__name__) | |
# Configuration from environment variables | |
llm_ip = os.environ.get('public_ip') | |
llm_port = os.environ.get('port') | |
llm_key = os.environ.get('api_key') | |
llm_model = os.environ.get('model') | |
# Tavily API configuration | |
tavily_key = os.environ.get('tavily_key', '') | |
# Tavily search tool integration | |
class ReactAgentChat: | |
def __init__(self, ip: str, port: str, api_key: str, model: str): | |
self.ip = ip | |
self.port = port | |
self.api_key = api_key | |
self.model = model | |
self.agent = None | |
self._setup_agent() | |
def _setup_agent(self): | |
"""Initialize the LangGraph ReAct agent""" | |
try: | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== SETTING UP AGENT ===") | |
logger.info(f"LLM URL: http://{self.ip}:{self.port}/v1") | |
logger.info(f"Model: {self.model}") | |
# Create OpenAI-compatible model | |
llm = ChatOpenAI( | |
base_url=f"http://{self.ip}:{self.port}/v1", | |
api_key=self.api_key, | |
model=self.model, | |
temperature=0.7 | |
) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("LLM created successfully") | |
# Define tools - use Tavily search API with graceful error handling | |
if tavily_key: | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("Setting up Tavily search tool") | |
try: | |
# Create custom wrapper for Tavily with error handling | |
def web_search(query: str) -> str: | |
"""Search the web for current information about any topic.""" | |
try: | |
tavily_tool = TavilySearch( | |
api_key=tavily_key, | |
max_results=5, | |
topic="general", | |
include_answer=True, | |
search_depth="advanced" | |
) | |
result = tavily_tool.invoke({"query": query}) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"Tavily search successful for query: {query}") | |
return result | |
except Exception as e: | |
error_str = str(e).lower() | |
if ENABLE_DETAILED_LOGGING: | |
logger.error(f"Tavily search failed for query '{query}': {e}") | |
# Check for rate limit or quota issues | |
if any(keyword in error_str for keyword in ['rate limit', 'quota', 'limit exceeded', 'usage limit', 'billing']): | |
if ENABLE_DETAILED_LOGGING: | |
logger.warning(f"Tavily rate limit/quota exceeded: {e}") | |
return "I can't search the web right now." | |
else: | |
if ENABLE_DETAILED_LOGGING: | |
logger.error(f"Tavily API error: {e}") | |
return "I can't search the web right now." | |
search_tool = web_search | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("Tavily search tool wrapper created successfully") | |
except Exception as e: | |
if ENABLE_DETAILED_LOGGING: | |
logger.error(f"Failed to create Tavily tool wrapper: {e}") | |
# Fallback tool | |
def no_search(query: str) -> str: | |
"""Search tool unavailable.""" | |
return "I can't search the web right now." | |
search_tool = no_search | |
else: | |
if ENABLE_DETAILED_LOGGING: | |
logger.warning("No Tavily API key found, creating fallback tool") | |
def no_search(query: str) -> str: | |
"""Search tool unavailable.""" | |
if ENABLE_DETAILED_LOGGING: | |
logger.error("Search attempted but no Tavily API key configured") | |
return "I can't search the web right now." | |
search_tool = no_search | |
tools = [search_tool] | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"Tools defined: {[tool.name for tool in tools]}") | |
# Bind tools to the model | |
model_with_tools = llm.bind_tools(tools) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("Tools bound to model") | |
# Create the ReAct agent | |
self.agent = create_react_agent(model_with_tools, tools) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("ReAct agent created successfully") | |
except Exception as e: | |
logger.error(f"=== AGENT SETUP ERROR ===") | |
logger.error(f"Failed to setup agent: {e}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
raise e | |
def update_config(self, ip: str, port: str, api_key: str, model: str): | |
"""Update LLM configuration""" | |
if (ip != self.ip or port != self.port or | |
api_key != self.api_key or model != self.model): | |
self.ip = ip | |
self.port = port | |
self.api_key = api_key | |
self.model = model | |
self._setup_agent() | |
def chat(self, message: str, history: List[List[str]]) -> str: | |
"""Generate chat response using ReAct agent""" | |
try: | |
if not self.agent: | |
return "Error: Agent not initialized" | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== USER INPUT ===") | |
logger.info(f"Message: {message}") | |
logger.info(f"History length: {len(history)}") | |
# Convert history to messages for context handling | |
messages = [] | |
for user_msg, assistant_msg in history: | |
messages.append(HumanMessage(content=user_msg)) | |
if assistant_msg: # Only add if assistant responded | |
from langchain_core.messages import AIMessage | |
messages.append(AIMessage(content=assistant_msg)) | |
# Add current message | |
messages.append(HumanMessage(content=message)) | |
# Invoke the agent | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== INVOKING AGENT ===") | |
logger.info(f"Total messages in history: {len(messages)}") | |
response = self.agent.invoke({"messages": messages}) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== AGENT RESPONSE ===") | |
logger.info(f"Full response: {response}") | |
logger.info(f"Number of messages: {len(response.get('messages', []))}") | |
# Log each message in the response | |
for i, msg in enumerate(response.get("messages", [])): | |
logger.info(f"Message {i}: Type={type(msg).__name__}, Content={getattr(msg, 'content', 'No content')}") | |
# Extract the final response | |
final_message = response["messages"][-1].content | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== FINAL MESSAGE ===") | |
logger.info(f"Final message: {final_message}") | |
return final_message | |
except Exception as e: | |
error_msg = f"Agent error: {str(e)}" | |
logger.error(f"=== AGENT ERROR ===") | |
logger.error(f"Error: {e}") | |
logger.error(f"Error type: {type(e)}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
return error_msg | |
# Global agent instance | |
react_agent = ReactAgentChat(llm_ip, llm_port, llm_key, llm_model) | |
def generate_response(message: str, history: List[List[str]], system_prompt: str, | |
max_tokens: int, ip: str, port: str, api_key: str, model: str): | |
"""Generate response using ReAct agent""" | |
global react_agent | |
try: | |
# Update agent configuration if changed | |
react_agent.update_config(ip, port, api_key, model) | |
# Generate response | |
response = react_agent.chat(message, history) | |
# Stream the response word by word for better UX | |
words = response.split() | |
current_response = "" | |
for word in words: | |
current_response += word + " " | |
yield current_response.strip() | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
logger.error(error_msg) | |
yield error_msg | |
# Create Gradio ChatInterface | |
chatbot = gr.ChatInterface( | |
generate_response, | |
chatbot=gr.Chatbot( | |
avatar_images=[ | |
None, | |
"https://cdn-avatars.huggingface.co/v1/production/uploads/64e6d37e02dee9bcb9d9fa18/o_HhUnXb_PgyYlqJ6gfEO.png" | |
], | |
height="64vh", | |
type="messages" | |
), | |
additional_inputs=[ | |
gr.Textbox( | |
"You are a helpful AI assistant with web search capabilities.", | |
label="System Prompt", | |
lines=2 | |
), | |
gr.Slider(50, 2048, label="Max Tokens", value=512, | |
info="Maximum number of tokens in the response"), | |
gr.Textbox(llm_ip, label="LLM IP Address", | |
info="IP address of the OpenAI-compatible LLM server"), | |
gr.Textbox(llm_port, label="LLM Port", | |
info="Port of the LLM server"), | |
gr.Textbox(llm_key, label="API Key", type="password", | |
info="API key for the LLM server"), | |
gr.Textbox(llm_model, label="Model Name", | |
info="Name of the model to use"), | |
], | |
title="🤖 LangGraph ReAct Agent with DuckDuckGo Search", | |
description="Chat with a LangGraph ReAct agent that can search the web using DuckDuckGo. Ask about current events, research topics, or any questions that require up-to-date information!", | |
theme="finlaymacklon/smooth_slate", | |
submit_btn="Send", | |
retry_btn="🔄 Regenerate Response", | |
undo_btn="↩ Delete Previous", | |
clear_btn="🗑️ Clear Chat" | |
) | |
if __name__ == "__main__": | |
chatbot.queue().launch() |