File size: 11,079 Bytes
a6ce454
dcb2a99
 
a6ce454
dcb2a99
a6ce454
 
dcb2a99
a6ce454
 
dcb2a99
 
 
a6ce454
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
dcb2a99
 
a6ce454
 
 
 
 
 
 
 
 
 
dcb2a99
 
 
 
a6ce454
dcb2a99
 
 
a6ce454
 
 
 
 
dcb2a99
a6ce454
dcb2a99
 
 
a6ce454
 
 
 
 
 
dcb2a99
a6ce454
dcb2a99
a6ce454
 
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
dcb2a99
a6ce454
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
dcb2a99
 
a6ce454
 
 
 
 
 
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
dcb2a99
a6ce454
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcb2a99
a6ce454
 
 
 
dcb2a99
a6ce454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""Advanced Bayesian reasoning for probabilistic analysis."""

import logging
from typing import Dict, Any, List, Optional, Set, Union, Type, Tuple
import json
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import numpy as np
from collections import defaultdict

from .base import ReasoningStrategy

@dataclass
class BayesianHypothesis:
    """Bayesian hypothesis with probabilities."""
    name: str
    prior: float
    likelihood: float
    posterior: float = 0.0
    evidence: List[Dict[str, Any]] = field(default_factory=list)

class BayesianReasoning(ReasoningStrategy):
    """
    Advanced Bayesian reasoning that:
    1. Generates hypotheses
    2. Calculates prior probabilities
    3. Updates with evidence
    4. Computes posteriors
    5. Provides probabilistic analysis
    """
    
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """Initialize Bayesian reasoning."""
        super().__init__()
        self.config = config or {}
        
        # Configure Bayesian parameters
        self.prior_weight = self.config.get('prior_weight', 0.3)
        self.evidence_threshold = self.config.get('evidence_threshold', 0.1)
        self.min_likelihood = self.config.get('min_likelihood', 0.01)
    
    async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """
        Apply Bayesian reasoning to analyze probabilities and update beliefs.
        
        Args:
            query: The input query to reason about
            context: Additional context and parameters
            
        Returns:
            Dict containing reasoning results and confidence scores
        """
        try:
            # Generate hypotheses
            hypotheses = await self._generate_hypotheses(query, context)
            
            # Calculate priors
            priors = await self._calculate_priors(hypotheses, context)
            
            # Update with evidence
            posteriors = await self._update_with_evidence(
                hypotheses,
                priors,
                context
            )
            
            # Generate analysis
            analysis = await self._generate_analysis(posteriors, context)
            
            return {
                'answer': self._format_analysis(analysis),
                'confidence': self._calculate_confidence(posteriors),
                'hypotheses': hypotheses,
                'priors': priors,
                'posteriors': posteriors,
                'analysis': analysis
            }
            
        except Exception as e:
            logging.error(f"Bayesian reasoning failed: {str(e)}")
            return {
                'error': f"Bayesian reasoning failed: {str(e)}",
                'confidence': 0.0
            }
    
    async def _generate_hypotheses(
        self,
        query: str,
        context: Dict[str, Any]
    ) -> List[Dict[str, Any]]:
        """Generate plausible hypotheses."""
        hypotheses = []
        
        # Extract key terms for hypothesis generation
        terms = set(query.lower().split())
        
        # Generate hypotheses based on context and terms
        if 'options' in context:
            # Use provided options as hypotheses
            for option in context['options']:
                hypotheses.append({
                    'name': option,
                    'description': f"Hypothesis based on option: {option}",
                    'factors': self._extract_factors(option, terms)
                })
        else:
            # Generate default hypotheses
            hypotheses.extend([
                {
                    'name': 'primary',
                    'description': "Primary hypothesis based on direct interpretation",
                    'factors': self._extract_factors(query, terms)
                },
                {
                    'name': 'alternative',
                    'description': "Alternative hypothesis considering other factors",
                    'factors': self._generate_alternative_factors(terms)
                }
            ])
        
        return hypotheses
    
    async def _calculate_priors(
        self,
        hypotheses: List[Dict[str, Any]],
        context: Dict[str, Any]
    ) -> Dict[str, float]:
        """Calculate prior probabilities."""
        priors = {}
        
        # Get historical data if available
        history = context.get('history', {})
        total_cases = sum(history.values()) if history else len(hypotheses)
        
        for hypothesis in hypotheses:
            name = hypothesis['name']
            
            # Calculate prior from history or use uniform prior
            if name in history:
                priors[name] = history[name] / total_cases
            else:
                priors[name] = 1.0 / len(hypotheses)
            
            # Adjust prior based on factors
            factor_weight = len(hypothesis['factors']) / 10  # Normalize factor count
            priors[name] = (
                priors[name] * (1 - self.prior_weight) +
                factor_weight * self.prior_weight
            )
        
        # Normalize priors
        total_prior = sum(priors.values())
        if total_prior > 0:
            priors = {
                name: prob / total_prior
                for name, prob in priors.items()
            }
        
        return priors
    
    async def _update_with_evidence(
        self,
        hypotheses: List[Dict[str, Any]],
        priors: Dict[str, float],
        context: Dict[str, Any]
    ) -> Dict[str, float]:
        """Update probabilities with evidence."""
        posteriors = priors.copy()
        
        # Get evidence from context
        evidence = context.get('evidence', [])
        if not evidence:
            return posteriors
        
        for e in evidence:
            # Calculate likelihood for each hypothesis
            likelihoods = {}
            for hypothesis in hypotheses:
                name = hypothesis['name']
                likelihood = self._calculate_likelihood(hypothesis, e)
                likelihoods[name] = max(likelihood, self.min_likelihood)
            
            # Update posteriors using Bayes' rule
            total_probability = sum(
                likelihoods[name] * posteriors[name]
                for name in posteriors
            )
            
            if total_probability > 0:
                posteriors = {
                    name: (likelihoods[name] * posteriors[name]) / total_probability
                    for name in posteriors
                }
        
        return posteriors
    
    def _calculate_likelihood(
        self,
        hypothesis: Dict[str, Any],
        evidence: Dict[str, Any]
    ) -> float:
        """Calculate likelihood of evidence given hypothesis."""
        # Extract evidence factors
        evidence_factors = set(
            str(v).lower()
            for v in evidence.values()
            if isinstance(v, (str, int, float))
        )
        
        # Compare with hypothesis factors
        common_factors = evidence_factors.intersection(hypothesis['factors'])
        
        if not evidence_factors:
            return 0.5  # Neutral likelihood if no factors
        
        return len(common_factors) / len(evidence_factors)
    
    async def _generate_analysis(
        self,
        posteriors: Dict[str, float],
        context: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Generate probabilistic analysis."""
        # Sort hypotheses by posterior probability
        ranked_hypotheses = sorted(
            posteriors.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        # Calculate statistics
        mean = np.mean(list(posteriors.values()))
        std = np.std(list(posteriors.values()))
        entropy = -sum(
            p * np.log2(p) if p > 0 else 0
            for p in posteriors.values()
        )
        
        return {
            'top_hypothesis': ranked_hypotheses[0][0],
            'probability': ranked_hypotheses[0][1],
            'alternatives': [
                {'name': name, 'probability': prob}
                for name, prob in ranked_hypotheses[1:]
            ],
            'statistics': {
                'mean': mean,
                'std': std,
                'entropy': entropy
            }
        }
    
    def _format_analysis(self, analysis: Dict[str, Any]) -> str:
        """Format analysis into readable text."""
        sections = []
        
        # Top hypothesis
        sections.append(
            f"Most likely hypothesis: {analysis['top_hypothesis']} "
            f"(probability: {analysis['probability']:.2%})"
        )
        
        # Alternative hypotheses
        if analysis['alternatives']:
            sections.append("\nAlternative hypotheses:")
            for alt in analysis['alternatives']:
                sections.append(
                    f"- {alt['name']}: {alt['probability']:.2%}"
                )
        
        # Statistics
        stats = analysis['statistics']
        sections.append("\nDistribution statistics:")
        sections.append(f"- Mean probability: {stats['mean']:.2%}")
        sections.append(f"- Standard deviation: {stats['std']:.2%}")
        sections.append(f"- Entropy: {stats['entropy']:.2f} bits")
        
        return "\n".join(sections)
    
    def _calculate_confidence(self, posteriors: Dict[str, float]) -> float:
        """Calculate overall confidence score."""
        if not posteriors:
            return 0.0
        
        # Base confidence
        confidence = 0.5
        
        # Adjust based on probability distribution
        probs = list(posteriors.values())
        
        # Strong leading hypothesis increases confidence
        max_prob = max(probs)
        if max_prob > 0.8:
            confidence += 0.3
        elif max_prob > 0.6:
            confidence += 0.2
        elif max_prob > 0.4:
            confidence += 0.1
        
        # Low entropy (clear distinction) increases confidence
        entropy = -sum(p * np.log2(p) if p > 0 else 0 for p in probs)
        max_entropy = -np.log2(1/len(probs))  # Maximum possible entropy
        
        if entropy < 0.3 * max_entropy:
            confidence += 0.2
        elif entropy < 0.6 * max_entropy:
            confidence += 0.1
        
        return min(confidence, 1.0)
    
    def _extract_factors(self, text: str, terms: Set[str]) -> Set[str]:
        """Extract relevant factors from text."""
        return set(word.lower() for word in text.split() if word.lower() in terms)
    
    def _generate_alternative_factors(self, terms: Set[str]) -> Set[str]:
        """Generate factors for alternative hypothesis."""
        # Simple approach: use terms not in primary hypothesis
        return set(
            word for word in terms
            if not any(
                similar in word or word in similar
                for similar in terms
            )
        )