Spaces:
Runtime error
Runtime error
"""Advanced Bayesian reasoning for probabilistic analysis.""" | |
import logging | |
from typing import Dict, Any, List, Optional, Set, Union, Type, Tuple | |
import json | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from datetime import datetime | |
import numpy as np | |
from collections import defaultdict | |
from .base import ReasoningStrategy | |
class BayesianHypothesis: | |
"""Bayesian hypothesis with probabilities.""" | |
name: str | |
prior: float | |
likelihood: float | |
posterior: float = 0.0 | |
evidence: List[Dict[str, Any]] = field(default_factory=list) | |
class BayesianReasoning(ReasoningStrategy): | |
""" | |
Advanced Bayesian reasoning that: | |
1. Generates hypotheses | |
2. Calculates prior probabilities | |
3. Updates with evidence | |
4. Computes posteriors | |
5. Provides probabilistic analysis | |
""" | |
def __init__(self, config: Optional[Dict[str, Any]] = None): | |
"""Initialize Bayesian reasoning.""" | |
super().__init__() | |
self.config = config or {} | |
# Configure Bayesian parameters | |
self.prior_weight = self.config.get('prior_weight', 0.3) | |
self.evidence_threshold = self.config.get('evidence_threshold', 0.1) | |
self.min_likelihood = self.config.get('min_likelihood', 0.01) | |
async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Apply Bayesian reasoning to analyze probabilities and update beliefs. | |
Args: | |
query: The input query to reason about | |
context: Additional context and parameters | |
Returns: | |
Dict containing reasoning results and confidence scores | |
""" | |
try: | |
# Generate hypotheses | |
hypotheses = await self._generate_hypotheses(query, context) | |
# Calculate priors | |
priors = await self._calculate_priors(hypotheses, context) | |
# Update with evidence | |
posteriors = await self._update_with_evidence( | |
hypotheses, | |
priors, | |
context | |
) | |
# Generate analysis | |
analysis = await self._generate_analysis(posteriors, context) | |
return { | |
'answer': self._format_analysis(analysis), | |
'confidence': self._calculate_confidence(posteriors), | |
'hypotheses': hypotheses, | |
'priors': priors, | |
'posteriors': posteriors, | |
'analysis': analysis | |
} | |
except Exception as e: | |
logging.error(f"Bayesian reasoning failed: {str(e)}") | |
return { | |
'error': f"Bayesian reasoning failed: {str(e)}", | |
'confidence': 0.0 | |
} | |
async def _generate_hypotheses( | |
self, | |
query: str, | |
context: Dict[str, Any] | |
) -> List[Dict[str, Any]]: | |
"""Generate plausible hypotheses.""" | |
hypotheses = [] | |
# Extract key terms for hypothesis generation | |
terms = set(query.lower().split()) | |
# Generate hypotheses based on context and terms | |
if 'options' in context: | |
# Use provided options as hypotheses | |
for option in context['options']: | |
hypotheses.append({ | |
'name': option, | |
'description': f"Hypothesis based on option: {option}", | |
'factors': self._extract_factors(option, terms) | |
}) | |
else: | |
# Generate default hypotheses | |
hypotheses.extend([ | |
{ | |
'name': 'primary', | |
'description': "Primary hypothesis based on direct interpretation", | |
'factors': self._extract_factors(query, terms) | |
}, | |
{ | |
'name': 'alternative', | |
'description': "Alternative hypothesis considering other factors", | |
'factors': self._generate_alternative_factors(terms) | |
} | |
]) | |
return hypotheses | |
async def _calculate_priors( | |
self, | |
hypotheses: List[Dict[str, Any]], | |
context: Dict[str, Any] | |
) -> Dict[str, float]: | |
"""Calculate prior probabilities.""" | |
priors = {} | |
# Get historical data if available | |
history = context.get('history', {}) | |
total_cases = sum(history.values()) if history else len(hypotheses) | |
for hypothesis in hypotheses: | |
name = hypothesis['name'] | |
# Calculate prior from history or use uniform prior | |
if name in history: | |
priors[name] = history[name] / total_cases | |
else: | |
priors[name] = 1.0 / len(hypotheses) | |
# Adjust prior based on factors | |
factor_weight = len(hypothesis['factors']) / 10 # Normalize factor count | |
priors[name] = ( | |
priors[name] * (1 - self.prior_weight) + | |
factor_weight * self.prior_weight | |
) | |
# Normalize priors | |
total_prior = sum(priors.values()) | |
if total_prior > 0: | |
priors = { | |
name: prob / total_prior | |
for name, prob in priors.items() | |
} | |
return priors | |
async def _update_with_evidence( | |
self, | |
hypotheses: List[Dict[str, Any]], | |
priors: Dict[str, float], | |
context: Dict[str, Any] | |
) -> Dict[str, float]: | |
"""Update probabilities with evidence.""" | |
posteriors = priors.copy() | |
# Get evidence from context | |
evidence = context.get('evidence', []) | |
if not evidence: | |
return posteriors | |
for e in evidence: | |
# Calculate likelihood for each hypothesis | |
likelihoods = {} | |
for hypothesis in hypotheses: | |
name = hypothesis['name'] | |
likelihood = self._calculate_likelihood(hypothesis, e) | |
likelihoods[name] = max(likelihood, self.min_likelihood) | |
# Update posteriors using Bayes' rule | |
total_probability = sum( | |
likelihoods[name] * posteriors[name] | |
for name in posteriors | |
) | |
if total_probability > 0: | |
posteriors = { | |
name: (likelihoods[name] * posteriors[name]) / total_probability | |
for name in posteriors | |
} | |
return posteriors | |
def _calculate_likelihood( | |
self, | |
hypothesis: Dict[str, Any], | |
evidence: Dict[str, Any] | |
) -> float: | |
"""Calculate likelihood of evidence given hypothesis.""" | |
# Extract evidence factors | |
evidence_factors = set( | |
str(v).lower() | |
for v in evidence.values() | |
if isinstance(v, (str, int, float)) | |
) | |
# Compare with hypothesis factors | |
common_factors = evidence_factors.intersection(hypothesis['factors']) | |
if not evidence_factors: | |
return 0.5 # Neutral likelihood if no factors | |
return len(common_factors) / len(evidence_factors) | |
async def _generate_analysis( | |
self, | |
posteriors: Dict[str, float], | |
context: Dict[str, Any] | |
) -> Dict[str, Any]: | |
"""Generate probabilistic analysis.""" | |
# Sort hypotheses by posterior probability | |
ranked_hypotheses = sorted( | |
posteriors.items(), | |
key=lambda x: x[1], | |
reverse=True | |
) | |
# Calculate statistics | |
mean = np.mean(list(posteriors.values())) | |
std = np.std(list(posteriors.values())) | |
entropy = -sum( | |
p * np.log2(p) if p > 0 else 0 | |
for p in posteriors.values() | |
) | |
return { | |
'top_hypothesis': ranked_hypotheses[0][0], | |
'probability': ranked_hypotheses[0][1], | |
'alternatives': [ | |
{'name': name, 'probability': prob} | |
for name, prob in ranked_hypotheses[1:] | |
], | |
'statistics': { | |
'mean': mean, | |
'std': std, | |
'entropy': entropy | |
} | |
} | |
def _format_analysis(self, analysis: Dict[str, Any]) -> str: | |
"""Format analysis into readable text.""" | |
sections = [] | |
# Top hypothesis | |
sections.append( | |
f"Most likely hypothesis: {analysis['top_hypothesis']} " | |
f"(probability: {analysis['probability']:.2%})" | |
) | |
# Alternative hypotheses | |
if analysis['alternatives']: | |
sections.append("\nAlternative hypotheses:") | |
for alt in analysis['alternatives']: | |
sections.append( | |
f"- {alt['name']}: {alt['probability']:.2%}" | |
) | |
# Statistics | |
stats = analysis['statistics'] | |
sections.append("\nDistribution statistics:") | |
sections.append(f"- Mean probability: {stats['mean']:.2%}") | |
sections.append(f"- Standard deviation: {stats['std']:.2%}") | |
sections.append(f"- Entropy: {stats['entropy']:.2f} bits") | |
return "\n".join(sections) | |
def _calculate_confidence(self, posteriors: Dict[str, float]) -> float: | |
"""Calculate overall confidence score.""" | |
if not posteriors: | |
return 0.0 | |
# Base confidence | |
confidence = 0.5 | |
# Adjust based on probability distribution | |
probs = list(posteriors.values()) | |
# Strong leading hypothesis increases confidence | |
max_prob = max(probs) | |
if max_prob > 0.8: | |
confidence += 0.3 | |
elif max_prob > 0.6: | |
confidence += 0.2 | |
elif max_prob > 0.4: | |
confidence += 0.1 | |
# Low entropy (clear distinction) increases confidence | |
entropy = -sum(p * np.log2(p) if p > 0 else 0 for p in probs) | |
max_entropy = -np.log2(1/len(probs)) # Maximum possible entropy | |
if entropy < 0.3 * max_entropy: | |
confidence += 0.2 | |
elif entropy < 0.6 * max_entropy: | |
confidence += 0.1 | |
return min(confidence, 1.0) | |
def _extract_factors(self, text: str, terms: Set[str]) -> Set[str]: | |
"""Extract relevant factors from text.""" | |
return set(word.lower() for word in text.split() if word.lower() in terms) | |
def _generate_alternative_factors(self, terms: Set[str]) -> Set[str]: | |
"""Generate factors for alternative hypothesis.""" | |
# Simple approach: use terms not in primary hypothesis | |
return set( | |
word for word in terms | |
if not any( | |
similar in word or word in similar | |
for similar in terms | |
) | |
) | |