agentic-system / space /chain_of_thought.py
Cascade Bot
Added Groq streaming support and optimizations - clean version
1d75522
raw
history blame
15 kB
"""Chain of Thought reasoning implementation with advanced features."""
import logging
from typing import Dict, Any, List, Optional, Tuple
import json
from dataclasses import dataclass
from enum import Enum
from .base import ReasoningStrategy
class ThoughtType(Enum):
"""Types of thoughts in the chain."""
OBSERVATION = "observation"
ANALYSIS = "analysis"
HYPOTHESIS = "hypothesis"
VERIFICATION = "verification"
CONCLUSION = "conclusion"
REFLECTION = "reflection"
REFINEMENT = "refinement"
@dataclass
class Thought:
"""Represents a single thought in the chain."""
type: ThoughtType
content: str
confidence: float
evidence: List[str]
alternatives: List[str]
next_steps: List[str]
metadata: Dict[str, Any]
class ChainOfThoughtStrategy(ReasoningStrategy):
"""
Advanced Chain of Thought reasoning implementation with:
- Hierarchical thought chains
- Confidence scoring
- Alternative path exploration
- Self-reflection and refinement
- Evidence tracking
- Meta-learning capabilities
"""
def __init__(self,
min_confidence: float = 0.7,
parallel_threshold: int = 3,
learning_rate: float = 0.1,
strategy_weights: Optional[Dict[str, float]] = None):
self.min_confidence = min_confidence
self.parallel_threshold = parallel_threshold
self.learning_rate = learning_rate
self.strategy_weights = strategy_weights or {
"LOCAL_LLM": 0.8,
"CHAIN_OF_THOUGHT": 0.6,
"TREE_OF_THOUGHTS": 0.5,
"META_LEARNING": 0.4
}
self.thought_history: List[Thought] = []
async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Main reasoning method implementing chain of thought."""
try:
# Initialize reasoning chain
chain = await self._initialize_chain(query, context)
# Generate initial thoughts
thoughts = await self._generate_thoughts(query, context)
# Build thought chain
chain = await self._build_chain(thoughts, context)
# Reflect and refine
if self.enable_reflection:
chain = await self._reflect_and_refine(chain, context)
# Extract conclusion
conclusion = await self._extract_conclusion(chain, context)
# Update thought history
self.thought_history.extend(chain)
return {
"success": True,
"answer": conclusion["answer"],
"confidence": conclusion["confidence"],
"reasoning_chain": [self._thought_to_dict(t) for t in chain],
"alternatives": conclusion["alternatives"],
"evidence": conclusion["evidence"],
"meta_insights": conclusion["meta_insights"]
}
except Exception as e:
logging.error(f"Error in chain of thought reasoning: {str(e)}")
return {"success": False, "error": str(e)}
async def _initialize_chain(self, query: str, context: Dict[str, Any]) -> List[Thought]:
"""Initialize the thought chain with observations."""
prompt = f"""
Initialize chain of thought for query:
Query: {query}
Context: {json.dumps(context)}
Provide initial observations:
1. Key elements in query
2. Relevant context factors
3. Initial hypotheses
4. Potential approaches
Format as:
[O1] Element: ... | Relevance: ... | Confidence: ...
[O2] Context: ... | Impact: ... | Confidence: ...
[O3] Hypothesis: ... | Support: ... | Confidence: ...
[O4] Approach: ... | Rationale: ... | Confidence: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_observations(response["answer"])
async def _generate_thoughts(self, query: str, context: Dict[str, Any]) -> List[Thought]:
"""Generate candidate thoughts for the chain."""
prompt = f"""
Generate thoughts for query analysis:
Query: {query}
Context: {json.dumps(context)}
For each thought provide:
1. [Type]: {" | ".join([t.value for t in ThoughtType])}
2. [Content]: Main thought
3. [Evidence]: Supporting evidence
4. [Alternatives]: Alternative perspectives
5. [Next]: Potential next steps
6. [Confidence]: 0-1 score
Format as:
[T1]
Type: ...
Content: ...
Evidence: ...
Alternatives: ...
Next: ...
Confidence: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_thoughts(response["answer"])
async def _build_chain(self, thoughts: List[Thought], context: Dict[str, Any]) -> List[Thought]:
"""Build coherent chain from candidate thoughts."""
prompt = f"""
Build coherent thought chain:
Thoughts: {json.dumps([self._thought_to_dict(t) for t in thoughts])}
Context: {json.dumps(context)}
For each step specify:
1. Selected thought
2. Reasoning for selection
3. Connection to previous
4. Expected impact
Format as:
[S1]
Thought: ...
Reason: ...
Connection: ...
Impact: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_chain(response["answer"], thoughts)
async def _reflect_and_refine(self, chain: List[Thought], context: Dict[str, Any]) -> List[Thought]:
"""Reflect on and refine the thought chain."""
prompt = f"""
Reflect on thought chain:
Chain: {json.dumps([self._thought_to_dict(t) for t in chain])}
Context: {json.dumps(context)}
Analyze for:
1. Logical gaps
2. Weak assumptions
3. Missing evidence
4. Alternative perspectives
Suggest refinements:
1. Additional thoughts
2. Modified reasoning
3. New connections
4. Evidence needs
Format as:
[Analysis]
Gaps: ...
Assumptions: ...
Missing: ...
Alternatives: ...
[Refinements]
Thoughts: ...
Reasoning: ...
Connections: ...
Evidence: ...
"""
response = await context["groq_api"].predict(prompt)
return self._apply_refinements(chain, response["answer"])
async def _extract_conclusion(self, chain: List[Thought], context: Dict[str, Any]) -> Dict[str, Any]:
"""Extract final conclusion from thought chain."""
prompt = f"""
Extract conclusion from thought chain:
Chain: {json.dumps([self._thought_to_dict(t) for t in chain])}
Context: {json.dumps(context)}
Provide:
1. Main conclusion
2. Confidence level
3. Supporting evidence
4. Alternative conclusions
5. Meta-insights gained
6. Future considerations
Format as:
[Conclusion]
Answer: ...
Confidence: ...
Evidence: ...
Alternatives: ...
[Meta]
Insights: ...
Future: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_conclusion(response["answer"])
def _parse_observations(self, response: str) -> List[Thought]:
"""Parse initial observations into thoughts."""
observations = []
lines = response.split('\n')
for line in lines:
if line.startswith('[O'):
parts = line.split('|')
if len(parts) >= 3:
main_part = parts[0].split(']')[1].strip()
key, content = main_part.split(':', 1)
evidence = [p.strip() for p in parts[1].split(':')[1].strip().split(',')]
try:
confidence = float(parts[2].split(':')[1].strip())
except:
confidence = 0.5
observations.append(Thought(
type=ThoughtType.OBSERVATION,
content=content.strip(),
confidence=confidence,
evidence=evidence,
alternatives=[],
next_steps=[],
metadata={"key": key}
))
return observations
def _parse_thoughts(self, response: str) -> List[Thought]:
"""Parse generated thoughts."""
thoughts = []
current = None
for line in response.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('[T'):
if current:
thoughts.append(current)
current = None
elif line.startswith('Type:'):
type_str = line[5:].strip()
try:
thought_type = ThoughtType(type_str.lower())
current = Thought(
type=thought_type,
content="",
confidence=0.0,
evidence=[],
alternatives=[],
next_steps=[],
metadata={}
)
except ValueError:
logging.warning(f"Invalid thought type: {type_str}")
elif current:
if line.startswith('Content:'):
current.content = line[8:].strip()
elif line.startswith('Evidence:'):
current.evidence = [e.strip() for e in line[9:].split(',')]
elif line.startswith('Alternatives:'):
current.alternatives = [a.strip() for a in line[13:].split(',')]
elif line.startswith('Next:'):
current.next_steps = [n.strip() for n in line[5:].split(',')]
elif line.startswith('Confidence:'):
try:
current.confidence = float(line[11:].strip())
except:
current.confidence = 0.5
if current:
thoughts.append(current)
return thoughts
def _parse_chain(self, response: str, thoughts: List[Thought]) -> List[Thought]:
"""Parse and order thoughts into a chain."""
chain = []
thought_map = {self._thought_to_dict(t)["content"]: t for t in thoughts}
for line in response.split('\n'):
if line.startswith('Thought:'):
content = line[8:].strip()
if content in thought_map:
chain.append(thought_map[content])
return chain
def _apply_refinements(self, chain: List[Thought], response: str) -> List[Thought]:
"""Apply refinements to thought chain."""
refined_chain = chain.copy()
# Parse refinements
sections = response.split('[')
for section in sections:
if section.startswith('Refinements]'):
lines = section.split('\n')[1:]
for line in lines:
if line.startswith('Thoughts:'):
new_thoughts = self._parse_refinement_thoughts(line[9:])
refined_chain.extend(new_thoughts)
return refined_chain
def _parse_refinement_thoughts(self, refinements: str) -> List[Thought]:
"""Parse refinement thoughts."""
thoughts = []
for refinement in refinements.split(';'):
if refinement.strip():
thoughts.append(Thought(
type=ThoughtType.REFINEMENT,
content=refinement.strip(),
confidence=0.8, # Refinements typically have high confidence
evidence=[],
alternatives=[],
next_steps=[],
metadata={"refined": True}
))
return thoughts
def _parse_conclusion(self, response: str) -> Dict[str, Any]:
"""Parse final conclusion."""
conclusion = {
"answer": "",
"confidence": 0.0,
"evidence": [],
"alternatives": [],
"meta_insights": [],
"future_considerations": []
}
sections = response.split('[')
for section in sections:
if section.startswith('Conclusion]'):
lines = section.split('\n')[1:]
for line in lines:
if line.startswith('Answer:'):
conclusion["answer"] = line[7:].strip()
elif line.startswith('Confidence:'):
try:
conclusion["confidence"] = float(line[11:].strip())
except:
conclusion["confidence"] = 0.5
elif line.startswith('Evidence:'):
conclusion["evidence"] = [e.strip() for e in line[9:].split(',')]
elif line.startswith('Alternatives:'):
conclusion["alternatives"] = [a.strip() for a in line[13:].split(',')]
elif section.startswith('Meta]'):
lines = section.split('\n')[1:]
for line in lines:
if line.startswith('Insights:'):
conclusion["meta_insights"] = [i.strip() for i in line[9:].split(',')]
elif line.startswith('Future:'):
conclusion["future_considerations"] = [f.strip() for f in line[7:].split(',')]
return conclusion
def _thought_to_dict(self, thought: Thought) -> Dict[str, Any]:
"""Convert thought to dictionary for serialization."""
return {
"type": thought.type.value,
"content": thought.content,
"confidence": thought.confidence,
"evidence": thought.evidence,
"alternatives": thought.alternatives,
"next_steps": thought.next_steps,
"metadata": thought.metadata
}
def get_thought_history(self) -> List[Dict[str, Any]]:
"""Get the history of all thoughts processed."""
return [self._thought_to_dict(t) for t in self.thought_history]
def clear_history(self) -> None:
"""Clear thought history."""
self.thought_history = []