""" Advanced Agentic System Interface ------------------------------- Provides a chat interface to interact with the autonomous agent teams: - Team A: Coders (App/Software Developers) - Team B: Business (Entrepreneurs) - Team C: Research (Deep Online Research) - Team D: Crypto & Sports Trading """ import os import socket import gradio as gr from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing import Dict, Any, List, Tuple, Optional import logging from pathlib import Path import asyncio from datetime import datetime import json import requests from requests.adapters import HTTPAdapter, Retry from dataclasses import dataclass from agentic_system import AgenticSystem from orchestrator import AgentOrchestrator from team_management import TeamManager, TeamType from reasoning import ( UnifiedReasoningEngine, StrategyType, UnifiedResult ) from api.openai_compatible import OpenAICompatibleAPI from api.venture_api import VentureAPI from api.groq_api import GroqAPI # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def setup_requests_session(): """Set up requests session with retries.""" session = requests.Session() retries = Retry( total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504] ) session.mount('http://', HTTPAdapter(max_retries=retries)) session.mount('https://', HTTPAdapter(max_retries=retries)) return session def check_network(): """Check network connectivity.""" try: # Try DNS resolution first socket.gethostbyname('huggingface.co') return True except socket.gaierror: logger.warning("DNS resolution failed") try: # Try HTTP request as backup session = setup_requests_session() response = session.get('https://huggingface.co', timeout=5) return response.status_code == 200 except (requests.RequestException, socket.gaierror) as e: logger.warning(f"Network connectivity check failed: {e}") return False class ChatInterface: """Chat interface for interacting with the agentic system.""" def __init__(self): """Initialize the chat interface.""" # Check network connectivity if not check_network(): raise ConnectionError("No network connectivity. Please check your connection.") # Initialize core components with consistent configuration config = { "min_confidence": 0.7, "parallel_threshold": 3, "learning_rate": 0.1, "strategy_weights": { "LOCAL_LLM": 0.8, "CHAIN_OF_THOUGHT": 0.6, "TREE_OF_THOUGHTS": 0.5, "META_LEARNING": 0.4 } } # Initialize chat state self.chat_history = [] self.active_objectives = {} # Initialize components self.orchestrator = AgentOrchestrator(config) self.team_manager = TeamManager(self.orchestrator) self.reasoning_engine = UnifiedReasoningEngine() self.groq_api = GroqAPI() # Set up the agentic system self.agentic_system = AgenticSystem(config) # Initialize FastAPI app self.app = FastAPI() self.setup_cors() self.setup_routes() # Create Gradio interface self.interface = self.create_interface() # Launch background tasks self.background_tasks = [] self.launch_background_tasks() async def initialize(self): """Initialize async components.""" await self.team_manager.initialize_team_agents() def launch_background_tasks(self): """Launch background tasks.""" loop = asyncio.get_event_loop() self.background_tasks.append( loop.create_task(self.initialize()) ) def setup_cors(self): """Set up CORS middleware.""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def setup_routes(self): """Set up API routes.""" # Include OpenAI-compatible routes openai_api = OpenAICompatibleAPI(self.reasoning_engine) self.app.include_router(openai_api.router, tags=["OpenAI Compatible"]) # Original API routes @self.app.get("/api/health") async def health_check(): """Health check endpoint.""" return { "status": "healthy", "version": "1.0.0", "endpoints": { "openai_compatible": "/v1/chat/completions", "venture": "/api/venture", "ui": "/" } } @self.app.post("/api/reason") async def reason(query: str, context: Optional[Dict[str, Any]] = None): """Reasoning endpoint.""" try: result = await self.reasoning_engine.reason(query, context or {}) return result except Exception as e: logger.error(f"Reasoning error: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.post("/api/venture/analyze") async def analyze_venture( venture_type: str, description: str, metrics: Optional[Dict[str, Any]] = None ): """Venture analysis endpoint.""" try: result = await VentureAPI(self.reasoning_engine).analyze_venture( venture_type=venture_type, description=description, metrics=metrics or {} ) return result except Exception as e: logger.error(f"Analysis error: {e}") raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/api/venture/types") async def get_venture_types(): """Get available venture types.""" return VentureAPI(self.reasoning_engine).get_venture_types() def create_interface(self): """Create the Gradio interface.""" with gr.Blocks( theme=gr.themes.Soft(), analytics_enabled=False, title="Advanced Agentic System" ) as interface: # Verify Gradio version gr.Markdown(f""" # Advanced Agentic System Chat Interface v{gr.__version__} Chat with our autonomous agent teams: - Team A: Coders (App/Software Developers) - Team B: Business (Entrepreneurs) - Team C: Research (Deep Online Research) - Team D: Crypto & Sports Trading You can: 1. Ask questions 2. Create new objectives 3. Check status of teams and objectives 4. Get insights and recommendations """) chatbot = gr.Chatbot( label="Chat History", height=400, bubble_full_width=False, show_copy_button=True, render_markdown=True ) with gr.Row(): msg = gr.Textbox( label="Message", placeholder="Chat with the Agentic System...", lines=2, scale=9, autofocus=True, container=True ) submit = gr.Button( "Send", scale=1, variant="primary" ) with gr.Row(): clear = gr.ClearButton( [msg, chatbot], value="Clear Chat", variant="secondary", scale=1 ) retry = gr.Button( "Retry Last", variant="secondary", scale=1 ) async def respond(message, history): try: # Convert history to the format expected by process_message history_list = [[x, y] for x, y in history] if history else [] response = await self.process_message(message, history_list) # Update history if history is None: history = [] history.append((message, response)) return "", history except Exception as e: logger.error(f"Error in chat response: {str(e)}") error_msg = "I apologize, but I encountered an error. Please try again." if history is None: history = [] history.append((message, error_msg)) return "", history async def retry_last(history): if not history: return history last_user_msg = history[-1][0] history = history[:-1] # Remove last exchange return await respond(last_user_msg, history) # Submit handlers msg.submit( respond, [msg, chatbot], [msg, chatbot] ).then( lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), None, [msg, submit] ) submit.click( respond, [msg, chatbot], [msg, chatbot] ).then( lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), None, [msg, submit] ) retry.click( retry_last, [chatbot], [chatbot] ) # Event handlers for better UX msg.change( lambda x: (gr.update(interactive=bool(x.strip())), gr.update(interactive=bool(x.strip()))), [msg], [msg, submit] ) # Add example inputs gr.Examples( examples=[ "What can Team A (Coders) help me with?", "Create a new objective: Analyze market trends", "What's the status of all teams?", "Give me insights about recent developments" ], inputs=msg, label="Example Queries" ) return interface async def process_message( self, message: str, history: List[List[str]] = None ) -> str: """Process a user message.""" try: # Initialize history if None if history is None: history = [] # Update chat history self.chat_history = history # Analyze message intent intent = await self._analyze_intent(message) # Process based on intent if intent.get('type') == 'objective': response = await self._handle_objective(message, intent) elif intent.get('type') == 'status': response = await self._get_status() elif intent.get('type') == 'chat': response = await self._handle_chat(message) else: response = await self._handle_chat(message) # Default to chat handler return response except Exception as e: logger.error(f"Error processing message: {str(e)}") return "I encountered an error processing your message. Please try again." async def _analyze_intent(self, message: str) -> Dict[str, Any]: """Analyze user message intent with error handling.""" try: # Use reasoning engine to analyze intent result = await self.reasoning_engine.reason( query=message, context={ "chat_history": self.chat_history, "active_objectives": self.active_objectives } ) # Handle UnifiedResult object if isinstance(result, UnifiedResult): return { "type": "chat", "confidence": getattr(result, 'confidence', 0.5), "metadata": getattr(result, 'metadata', {}) } elif isinstance(result, dict): return result else: return {"type": "chat", "confidence": 0.5} except Exception as e: logger.error(f"Error analyzing intent: {str(e)}") return {"type": "chat", "error": str(e)} async def _handle_objective(self, message: str, intent: Dict[str, Any]) -> str: """Handle objective creation and management.""" try: # Extract objective details objective = intent.get('objective', {}) # Create objective objective_id = await self.team_manager.create_cross_team_objective( objective=objective.get('description', message), required_teams=objective.get('teams', []), priority=objective.get('priority', 'MEDIUM') ) # Monitor progress status = await self.team_manager.monitor_objective_progress(objective_id) return f"Created objective {objective_id}. Current status: {status}" except Exception as e: logger.error(f"Error handling objective: {str(e)}") return "Failed to create objective. Please try again." async def _handle_chat(self, message: str) -> str: """Handle general chat interactions with error recovery.""" try: # First try using the reasoning engine try: result = await self.reasoning_engine.reason( query=message, context={ "chat_history": self.chat_history, "active_objectives": self.active_objectives, "groq_api": self.groq_api } ) # Handle UnifiedResult object if isinstance(result, UnifiedResult): if not result.success: # If reasoning engine fails, fallback to Groq API groq_result = await self.groq_api.predict(message) if groq_result["success"]: return groq_result["answer"] else: return "I encountered an error. Please try rephrasing your question." return result.answer if hasattr(result, 'answer') else str(result) elif isinstance(result, dict): return result.get('response', str(result)) else: return str(result) except Exception as reasoning_error: logger.error(f"Reasoning engine error: {str(reasoning_error)}") # Fallback to Groq API groq_result = await self.groq_api.predict(message) if groq_result["success"]: return groq_result["answer"] else: raise Exception(f"Both reasoning engine and Groq API failed: {groq_result.get('error')}") except Exception as e: logger.error(f"Error in chat response: {str(e)}") return "I encountered an error generating a response. Please try again." async def _get_status(self) -> str: """Get system status information.""" try: # Get team status team_status = await self.team_manager.get_team_status() # Get objective status objective_status = await self.team_manager.get_objective_status() # Format status information status = "Current System Status:\n\n" # Add team information status += "Teams:\n" for team, info in team_status.items(): status += f"- {team}: {info['status']}\n" status += f" Active Projects: {info['active_projects']}\n" status += f" Success Rate: {info['success_rate']}%\n\n" # Add objective information status += "\nActive Objectives:\n" for obj, info in objective_status.items(): status += f"- {obj}: {info['status']}\n" status += f" Progress: {info['progress']}%\n" status += f" Teams: {', '.join(info['teams'])}\n\n" return status except Exception as e: logger.error(f"Error formatting status: {str(e)}") return "Error formatting status information." def create_chat_interface() -> gr.Blocks: """Create Gradio chat interface.""" chat = ChatInterface() return chat.interface # Initialize FastAPI app = FastAPI( title="Advanced Agentic System API", description="API for interacting with the autonomous agent teams", version="1.0.0" ) # Create Gradio interface interface = create_chat_interface() # Mount Gradio app to FastAPI app = gr.mount_gradio_app(app, interface, path="/") if __name__ == "__main__": # Run with uvicorn when called directly uvicorn.run( "app:app", host="0.0.0.0", port=7860, reload=True, workers=4 )