""" Advanced Agentic System Interface ------------------------------- Provides a chat interface to interact with the autonomous agent teams: - Team A: Coders (App/Software Developers) - Team B: Business (Entrepreneurs) - Team C: Research (Deep Online Research) - Team D: Crypto & Sports Trading """ import gradio as gr import asyncio from typing import Dict, Any, List import json from datetime import datetime from agentic_system import AgenticSystem from team_management import TeamManager, TeamType, TeamObjective from orchestrator import AgentOrchestrator from reasoning import ReasoningEngine class ChatInterface: def __init__(self): # Initialize core components self.orchestrator = AgentOrchestrator() self.agentic_system = AgenticSystem() self.team_manager = TeamManager(self.orchestrator) self.chat_history = [] self.active_objectives = {} # Initialize teams asyncio.run(self.team_manager.initialize_team_agents()) async def process_message( self, message: str, history: List[List[str]] ) -> str: """Process incoming chat message.""" try: # Analyze message intent intent = await self._analyze_intent(message) if intent["type"] == "query": response = await self._handle_query(message) elif intent["type"] == "objective": response = await self._handle_objective(message) elif intent["type"] == "status": response = await self._handle_status_request(message) else: response = await self._handle_general_chat(message) # Update chat history self.chat_history.append({ "role": "user", "content": message, "timestamp": datetime.now() }) self.chat_history.append({ "role": "assistant", "content": response, "timestamp": datetime.now() }) return response except Exception as e: return f"Error processing message: {str(e)}" async def _analyze_intent(self, message: str) -> Dict[str, Any]: """Analyze user message intent.""" # Use reasoning engine to analyze intent analysis = await self.orchestrator.reasoning_engine.reason( query=message, context={ "chat_history": self.chat_history, "active_objectives": self.active_objectives } ) return { "type": analysis.get("intent_type", "general"), "confidence": analysis.get("confidence", 0.5), "entities": analysis.get("entities", []), "action_required": analysis.get("action_required", False) } async def _handle_query(self, message: str) -> str: """Handle information queries.""" # Get relevant teams for the query recommended_teams = await self.team_manager.get_team_recommendations(message) # Get responses from relevant teams responses = [] for team_type in recommended_teams: team_response = await self._get_team_response(team_type, message) responses.append(team_response) # Combine and format responses combined_response = self._format_team_responses(responses) return combined_response async def _handle_objective(self, message: str) -> str: """Handle new objective creation.""" # Analyze objective requirements analysis = await self.orchestrator.reasoning_engine.reason( query=f"Analyze objective requirements: {message}", context={"teams": self.team_manager.teams} ) # Determine required teams required_teams = [ TeamType[team.upper()] for team in analysis.get("required_teams", []) ] # Create cross-team objective objective_id = await self.team_manager.create_cross_team_objective( objective=message, required_teams=required_teams ) self.active_objectives[objective_id] = { "description": message, "teams": required_teams, "status": "initiated", "created_at": datetime.now() } return self._format_objective_creation(objective_id) async def _handle_status_request(self, message: str) -> str: """Handle status check requests.""" # Get system status system_status = await self.agentic_system.get_system_status() # Get team status team_status = {} for team_id, team in self.team_manager.teams.items(): team_status[team.name] = await self.team_manager.monitor_objective_progress(team_id) # Get objective status objective_status = {} for obj_id, obj in self.active_objectives.items(): objective_status[obj_id] = await self.team_manager.monitor_objective_progress(obj_id) return self._format_status_response(system_status, team_status, objective_status) async def _handle_general_chat(self, message: str) -> str: """Handle general chat interactions.""" # Use reasoning engine for response generation response = await self.orchestrator.reasoning_engine.reason( query=message, context={ "chat_history": self.chat_history, "system_state": await self.agentic_system.get_system_status() } ) return response.get("response", "I'm not sure how to respond to that.") async def _get_team_response(self, team_type: TeamType, query: str) -> Dict[str, Any]: """Get response from a specific team.""" team_id = next( (tid for tid, team in self.team_manager.teams.items() if team.type == team_type), None ) if not team_id: return { "team": team_type.value, "response": "Team not available", "confidence": 0.0 } # Get team agents team_agents = self.team_manager.agents[team_id] # Aggregate responses from team agents responses = [] for agent in team_agents.values(): agent_response = await agent.process_query(query) responses.append(agent_response) # Combine responses combined_response = self._combine_agent_responses(responses) return { "team": team_type.value, "response": combined_response, "confidence": sum(r.get("confidence", 0) for r in responses) / len(responses) } def _combine_agent_responses(self, responses: List[Dict[str, Any]]) -> str: """Combine multiple agent responses into a coherent response.""" # Sort by confidence valid_responses = [ r for r in responses if r.get("success", False) and r.get("response") ] if not valid_responses: return "No valid response available" sorted_responses = sorted( valid_responses, key=lambda x: x.get("confidence", 0), reverse=True ) # Take the highest confidence response best_response = sorted_responses[0] return best_response.get("response", "No response available") def _format_team_responses(self, responses: List[Dict[str, Any]]) -> str: """Format team responses into a readable message.""" formatted = [] for response in responses: if response.get("confidence", 0) > 0.3: # Confidence threshold formatted.append( f"Team {response['team'].title()}:\n" f"{response['response']}\n" ) if not formatted: return "No team was able to provide a confident response." return "\n".join(formatted) def _format_objective_creation(self, objective_id: str) -> str: """Format objective creation response.""" objective = self.active_objectives[objective_id] return ( f"Objective created successfully!\n\n" f"Objective ID: {objective_id}\n" f"Description: {objective['description']}\n" f"Assigned Teams: {', '.join(t.value for t in objective['teams'])}\n" f"Status: {objective['status']}\n" f"Created: {objective['created_at'].strftime('%Y-%m-%d %H:%M:%S')}" ) def _format_status_response( self, system_status: Dict[str, Any], team_status: Dict[str, Any], objective_status: Dict[str, Any] ) -> str: """Format status response.""" # Format system status status = [ "System Status:", f"- State: {system_status['state']}", f"- Active Agents: {system_status['agent_count']}", f"- Active Tasks: {system_status['active_tasks']}", "\nTeam Status:" ] # Add team status for team_name, team_info in team_status.items(): status.extend([ f"\n{team_name}:", f"- Active Agents: {team_info['active_agents']}", f"- Completion Rate: {team_info['completion_rate']:.2%}", f"- Collaboration Score: {team_info['collaboration_score']:.2f}" ]) # Add objective status if objective_status: status.append("\nActive Objectives:") for obj_id, obj_info in objective_status.items(): obj = self.active_objectives[obj_id] status.extend([ f"\n{obj['description']}:", f"- Status: {obj['status']}", f"- Teams: {', '.join(t.value for t in obj['teams'])}", f"- Progress: {sum(t['completion_rate'] for t in obj_info.values())/len(obj_info):.2%}" ]) return "\n".join(status) class VentureUI: def __init__(self, app): self.app = app def create_interface(self): return gr.Interface( fn=self.app, inputs=[ gr.Textbox( label="Message", placeholder="Chat with the Agentic System...", lines=2 ), gr.State([]) # For chat history ], outputs=gr.Textbox( label="Response", lines=10 ), title="Advanced Agentic System Chat Interface", description=""" Chat with our autonomous agent teams: - Team A: Coders (App/Software Developers) - Team B: Business (Entrepreneurs) - Team C: Research (Deep Online Research) - Team D: Crypto & Sports Trading You can: 1. Ask questions 2. Create new objectives 3. Check status of teams and objectives 4. Get insights and recommendations """, theme="default", allow_flagging="never" ) def create_chat_interface() -> gr.Interface: """Create Gradio chat interface.""" chat = ChatInterface() ui = VentureUI(chat.process_message) return ui.create_interface() # Create and launch the interface interface = create_chat_interface() if __name__ == "__main__": interface.launch( server_name="0.0.0.0", server_port=7860, share=True )