Spaces:
Runtime error
Runtime error
""" | |
Advanced Agentic System Interface | |
------------------------------- | |
Provides a chat interface to interact with the autonomous agent teams: | |
- Team A: Coders (App/Software Developers) | |
- Team B: Business (Entrepreneurs) | |
- Team C: Research (Deep Online Research) | |
- Team D: Crypto & Sports Trading | |
""" | |
import os | |
import socket | |
import gradio as gr | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
import uvicorn | |
from typing import Dict, Any, List, Tuple, Optional | |
import logging | |
from pathlib import Path | |
import asyncio | |
from datetime import datetime | |
import json | |
import requests | |
from requests.adapters import HTTPAdapter, Retry | |
from dataclasses import dataclass | |
from agentic_system import AgenticSystem | |
from orchestrator import AgentOrchestrator | |
from team_management import TeamManager, TeamType | |
from reasoning import ( | |
UnifiedReasoningEngine, | |
StrategyType, | |
UnifiedResult | |
) | |
from api.openai_compatible import OpenAICompatibleAPI | |
from api.venture_api import VentureAPI | |
from api.groq_api import GroqAPI | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def setup_requests_session(): | |
"""Set up requests session with retries.""" | |
session = requests.Session() | |
retries = Retry( | |
total=5, | |
backoff_factor=0.1, | |
status_forcelist=[500, 502, 503, 504] | |
) | |
session.mount('http://', HTTPAdapter(max_retries=retries)) | |
session.mount('https://', HTTPAdapter(max_retries=retries)) | |
return session | |
def check_network(): | |
"""Check network connectivity.""" | |
try: | |
# Try DNS resolution first | |
socket.gethostbyname('huggingface.co') | |
return True | |
except socket.gaierror: | |
logger.warning("DNS resolution failed") | |
try: | |
# Try HTTP request as backup | |
session = setup_requests_session() | |
response = session.get('https://huggingface.co', timeout=5) | |
return response.status_code == 200 | |
except (requests.RequestException, socket.gaierror) as e: | |
logger.warning(f"Network connectivity check failed: {e}") | |
return False | |
class ChatInterface: | |
"""Chat interface for interacting with the agentic system.""" | |
def __init__(self): | |
"""Initialize the chat interface.""" | |
# Check network connectivity | |
if not check_network(): | |
raise ConnectionError("No network connectivity. Please check your connection.") | |
# Initialize core components with consistent configuration | |
config = { | |
"min_confidence": 0.7, | |
"parallel_threshold": 3, | |
"learning_rate": 0.1, | |
"strategy_weights": { | |
"LOCAL_LLM": 0.8, | |
"CHAIN_OF_THOUGHT": 0.6, | |
"TREE_OF_THOUGHTS": 0.5, | |
"META_LEARNING": 0.4 | |
} | |
} | |
# Initialize chat state | |
self.chat_history = [] | |
self.active_objectives = {} | |
# Initialize components | |
self.orchestrator = AgentOrchestrator(config) | |
self.team_manager = TeamManager(self.orchestrator) | |
self.reasoning_engine = UnifiedReasoningEngine() | |
self.groq_api = GroqAPI() | |
# Set up the agentic system | |
self.agentic_system = AgenticSystem(config) | |
# Initialize FastAPI app | |
self.app = FastAPI() | |
self.setup_cors() | |
self.setup_routes() | |
# Create Gradio interface | |
self.interface = self.create_interface() | |
# Launch background tasks | |
self.background_tasks = [] | |
self.launch_background_tasks() | |
async def initialize(self): | |
"""Initialize async components.""" | |
await self.team_manager.initialize_team_agents() | |
def launch_background_tasks(self): | |
"""Launch background tasks.""" | |
loop = asyncio.get_event_loop() | |
self.background_tasks.append( | |
loop.create_task(self.initialize()) | |
) | |
def setup_cors(self): | |
"""Set up CORS middleware.""" | |
self.app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def setup_routes(self): | |
"""Set up API routes.""" | |
# Include OpenAI-compatible routes | |
openai_api = OpenAICompatibleAPI(self.reasoning_engine) | |
self.app.include_router(openai_api.router, tags=["OpenAI Compatible"]) | |
# Original API routes | |
async def health_check(): | |
"""Health check endpoint.""" | |
return { | |
"status": "healthy", | |
"version": "1.0.0", | |
"endpoints": { | |
"openai_compatible": "/v1/chat/completions", | |
"venture": "/api/venture", | |
"ui": "/" | |
} | |
} | |
async def reason(query: str, context: Optional[Dict[str, Any]] = None): | |
"""Reasoning endpoint.""" | |
try: | |
result = await self.reasoning_engine.reason(query, context or {}) | |
return result | |
except Exception as e: | |
logger.error(f"Reasoning error: {e}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def analyze_venture( | |
venture_type: str, | |
description: str, | |
metrics: Optional[Dict[str, Any]] = None | |
): | |
"""Venture analysis endpoint.""" | |
try: | |
result = await VentureAPI(self.reasoning_engine).analyze_venture( | |
venture_type=venture_type, | |
description=description, | |
metrics=metrics or {} | |
) | |
return result | |
except Exception as e: | |
logger.error(f"Analysis error: {e}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_venture_types(): | |
"""Get available venture types.""" | |
return VentureAPI(self.reasoning_engine).get_venture_types() | |
def create_interface(self) -> gr.Blocks: | |
"""Create the Gradio interface.""" | |
with gr.Blocks( | |
title="Advanced Agentic System" | |
) as interface: | |
gr.Markdown(""" | |
# 🤖 Advanced Agentic System Chat Interface | |
Welcome to our AI-powered autonomous agent teams! Each team specializes in different domains: | |
- 💻 **Team A: Coders** - Expert software developers and architects | |
- 💼 **Team B: Business** - Strategic entrepreneurs and analysts | |
- 🔍 **Team C: Research** - Deep online research specialists | |
- 📈 **Team D: Trading** - Crypto & sports trading experts | |
You can: | |
1. Ask questions about any domain | |
2. Create new objectives for teams | |
3. Check status of ongoing work | |
4. Get insights and recommendations | |
--- | |
""") | |
chatbot = gr.Chatbot( | |
show_label=False, | |
height=500, | |
type="messages" # Use OpenAI-style message format | |
) | |
with gr.Row(): | |
msg = gr.Textbox( | |
show_label=False, | |
placeholder="Chat with the Agentic System...", | |
container=False | |
) | |
submit = gr.Button("Send 🚀") | |
with gr.Row(): | |
clear = gr.ClearButton([msg, chatbot], value="Clear") | |
retry = gr.Button("Retry") | |
async def respond(message, history): | |
"""Handle chat responses with proper formatting.""" | |
try: | |
# Convert history to the format expected by process_message | |
history_list = [[msg["content"] for msg in exchange] for exchange in history] if history else [] | |
response = await self.process_message(message, history_list) | |
# Format response for markdown rendering | |
formatted_response = response.replace('```', '\n```\n') | |
# Update history with the new message format | |
return "", history + [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": formatted_response} | |
] | |
except Exception as e: | |
logger.error(f"Error in chat response: {str(e)}") | |
error_msg = "I apologize, but I encountered an error. Please try again." | |
return "", history + [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": error_msg} | |
] | |
async def retry_last(history): | |
"""Retry the last message with proper formatting.""" | |
if not history: | |
return history | |
last_user_msg = history[-2]["content"] # Get the last user message | |
history = history[:-2] # Remove last exchange | |
return await respond(last_user_msg, history) | |
# Submit handlers with loading states | |
submit_event = msg.submit( | |
fn=respond, | |
inputs=[msg, chatbot], | |
outputs=[msg, chatbot], | |
api_name=False | |
).then( | |
lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), | |
None, | |
[msg, submit] | |
) | |
# Click handlers with loading states | |
click_event = submit.click( | |
fn=respond, | |
inputs=[msg, chatbot], | |
outputs=[msg, chatbot], | |
api_name=False | |
).then( | |
lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), | |
None, | |
[msg, submit] | |
) | |
# Retry handler | |
retry.click( | |
fn=retry_last, | |
inputs=[chatbot], | |
outputs=[chatbot], | |
api_name=False | |
) | |
# Auto-focus and dynamic submit button state | |
msg.change( | |
lambda x: ( | |
gr.update(interactive=bool(x.strip())), | |
gr.update(interactive=bool(x.strip()), variant="primary" if x.strip() else "secondary") | |
), | |
[msg], | |
[msg, submit] | |
) | |
# Example queries with emojis | |
gr.Examples( | |
examples=[ | |
"💻 Can Team A help me build a web application?", | |
"💼 Create a new objective: Analyze market trends for AI startups", | |
"🔍 Research the latest developments in quantum computing", | |
"📈 What's the current status of all teams?" | |
], | |
inputs=msg, | |
label="Example Queries", | |
examples_per_page=4 | |
) | |
return interface | |
async def process_message( | |
self, | |
message: str, | |
history: List[List[str]] = None | |
) -> str: | |
"""Process a user message.""" | |
try: | |
# Initialize history if None | |
if history is None: | |
history = [] | |
# Update chat history | |
self.chat_history = history | |
# Analyze message intent | |
intent = await self._analyze_intent(message) | |
# Process based on intent | |
if intent.get('type') == 'objective': | |
response = await self._handle_objective(message, intent) | |
elif intent.get('type') == 'status': | |
response = await self._get_status() | |
elif intent.get('type') == 'chat': | |
response = await self._handle_chat(message) | |
else: | |
response = await self._handle_chat(message) # Default to chat handler | |
return response | |
except Exception as e: | |
logger.error(f"Error processing message: {str(e)}") | |
return "I encountered an error processing your message. Please try again." | |
async def _analyze_intent(self, message: str) -> Dict[str, Any]: | |
"""Analyze user message intent with error handling.""" | |
try: | |
# Use reasoning engine to analyze intent | |
result = await self.reasoning_engine.reason( | |
query=message, | |
context={ | |
"chat_history": self.chat_history, | |
"active_objectives": self.active_objectives | |
} | |
) | |
# Handle UnifiedResult object | |
if isinstance(result, UnifiedResult): | |
return { | |
"type": "chat", | |
"confidence": getattr(result, 'confidence', 0.5), | |
"metadata": getattr(result, 'metadata', {}) | |
} | |
elif isinstance(result, dict): | |
return result | |
else: | |
return {"type": "chat", "confidence": 0.5} | |
except Exception as e: | |
logger.error(f"Error analyzing intent: {str(e)}") | |
return {"type": "chat", "error": str(e)} | |
async def _handle_objective(self, message: str, intent: Dict[str, Any]) -> str: | |
"""Handle objective creation and management.""" | |
try: | |
# Extract objective details | |
objective = intent.get('objective', {}) | |
# Create objective | |
objective_id = await self.team_manager.create_cross_team_objective( | |
objective=objective.get('description', message), | |
required_teams=objective.get('teams', []), | |
priority=objective.get('priority', 'MEDIUM') | |
) | |
# Monitor progress | |
status = await self.team_manager.monitor_objective_progress(objective_id) | |
return f"Created objective {objective_id}. Current status: {status}" | |
except Exception as e: | |
logger.error(f"Error handling objective: {str(e)}") | |
return "Failed to create objective. Please try again." | |
async def _handle_chat(self, message: str) -> str: | |
"""Handle general chat interactions with error recovery.""" | |
try: | |
# First try using the reasoning engine | |
try: | |
result = await self.reasoning_engine.reason( | |
query=message, | |
context={ | |
"chat_history": self.chat_history, | |
"active_objectives": self.active_objectives, | |
"groq_api": self.groq_api | |
} | |
) | |
# Handle UnifiedResult object | |
if isinstance(result, UnifiedResult): | |
if not result.success: | |
# If reasoning engine fails, fallback to Groq API | |
groq_result = await self.groq_api.predict(message) | |
if groq_result["success"]: | |
return groq_result["answer"] | |
else: | |
return "I encountered an error. Please try rephrasing your question." | |
return result.answer if hasattr(result, 'answer') else str(result) | |
elif isinstance(result, dict): | |
return result.get('response', str(result)) | |
else: | |
return str(result) | |
except Exception as reasoning_error: | |
logger.error(f"Reasoning engine error: {str(reasoning_error)}") | |
# Fallback to Groq API | |
groq_result = await self.groq_api.predict(message) | |
if groq_result["success"]: | |
return groq_result["answer"] | |
else: | |
raise Exception(f"Both reasoning engine and Groq API failed: {groq_result.get('error')}") | |
except Exception as e: | |
logger.error(f"Error in chat response: {str(e)}") | |
return "I encountered an error generating a response. Please try again." | |
async def _get_status(self) -> str: | |
"""Get system status information.""" | |
try: | |
# Get team status | |
team_status = await self.team_manager.get_team_status() | |
# Get objective status | |
objective_status = await self.team_manager.get_objective_status() | |
# Format status information | |
status = "Current System Status:\n\n" | |
# Add team information | |
status += "Teams:\n" | |
for team, info in team_status.items(): | |
status += f"- {team}: {info['status']}\n" | |
status += f" Active Projects: {info['active_projects']}\n" | |
status += f" Success Rate: {info['success_rate']}%\n\n" | |
# Add objective information | |
status += "\nActive Objectives:\n" | |
for obj, info in objective_status.items(): | |
status += f"- {obj}: {info['status']}\n" | |
status += f" Progress: {info['progress']}%\n" | |
status += f" Teams: {', '.join(info['teams'])}\n\n" | |
return status | |
except Exception as e: | |
logger.error(f"Error formatting status: {str(e)}") | |
return "Error formatting status information." | |
def create_chat_interface() -> gr.Blocks: | |
"""Create Gradio chat interface.""" | |
chat = ChatInterface() | |
return chat.interface | |
# Initialize FastAPI | |
app = FastAPI( | |
title="Advanced Agentic System API", | |
description="API for interacting with the autonomous agent teams", | |
version="1.0.0" | |
) | |
# Create Gradio interface | |
interface = create_chat_interface() | |
# Mount Gradio app to FastAPI | |
app = gr.mount_gradio_app(app, interface, path="/") | |
if __name__ == "__main__": | |
# Run with uvicorn when called directly | |
uvicorn.run( | |
"app:app", | |
host="0.0.0.0", | |
port=7860, | |
reload=True, | |
workers=4 | |
) | |