Spaces:
Runtime error
Runtime error
"""Unified reasoning engine that combines multiple reasoning strategies.""" | |
import logging | |
from typing import Dict, Any, List, Optional, Set, Union, Type | |
import json | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from datetime import datetime | |
import asyncio | |
from collections import defaultdict | |
from .base import ReasoningStrategy | |
from .chain_of_thought import ChainOfThoughtStrategy | |
from .tree_of_thoughts import TreeOfThoughtsStrategy | |
from .meta_learning import MetaLearningStrategy | |
from .recursive import RecursiveReasoning | |
from .analogical import AnalogicalReasoning | |
from .local_llm import LocalLLMStrategy | |
from .agentic import ( | |
TaskDecompositionStrategy, | |
ResourceManagementStrategy, | |
ContextualPlanningStrategy, | |
AdaptiveExecutionStrategy, | |
FeedbackIntegrationStrategy | |
) | |
class StrategyType(str, Enum): | |
"""Types of reasoning strategies.""" | |
CHAIN_OF_THOUGHT = "chain_of_thought" | |
TREE_OF_THOUGHTS = "tree_of_thoughts" | |
META_LEARNING = "meta_learning" | |
RECURSIVE = "recursive" | |
ANALOGICAL = "analogical" | |
TASK_DECOMPOSITION = "task_decomposition" | |
RESOURCE_MANAGEMENT = "resource_management" | |
CONTEXTUAL_PLANNING = "contextual_planning" | |
ADAPTIVE_EXECUTION = "adaptive_execution" | |
FEEDBACK_INTEGRATION = "feedback_integration" | |
LOCAL_LLM = "local_llm" | |
class StrategyResult: | |
"""Result from a reasoning strategy.""" | |
strategy_type: StrategyType | |
success: bool | |
answer: Optional[str] | |
confidence: float | |
reasoning_trace: List[Dict[str, Any]] | |
metadata: Dict[str, Any] | |
performance_metrics: Dict[str, Any] | |
timestamp: datetime = field(default_factory=datetime.now) | |
class UnifiedResult: | |
"""Combined result from multiple strategies.""" | |
success: bool | |
answer: str | |
confidence: float | |
strategy_results: Dict[StrategyType, StrategyResult] | |
synthesis_method: str | |
meta_insights: List[str] | |
performance_metrics: Dict[str, Any] | |
timestamp: datetime = field(default_factory=datetime.now) | |
class UnifiedReasoningEngine: | |
""" | |
Advanced unified reasoning engine that: | |
1. Combines multiple reasoning strategies | |
2. Dynamically selects and weights strategies | |
3. Synthesizes results from different approaches | |
4. Learns from experience | |
5. Adapts to different types of tasks | |
""" | |
def __init__(self, | |
min_confidence: float = 0.7, | |
strategy_weights: Optional[Dict[StrategyType, float]] = None, | |
parallel_threshold: int = 3, | |
learning_rate: float = 0.1): | |
self.min_confidence = min_confidence | |
self.parallel_threshold = parallel_threshold | |
self.learning_rate = learning_rate | |
# Initialize strategies | |
self.strategies: Dict[StrategyType, ReasoningStrategy] = { | |
StrategyType.CHAIN_OF_THOUGHT: ChainOfThoughtStrategy(), | |
StrategyType.TREE_OF_THOUGHTS: TreeOfThoughtsStrategy(), | |
StrategyType.META_LEARNING: MetaLearningStrategy(), | |
StrategyType.RECURSIVE: RecursiveReasoning(), | |
StrategyType.ANALOGICAL: AnalogicalReasoning(), | |
StrategyType.TASK_DECOMPOSITION: TaskDecompositionStrategy(), | |
StrategyType.RESOURCE_MANAGEMENT: ResourceManagementStrategy(), | |
StrategyType.CONTEXTUAL_PLANNING: ContextualPlanningStrategy(), | |
StrategyType.ADAPTIVE_EXECUTION: AdaptiveExecutionStrategy(), | |
StrategyType.FEEDBACK_INTEGRATION: FeedbackIntegrationStrategy(), | |
StrategyType.LOCAL_LLM: LocalLLMStrategy() # Add local LLM strategy | |
} | |
# Strategy weights with higher weight for LOCAL_LLM | |
self.strategy_weights = strategy_weights or { | |
**{strategy_type: 1.0 for strategy_type in StrategyType}, | |
StrategyType.LOCAL_LLM: 2.0 # Higher weight for local LLM | |
} | |
# Performance tracking | |
self.strategy_performance: Dict[StrategyType, List[float]] = defaultdict(list) | |
self.task_type_performance: Dict[str, Dict[StrategyType, float]] = defaultdict(lambda: defaultdict(float)) | |
self.synthesis_performance: Dict[str, List[float]] = defaultdict(list) | |
async def reason(self, query: str, context: Dict[str, Any]) -> UnifiedResult: | |
"""Main reasoning method combining multiple strategies.""" | |
try: | |
# Analyze task | |
task_analysis = await self._analyze_task(query, context) | |
# Select strategies | |
selected_strategies = await self._select_strategies(task_analysis, context) | |
# Execute strategies | |
strategy_results = await self._execute_strategies( | |
selected_strategies, query, context) | |
# Synthesize results | |
unified_result = await self._synthesize_results( | |
strategy_results, task_analysis, context) | |
# Learn from experience | |
self._update_performance(unified_result) | |
return unified_result | |
except Exception as e: | |
logging.error(f"Error in unified reasoning: {str(e)}") | |
return UnifiedResult( | |
success=False, | |
answer=f"Error: {str(e)}", | |
confidence=0.0, | |
strategy_results={}, | |
synthesis_method="failed", | |
meta_insights=[f"Error occurred: {str(e)}"], | |
performance_metrics={} | |
) | |
async def _analyze_task(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Analyze the task to determine optimal strategy selection.""" | |
prompt = f""" | |
Analyze reasoning task: | |
Query: {query} | |
Context: {json.dumps(context)} | |
Determine: | |
1. Task type and complexity | |
2. Required reasoning capabilities | |
3. Resource requirements | |
4. Success criteria | |
5. Risk factors | |
Format as: | |
[Analysis] | |
Type: ... | |
Complexity: ... | |
Capabilities: ... | |
Resources: ... | |
Criteria: ... | |
Risks: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
return self._parse_task_analysis(response["answer"]) | |
async def _select_strategies(self, task_analysis: Dict[str, Any], context: Dict[str, Any]) -> List[StrategyType]: | |
"""Select appropriate strategies based on task analysis.""" | |
# Calculate strategy scores | |
scores: Dict[StrategyType, float] = {} | |
for strategy_type in StrategyType: | |
base_score = self.strategy_weights[strategy_type] | |
# Task type performance | |
task_type = task_analysis["type"] | |
type_score = self.task_type_performance[task_type][strategy_type] | |
# Recent performance | |
recent_performance = ( | |
sum(self.strategy_performance[strategy_type][-5:]) / 5 | |
if self.strategy_performance[strategy_type] else 0.5 | |
) | |
# Resource match | |
resource_match = self._calculate_resource_match( | |
strategy_type, task_analysis["resources"]) | |
# Capability match | |
capability_match = self._calculate_capability_match( | |
strategy_type, task_analysis["capabilities"]) | |
# Combined score | |
scores[strategy_type] = ( | |
0.3 * base_score + | |
0.2 * type_score + | |
0.2 * recent_performance + | |
0.15 * resource_match + | |
0.15 * capability_match | |
) | |
# Select top strategies | |
selected = sorted( | |
StrategyType, | |
key=lambda x: scores[x], | |
reverse=True | |
)[:self.parallel_threshold] | |
return selected | |
async def _execute_strategies(self, | |
strategies: List[StrategyType], | |
query: str, | |
context: Dict[str, Any]) -> Dict[StrategyType, StrategyResult]: | |
"""Execute selected strategies in parallel.""" | |
async def execute_strategy(strategy_type: StrategyType) -> StrategyResult: | |
strategy = self.strategies[strategy_type] | |
start_time = datetime.now() | |
try: | |
result = await strategy.reason(query, context) | |
return StrategyResult( | |
strategy_type=strategy_type, | |
success=result.get("success", False), | |
answer=result.get("answer"), | |
confidence=result.get("confidence", 0.0), | |
reasoning_trace=result.get("reasoning_trace", []), | |
metadata=result.get("metadata", {}), | |
performance_metrics={ | |
"execution_time": (datetime.now() - start_time).total_seconds(), | |
**result.get("performance_metrics", {}) | |
} | |
) | |
except Exception as e: | |
logging.error(f"Error in strategy {strategy_type}: {str(e)}") | |
return StrategyResult( | |
strategy_type=strategy_type, | |
success=False, | |
answer=None, | |
confidence=0.0, | |
reasoning_trace=[{"error": str(e)}], | |
metadata={}, | |
performance_metrics={"execution_time": (datetime.now() - start_time).total_seconds()} | |
) | |
# Execute strategies in parallel | |
tasks = [execute_strategy(strategy) for strategy in strategies] | |
results = await asyncio.gather(*tasks) | |
return {result.strategy_type: result for result in results} | |
async def _synthesize_results(self, | |
strategy_results: Dict[StrategyType, StrategyResult], | |
task_analysis: Dict[str, Any], | |
context: Dict[str, Any]) -> UnifiedResult: | |
"""Synthesize results from multiple strategies.""" | |
prompt = f""" | |
Synthesize reasoning results: | |
Results: {json.dumps({str(k): self._strategy_result_to_dict(v) | |
for k, v in strategy_results.items()})} | |
Task Analysis: {json.dumps(task_analysis)} | |
Context: {json.dumps(context)} | |
Provide: | |
1. Optimal synthesis method | |
2. Combined answer | |
3. Confidence assessment | |
4. Meta-insights | |
5. Performance analysis | |
Format as: | |
[Synthesis] | |
Method: ... | |
Answer: ... | |
Confidence: ... | |
Insights: ... | |
Performance: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
synthesis = self._parse_synthesis(response["answer"]) | |
return UnifiedResult( | |
success=synthesis["confidence"] >= self.min_confidence, | |
answer=synthesis["answer"], | |
confidence=synthesis["confidence"], | |
strategy_results=strategy_results, | |
synthesis_method=synthesis["method"], | |
meta_insights=synthesis["insights"], | |
performance_metrics=synthesis["performance"] | |
) | |
def _update_performance(self, result: UnifiedResult): | |
"""Update performance metrics and strategy weights.""" | |
# Update strategy performance | |
for strategy_type, strategy_result in result.strategy_results.items(): | |
self.strategy_performance[strategy_type].append(strategy_result.confidence) | |
# Update weights using exponential moving average | |
current_weight = self.strategy_weights[strategy_type] | |
performance = strategy_result.confidence | |
self.strategy_weights[strategy_type] = ( | |
(1 - self.learning_rate) * current_weight + | |
self.learning_rate * performance | |
) | |
# Update synthesis performance | |
self.synthesis_performance[result.synthesis_method].append(result.confidence) | |
def _calculate_resource_match(self, strategy_type: StrategyType, required_resources: Dict[str, Any]) -> float: | |
"""Calculate how well a strategy matches required resources.""" | |
# Implementation-specific resource matching logic | |
return 0.8 # Placeholder | |
def _calculate_capability_match(self, strategy_type: StrategyType, required_capabilities: List[str]) -> float: | |
"""Calculate how well a strategy matches required capabilities.""" | |
# Implementation-specific capability matching logic | |
return 0.8 # Placeholder | |
def _parse_task_analysis(self, response: str) -> Dict[str, Any]: | |
"""Parse task analysis from response.""" | |
analysis = { | |
"type": "", | |
"complexity": 0.0, | |
"capabilities": [], | |
"resources": {}, | |
"criteria": [], | |
"risks": [] | |
} | |
for line in response.split('\n'): | |
line = line.strip() | |
if line.startswith('Type:'): | |
analysis["type"] = line[5:].strip() | |
elif line.startswith('Complexity:'): | |
try: | |
analysis["complexity"] = float(line[11:].strip()) | |
except: | |
pass | |
elif line.startswith('Capabilities:'): | |
analysis["capabilities"] = [c.strip() for c in line[13:].split(',')] | |
elif line.startswith('Resources:'): | |
try: | |
analysis["resources"] = json.loads(line[10:].strip()) | |
except: | |
analysis["resources"] = {"raw": line[10:].strip()} | |
elif line.startswith('Criteria:'): | |
analysis["criteria"] = [c.strip() for c in line[9:].split(',')] | |
elif line.startswith('Risks:'): | |
analysis["risks"] = [r.strip() for r in line[7:].split(',')] | |
return analysis | |
def _parse_synthesis(self, response: str) -> Dict[str, Any]: | |
"""Parse synthesis result from response.""" | |
synthesis = { | |
"method": "", | |
"answer": "", | |
"confidence": 0.0, | |
"insights": [], | |
"performance": {} | |
} | |
for line in response.split('\n'): | |
line = line.strip() | |
if line.startswith('Method:'): | |
synthesis["method"] = line[7:].strip() | |
elif line.startswith('Answer:'): | |
synthesis["answer"] = line[7:].strip() | |
elif line.startswith('Confidence:'): | |
try: | |
synthesis["confidence"] = float(line[11:].strip()) | |
except: | |
pass | |
elif line.startswith('Insights:'): | |
synthesis["insights"] = [i.strip() for i in line[9:].split(',')] | |
elif line.startswith('Performance:'): | |
try: | |
synthesis["performance"] = json.loads(line[12:].strip()) | |
except: | |
synthesis["performance"] = {"raw": line[12:].strip()} | |
return synthesis | |
def _strategy_result_to_dict(self, result: StrategyResult) -> Dict[str, Any]: | |
"""Convert strategy result to dictionary for serialization.""" | |
return { | |
"strategy_type": result.strategy_type.value, | |
"success": result.success, | |
"answer": result.answer, | |
"confidence": result.confidence, | |
"reasoning_trace": result.reasoning_trace, | |
"metadata": result.metadata, | |
"performance_metrics": result.performance_metrics, | |
"timestamp": result.timestamp.isoformat() | |
} | |
def get_performance_metrics(self) -> Dict[str, Any]: | |
"""Get comprehensive performance metrics.""" | |
return { | |
"strategy_weights": dict(self.strategy_weights), | |
"average_performance": { | |
strategy_type.value: sum(scores) / len(scores) if scores else 0 | |
for strategy_type, scores in self.strategy_performance.items() | |
}, | |
"synthesis_success": { | |
method: sum(scores) / len(scores) if scores else 0 | |
for method, scores in self.synthesis_performance.items() | |
}, | |
"task_type_performance": { | |
task_type: dict(strategy_scores) | |
for task_type, strategy_scores in self.task_type_performance.items() | |
} | |
} | |
def clear_performance_history(self): | |
"""Clear performance history and reset weights.""" | |
self.strategy_performance.clear() | |
self.task_type_performance.clear() | |
self.synthesis_performance.clear() | |
self.strategy_weights = { | |
strategy_type: 1.0 for strategy_type in StrategyType | |
} | |