"""Enhanced learning mechanisms for reasoning strategies.""" import logging from typing import Dict, Any, List, Optional, Set, Union, Type, Tuple import json from dataclasses import dataclass, field from enum import Enum from datetime import datetime import numpy as np from collections import defaultdict @dataclass class LearningEvent: """Event for strategy learning.""" strategy_type: str event_type: str data: Dict[str, Any] outcome: Optional[float] timestamp: datetime = field(default_factory=datetime.now) class LearningMode(Enum): """Types of learning modes.""" SUPERVISED = "supervised" REINFORCEMENT = "reinforcement" ACTIVE = "active" TRANSFER = "transfer" META = "meta" ENSEMBLE = "ensemble" @dataclass class LearningState: """State for learning process.""" mode: LearningMode parameters: Dict[str, Any] history: List[LearningEvent] metrics: Dict[str, float] metadata: Dict[str, Any] = field(default_factory=dict) class EnhancedLearningManager: """ Advanced learning manager that: 1. Implements multiple learning modes 2. Tracks learning progress 3. Adapts learning parameters 4. Optimizes strategy performance 5. Transfers knowledge between strategies """ def __init__(self, learning_rate: float = 0.1, exploration_rate: float = 0.2, memory_size: int = 1000): self.learning_rate = learning_rate self.exploration_rate = exploration_rate self.memory_size = memory_size # Learning states self.states: Dict[str, LearningState] = {} # Performance tracking self.performance_history: List[Dict[str, Any]] = [] self.strategy_metrics: Dict[str, List[float]] = defaultdict(list) # Knowledge transfer self.knowledge_base: Dict[str, Any] = {} self.transfer_history: List[Dict[str, Any]] = [] async def learn(self, strategy_type: str, event: LearningEvent, context: Dict[str, Any]) -> Dict[str, Any]: """Learn from strategy execution event.""" try: # Initialize or get learning state state = self._get_learning_state(strategy_type) # Select learning mode mode = await self._select_learning_mode(event, state, context) # Execute learning if mode == LearningMode.SUPERVISED: result = await self._supervised_learning(event, state, context) elif mode == LearningMode.REINFORCEMENT: result = await self._reinforcement_learning(event, state, context) elif mode == LearningMode.ACTIVE: result = await self._active_learning(event, state, context) elif mode == LearningMode.TRANSFER: result = await self._transfer_learning(event, state, context) elif mode == LearningMode.META: result = await self._meta_learning(event, state, context) elif mode == LearningMode.ENSEMBLE: result = await self._ensemble_learning(event, state, context) else: raise ValueError(f"Unsupported learning mode: {mode}") # Update state self._update_learning_state(state, result) # Record performance self._record_performance(strategy_type, result) return result except Exception as e: logging.error(f"Error in learning: {str(e)}") return { "success": False, "error": str(e), "mode": mode.value if 'mode' in locals() else None } async def _supervised_learning(self, event: LearningEvent, state: LearningState, context: Dict[str, Any]) -> Dict[str, Any]: """Implement supervised learning.""" # Extract features and labels features = await self._extract_features(event.data, context) labels = event.outcome if event.outcome is not None else 0.0 # Train model model_update = await self._update_model(features, labels, state, context) # Validate performance validation = await self._validate_model(model_update, state, context) return { "success": True, "mode": LearningMode.SUPERVISED.value, "model_update": model_update, "validation": validation, "metrics": { "accuracy": validation.get("accuracy", 0.0), "loss": validation.get("loss", 0.0) } } async def _reinforcement_learning(self, event: LearningEvent, state: LearningState, context: Dict[str, Any]) -> Dict[str, Any]: """Implement reinforcement learning.""" # Extract state and action current_state = await self._extract_state(event.data, context) action = event.data.get("action") reward = event.outcome if event.outcome is not None else 0.0 # Update policy policy_update = await self._update_policy( current_state, action, reward, state, context) # Optimize value function value_update = await self._update_value_function( current_state, reward, state, context) return { "success": True, "mode": LearningMode.REINFORCEMENT.value, "policy_update": policy_update, "value_update": value_update, "metrics": { "reward": reward, "value_error": value_update.get("error", 0.0) } } async def _active_learning(self, event: LearningEvent, state: LearningState, context: Dict[str, Any]) -> Dict[str, Any]: """Implement active learning.""" # Query selection query = await self._select_query(event.data, state, context) # Get feedback feedback = await self._get_feedback(query, context) # Update model model_update = await self._update_model_active( query, feedback, state, context) return { "success": True, "mode": LearningMode.ACTIVE.value, "query": query, "feedback": feedback, "model_update": model_update, "metrics": { "uncertainty": query.get("uncertainty", 0.0), "feedback_quality": feedback.get("quality", 0.0) } } async def _transfer_learning(self, event: LearningEvent, state: LearningState, context: Dict[str, Any]) -> Dict[str, Any]: """Implement transfer learning.""" # Source task selection source_task = await self._select_source_task(event.data, state, context) # Knowledge extraction knowledge = await self._extract_knowledge(source_task, context) # Transfer adaptation adaptation = await self._adapt_knowledge( knowledge, event.data, state, context) # Apply transfer transfer = await self._apply_transfer(adaptation, state, context) return { "success": True, "mode": LearningMode.TRANSFER.value, "source_task": source_task, "knowledge": knowledge, "adaptation": adaptation, "transfer": transfer, "metrics": { "transfer_efficiency": transfer.get("efficiency", 0.0), "adaptation_quality": adaptation.get("quality", 0.0) } } async def _meta_learning(self, event: LearningEvent, state: LearningState, context: Dict[str, Any]) -> Dict[str, Any]: """Implement meta-learning.""" # Task characterization task_char = await self._characterize_task(event.data, context) # Strategy selection strategy = await self._select_strategy(task_char, state, context) # Parameter optimization optimization = await self._optimize_parameters( strategy, task_char, state, context) # Apply meta-learning meta_update = await self._apply_meta_learning( optimization, state, context) return { "success": True, "mode": LearningMode.META.value, "task_characterization": task_char, "strategy": strategy, "optimization": optimization, "meta_update": meta_update, "metrics": { "strategy_fit": strategy.get("fit_score", 0.0), "optimization_improvement": optimization.get("improvement", 0.0) } } async def _ensemble_learning(self, event: LearningEvent, state: LearningState, context: Dict[str, Any]) -> Dict[str, Any]: """Implement ensemble learning.""" # Member selection members = await self._select_members(event.data, state, context) # Weight optimization weights = await self._optimize_weights(members, state, context) # Combine predictions combination = await self._combine_predictions( members, weights, event.data, context) return { "success": True, "mode": LearningMode.ENSEMBLE.value, "members": members, "weights": weights, "combination": combination, "metrics": { "ensemble_diversity": weights.get("diversity", 0.0), "combination_strength": combination.get("strength", 0.0) } } def _get_learning_state(self, strategy_type: str) -> LearningState: """Get or initialize learning state for strategy.""" if strategy_type not in self.states: self.states[strategy_type] = LearningState( mode=LearningMode.SUPERVISED, parameters={ "learning_rate": self.learning_rate, "exploration_rate": self.exploration_rate }, history=[], metrics={} ) return self.states[strategy_type] def _update_learning_state(self, state: LearningState, result: Dict[str, Any]): """Update learning state with result.""" # Update history state.history.append(LearningEvent( strategy_type=result.get("strategy_type", "unknown"), event_type="learning_update", data=result, outcome=result.get("metrics", {}).get("accuracy", 0.0), timestamp=datetime.now() )) # Update metrics for metric, value in result.get("metrics", {}).items(): if metric in state.metrics: state.metrics[metric] = ( 0.9 * state.metrics[metric] + 0.1 * value # Exponential moving average ) else: state.metrics[metric] = value # Adapt parameters self._adapt_parameters(state, result) def _record_performance(self, strategy_type: str, result: Dict[str, Any]): """Record learning performance.""" self.performance_history.append({ "timestamp": datetime.now().isoformat(), "strategy_type": strategy_type, "mode": result.get("mode"), "metrics": result.get("metrics", {}), "success": result.get("success", False) }) # Update strategy metrics for metric, value in result.get("metrics", {}).items(): self.strategy_metrics[f"{strategy_type}_{metric}"].append(value) # Maintain memory size if len(self.performance_history) > self.memory_size: self.performance_history = self.performance_history[-self.memory_size:] def _adapt_parameters(self, state: LearningState, result: Dict[str, Any]): """Adapt learning parameters based on performance.""" # Adapt learning rate if "accuracy" in result.get("metrics", {}): accuracy = result["metrics"]["accuracy"] if accuracy > 0.8: state.parameters["learning_rate"] *= 0.95 # Decrease if performing well elif accuracy < 0.6: state.parameters["learning_rate"] *= 1.05 # Increase if performing poorly # Adapt exploration rate if "reward" in result.get("metrics", {}): reward = result["metrics"]["reward"] if reward > 0: state.parameters["exploration_rate"] *= 0.95 # Decrease if getting rewards else: state.parameters["exploration_rate"] *= 1.05 # Increase if not getting rewards # Clip parameters to reasonable ranges state.parameters["learning_rate"] = np.clip( state.parameters["learning_rate"], 0.001, 0.5) state.parameters["exploration_rate"] = np.clip( state.parameters["exploration_rate"], 0.01, 0.5) def get_performance_metrics(self) -> Dict[str, Any]: """Get comprehensive performance metrics.""" return { "learning_states": { strategy_type: { "mode": state.mode.value, "parameters": state.parameters, "metrics": state.metrics } for strategy_type, state in self.states.items() }, "strategy_performance": { metric: { "mean": np.mean(values) if values else 0.0, "std": np.std(values) if values else 0.0, "min": min(values) if values else 0.0, "max": max(values) if values else 0.0 } for metric, values in self.strategy_metrics.items() }, "transfer_metrics": { "total_transfers": len(self.transfer_history), "success_rate": sum(1 for t in self.transfer_history if t.get("success", False)) / len(self.transfer_history) if self.transfer_history else 0 } } def clear_history(self): """Clear learning history and reset states.""" self.states.clear() self.performance_history.clear() self.strategy_metrics.clear() self.transfer_history.clear()