Spaces:
Runtime error
Runtime error
""" | |
Advanced Agentic System Interface | |
------------------------------- | |
Provides an interface to interact with the autonomous agent system | |
using local LLM for improved performance. | |
""" | |
import gradio as gr | |
import asyncio | |
from typing import Dict, Any, List | |
import json | |
from datetime import datetime | |
import logging | |
import os | |
import socket | |
import requests | |
from requests.adapters import HTTPAdapter, Retry | |
from agentic_system import AgenticSystem | |
from team_management import TeamManager | |
from orchestrator import AgentOrchestrator | |
from reasoning import UnifiedReasoningEngine | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Configure network settings | |
TIMEOUT = int(os.getenv('REQUESTS_TIMEOUT', '30')) | |
RETRIES = 3 | |
def check_network(): | |
"""Check network connectivity.""" | |
try: | |
# Test connection to Hugging Face | |
response = requests.get('https://huggingface.co', timeout=TIMEOUT) | |
return response.status_code == 200 | |
except requests.RequestException as e: | |
logger.warning(f"Network connectivity issue: {e}") | |
return False | |
class AgentInterface: | |
"""Interface for the agentic system.""" | |
def __init__(self): | |
"""Initialize the interface components.""" | |
# Check network connectivity | |
if not check_network(): | |
logger.warning("Network connectivity issues detected") | |
self.orchestrator = AgentOrchestrator() | |
self.reasoning_engine = UnifiedReasoningEngine( | |
min_confidence=0.7, | |
parallel_threshold=3, | |
learning_rate=0.1 | |
) | |
self.team_manager = TeamManager(self.orchestrator) | |
self.system = AgenticSystem() | |
async def process_query(self, message: str) -> str: | |
"""Process user query through the reasoning system.""" | |
try: | |
# Log incoming query | |
logger.info(f"Processing query: {message}") | |
# Get reasoning result | |
result = await self.reasoning_engine.reason( | |
query=message, | |
context={"timestamp": datetime.now().isoformat()} | |
) | |
# Format response | |
if result.success: | |
response = f"Answer: {result.answer}\nConfidence: {result.confidence:.2f}" | |
if result.meta_insights: | |
response += "\nInsights:\n" + "\n".join(f"- {insight}" for insight in result.meta_insights) | |
else: | |
response = "I apologize, but I couldn't process your query effectively. Please try rephrasing or providing more context." | |
return response | |
except Exception as e: | |
logger.error(f"Error processing query: {e}") | |
return f"An error occurred: {str(e)}" | |
def health_check(self) -> Dict[str, Any]: | |
"""Check system health.""" | |
return { | |
"status": "healthy", | |
"network": check_network(), | |
"components": { | |
"orchestrator": self.orchestrator is not None, | |
"reasoning_engine": self.reasoning_engine is not None, | |
"team_manager": self.team_manager is not None, | |
"system": self.system is not None | |
}, | |
"timestamp": datetime.now().isoformat() | |
} | |
# Initialize interface | |
interface = AgentInterface() | |
# Create Gradio interface | |
with gr.Blocks(title="Advanced Reasoning System", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🤖 Advanced Reasoning System | |
Welcome to the Advanced Reasoning System! This system combines multiple reasoning strategies: | |
- Chain of Thought | |
- Tree of Thoughts | |
- Meta Learning | |
- Local LLM | |
- And more! | |
Ask any question and the system will use its advanced reasoning capabilities to help you. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
query_input = gr.Textbox( | |
label="Your Question", | |
placeholder="Enter your question here...", | |
lines=3 | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Submit", variant="primary") | |
clear_btn = gr.Button("Clear") | |
with gr.Column(scale=6): | |
output = gr.Textbox( | |
label="Response", | |
lines=10, | |
interactive=False | |
) | |
# Add examples | |
gr.Examples( | |
examples=[ | |
"How would you approach designing a scalable microservices architecture?", | |
"What are the key considerations for implementing a secure authentication system?", | |
"Can you help me understand the differences between various machine learning algorithms?", | |
], | |
inputs=query_input, | |
label="Example Questions" | |
) | |
# Event handlers | |
submit_btn.click( | |
fn=interface.process_query, | |
inputs=query_input, | |
outputs=output, | |
api_name="process_query" | |
) | |
clear_btn.click( | |
fn=lambda: ("", ""), | |
inputs=None, | |
outputs=[query_input, output], | |
api_name="clear" | |
) | |
# Add health check endpoint | |
demo.load( | |
fn=interface.health_check, | |
inputs=None, | |
outputs=None, | |
api_name="health" | |
) | |
# Launch the interface | |
if __name__ == "__main__": | |
demo.launch() | |