agentic-system / space /market_analysis.py
Cascade Bot
Added Groq streaming support and optimizations - clean version
1d75522
"""Advanced market analysis tools for venture strategies."""
import logging
from typing import Dict, Any, List, Optional, Set, Union, Type, Tuple
import json
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import numpy as np
from collections import defaultdict
from .base import ReasoningStrategy
@dataclass
class MarketSegment:
"""Market segment analysis."""
size: float
growth_rate: float
cagr: float
competition: List[Dict[str, Any]]
barriers: List[str]
opportunities: List[str]
risks: List[str]
@dataclass
class CompetitorAnalysis:
"""Competitor analysis."""
name: str
market_share: float
strengths: List[str]
weaknesses: List[str]
strategy: str
revenue: Optional[float]
valuation: Optional[float]
@dataclass
class MarketTrend:
"""Market trend analysis."""
name: str
impact: float
timeline: str
adoption_rate: float
market_potential: float
risk_level: float
class MarketAnalyzer:
"""
Advanced market analysis toolkit that:
1. Analyzes market segments
2. Tracks competitors
3. Identifies trends
4. Predicts opportunities
5. Assesses risks
"""
def __init__(self):
self.segments: Dict[str, MarketSegment] = {}
self.competitors: Dict[str, CompetitorAnalysis] = {}
self.trends: List[MarketTrend] = []
async def analyze_market(self,
segment: str,
context: Dict[str, Any]) -> Dict[str, Any]:
"""Perform comprehensive market analysis."""
try:
# Segment analysis
segment_analysis = await self._analyze_segment(segment, context)
# Competitor analysis
competitor_analysis = await self._analyze_competitors(segment, context)
# Trend analysis
trend_analysis = await self._analyze_trends(segment, context)
# Opportunity analysis
opportunity_analysis = await self._analyze_opportunities(
segment_analysis, competitor_analysis, trend_analysis, context)
# Risk analysis
risk_analysis = await self._analyze_risks(
segment_analysis, competitor_analysis, trend_analysis, context)
return {
"success": True,
"segment_analysis": segment_analysis,
"competitor_analysis": competitor_analysis,
"trend_analysis": trend_analysis,
"opportunity_analysis": opportunity_analysis,
"risk_analysis": risk_analysis,
"metrics": {
"market_score": self._calculate_market_score(segment_analysis),
"opportunity_score": self._calculate_opportunity_score(opportunity_analysis),
"risk_score": self._calculate_risk_score(risk_analysis)
}
}
except Exception as e:
logging.error(f"Error in market analysis: {str(e)}")
return {"success": False, "error": str(e)}
async def _analyze_segment(self,
segment: str,
context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze market segment."""
prompt = f"""
Analyze market segment:
Segment: {segment}
Context: {json.dumps(context)}
Analyze:
1. Market size and growth
2. Customer segments
3. Value chain
4. Entry barriers
5. Competitive dynamics
Format as:
[Analysis]
Size: ...
Growth: ...
Segments: ...
Value_Chain: ...
Barriers: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_segment_analysis(response["answer"])
async def _analyze_competitors(self,
segment: str,
context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze competitors in segment."""
prompt = f"""
Analyze competitors:
Segment: {segment}
Context: {json.dumps(context)}
For each competitor analyze:
1. Market share
2. Business model
3. Strengths/weaknesses
4. Strategy
5. Performance metrics
Format as:
[Competitor1]
Share: ...
Model: ...
Strengths: ...
Weaknesses: ...
Strategy: ...
Metrics: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_competitor_analysis(response["answer"])
async def _analyze_trends(self,
segment: str,
context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze market trends."""
prompt = f"""
Analyze market trends:
Segment: {segment}
Context: {json.dumps(context)}
Analyze trends in:
1. Technology
2. Customer behavior
3. Business models
4. Regulation
5. Market dynamics
Format as:
[Trend1]
Type: ...
Impact: ...
Timeline: ...
Adoption: ...
Potential: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_trend_analysis(response["answer"])
async def _analyze_opportunities(self,
segment_analysis: Dict[str, Any],
competitor_analysis: Dict[str, Any],
trend_analysis: Dict[str, Any],
context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze market opportunities."""
prompt = f"""
Analyze market opportunities:
Segment: {json.dumps(segment_analysis)}
Competitors: {json.dumps(competitor_analysis)}
Trends: {json.dumps(trend_analysis)}
Context: {json.dumps(context)}
Identify opportunities in:
1. Unmet needs
2. Market gaps
3. Innovation potential
4. Scaling potential
5. Value creation
Format as:
[Opportunity1]
Type: ...
Description: ...
Potential: ...
Requirements: ...
Timeline: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_opportunity_analysis(response["answer"])
async def _analyze_risks(self,
segment_analysis: Dict[str, Any],
competitor_analysis: Dict[str, Any],
trend_analysis: Dict[str, Any],
context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze market risks."""
prompt = f"""
Analyze market risks:
Segment: {json.dumps(segment_analysis)}
Competitors: {json.dumps(competitor_analysis)}
Trends: {json.dumps(trend_analysis)}
Context: {json.dumps(context)}
Analyze risks in:
1. Market dynamics
2. Competition
3. Technology
4. Regulation
5. Execution
Format as:
[Risk1]
Type: ...
Description: ...
Impact: ...
Probability: ...
Mitigation: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_risk_analysis(response["answer"])
def _calculate_market_score(self, analysis: Dict[str, Any]) -> float:
"""Calculate market attractiveness score."""
weights = {
"size": 0.3,
"growth": 0.3,
"competition": 0.2,
"barriers": 0.1,
"dynamics": 0.1
}
scores = {
"size": min(analysis.get("size", 0) / 1e9, 1.0), # Normalize to 1B
"growth": min(analysis.get("growth", 0) / 30, 1.0), # Normalize to 30%
"competition": 1.0 - min(len(analysis.get("competitors", [])) / 10, 1.0),
"barriers": 1.0 - min(len(analysis.get("barriers", [])) / 5, 1.0),
"dynamics": analysis.get("dynamics_score", 0.5)
}
return sum(weights[k] * scores[k] for k in weights)
def _calculate_opportunity_score(self, analysis: Dict[str, Any]) -> float:
"""Calculate opportunity attractiveness score."""
weights = {
"market_potential": 0.3,
"innovation_potential": 0.2,
"execution_feasibility": 0.2,
"competitive_advantage": 0.2,
"timing": 0.1
}
scores = {
"market_potential": analysis.get("market_potential", 0.5),
"innovation_potential": analysis.get("innovation_potential", 0.5),
"execution_feasibility": analysis.get("execution_feasibility", 0.5),
"competitive_advantage": analysis.get("competitive_advantage", 0.5),
"timing": analysis.get("timing_score", 0.5)
}
return sum(weights[k] * scores[k] for k in weights)
def _calculate_risk_score(self, analysis: Dict[str, Any]) -> float:
"""Calculate risk level score."""
weights = {
"market_risk": 0.2,
"competition_risk": 0.2,
"technology_risk": 0.2,
"regulatory_risk": 0.2,
"execution_risk": 0.2
}
scores = {
"market_risk": analysis.get("market_risk", 0.5),
"competition_risk": analysis.get("competition_risk", 0.5),
"technology_risk": analysis.get("technology_risk", 0.5),
"regulatory_risk": analysis.get("regulatory_risk", 0.5),
"execution_risk": analysis.get("execution_risk", 0.5)
}
return sum(weights[k] * scores[k] for k in weights)
def get_market_insights(self) -> Dict[str, Any]:
"""Get comprehensive market insights."""
return {
"segment_insights": {
segment: {
"size": s.size,
"growth_rate": s.growth_rate,
"cagr": s.cagr,
"opportunity_score": self._calculate_market_score({
"size": s.size,
"growth": s.growth_rate,
"competitors": s.competition,
"barriers": s.barriers
})
}
for segment, s in self.segments.items()
},
"competitor_insights": {
competitor: {
"market_share": c.market_share,
"strength_score": len(c.strengths) / (len(c.strengths) + len(c.weaknesses)),
"revenue": c.revenue,
"valuation": c.valuation
}
for competitor, c in self.competitors.items()
},
"trend_insights": [
{
"name": t.name,
"impact": t.impact,
"potential": t.market_potential,
"risk": t.risk_level
}
for t in self.trends
]
}
class MarketAnalysisStrategy(ReasoningStrategy):
"""
Advanced market analysis strategy that combines multiple analytical tools
to provide comprehensive market insights.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""Initialize market analysis strategy."""
super().__init__()
self.config = config or {}
self.analyzer = MarketAnalyzer()
async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Perform market analysis based on query and context.
Args:
query: The market analysis query
context: Additional context and parameters
Returns:
Dict containing market analysis results and confidence scores
"""
try:
# Extract market segment from query/context
segment = self._extract_segment(query, context)
# Perform market analysis
analysis = await self._analyze_market(segment, context)
# Get insights
insights = self.analyzer.get_market_insights()
# Calculate confidence based on data quality and completeness
confidence = self._calculate_confidence(analysis, insights)
return {
'answer': self._format_insights(insights),
'confidence': confidence,
'analysis': analysis,
'insights': insights,
'segment': segment
}
except Exception as e:
logging.error(f"Market analysis failed: {str(e)}")
return {
'error': f"Market analysis failed: {str(e)}",
'confidence': 0.0
}
def _extract_segment(self, query: str, context: Dict[str, Any]) -> str:
"""Extract market segment from query and context."""
# Use context if available
if 'segment' in context:
return context['segment']
# Default to general market
return 'general'
async def _analyze_market(self, segment: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Perform comprehensive market analysis."""
return await self.analyzer.analyze_market(segment, context)
def _calculate_confidence(self, analysis: Dict[str, Any], insights: Dict[str, Any]) -> float:
"""Calculate confidence score based on analysis quality."""
# Base confidence
confidence = 0.5
# Adjust based on data completeness
if analysis.get('segment_analysis'):
confidence += 0.1
if analysis.get('competitor_analysis'):
confidence += 0.1
if analysis.get('trend_analysis'):
confidence += 0.1
# Adjust based on insight quality
if insights.get('opportunities'):
confidence += 0.1
if insights.get('risks'):
confidence += 0.1
return min(confidence, 1.0)
def _format_insights(self, insights: Dict[str, Any]) -> str:
"""Format market insights into readable text."""
sections = []
if 'market_overview' in insights:
sections.append(f"Market Overview: {insights['market_overview']}")
if 'opportunities' in insights:
opps = insights['opportunities']
sections.append("Key Opportunities:\n- " + "\n- ".join(opps))
if 'risks' in insights:
risks = insights['risks']
sections.append("Key Risks:\n- " + "\n- ".join(risks))
if 'recommendations' in insights:
recs = insights['recommendations']
sections.append("Recommendations:\n- " + "\n- ".join(recs))
return "\n\n".join(sections)