advanced-reasoning / app.py.backup
nananie143's picture
Upload folder using huggingface_hub
dcb2a99 verified
"""
Advanced Agentic System Interface
-------------------------------
Provides a chat interface to interact with the autonomous agent teams:
- Team A: Coders (App/Software Developers)
- Team B: Business (Entrepreneurs)
- Team C: Research (Deep Online Research)
- Team D: Crypto & Sports Trading
"""
import gradio as gr
import asyncio
from typing import Dict, Any, List
import json
from datetime import datetime
from agentic_system import AgenticSystem
from team_management import TeamManager, TeamType, TeamObjective
from orchestrator import AgentOrchestrator
from reasoning import ReasoningEngine
class ChatInterface:
def __init__(self):
# Initialize core components
self.orchestrator = AgentOrchestrator()
self.agentic_system = AgenticSystem()
self.team_manager = TeamManager(self.orchestrator)
self.chat_history = []
self.active_objectives = {}
# Initialize teams
asyncio.run(self.team_manager.initialize_team_agents())
async def process_message(
self,
message: str,
history: List[List[str]]
) -> str:
"""Process incoming chat message."""
try:
# Analyze message intent
intent = await self._analyze_intent(message)
if intent["type"] == "query":
response = await self._handle_query(message)
elif intent["type"] == "objective":
response = await self._handle_objective(message)
elif intent["type"] == "status":
response = await self._handle_status_request(message)
else:
response = await self._handle_general_chat(message)
# Update chat history
self.chat_history.append({
"role": "user",
"content": message,
"timestamp": datetime.now()
})
self.chat_history.append({
"role": "assistant",
"content": response,
"timestamp": datetime.now()
})
return response
except Exception as e:
return f"Error processing message: {str(e)}"
async def _analyze_intent(self, message: str) -> Dict[str, Any]:
"""Analyze user message intent."""
# Use reasoning engine to analyze intent
analysis = await self.orchestrator.reasoning_engine.reason(
query=message,
context={
"chat_history": self.chat_history,
"active_objectives": self.active_objectives
}
)
return {
"type": analysis.get("intent_type", "general"),
"confidence": analysis.get("confidence", 0.5),
"entities": analysis.get("entities", []),
"action_required": analysis.get("action_required", False)
}
async def _handle_query(self, message: str) -> str:
"""Handle information queries."""
# Get relevant teams for the query
recommended_teams = await self.team_manager.get_team_recommendations(message)
# Get responses from relevant teams
responses = []
for team_type in recommended_teams:
team_response = await self._get_team_response(team_type, message)
responses.append(team_response)
# Combine and format responses
combined_response = self._format_team_responses(responses)
return combined_response
async def _handle_objective(self, message: str) -> str:
"""Handle new objective creation."""
# Analyze objective requirements
analysis = await self.orchestrator.reasoning_engine.reason(
query=f"Analyze objective requirements: {message}",
context={"teams": self.team_manager.teams}
)
# Determine required teams
required_teams = [
TeamType[team.upper()]
for team in analysis.get("required_teams", [])
]
# Create cross-team objective
objective_id = await self.team_manager.create_cross_team_objective(
objective=message,
required_teams=required_teams
)
self.active_objectives[objective_id] = {
"description": message,
"teams": required_teams,
"status": "initiated",
"created_at": datetime.now()
}
return self._format_objective_creation(objective_id)
async def _handle_status_request(self, message: str) -> str:
"""Handle status check requests."""
# Get system status
system_status = await self.agentic_system.get_system_status()
# Get team status
team_status = {}
for team_id, team in self.team_manager.teams.items():
team_status[team.name] = await self.team_manager.monitor_objective_progress(team_id)
# Get objective status
objective_status = {}
for obj_id, obj in self.active_objectives.items():
objective_status[obj_id] = await self.team_manager.monitor_objective_progress(obj_id)
return self._format_status_response(system_status, team_status, objective_status)
async def _handle_general_chat(self, message: str) -> str:
"""Handle general chat interactions."""
# Use reasoning engine for response generation
response = await self.orchestrator.reasoning_engine.reason(
query=message,
context={
"chat_history": self.chat_history,
"system_state": await self.agentic_system.get_system_status()
}
)
return response.get("response", "I'm not sure how to respond to that.")
async def _get_team_response(self, team_type: TeamType, query: str) -> Dict[str, Any]:
"""Get response from a specific team."""
team_id = next(
(tid for tid, team in self.team_manager.teams.items()
if team.type == team_type),
None
)
if not team_id:
return {
"team": team_type.value,
"response": "Team not available",
"confidence": 0.0
}
# Get team agents
team_agents = self.team_manager.agents[team_id]
# Aggregate responses from team agents
responses = []
for agent in team_agents.values():
agent_response = await agent.process_query(query)
responses.append(agent_response)
# Combine responses
combined_response = self._combine_agent_responses(responses)
return {
"team": team_type.value,
"response": combined_response,
"confidence": sum(r.get("confidence", 0) for r in responses) / len(responses)
}
def _combine_agent_responses(self, responses: List[Dict[str, Any]]) -> str:
"""Combine multiple agent responses into a coherent response."""
# Sort by confidence
valid_responses = [
r for r in responses
if r.get("success", False) and r.get("response")
]
if not valid_responses:
return "No valid response available"
sorted_responses = sorted(
valid_responses,
key=lambda x: x.get("confidence", 0),
reverse=True
)
# Take the highest confidence response
best_response = sorted_responses[0]
return best_response.get("response", "No response available")
def _format_team_responses(self, responses: List[Dict[str, Any]]) -> str:
"""Format team responses into a readable message."""
formatted = []
for response in responses:
if response.get("confidence", 0) > 0.3: # Confidence threshold
formatted.append(
f"Team {response['team'].title()}:\n"
f"{response['response']}\n"
)
if not formatted:
return "No team was able to provide a confident response."
return "\n".join(formatted)
def _format_objective_creation(self, objective_id: str) -> str:
"""Format objective creation response."""
objective = self.active_objectives[objective_id]
return (
f"Objective created successfully!\n\n"
f"Objective ID: {objective_id}\n"
f"Description: {objective['description']}\n"
f"Assigned Teams: {', '.join(t.value for t in objective['teams'])}\n"
f"Status: {objective['status']}\n"
f"Created: {objective['created_at'].strftime('%Y-%m-%d %H:%M:%S')}"
)
def _format_status_response(
self,
system_status: Dict[str, Any],
team_status: Dict[str, Any],
objective_status: Dict[str, Any]
) -> str:
"""Format status response."""
# Format system status
status = [
"System Status:",
f"- State: {system_status['state']}",
f"- Active Agents: {system_status['agent_count']}",
f"- Active Tasks: {system_status['active_tasks']}",
"\nTeam Status:"
]
# Add team status
for team_name, team_info in team_status.items():
status.extend([
f"\n{team_name}:",
f"- Active Agents: {team_info['active_agents']}",
f"- Completion Rate: {team_info['completion_rate']:.2%}",
f"- Collaboration Score: {team_info['collaboration_score']:.2f}"
])
# Add objective status
if objective_status:
status.append("\nActive Objectives:")
for obj_id, obj_info in objective_status.items():
obj = self.active_objectives[obj_id]
status.extend([
f"\n{obj['description']}:",
f"- Status: {obj['status']}",
f"- Teams: {', '.join(t.value for t in obj['teams'])}",
f"- Progress: {sum(t['completion_rate'] for t in obj_info.values())/len(obj_info):.2%}"
])
return "\n".join(status)
class VentureUI:
def __init__(self, app):
self.app = app
def create_interface(self):
return gr.Interface(
fn=self.app,
inputs=[
gr.Textbox(
label="Message",
placeholder="Chat with the Agentic System...",
lines=2
),
gr.State([]) # For chat history
],
outputs=gr.Textbox(
label="Response",
lines=10
),
title="Advanced Agentic System Chat Interface",
description="""
Chat with our autonomous agent teams:
- Team A: Coders (App/Software Developers)
- Team B: Business (Entrepreneurs)
- Team C: Research (Deep Online Research)
- Team D: Crypto & Sports Trading
You can:
1. Ask questions
2. Create new objectives
3. Check status of teams and objectives
4. Get insights and recommendations
""",
theme="default",
allow_flagging="never"
)
def create_chat_interface() -> gr.Interface:
"""Create Gradio chat interface."""
chat = ChatInterface()
ui = VentureUI(chat.process_message)
return ui.create_interface()
# Create and launch the interface
interface = create_chat_interface()
if __name__ == "__main__":
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True
)