Spaces:
Runtime error
Runtime error
"""Recursive reasoning implementation with advanced decomposition and synthesis.""" | |
import logging | |
from typing import Dict, Any, List, Optional, Set, Tuple, Callable | |
import json | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from datetime import datetime | |
import asyncio | |
from collections import defaultdict | |
from .base import ReasoningStrategy | |
class SubproblemType(Enum): | |
"""Types of subproblems in recursive reasoning.""" | |
ATOMIC = "atomic" | |
COMPOSITE = "composite" | |
PARALLEL = "parallel" | |
SEQUENTIAL = "sequential" | |
CONDITIONAL = "conditional" | |
ITERATIVE = "iterative" | |
class SolutionStatus(Enum): | |
"""Status of subproblem solutions.""" | |
PENDING = "pending" | |
IN_PROGRESS = "in_progress" | |
SOLVED = "solved" | |
FAILED = "failed" | |
BLOCKED = "blocked" | |
OPTIMIZING = "optimizing" | |
class Subproblem: | |
"""Represents a subproblem in recursive reasoning.""" | |
id: str | |
type: SubproblemType | |
query: str | |
context: Dict[str, Any] | |
parent_id: Optional[str] | |
children: List[str] | |
status: SolutionStatus | |
solution: Optional[Dict[str, Any]] | |
confidence: float | |
dependencies: List[str] | |
metadata: Dict[str, Any] = field(default_factory=dict) | |
class RecursiveStep: | |
"""Represents a step in recursive reasoning.""" | |
id: str | |
subproblem_id: str | |
action: str | |
timestamp: datetime | |
result: Optional[Dict[str, Any]] | |
metrics: Dict[str, float] | |
metadata: Dict[str, Any] = field(default_factory=dict) | |
class RecursiveReasoning(ReasoningStrategy): | |
""" | |
Advanced Recursive Reasoning implementation with: | |
- Dynamic problem decomposition | |
- Parallel subproblem solving | |
- Solution synthesis | |
- Cycle detection | |
- Optimization strategies | |
""" | |
def __init__(self, config: Optional[Dict[str, Any]] = None): | |
"""Initialize recursive reasoning.""" | |
super().__init__() | |
self.config = config or {} | |
# Standard reasoning parameters | |
self.min_confidence = self.config.get('min_confidence', 0.7) | |
self.parallel_threshold = self.config.get('parallel_threshold', 3) | |
self.learning_rate = self.config.get('learning_rate', 0.1) | |
self.strategy_weights = self.config.get('strategy_weights', { | |
"LOCAL_LLM": 0.8, | |
"CHAIN_OF_THOUGHT": 0.6, | |
"TREE_OF_THOUGHTS": 0.5, | |
"META_LEARNING": 0.4 | |
}) | |
# Recursive reasoning specific parameters | |
self.max_depth = self.config.get('max_depth', 5) | |
self.optimization_rounds = self.config.get('optimization_rounds', 2) | |
# Problem tracking | |
self.subproblems: Dict[str, Subproblem] = {} | |
self.steps: List[RecursiveStep] = [] | |
self.solution_cache: Dict[str, Dict[str, Any]] = {} | |
self.cycle_detection: Set[str] = set() | |
# Performance metrics | |
self.depth_distribution: Dict[int, int] = defaultdict(int) | |
self.type_distribution: Dict[SubproblemType, int] = defaultdict(int) | |
self.success_rate: Dict[SubproblemType, float] = defaultdict(float) | |
async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Main reasoning method implementing recursive reasoning.""" | |
try: | |
# Initialize root problem | |
root = await self._initialize_problem(query, context) | |
self.subproblems[root.id] = root | |
# Recursively solve | |
solution = await self._solve_recursive(root.id, depth=0) | |
# Optimize solution | |
optimized = await self._optimize_solution(solution, root, context) | |
# Update metrics | |
self._update_metrics(root.id) | |
return { | |
"success": True, | |
"answer": optimized["answer"], | |
"confidence": optimized["confidence"], | |
"decomposition": self._get_problem_tree(root.id), | |
"solution_trace": self._get_solution_trace(root.id), | |
"performance_metrics": self._get_performance_metrics(), | |
"meta_insights": optimized["meta_insights"] | |
} | |
except Exception as e: | |
logging.error(f"Error in recursive reasoning: {str(e)}") | |
return {"success": False, "error": str(e)} | |
async def _initialize_problem(self, query: str, context: Dict[str, Any]) -> Subproblem: | |
"""Initialize the root problem.""" | |
prompt = f""" | |
Initialize recursive reasoning problem: | |
Query: {query} | |
Context: {json.dumps(context)} | |
Analyze for: | |
1. Problem type classification | |
2. Initial decomposition strategy | |
3. Key dependencies | |
4. Solution approach | |
Format as: | |
[Problem] | |
Type: ... | |
Strategy: ... | |
Dependencies: ... | |
Approach: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
return self._parse_problem_init(response["answer"], query, context) | |
async def _decompose_problem(self, problem: Subproblem, context: Dict[str, Any]) -> List[Subproblem]: | |
"""Decompose a problem into subproblems.""" | |
prompt = f""" | |
Decompose problem into subproblems: | |
Problem: {json.dumps(self._problem_to_dict(problem))} | |
Context: {json.dumps(context)} | |
For each subproblem specify: | |
1. [Type]: {" | ".join([t.value for t in SubproblemType])} | |
2. [Query]: Specific question | |
3. [Dependencies]: Required solutions | |
4. [Approach]: Solution strategy | |
Format as: | |
[S1] | |
Type: ... | |
Query: ... | |
Dependencies: ... | |
Approach: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
return self._parse_subproblems(response["answer"], problem.id, context) | |
async def _solve_recursive(self, problem_id: str, depth: int) -> Dict[str, Any]: | |
"""Recursively solve a problem and its subproblems.""" | |
if depth > self.max_depth: | |
return {"success": False, "error": "Maximum recursion depth exceeded"} | |
if problem_id in self.cycle_detection: | |
return {"success": False, "error": "Cycle detected in recursive solving"} | |
problem = self.subproblems[problem_id] | |
self.cycle_detection.add(problem_id) | |
self.depth_distribution[depth] += 1 | |
try: | |
# Check cache | |
cache_key = f"{problem.query}:{json.dumps(problem.context)}" | |
if cache_key in self.solution_cache: | |
return self.solution_cache[cache_key] | |
# Check if atomic | |
if problem.type == SubproblemType.ATOMIC: | |
solution = await self._solve_atomic(problem) | |
else: | |
# Decompose | |
subproblems = await self._decompose_problem(problem, problem.context) | |
for sub in subproblems: | |
self.subproblems[sub.id] = sub | |
problem.children.append(sub.id) | |
# Solve subproblems | |
if problem.type == SubproblemType.PARALLEL and len(subproblems) >= self.parallel_threshold: | |
# Solve in parallel | |
tasks = [self._solve_recursive(sub.id, depth + 1) for sub in subproblems] | |
subsolutions = await asyncio.gather(*tasks) | |
else: | |
# Solve sequentially | |
subsolutions = [] | |
for sub in subproblems: | |
subsolution = await self._solve_recursive(sub.id, depth + 1) | |
subsolutions.append(subsolution) | |
# Synthesize solutions | |
solution = await self._synthesize_solutions(subsolutions, problem, problem.context) | |
# Cache solution | |
self.solution_cache[cache_key] = solution | |
problem.solution = solution | |
problem.status = SolutionStatus.SOLVED if solution["success"] else SolutionStatus.FAILED | |
return solution | |
finally: | |
self.cycle_detection.remove(problem_id) | |
async def _solve_atomic(self, problem: Subproblem) -> Dict[str, Any]: | |
"""Solve an atomic problem.""" | |
prompt = f""" | |
Solve atomic problem: | |
Problem: {json.dumps(self._problem_to_dict(problem))} | |
Provide: | |
1. Direct solution | |
2. Confidence level | |
3. Supporting evidence | |
4. Alternative approaches | |
Format as: | |
[Solution] | |
Answer: ... | |
Confidence: ... | |
Evidence: ... | |
Alternatives: ... | |
""" | |
response = await problem.context["groq_api"].predict(prompt) | |
solution = self._parse_atomic_solution(response["answer"]) | |
self._record_step(RecursiveStep( | |
id=f"step_{len(self.steps)}", | |
subproblem_id=problem.id, | |
action="atomic_solve", | |
timestamp=datetime.now(), | |
result=solution, | |
metrics={"confidence": solution.get("confidence", 0.0)}, | |
metadata={} | |
)) | |
return solution | |
async def _synthesize_solutions(self, subsolutions: List[Dict[str, Any]], problem: Subproblem, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Synthesize solutions from subproblems.""" | |
prompt = f""" | |
Synthesize solutions: | |
Problem: {json.dumps(self._problem_to_dict(problem))} | |
Solutions: {json.dumps(subsolutions)} | |
Context: {json.dumps(context)} | |
Provide: | |
1. Integrated solution | |
2. Confidence assessment | |
3. Integration method | |
4. Quality metrics | |
Format as: | |
[Synthesis] | |
Solution: ... | |
Confidence: ... | |
Method: ... | |
Metrics: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
synthesis = self._parse_synthesis(response["answer"]) | |
self._record_step(RecursiveStep( | |
id=f"step_{len(self.steps)}", | |
subproblem_id=problem.id, | |
action="synthesize", | |
timestamp=datetime.now(), | |
result=synthesis, | |
metrics={"confidence": synthesis.get("confidence", 0.0)}, | |
metadata={"num_subsolutions": len(subsolutions)} | |
)) | |
return synthesis | |
async def _optimize_solution(self, solution: Dict[str, Any], problem: Subproblem, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Optimize the final solution.""" | |
prompt = f""" | |
Optimize recursive solution: | |
Original: {json.dumps(solution)} | |
Problem: {json.dumps(self._problem_to_dict(problem))} | |
Context: {json.dumps(context)} | |
Optimize for: | |
1. Completeness | |
2. Consistency | |
3. Efficiency | |
4. Clarity | |
Format as: | |
[Optimization] | |
Answer: ... | |
Improvements: ... | |
Metrics: ... | |
Insights: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
return self._parse_optimization(response["answer"]) | |
def _update_metrics(self, root_id: str): | |
"""Update performance metrics.""" | |
def update_recursive(problem_id: str): | |
problem = self.subproblems[problem_id] | |
self.type_distribution[problem.type] += 1 | |
if problem.status == SolutionStatus.SOLVED: | |
self.success_rate[problem.type] = ( | |
self.success_rate[problem.type] * (self.type_distribution[problem.type] - 1) + | |
problem.confidence | |
) / self.type_distribution[problem.type] | |
for child_id in problem.children: | |
update_recursive(child_id) | |
update_recursive(root_id) | |
def _get_problem_tree(self, root_id: str) -> Dict[str, Any]: | |
"""Get the problem decomposition tree.""" | |
def build_tree(problem_id: str) -> Dict[str, Any]: | |
problem = self.subproblems[problem_id] | |
return { | |
"id": problem.id, | |
"type": problem.type.value, | |
"query": problem.query, | |
"status": problem.status.value, | |
"confidence": problem.confidence, | |
"children": [build_tree(child_id) for child_id in problem.children] | |
} | |
return build_tree(root_id) | |
def _get_solution_trace(self, root_id: str) -> List[Dict[str, Any]]: | |
"""Get the solution trace for a problem.""" | |
return [self._step_to_dict(step) for step in self.steps | |
if step.subproblem_id == root_id or | |
any(step.subproblem_id == sub_id for sub_id in self.subproblems[root_id].children)] | |
def _get_performance_metrics(self) -> Dict[str, Any]: | |
"""Get current performance metrics.""" | |
return { | |
"depth_distribution": dict(self.depth_distribution), | |
"type_distribution": {t.value: c for t, c in self.type_distribution.items()}, | |
"success_rate": {t.value: r for t, r in self.success_rate.items()}, | |
"cache_hits": len(self.solution_cache), | |
"total_steps": len(self.steps) | |
} | |
def _record_step(self, step: RecursiveStep): | |
"""Record a reasoning step.""" | |
self.steps.append(step) | |
def _parse_problem_init(self, response: str, query: str, context: Dict[str, Any]) -> Subproblem: | |
"""Parse initial problem configuration.""" | |
problem_type = SubproblemType.COMPOSITE # default | |
dependencies = [] | |
metadata = {} | |
for line in response.split('\n'): | |
line = line.strip() | |
if line.startswith('Type:'): | |
try: | |
problem_type = SubproblemType(line[5:].strip().lower()) | |
except ValueError: | |
pass | |
elif line.startswith('Dependencies:'): | |
dependencies = [d.strip() for d in line[13:].split(',')] | |
elif line.startswith('Strategy:') or line.startswith('Approach:'): | |
metadata["strategy"] = line.split(':', 1)[1].strip() | |
return Subproblem( | |
id="root", | |
type=problem_type, | |
query=query, | |
context=context, | |
parent_id=None, | |
children=[], | |
status=SolutionStatus.PENDING, | |
solution=None, | |
confidence=0.0, | |
dependencies=dependencies, | |
metadata=metadata | |
) | |
def _parse_subproblems(self, response: str, parent_id: str, context: Dict[str, Any]) -> List[Subproblem]: | |
"""Parse subproblems from response.""" | |
subproblems = [] | |
current = None | |
for line in response.split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
if line.startswith('[S'): | |
if current: | |
subproblems.append(current) | |
current = None | |
elif line.startswith('Type:'): | |
try: | |
problem_type = SubproblemType(line[5:].strip().lower()) | |
current = Subproblem( | |
id=f"{parent_id}_{len(subproblems)}", | |
type=problem_type, | |
query="", | |
context=context, | |
parent_id=parent_id, | |
children=[], | |
status=SolutionStatus.PENDING, | |
solution=None, | |
confidence=0.0, | |
dependencies=[], | |
metadata={} | |
) | |
except ValueError: | |
current = None | |
elif current: | |
if line.startswith('Query:'): | |
current.query = line[6:].strip() | |
elif line.startswith('Dependencies:'): | |
current.dependencies = [d.strip() for d in line[13:].split(',')] | |
elif line.startswith('Approach:'): | |
current.metadata["approach"] = line[9:].strip() | |
if current: | |
subproblems.append(current) | |
return subproblems | |
def _parse_atomic_solution(self, response: str) -> Dict[str, Any]: | |
"""Parse atomic solution from response.""" | |
solution = { | |
"success": True, | |
"answer": "", | |
"confidence": 0.0, | |
"evidence": [], | |
"alternatives": [] | |
} | |
for line in response.split('\n'): | |
line = line.strip() | |
if line.startswith('Answer:'): | |
solution["answer"] = line[7:].strip() | |
elif line.startswith('Confidence:'): | |
try: | |
solution["confidence"] = float(line[11:].strip()) | |
except: | |
pass | |
elif line.startswith('Evidence:'): | |
solution["evidence"] = [e.strip() for e in line[9:].split(',')] | |
elif line.startswith('Alternatives:'): | |
solution["alternatives"] = [a.strip() for a in line[13:].split(',')] | |
return solution | |
def _parse_synthesis(self, response: str) -> Dict[str, Any]: | |
"""Parse synthesis result from response.""" | |
synthesis = { | |
"success": True, | |
"solution": "", | |
"confidence": 0.0, | |
"method": "", | |
"metrics": {} | |
} | |
for line in response.split('\n'): | |
line = line.strip() | |
if line.startswith('Solution:'): | |
synthesis["solution"] = line[9:].strip() | |
elif line.startswith('Confidence:'): | |
try: | |
synthesis["confidence"] = float(line[11:].strip()) | |
except: | |
pass | |
elif line.startswith('Method:'): | |
synthesis["method"] = line[7:].strip() | |
elif line.startswith('Metrics:'): | |
try: | |
synthesis["metrics"] = json.loads(line[8:].strip()) | |
except: | |
pass | |
return synthesis | |
def _parse_optimization(self, response: str) -> Dict[str, Any]: | |
"""Parse optimization result from response.""" | |
optimization = { | |
"answer": "", | |
"confidence": 0.0, | |
"improvements": [], | |
"metrics": {}, | |
"meta_insights": [] | |
} | |
for line in response.split('\n'): | |
line = line.strip() | |
if line.startswith('Answer:'): | |
optimization["answer"] = line[7:].strip() | |
elif line.startswith('Improvements:'): | |
optimization["improvements"] = [i.strip() for i in line[13:].split(',')] | |
elif line.startswith('Metrics:'): | |
try: | |
optimization["metrics"] = json.loads(line[8:].strip()) | |
except: | |
pass | |
elif line.startswith('Insights:'): | |
optimization["meta_insights"] = [i.strip() for i in line[9:].split(',')] | |
return optimization | |
def _problem_to_dict(self, problem: Subproblem) -> Dict[str, Any]: | |
"""Convert problem to dictionary for serialization.""" | |
return { | |
"id": problem.id, | |
"type": problem.type.value, | |
"query": problem.query, | |
"parent_id": problem.parent_id, | |
"children": problem.children, | |
"status": problem.status.value, | |
"confidence": problem.confidence, | |
"dependencies": problem.dependencies, | |
"metadata": problem.metadata | |
} | |
def _step_to_dict(self, step: RecursiveStep) -> Dict[str, Any]: | |
"""Convert step to dictionary for serialization.""" | |
return { | |
"id": step.id, | |
"subproblem_id": step.subproblem_id, | |
"action": step.action, | |
"timestamp": step.timestamp.isoformat(), | |
"result": step.result, | |
"metrics": step.metrics, | |
"metadata": step.metadata | |
} | |
def clear_cache(self): | |
"""Clear solution cache.""" | |
self.solution_cache.clear() | |
def get_statistics(self) -> Dict[str, Any]: | |
"""Get detailed statistics about the reasoning process.""" | |
return { | |
"total_problems": len(self.subproblems), | |
"total_steps": len(self.steps), | |
"cache_size": len(self.solution_cache), | |
"type_distribution": dict(self.type_distribution), | |
"depth_distribution": dict(self.depth_distribution), | |
"success_rates": dict(self.success_rate), | |
"average_confidence": sum(p.confidence for p in self.subproblems.values()) / len(self.subproblems) if self.subproblems else 0.0 | |
} | |