Spaces:
Runtime error
Runtime error
"""API endpoints for venture strategies and analysis.""" | |
from fastapi import APIRouter, HTTPException, Depends | |
from typing import List, Dict, Any, Optional | |
from pydantic import BaseModel, Field | |
from datetime import datetime | |
from reasoning.venture_strategies import ( | |
AIStartupStrategy, SaaSVentureStrategy, AutomationVentureStrategy, | |
DataVentureStrategy, APIVentureStrategy, MarketplaceVentureStrategy, | |
AIInfrastructureStrategy, AIConsultingStrategy, AIProductStrategy, | |
FinTechStrategy, HealthTechStrategy, EdTechStrategy, | |
BlockchainStrategy, AIMarketplaceStrategy | |
) | |
from reasoning.market_analysis import MarketAnalyzer | |
from reasoning.portfolio_optimization import PortfolioOptimizer | |
from reasoning.monetization import MonetizationOptimizer | |
router = APIRouter(prefix="/api/ventures", tags=["ventures"]) | |
# Models | |
class VentureRequest(BaseModel): | |
"""Venture analysis request.""" | |
venture_type: str | |
query: str | |
context: Dict[str, Any] = Field(default_factory=dict) | |
class MarketRequest(BaseModel): | |
"""Market analysis request.""" | |
segment: str | |
context: Dict[str, Any] = Field(default_factory=dict) | |
class PortfolioRequest(BaseModel): | |
"""Portfolio optimization request.""" | |
ventures: List[str] | |
context: Dict[str, Any] = Field(default_factory=dict) | |
class MonetizationRequest(BaseModel): | |
"""Monetization optimization request.""" | |
venture_type: str | |
context: Dict[str, Any] = Field(default_factory=dict) | |
# Strategy mapping | |
VENTURE_STRATEGIES = { | |
"ai_startup": AIStartupStrategy(), | |
"saas": SaaSVentureStrategy(), | |
"automation": AutomationVentureStrategy(), | |
"data": DataVentureStrategy(), | |
"api": APIVentureStrategy(), | |
"marketplace": MarketplaceVentureStrategy(), | |
"ai_infrastructure": AIInfrastructureStrategy(), | |
"ai_consulting": AIConsultingStrategy(), | |
"ai_product": AIProductStrategy(), | |
"fintech": FinTechStrategy(), | |
"healthtech": HealthTechStrategy(), | |
"edtech": EdTechStrategy(), | |
"blockchain": BlockchainStrategy(), | |
"ai_marketplace": AIMarketplaceStrategy() | |
} | |
# Endpoints | |
async def analyze_venture(request: VentureRequest): | |
"""Analyze venture opportunity.""" | |
try: | |
strategy = VENTURE_STRATEGIES.get(request.venture_type) | |
if not strategy: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Invalid venture type: {request.venture_type}" | |
) | |
result = await strategy.reason(request.query, request.context) | |
return { | |
"success": True, | |
"result": result, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def analyze_market(request: MarketRequest): | |
"""Analyze market opportunity.""" | |
try: | |
analyzer = MarketAnalyzer() | |
result = await analyzer.analyze_market(request.segment, request.context) | |
return { | |
"success": True, | |
"result": result, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def optimize_portfolio(request: PortfolioRequest): | |
"""Optimize venture portfolio.""" | |
try: | |
optimizer = PortfolioOptimizer() | |
result = await optimizer.optimize_portfolio(request.ventures, request.context) | |
return { | |
"success": True, | |
"result": result, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def optimize_monetization(request: MonetizationRequest): | |
"""Optimize venture monetization.""" | |
try: | |
optimizer = MonetizationOptimizer() | |
result = await optimizer.optimize_monetization( | |
request.venture_type, request.context) | |
return { | |
"success": True, | |
"result": result, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def list_strategies(): | |
"""List available venture strategies.""" | |
return { | |
"success": True, | |
"strategies": list(VENTURE_STRATEGIES.keys()), | |
"timestamp": datetime.now().isoformat() | |
} | |
async def get_venture_metrics(venture_type: str): | |
"""Get venture performance metrics.""" | |
try: | |
strategy = VENTURE_STRATEGIES.get(venture_type) | |
if not strategy: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Invalid venture type: {venture_type}" | |
) | |
metrics = strategy.get_venture_metrics() | |
return { | |
"success": True, | |
"metrics": metrics, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_market_insights(): | |
"""Get comprehensive market insights.""" | |
try: | |
analyzer = MarketAnalyzer() | |
insights = analyzer.get_market_insights() | |
return { | |
"success": True, | |
"insights": insights, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_portfolio_insights(): | |
"""Get comprehensive portfolio insights.""" | |
try: | |
optimizer = PortfolioOptimizer() | |
insights = optimizer.get_portfolio_insights() | |
return { | |
"success": True, | |
"insights": insights, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_monetization_metrics(): | |
"""Get comprehensive monetization metrics.""" | |
try: | |
optimizer = MonetizationOptimizer() | |
metrics = optimizer.get_monetization_metrics() | |
return { | |
"success": True, | |
"metrics": metrics, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |