agentic-system / reasoning /recursive.py
Cascade Bot
fix: standardize strategy class names
05c0fea
"""Recursive reasoning strategy implementation."""
import logging
from typing import Dict, Any, List, Optional, Set, Tuple, Callable
import json
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import asyncio
from collections import defaultdict
from .base import ReasoningStrategy, StrategyResult
class SubproblemType(Enum):
"""Types of subproblems in recursive reasoning."""
ATOMIC = "atomic"
COMPOSITE = "composite"
PARALLEL = "parallel"
SEQUENTIAL = "sequential"
CONDITIONAL = "conditional"
ITERATIVE = "iterative"
class SolutionStatus(Enum):
"""Status of subproblem solutions."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
SOLVED = "solved"
FAILED = "failed"
BLOCKED = "blocked"
OPTIMIZING = "optimizing"
@dataclass
class Subproblem:
"""Represents a subproblem in recursive reasoning."""
id: str
type: SubproblemType
query: str
context: Dict[str, Any]
parent_id: Optional[str]
children: List[str]
status: SolutionStatus
solution: Optional[Dict[str, Any]]
confidence: float
dependencies: List[str]
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@dataclass
class RecursiveStep:
"""Represents a step in recursive reasoning."""
id: str
subproblem_id: str
action: str
result: Dict[str, Any]
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class RecursiveStrategy(ReasoningStrategy):
"""Advanced recursive reasoning that:
1. Breaks down complex problems
2. Solves sub-problems recursively
3. Combines solutions
4. Handles base cases
5. Optimizes performance
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""Initialize recursive reasoning."""
super().__init__()
self.config = config or {}
# Standard reasoning parameters
self.min_confidence = self.config.get('min_confidence', 0.7)
self.max_depth = self.config.get('max_depth', 5)
self.optimization_rounds = self.config.get('optimization_rounds', 2)
# Problem tracking
self.subproblems: Dict[str, Subproblem] = {}
self.steps: List[RecursiveStep] = []
self.solution_cache: Dict[str, Dict[str, Any]] = {}
self.cycle_detection: Set[str] = set()
# Performance metrics
self.performance_metrics = {
'depth_distribution': defaultdict(int),
'type_distribution': defaultdict(int),
'success_rate': defaultdict(float),
'total_subproblems': 0,
'solved_subproblems': 0,
'failed_subproblems': 0,
'optimization_rounds': 0,
'cache_hits': 0,
'cycles_detected': 0
}
async def reason(
self,
query: str,
context: Dict[str, Any]
) -> StrategyResult:
"""
Apply recursive reasoning to analyze the query.
Args:
query: The query to reason about
context: Additional context and parameters
Returns:
StrategyResult containing the reasoning output and metadata
"""
try:
# Initialize root problem
root_problem = await self._initialize_problem(query, context)
root_id = root_problem.id
# Solve recursively
solution = await self._solve_recursive(root_id, depth=0)
# Optimize solution
if solution and solution.get('success', False):
solution = await self._optimize_solution(solution, root_problem, context)
# Update metrics
self._update_metrics(root_id)
# Build solution trace
solution_trace = self._get_solution_trace(root_id)
# Calculate overall confidence
confidence = self._calculate_confidence(solution_trace)
return StrategyResult(
strategy_type="recursive",
success=bool(solution and solution.get('success', False)),
answer=solution.get('answer') if solution else None,
confidence=confidence,
reasoning_trace=solution_trace,
metadata={
'problem_tree': self._get_problem_tree(root_id),
'steps': [self._step_to_dict(step) for step in self.steps],
'solution_details': solution if solution else {}
},
performance_metrics=self.performance_metrics
)
except Exception as e:
logging.error(f"Recursive reasoning error: {str(e)}")
return StrategyResult(
strategy_type="recursive",
success=False,
answer=None,
confidence=0.0,
reasoning_trace=[{
'step': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat()
}],
metadata={'error': str(e)},
performance_metrics=self.performance_metrics
)
async def _initialize_problem(
self,
query: str,
context: Dict[str, Any]
) -> Subproblem:
"""Initialize the root problem."""
problem = Subproblem(
id="root",
type=SubproblemType.COMPOSITE,
query=query,
context=context,
parent_id=None,
children=[],
status=SolutionStatus.PENDING,
solution=None,
confidence=1.0,
dependencies=[],
metadata={'depth': 0}
)
self.subproblems[problem.id] = problem
self._record_step(RecursiveStep(
id=f"init_{problem.id}",
subproblem_id=problem.id,
action="initialize",
result={'type': problem.type.value, 'query': query}
))
return problem
async def _solve_recursive(
self,
problem_id: str,
depth: int
) -> Optional[Dict[str, Any]]:
"""Recursively solve a problem and its subproblems."""
if depth > self.max_depth:
return None
problem = self.subproblems[problem_id]
# Check cycle
if problem_id in self.cycle_detection:
self.performance_metrics['cycles_detected'] += 1
return None
self.cycle_detection.add(problem_id)
try:
# Check cache
if problem_id in self.solution_cache:
self.performance_metrics['cache_hits'] += 1
return self.solution_cache[problem_id]
# Decompose if composite
if problem.type != SubproblemType.ATOMIC:
await self._decompose_problem(problem, problem.context)
# Solve atomic problem
if problem.type == SubproblemType.ATOMIC:
solution = await self._solve_atomic(problem)
if solution:
problem.solution = solution
problem.status = SolutionStatus.SOLVED
return solution
else:
problem.status = SolutionStatus.FAILED
return None
# Solve subproblems
subsolutions = []
for child_id in problem.children:
child_solution = await self._solve_recursive(child_id, depth + 1)
if child_solution:
subsolutions.append(child_solution)
# Synthesize solutions
if subsolutions:
solution = await self._synthesize_solutions(subsolutions, problem, problem.context)
if solution:
problem.solution = solution
problem.status = SolutionStatus.SOLVED
self.solution_cache[problem_id] = solution
return solution
problem.status = SolutionStatus.FAILED
return None
finally:
self.cycle_detection.remove(problem_id)
async def _decompose_problem(
self,
problem: Subproblem,
context: Dict[str, Any]
) -> None:
"""Decompose a problem into subproblems."""
subproblems = self._generate_subproblems(problem, context)
for subproblem in subproblems:
self.subproblems[subproblem.id] = subproblem
problem.children.append(subproblem.id)
self._record_step(RecursiveStep(
id=f"decompose_{problem.id}",
subproblem_id=problem.id,
action="decompose",
result={'num_subproblems': len(subproblems)}
))
def _generate_subproblems(
self,
parent: Subproblem,
context: Dict[str, Any]
) -> List[Subproblem]:
"""Generate subproblems for a composite problem."""
# This is a placeholder implementation
# In practice, this would use more sophisticated decomposition
subproblems = []
# Example: Split into 2-3 subproblems
parts = parent.query.split('.')[:3]
for i, part in enumerate(parts):
if part.strip():
subproblem = Subproblem(
id=f"{parent.id}_sub{i}",
type=SubproblemType.ATOMIC,
query=part.strip(),
context=context,
parent_id=parent.id,
children=[],
status=SolutionStatus.PENDING,
solution=None,
confidence=0.0,
dependencies=[],
metadata={'depth': parent.metadata['depth'] + 1}
)
subproblems.append(subproblem)
return subproblems
async def _solve_atomic(
self,
problem: Subproblem
) -> Optional[Dict[str, Any]]:
"""Solve an atomic problem."""
# This is a placeholder implementation
# In practice, this would use more sophisticated solving strategies
solution = {
'success': True,
'answer': f"Solution for {problem.query}",
'confidence': 0.8
}
self._record_step(RecursiveStep(
id=f"solve_{problem.id}",
subproblem_id=problem.id,
action="solve_atomic",
result=solution
))
return solution
async def _synthesize_solutions(
self,
subsolutions: List[Dict[str, Any]],
problem: Subproblem,
context: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Synthesize solutions from subproblems."""
if not subsolutions:
return None
# Combine answers
combined_answer = " ".join(
sol['answer'] for sol in subsolutions if sol.get('answer')
)
# Average confidence
avg_confidence = sum(
sol['confidence'] for sol in subsolutions
) / len(subsolutions)
synthesis = {
'success': True,
'answer': combined_answer,
'confidence': avg_confidence,
'subsolutions': subsolutions
}
self._record_step(RecursiveStep(
id=f"synthesize_{problem.id}",
subproblem_id=problem.id,
action="synthesize",
result={'num_solutions': len(subsolutions)}
))
return synthesis
async def _optimize_solution(
self,
solution: Dict[str, Any],
problem: Subproblem,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Optimize the final solution."""
optimized = solution.copy()
for _ in range(self.optimization_rounds):
self.performance_metrics['optimization_rounds'] += 1
# Example optimization: Improve confidence
if optimized['confidence'] < 0.9:
optimized['confidence'] *= 1.1
self._record_step(RecursiveStep(
id=f"optimize_{problem.id}",
subproblem_id=problem.id,
action="optimize",
result={'confidence_improvement': optimized['confidence'] - solution['confidence']}
))
return optimized
def _calculate_confidence(
self,
solution_trace: List[Dict[str, Any]]
) -> float:
"""Calculate overall confidence from solution trace."""
if not solution_trace:
return 0.0
confidences = [
step.get('confidence', 0.0)
for step in solution_trace
if isinstance(step.get('confidence'), (int, float))
]
return sum(confidences) / len(confidences) if confidences else 0.0
def _update_metrics(self, root_id: str) -> None:
"""Update performance metrics."""
def update_recursive(problem_id: str):
problem = self.subproblems[problem_id]
depth = problem.metadata.get('depth', 0)
self.performance_metrics['depth_distribution'][depth] += 1
self.performance_metrics['type_distribution'][problem.type] += 1
self.performance_metrics['total_subproblems'] += 1
if problem.status == SolutionStatus.SOLVED:
self.performance_metrics['solved_subproblems'] += 1
elif problem.status == SolutionStatus.FAILED:
self.performance_metrics['failed_subproblems'] += 1
for child_id in problem.children:
update_recursive(child_id)
update_recursive(root_id)
# Calculate success rates
total = self.performance_metrics['total_subproblems']
if total > 0:
for problem_type in SubproblemType:
type_count = self.performance_metrics['type_distribution'][problem_type]
if type_count > 0:
success_count = sum(
1 for p in self.subproblems.values()
if p.type == problem_type and p.status == SolutionStatus.SOLVED
)
self.performance_metrics['success_rate'][problem_type] = success_count / type_count
def _get_problem_tree(self, root_id: str) -> Dict[str, Any]:
"""Get the problem decomposition tree."""
def build_tree(problem_id: str) -> Dict[str, Any]:
problem = self.subproblems[problem_id]
return {
'id': problem.id,
'type': problem.type.value,
'status': problem.status.value,
'confidence': problem.confidence,
'children': [build_tree(child_id) for child_id in problem.children]
}
return build_tree(root_id)
def _get_solution_trace(self, root_id: str) -> List[Dict[str, Any]]:
"""Get the solution trace for a problem."""
trace = []
def build_trace(problem_id: str):
problem = self.subproblems[problem_id]
step = {
'id': problem.id,
'type': problem.type.value,
'status': problem.status.value,
'confidence': problem.confidence,
'timestamp': problem.timestamp
}
if problem.solution:
step.update(problem.solution)
trace.append(step)
for child_id in problem.children:
build_trace(child_id)
build_trace(root_id)
return trace
def _record_step(self, step: RecursiveStep) -> None:
"""Record a reasoning step."""
self.steps.append(step)
def _step_to_dict(self, step: RecursiveStep) -> Dict[str, Any]:
"""Convert step to dictionary for serialization."""
return {
'id': step.id,
'subproblem_id': step.subproblem_id,
'action': step.action,
'result': step.result,
'timestamp': step.timestamp
}
def clear_cache(self) -> None:
"""Clear solution cache."""
self.solution_cache.clear()
self.performance_metrics['cache_hits'] = 0