""" Advanced Agentic System Interface ------------------------------- Provides a chat interface to interact with the autonomous agent teams: - Team A: Coders (App/Software Developers) - Team B: Business (Entrepreneurs) - Team C: Research (Deep Online Research) - Team D: Crypto & Sports Trading """ import os import socket import gradio as gr from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing import Dict, Any, List, Tuple, Optional import logging from pathlib import Path import asyncio from datetime import datetime import json import requests from requests.adapters import HTTPAdapter, Retry from dataclasses import dataclass from agentic_system import AgenticSystem from orchestrator import AgentOrchestrator from team_management import TeamManager, TeamType from reasoning import ( UnifiedReasoningEngine, StrategyType, UnifiedResult ) from api.openai_compatible import OpenAICompatibleAPI from api.venture_api import VentureAPI from api.groq_api import GroqAPI # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def setup_requests_session(): """Set up requests session with retries.""" session = requests.Session() retries = Retry( total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504] ) session.mount('http://', HTTPAdapter(max_retries=retries)) session.mount('https://', HTTPAdapter(max_retries=retries)) return session def check_network(): """Check network connectivity.""" try: # Try DNS resolution first socket.gethostbyname('huggingface.co') return True except socket.gaierror: logger.warning("DNS resolution failed") try: # Try HTTP request as backup session = setup_requests_session() response = session.get('https://huggingface.co', timeout=5) return response.status_code == 200 except (requests.RequestException, socket.gaierror) as e: logger.warning(f"Network connectivity check failed: {e}") return False class ChatInterface: """Chat interface for interacting with the agentic system.""" def __init__(self): """Initialize the chat interface.""" # Check network connectivity if not check_network(): raise ConnectionError("No network connectivity. Please check your connection.") # Initialize core components with consistent configuration config = { "min_confidence": 0.7, "parallel_threshold": 3, "learning_rate": 0.1, "strategy_weights": { "LOCAL_LLM": 0.8, "CHAIN_OF_THOUGHT": 0.6, "TREE_OF_THOUGHTS": 0.5, "META_LEARNING": 0.4 } } # Initialize chat state self.chat_history = [] self.active_objectives = {} # Initialize components self.orchestrator = AgentOrchestrator(config) self.team_manager = TeamManager(self.orchestrator) self.reasoning_engine = UnifiedReasoningEngine() self.groq_api = GroqAPI() # Set up the agentic system self.agentic_system = AgenticSystem(config) # Initialize FastAPI app self.app = FastAPI() self.setup_cors() self.setup_routes() # Create Gradio interface self.interface = self.create_interface() # Launch background tasks self.background_tasks = [] self.launch_background_tasks() async def initialize(self): """Initialize async components.""" await self.team_manager.initialize_team_agents() def launch_background_tasks(self): """Launch background tasks.""" loop = asyncio.get_event_loop() self.background_tasks.append( loop.create_task(self.initialize()) ) def setup_cors(self): """Set up CORS middleware.""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def setup_routes(self): """Set up API routes.""" # Include OpenAI-compatible routes openai_api = OpenAICompatibleAPI(self.reasoning_engine) self.app.include_router(openai_api.router, tags=["OpenAI Compatible"]) # Original API routes @self.app.get("/api/health") async def health_check(): """Health check endpoint.""" return { "status": "healthy", "version": "1.0.0", "endpoints": { "openai_compatible": "/v1/chat/completions", "venture": "/api/venture", "ui": "/" } } @self.app.post("/api/reason") async def reason(query: str, context: Optional[Dict[str, Any]] = None): """Reasoning endpoint.""" try: result = await self.reasoning_engine.reason(query, context or {}) return result except Exception as e: logger.error(f"Reasoning error: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.post("/api/venture/analyze") async def analyze_venture( venture_type: str, description: str, metrics: Optional[Dict[str, Any]] = None ): """Venture analysis endpoint.""" try: result = await VentureAPI(self.reasoning_engine).analyze_venture( venture_type=venture_type, description=description, metrics=metrics or {} ) return result except Exception as e: logger.error(f"Analysis error: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/api/venture/types") async def get_venture_types(): """Get available venture types.""" return VentureAPI(self.reasoning_engine).get_venture_types() def create_interface(self) -> gr.Blocks: """Create the Gradio interface.""" with gr.Blocks( title="Advanced Agentic System" ) as interface: gr.Markdown(""" # 🤖 Advanced Agentic System Chat Interface Welcome to our AI-powered autonomous agent teams! Each team specializes in different domains: - 💻 **Team A: Coders** - Expert software developers and architects - 💼 **Team B: Business** - Strategic entrepreneurs and analysts - 🔍 **Team C: Research** - Deep online research specialists - 📈 **Team D: Trading** - Crypto & sports trading experts You can: 1. Ask questions about any domain 2. Create new objectives for teams 3. Check status of ongoing work 4. Get insights and recommendations --- """) chatbot = gr.Chatbot( show_label=False, height=500, type="messages" # Use OpenAI-style message format ) with gr.Row(): msg = gr.Textbox( show_label=False, placeholder="Chat with the Agentic System...", container=False ) submit = gr.Button("Send 🚀") with gr.Row(): clear = gr.ClearButton([msg, chatbot], value="Clear") retry = gr.Button("Retry") async def respond(message, history): """Handle chat responses with proper formatting.""" try: # Convert history to the format expected by process_message history_list = [[msg["content"] for msg in exchange] for exchange in history] if history else [] response = await self.process_message(message, history_list) # Format response for markdown rendering formatted_response = response.replace('```', '\n```\n') # Update history with the new message format return "", history + [ {"role": "user", "content": message}, {"role": "assistant", "content": formatted_response} ] except Exception as e: logger.error(f"Error in chat response: {str(e)}") error_msg = "I apologize, but I encountered an error. Please try again." return "", history + [ {"role": "user", "content": message}, {"role": "assistant", "content": error_msg} ] async def retry_last(history): """Retry the last message with proper formatting.""" if not history: return history last_user_msg = history[-2]["content"] # Get the last user message history = history[:-2] # Remove last exchange return await respond(last_user_msg, history) # Submit handlers with loading states submit_event = msg.submit( fn=respond, inputs=[msg, chatbot], outputs=[msg, chatbot], api_name=False ).then( lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), None, [msg, submit] ) # Click handlers with loading states click_event = submit.click( fn=respond, inputs=[msg, chatbot], outputs=[msg, chatbot], api_name=False ).then( lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), None, [msg, submit] ) # Retry handler retry.click( fn=retry_last, inputs=[chatbot], outputs=[chatbot], api_name=False ) # Auto-focus and dynamic submit button state msg.change( lambda x: ( gr.update(interactive=bool(x.strip())), gr.update(interactive=bool(x.strip()), variant="primary" if x.strip() else "secondary") ), [msg], [msg, submit] ) # Example queries with emojis gr.Examples( examples=[ "💻 Can Team A help me build a web application?", "💼 Create a new objective: Analyze market trends for AI startups", "🔍 Research the latest developments in quantum computing", "📈 What's the current status of all teams?" ], inputs=msg, label="Example Queries", examples_per_page=4 ) return interface async def process_message( self, message: str, history: List[List[str]] = None ) -> str: """Process a user message.""" try: # Initialize history if None if history is None: history = [] # Update chat history self.chat_history = history # Analyze message intent intent = await self._analyze_intent(message) # Process based on intent if intent.get('type') == 'objective': response = await self._handle_objective(message, intent) elif intent.get('type') == 'status': response = await self._get_status() elif intent.get('type') == 'chat': response = await self._handle_chat(message) else: response = await self._handle_chat(message) # Default to chat handler return response except Exception as e: logger.error(f"Error processing message: {str(e)}") return "I encountered an error processing your message. Please try again." async def _analyze_intent(self, message: str) -> Dict[str, Any]: """Analyze user message intent with error handling.""" try: # Use reasoning engine to analyze intent result = await self.reasoning_engine.reason( query=message, context={ "chat_history": self.chat_history, "active_objectives": self.active_objectives } ) # Handle UnifiedResult object if isinstance(result, UnifiedResult): return { "type": "chat", "confidence": getattr(result, 'confidence', 0.5), "metadata": getattr(result, 'metadata', {}) } elif isinstance(result, dict): return result else: return {"type": "chat", "confidence": 0.5} except Exception as e: logger.error(f"Error analyzing intent: {str(e)}") return {"type": "chat", "error": str(e)} async def _handle_objective(self, message: str, intent: Dict[str, Any]) -> str: """Handle objective creation and management.""" try: # Extract objective details objective = intent.get('objective', {}) # Create objective objective_id = await self.team_manager.create_cross_team_objective( objective=objective.get('description', message), required_teams=objective.get('teams', []), priority=objective.get('priority', 'MEDIUM') ) # Monitor progress status = await self.team_manager.monitor_objective_progress(objective_id) return f"Created objective {objective_id}. Current status: {status}" except Exception as e: logger.error(f"Error handling objective: {str(e)}") return "Failed to create objective. Please try again." async def _handle_chat(self, message: str) -> str: """Handle general chat interactions with error recovery.""" try: # First try using the reasoning engine try: result = await self.reasoning_engine.reason( query=message, context={ "chat_history": self.chat_history, "active_objectives": self.active_objectives, "groq_api": self.groq_api } ) # Handle UnifiedResult object if isinstance(result, UnifiedResult): if not result.success: # If reasoning engine fails, fallback to Groq API groq_result = await self.groq_api.predict(message) if groq_result["success"]: return groq_result["answer"] else: return "I encountered an error. Please try rephrasing your question." return result.answer if hasattr(result, 'answer') else str(result) elif isinstance(result, dict): return result.get('response', str(result)) else: return str(result) except Exception as reasoning_error: logger.error(f"Reasoning engine error: {str(reasoning_error)}") # Fallback to Groq API groq_result = await self.groq_api.predict(message) if groq_result["success"]: return groq_result["answer"] else: raise Exception(f"Both reasoning engine and Groq API failed: {groq_result.get('error')}") except Exception as e: logger.error(f"Error in chat response: {str(e)}") return "I encountered an error generating a response. Please try again." async def _get_status(self) -> str: """Get system status information.""" try: # Get team status team_status = await self.team_manager.get_team_status() # Get objective status objective_status = await self.team_manager.get_objective_status() # Format status information status = "Current System Status:\n\n" # Add team information status += "Teams:\n" for team, info in team_status.items(): status += f"- {team}: {info['status']}\n" status += f" Active Projects: {info['active_projects']}\n" status += f" Success Rate: {info['success_rate']}%\n\n" # Add objective information status += "\nActive Objectives:\n" for obj, info in objective_status.items(): status += f"- {obj}: {info['status']}\n" status += f" Progress: {info['progress']}%\n" status += f" Teams: {', '.join(info['teams'])}\n\n" return status except Exception as e: logger.error(f"Error formatting status: {str(e)}") return "Error formatting status information." def create_chat_interface() -> gr.Blocks: """Create Gradio chat interface.""" chat = ChatInterface() return chat.interface # Initialize FastAPI app = FastAPI( title="Advanced Agentic System API", description="API for interacting with the autonomous agent teams", version="1.0.0" ) # Create Gradio interface interface = create_chat_interface() # Mount Gradio app to FastAPI app = gr.mount_gradio_app(app, interface, path="/") if __name__ == "__main__": # Run with uvicorn when called directly uvicorn.run( "app:app", host="0.0.0.0", port=7860, reload=True, workers=4 )