agentic-system / main.py
Cascade Bot
Added Groq streaming support and optimizations - clean version
1d75522
"""Main entry point for the Advanced Agentic System."""
import asyncio
import logging
from typing import Dict, Any, Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import gradio as gr
import uvicorn
from agentic_system import AgenticSystem
from reasoning.unified_engine import UnifiedReasoningEngine
from api.openai_compatible import OpenAICompatibleAPI
from api.venture_api import VentureAPI
from ui.venture_ui import VentureUI
from config import Config
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AgenticSystemApp:
"""Main application class integrating all components."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = Config(config)
# Initialize core components
self.reasoning_engine = UnifiedReasoningEngine(
min_confidence=self.config.get('min_confidence', 0.7),
parallel_threshold=self.config.get('parallel_threshold', 3),
learning_rate=self.config.get('learning_rate', 0.1)
)
self.agentic_system = AgenticSystem(
config=self.config.get('agentic_system', {})
)
# Initialize APIs
self.venture_api = VentureAPI(self.reasoning_engine)
self.openai_api = OpenAICompatibleAPI(self.reasoning_engine)
# Initialize FastAPI
self.app = FastAPI(
title="Advanced Agentic System",
description="Venture Strategy Optimizer with OpenAI-compatible API",
version="1.0.0"
)
# Setup middleware
self._setup_middleware()
# Setup routes
self._setup_routes()
# Initialize UI
self.ui = VentureUI(self.venture_api)
self.interface = self.ui.create_interface()
# Mount Gradio app
self.app = gr.mount_gradio_app(self.app, self.interface, path="/")
def _setup_middleware(self):
"""Setup FastAPI middleware."""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _setup_routes(self):
"""Setup API routes."""
# Include OpenAI-compatible routes
self.app.include_router(
self.openai_api.router,
tags=["OpenAI Compatible"]
)
# Health check
@self.app.get("/api/health")
async def health_check():
return {
"status": "healthy",
"version": "1.0.0",
"components": {
"reasoning_engine": "active",
"agentic_system": "active",
"openai_api": "active",
"venture_api": "active"
}
}
# System status
@self.app.get("/api/system/status")
async def system_status():
return await self.agentic_system.get_system_status()
# Reasoning endpoint
@self.app.post("/api/reason")
async def reason(query: str, context: Optional[Dict[str, Any]] = None):
return await self.reasoning_engine.reason(query, context or {})
# Venture analysis
@self.app.post("/api/venture/analyze")
async def analyze_venture(
venture_type: str,
description: str,
metrics: Optional[Dict[str, Any]] = None
):
return await self.venture_api.analyze_venture(
venture_type=venture_type,
description=description,
metrics=metrics or {}
)
def run(self, host: str = "0.0.0.0", port: int = 7860):
"""Run the application."""
uvicorn.run(
self.app,
host=host,
port=port,
workers=4
)
def main():
"""Main entry point."""
app = AgenticSystemApp()
app.run()
if __name__ == "__main__":
main()