Cascade Bot commited on
Commit
bb6f0d3
·
1 Parent(s): c7d9c30

Renamed BayesianReasoning to BayesianStrategy and fixed imports

Browse files
Files changed (2) hide show
  1. reasoning/__init__.py +2 -2
  2. reasoning/bayesian.py +157 -194
reasoning/__init__.py CHANGED
@@ -39,7 +39,7 @@ from .local_llm import LocalLLMStrategy
39
 
40
  # Advanced reasoning strategies
41
  from .multimodal import MultiModalReasoning
42
- from .bayesian import BayesianReasoning
43
  from .quantum import QuantumReasoning
44
  from .neurosymbolic import NeurosymbolicReasoning
45
  from .emergent import EmergentReasoning
@@ -72,7 +72,7 @@ __all__ = [
72
 
73
  # Advanced reasoning
74
  'MultiModalReasoning',
75
- 'BayesianReasoning',
76
  'QuantumReasoning',
77
  'NeurosymbolicReasoning',
78
  'EmergentReasoning',
 
39
 
40
  # Advanced reasoning strategies
41
  from .multimodal import MultiModalReasoning
42
+ from .bayesian import BayesianStrategy
43
  from .quantum import QuantumReasoning
44
  from .neurosymbolic import NeurosymbolicReasoning
45
  from .emergent import EmergentReasoning
 
72
 
73
  # Advanced reasoning
74
  'MultiModalReasoning',
75
+ 'BayesianStrategy',
76
  'QuantumReasoning',
77
  'NeurosymbolicReasoning',
78
  'EmergentReasoning',
reasoning/bayesian.py CHANGED
@@ -9,7 +9,7 @@ from datetime import datetime
9
  import numpy as np
10
  from collections import defaultdict
11
 
12
- from .base import ReasoningStrategy
13
 
14
  @dataclass
15
  class BayesianHypothesis:
@@ -20,9 +20,8 @@ class BayesianHypothesis:
20
  posterior: float = 0.0
21
  evidence: List[Dict[str, Any]] = field(default_factory=list)
22
 
23
- class BayesianReasoning(ReasoningStrategy):
24
- """
25
- Advanced Bayesian reasoning that:
26
  1. Generates hypotheses
27
  2. Calculates prior probabilities
28
  3. Updates with evidence
@@ -39,8 +38,15 @@ class BayesianReasoning(ReasoningStrategy):
39
  self.prior_weight = self.config.get('prior_weight', 0.3)
40
  self.evidence_threshold = self.config.get('evidence_threshold', 0.1)
41
  self.min_likelihood = self.config.get('min_likelihood', 0.01)
 
 
 
42
 
43
- async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:
 
 
 
 
44
  """
45
  Apply Bayesian reasoning to analyze probabilities and update beliefs.
46
 
@@ -49,119 +55,113 @@ class BayesianReasoning(ReasoningStrategy):
49
  context: Additional context and parameters
50
 
51
  Returns:
52
- Dict containing reasoning results and confidence scores
53
  """
54
  try:
55
- # Generate hypotheses
56
- hypotheses = await self._generate_hypotheses(query, context)
57
 
58
- # Calculate priors
59
- priors = await self._calculate_priors(hypotheses, context)
60
 
61
  # Update with evidence
62
- posteriors = await self._update_with_evidence(
63
- hypotheses,
64
- priors,
65
- context
66
- )
67
 
68
  # Generate analysis
69
  analysis = await self._generate_analysis(posteriors, context)
70
 
71
- return {
72
- 'answer': self._format_analysis(analysis),
73
- 'confidence': self._calculate_confidence(posteriors),
74
- 'hypotheses': hypotheses,
75
- 'priors': priors,
76
- 'posteriors': posteriors,
77
- 'analysis': analysis
78
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
  except Exception as e:
81
- logging.error(f"Bayesian reasoning failed: {str(e)}")
82
- return {
83
- 'error': f"Bayesian reasoning failed: {str(e)}",
84
- 'confidence': 0.0
85
- }
 
 
 
 
 
 
 
 
 
86
 
87
  async def _generate_hypotheses(
88
  self,
89
  query: str,
90
  context: Dict[str, Any]
91
- ) -> List[Dict[str, Any]]:
92
  """Generate plausible hypotheses."""
93
- hypotheses = []
94
-
95
  # Extract key terms for hypothesis generation
96
- terms = set(query.lower().split())
97
 
98
- # Generate hypotheses based on context and terms
99
- if 'options' in context:
100
- # Use provided options as hypotheses
101
- for option in context['options']:
102
- hypotheses.append({
103
- 'name': option,
104
- 'description': f"Hypothesis based on option: {option}",
105
- 'factors': self._extract_factors(option, terms)
106
- })
107
- else:
108
- # Generate default hypotheses
109
- hypotheses.extend([
110
- {
111
- 'name': 'primary',
112
- 'description': "Primary hypothesis based on direct interpretation",
113
- 'factors': self._extract_factors(query, terms)
114
- },
115
- {
116
- 'name': 'alternative',
117
- 'description': "Alternative hypothesis considering other factors",
118
- 'factors': self._generate_alternative_factors(terms)
119
- }
120
- ])
121
 
122
  return hypotheses
123
 
124
  async def _calculate_priors(
125
  self,
126
- hypotheses: List[Dict[str, Any]],
127
  context: Dict[str, Any]
128
  ) -> Dict[str, float]:
129
  """Calculate prior probabilities."""
130
  priors = {}
 
131
 
132
- # Get historical data if available
133
- history = context.get('history', {})
134
- total_cases = sum(history.values()) if history else len(hypotheses)
135
-
136
- for hypothesis in hypotheses:
137
- name = hypothesis['name']
138
-
139
- # Calculate prior from history or use uniform prior
140
- if name in history:
141
- priors[name] = history[name] / total_cases
142
- else:
143
- priors[name] = 1.0 / len(hypotheses)
144
-
145
- # Adjust prior based on factors
146
- factor_weight = len(hypothesis['factors']) / 10 # Normalize factor count
147
- priors[name] = (
148
- priors[name] * (1 - self.prior_weight) +
149
- factor_weight * self.prior_weight
150
- )
151
-
152
- # Normalize priors
153
- total_prior = sum(priors.values())
154
  if total_prior > 0:
155
- priors = {
156
- name: prob / total_prior
157
- for name, prob in priors.items()
158
- }
 
 
 
 
159
 
160
  return priors
161
 
162
  async def _update_with_evidence(
163
  self,
164
- hypotheses: List[Dict[str, Any]],
165
  priors: Dict[str, float],
166
  context: Dict[str, Any]
167
  ) -> Dict[str, float]:
@@ -170,51 +170,41 @@ class BayesianReasoning(ReasoningStrategy):
170
 
171
  # Get evidence from context
172
  evidence = context.get('evidence', [])
173
- if not evidence:
174
- return posteriors
175
 
176
  for e in evidence:
177
- # Calculate likelihood for each hypothesis
178
  likelihoods = {}
179
- for hypothesis in hypotheses:
180
- name = hypothesis['name']
181
- likelihood = self._calculate_likelihood(hypothesis, e)
182
- likelihoods[name] = max(likelihood, self.min_likelihood)
183
 
184
- # Update posteriors using Bayes' rule
185
- total_probability = sum(
186
- likelihoods[name] * posteriors[name]
187
- for name in posteriors
188
- )
189
 
190
- if total_probability > 0:
191
- posteriors = {
192
- name: (likelihoods[name] * posteriors[name]) / total_probability
193
- for name in posteriors
194
- }
 
195
 
196
  return posteriors
197
 
198
- def _calculate_likelihood(
199
  self,
200
- hypothesis: Dict[str, Any],
201
  evidence: Dict[str, Any]
202
  ) -> float:
203
  """Calculate likelihood of evidence given hypothesis."""
204
- # Extract evidence factors
205
- evidence_factors = set(
206
- str(v).lower()
207
- for v in evidence.values()
208
- if isinstance(v, (str, int, float))
209
- )
210
-
211
- # Compare with hypothesis factors
212
- common_factors = evidence_factors.intersection(hypothesis['factors'])
213
 
214
- if not evidence_factors:
215
- return 0.5 # Neutral likelihood if no factors
 
216
 
217
- return len(common_factors) / len(evidence_factors)
218
 
219
  async def _generate_analysis(
220
  self,
@@ -222,104 +212,77 @@ class BayesianReasoning(ReasoningStrategy):
222
  context: Dict[str, Any]
223
  ) -> Dict[str, Any]:
224
  """Generate probabilistic analysis."""
225
- # Sort hypotheses by posterior probability
226
- ranked_hypotheses = sorted(
227
- posteriors.items(),
228
- key=lambda x: x[1],
229
- reverse=True
230
- )
231
 
232
- # Calculate statistics
233
- mean = np.mean(list(posteriors.values()))
234
- std = np.std(list(posteriors.values()))
235
- entropy = -sum(
236
- p * np.log2(p) if p > 0 else 0
237
- for p in posteriors.values()
238
- )
239
 
240
- return {
241
- 'top_hypothesis': ranked_hypotheses[0][0],
242
- 'probability': ranked_hypotheses[0][1],
243
- 'alternatives': [
244
- {'name': name, 'probability': prob}
245
- for name, prob in ranked_hypotheses[1:]
246
- ],
247
- 'statistics': {
248
- 'mean': mean,
249
- 'std': std,
250
- 'entropy': entropy
251
- }
252
- }
253
 
254
  def _format_analysis(self, analysis: Dict[str, Any]) -> str:
255
  """Format analysis into readable text."""
256
- sections = []
257
 
258
- # Top hypothesis
259
- sections.append(
260
- f"Most likely hypothesis: {analysis['top_hypothesis']} "
261
- f"(probability: {analysis['probability']:.2%})"
262
- )
263
 
264
- # Alternative hypotheses
265
- if analysis['alternatives']:
266
- sections.append("\nAlternative hypotheses:")
267
- for alt in analysis['alternatives']:
268
- sections.append(
269
- f"- {alt['name']}: {alt['probability']:.2%}"
270
- )
271
-
272
- # Statistics
273
- stats = analysis['statistics']
274
- sections.append("\nDistribution statistics:")
275
- sections.append(f"- Mean probability: {stats['mean']:.2%}")
276
- sections.append(f"- Standard deviation: {stats['std']:.2%}")
277
- sections.append(f"- Entropy: {stats['entropy']:.2f} bits")
278
 
279
- return "\n".join(sections)
280
 
281
  def _calculate_confidence(self, posteriors: Dict[str, float]) -> float:
282
  """Calculate overall confidence score."""
283
  if not posteriors:
284
  return 0.0
285
 
286
- # Base confidence
287
- confidence = 0.5
288
-
289
- # Adjust based on probability distribution
290
- probs = list(posteriors.values())
291
-
292
- # Strong leading hypothesis increases confidence
293
- max_prob = max(probs)
294
- if max_prob > 0.8:
295
- confidence += 0.3
296
- elif max_prob > 0.6:
297
- confidence += 0.2
298
- elif max_prob > 0.4:
299
- confidence += 0.1
300
-
301
- # Low entropy (clear distinction) increases confidence
302
- entropy = -sum(p * np.log2(p) if p > 0 else 0 for p in probs)
303
- max_entropy = -np.log2(1/len(probs)) # Maximum possible entropy
304
 
305
- if entropy < 0.3 * max_entropy:
306
- confidence += 0.2
307
- elif entropy < 0.6 * max_entropy:
308
- confidence += 0.1
 
 
 
 
 
309
 
310
- return min(confidence, 1.0)
311
 
312
  def _extract_factors(self, text: str, terms: Set[str]) -> Set[str]:
313
  """Extract relevant factors from text."""
314
- return set(word.lower() for word in text.split() if word.lower() in terms)
 
 
 
315
 
316
- def _generate_alternative_factors(self, terms: Set[str]) -> Set[str]:
317
  """Generate factors for alternative hypothesis."""
318
- # Simple approach: use terms not in primary hypothesis
319
- return set(
320
- word for word in terms
321
- if not any(
322
- similar in word or word in similar
323
- for similar in terms
324
- )
325
- )
 
 
9
  import numpy as np
10
  from collections import defaultdict
11
 
12
+ from .base import ReasoningStrategy, StrategyResult
13
 
14
  @dataclass
15
  class BayesianHypothesis:
 
20
  posterior: float = 0.0
21
  evidence: List[Dict[str, Any]] = field(default_factory=list)
22
 
23
+ class BayesianStrategy(ReasoningStrategy):
24
+ """Advanced Bayesian reasoning that:
 
25
  1. Generates hypotheses
26
  2. Calculates prior probabilities
27
  3. Updates with evidence
 
38
  self.prior_weight = self.config.get('prior_weight', 0.3)
39
  self.evidence_threshold = self.config.get('evidence_threshold', 0.1)
40
  self.min_likelihood = self.config.get('min_likelihood', 0.01)
41
+
42
+ # Initialize hypothesis storage
43
+ self.hypotheses: List[BayesianHypothesis] = []
44
 
45
+ async def reason(
46
+ self,
47
+ query: str,
48
+ context: Dict[str, Any]
49
+ ) -> StrategyResult:
50
  """
51
  Apply Bayesian reasoning to analyze probabilities and update beliefs.
52
 
 
55
  context: Additional context and parameters
56
 
57
  Returns:
58
+ StrategyResult containing reasoning results and confidence scores
59
  """
60
  try:
61
+ # Generate initial hypotheses
62
+ self.hypotheses = await self._generate_hypotheses(query, context)
63
 
64
+ # Calculate prior probabilities
65
+ priors = await self._calculate_priors(self.hypotheses, context)
66
 
67
  # Update with evidence
68
+ posteriors = await self._update_with_evidence(self.hypotheses, priors, context)
 
 
 
 
69
 
70
  # Generate analysis
71
  analysis = await self._generate_analysis(posteriors, context)
72
 
73
+ # Format results
74
+ answer = self._format_analysis(analysis)
75
+ confidence = self._calculate_confidence(posteriors)
76
+
77
+ return StrategyResult(
78
+ strategy_type="bayesian",
79
+ success=True,
80
+ answer=answer,
81
+ confidence=confidence,
82
+ reasoning_trace=[{
83
+ "step": "bayesian_analysis",
84
+ "hypotheses": [h.__dict__ for h in self.hypotheses],
85
+ "priors": priors,
86
+ "posteriors": posteriors,
87
+ "analysis": analysis,
88
+ "timestamp": datetime.now().isoformat()
89
+ }],
90
+ metadata={
91
+ "num_hypotheses": len(self.hypotheses),
92
+ "max_posterior": max(posteriors.values()) if posteriors else 0.0,
93
+ "config": self.config
94
+ },
95
+ performance_metrics={
96
+ "prior_weight": self.prior_weight,
97
+ "evidence_threshold": self.evidence_threshold,
98
+ "min_likelihood": self.min_likelihood
99
+ }
100
+ )
101
 
102
  except Exception as e:
103
+ logging.error(f"Bayesian reasoning error: {str(e)}")
104
+ return StrategyResult(
105
+ strategy_type="bayesian",
106
+ success=False,
107
+ answer=None,
108
+ confidence=0.0,
109
+ reasoning_trace=[{
110
+ "step": "error",
111
+ "error": str(e),
112
+ "timestamp": datetime.now().isoformat()
113
+ }],
114
+ metadata={"error": str(e)},
115
+ performance_metrics={}
116
+ )
117
 
118
  async def _generate_hypotheses(
119
  self,
120
  query: str,
121
  context: Dict[str, Any]
122
+ ) -> List[BayesianHypothesis]:
123
  """Generate plausible hypotheses."""
 
 
124
  # Extract key terms for hypothesis generation
125
+ terms = self._extract_factors(query, set())
126
 
127
+ # Generate alternative hypotheses
128
+ alternatives = self._generate_alternative_factors(terms)
129
+
130
+ # Create hypothesis objects
131
+ hypotheses = []
132
+ for name, prior in alternatives.items():
133
+ hypotheses.append(BayesianHypothesis(
134
+ name=name,
135
+ prior=prior,
136
+ likelihood=1.0 # Initial likelihood
137
+ ))
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
  return hypotheses
140
 
141
  async def _calculate_priors(
142
  self,
143
+ hypotheses: List[BayesianHypothesis],
144
  context: Dict[str, Any]
145
  ) -> Dict[str, float]:
146
  """Calculate prior probabilities."""
147
  priors = {}
148
+ total_prior = sum(h.prior for h in hypotheses)
149
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
  if total_prior > 0:
151
+ # Normalize priors
152
+ for h in hypotheses:
153
+ priors[h.name] = h.prior / total_prior
154
+ else:
155
+ # Equal priors if no information
156
+ prior = 1.0 / len(hypotheses)
157
+ for h in hypotheses:
158
+ priors[h.name] = prior
159
 
160
  return priors
161
 
162
  async def _update_with_evidence(
163
  self,
164
+ hypotheses: List[BayesianHypothesis],
165
  priors: Dict[str, float],
166
  context: Dict[str, Any]
167
  ) -> Dict[str, float]:
 
170
 
171
  # Get evidence from context
172
  evidence = context.get('evidence', [])
 
 
173
 
174
  for e in evidence:
175
+ # Calculate likelihoods
176
  likelihoods = {}
177
+ total_likelihood = 0.0
 
 
 
178
 
179
+ for h in hypotheses:
180
+ likelihood = await self._calculate_likelihood(h, e)
181
+ likelihoods[h.name] = max(likelihood, self.min_likelihood)
182
+ total_likelihood += likelihood * posteriors[h.name]
 
183
 
184
+ # Update posteriors using Bayes' rule
185
+ if total_likelihood > 0:
186
+ for h in hypotheses:
187
+ posteriors[h.name] = (
188
+ likelihoods[h.name] * posteriors[h.name] / total_likelihood
189
+ )
190
 
191
  return posteriors
192
 
193
+ async def _calculate_likelihood(
194
  self,
195
+ hypothesis: BayesianHypothesis,
196
  evidence: Dict[str, Any]
197
  ) -> float:
198
  """Calculate likelihood of evidence given hypothesis."""
199
+ # Simple likelihood calculation
200
+ # Could be enhanced with more sophisticated methods
201
+ base_likelihood = 0.5
 
 
 
 
 
 
202
 
203
+ # Adjust based on evidence strength
204
+ strength = evidence.get('strength', 0.0)
205
+ likelihood = base_likelihood * (1 + strength)
206
 
207
+ return min(1.0, max(self.min_likelihood, likelihood))
208
 
209
  async def _generate_analysis(
210
  self,
 
212
  context: Dict[str, Any]
213
  ) -> Dict[str, Any]:
214
  """Generate probabilistic analysis."""
215
+ analysis = {
216
+ 'top_hypothesis': max(posteriors.items(), key=lambda x: x[1]),
217
+ 'confidence': self._calculate_confidence(posteriors),
218
+ 'distribution': posteriors,
219
+ 'summary': []
220
+ }
221
 
222
+ # Generate summary points
223
+ for name, prob in sorted(posteriors.items(), key=lambda x: x[1], reverse=True):
224
+ analysis['summary'].append({
225
+ 'hypothesis': name,
226
+ 'probability': prob,
227
+ 'strength': 'strong' if prob > 0.7 else 'moderate' if prob > 0.3 else 'weak'
228
+ })
229
 
230
+ return analysis
 
 
 
 
 
 
 
 
 
 
 
 
231
 
232
  def _format_analysis(self, analysis: Dict[str, Any]) -> str:
233
  """Format analysis into readable text."""
234
+ top_hyp, top_prob = analysis['top_hypothesis']
235
 
236
+ text = [
237
+ f"Based on Bayesian analysis:",
238
+ f"- Most likely hypothesis: {top_hyp} (probability: {top_prob:.2f})",
239
+ "\nProbability distribution:"
240
+ ]
241
 
242
+ for item in analysis['summary']:
243
+ text.append(
244
+ f"- {item['hypothesis']}: {item['probability']:.2f} "
245
+ f"({item['strength']} evidence)"
246
+ )
 
 
 
 
 
 
 
 
 
247
 
248
+ return "\n".join(text)
249
 
250
  def _calculate_confidence(self, posteriors: Dict[str, float]) -> float:
251
  """Calculate overall confidence score."""
252
  if not posteriors:
253
  return 0.0
254
 
255
+ # Get top two probabilities
256
+ probs = sorted(posteriors.values(), reverse=True)
257
+ top_prob = probs[0]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
258
 
259
+ if len(probs) > 1:
260
+ # Consider the gap between top hypotheses
261
+ second_prob = probs[1]
262
+ margin = top_prob - second_prob
263
+
264
+ # Confidence increases with both probability and margin
265
+ confidence = (top_prob + margin) / 2
266
+ else:
267
+ confidence = top_prob
268
 
269
+ return min(1.0, max(0.0, confidence))
270
 
271
  def _extract_factors(self, text: str, terms: Set[str]) -> Set[str]:
272
  """Extract relevant factors from text."""
273
+ # Simple word-based extraction
274
+ # Could be enhanced with NLP techniques
275
+ words = text.lower().split()
276
+ return set(words).union(terms)
277
 
278
+ def _generate_alternative_factors(self, terms: Set[str]) -> Dict[str, float]:
279
  """Generate factors for alternative hypothesis."""
280
+ # Simple alternative generation
281
+ # Could be enhanced with domain knowledge
282
+ alternatives = {
283
+ 'primary': 0.6,
284
+ 'alternative': 0.3,
285
+ 'null': 0.1
286
+ }
287
+
288
+ return alternatives