"""Specialized reasoning strategies for Agentic Workflow.""" import logging from typing import Dict, Any, List, Optional, Set, Union, Tuple import json from dataclasses import dataclass, field from enum import Enum from datetime import datetime import asyncio from collections import defaultdict from .base import ReasoningStrategy class TaskType(Enum): """Types of tasks in agentic workflow.""" CODE_GENERATION = "code_generation" CODE_MODIFICATION = "code_modification" CODE_REVIEW = "code_review" DEBUGGING = "debugging" ARCHITECTURE = "architecture" OPTIMIZATION = "optimization" DOCUMENTATION = "documentation" TESTING = "testing" class ResourceType(Enum): """Types of resources in agentic workflow.""" CODE_CONTEXT = "code_context" SYSTEM_CONTEXT = "system_context" USER_CONTEXT = "user_context" TOOLS = "tools" APIS = "apis" DOCUMENTATION = "documentation" DEPENDENCIES = "dependencies" HISTORY = "history" @dataclass class TaskComponent: """Component of a decomposed task.""" id: str type: TaskType description: str dependencies: List[str] resources: Dict[ResourceType, Any] constraints: List[str] priority: float metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ResourceAllocation: """Resource allocation for a task.""" resource_type: ResourceType quantity: Union[int, float] priority: float constraints: List[str] metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ExecutionStep: """Step in task execution.""" id: str task_id: str action: str resources: Dict[ResourceType, Any] status: str result: Optional[Dict[str, Any]] feedback: List[str] timestamp: datetime = field(default_factory=datetime.now) class TaskDecompositionStrategy(ReasoningStrategy): """ Advanced task decomposition strategy that: 1. Analyzes task complexity and dependencies 2. Breaks down tasks into manageable components 3. Identifies resource requirements 4. Establishes execution order 5. Manages constraints and priorities """ def __init__(self, max_components: int = 10): self.max_components = max_components self.components: Dict[str, TaskComponent] = {} async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: """Decompose task into components.""" try: # Analyze task task_analysis = await self._analyze_task(query, context) # Generate components components = await self._generate_components(task_analysis, context) # Establish dependencies dependency_graph = await self._establish_dependencies(components, context) # Determine execution order execution_order = await self._determine_execution_order( components, dependency_graph, context) return { "success": True, "components": [self._component_to_dict(c) for c in components], "dependency_graph": dependency_graph, "execution_order": execution_order, "metadata": { "total_components": len(components), "complexity_score": task_analysis.get("complexity_score", 0.0), "resource_requirements": task_analysis.get("resource_requirements", {}) } } except Exception as e: logging.error(f"Error in task decomposition: {str(e)}") return {"success": False, "error": str(e)} class ResourceManagementStrategy(ReasoningStrategy): """ Advanced resource management strategy that: 1. Tracks available resources 2. Allocates resources to tasks 3. Handles resource constraints 4. Optimizes resource utilization 5. Manages resource dependencies """ def __init__(self): self.allocations: Dict[str, ResourceAllocation] = {} self.utilization_history: List[Dict[str, Any]] = [] async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: """Manage resource allocation.""" try: # Analyze resource requirements requirements = await self._analyze_requirements(query, context) # Check resource availability availability = await self._check_availability(requirements, context) # Generate allocation plan allocation_plan = await self._generate_allocation_plan( requirements, availability, context) # Optimize allocations optimized_plan = await self._optimize_allocations(allocation_plan, context) return { "success": True, "allocation_plan": optimized_plan, "resource_metrics": { "utilization": self._calculate_utilization(), "efficiency": self._calculate_efficiency(), "constraints_satisfied": self._check_constraints(optimized_plan) } } except Exception as e: logging.error(f"Error in resource management: {str(e)}") return {"success": False, "error": str(e)} class ContextualPlanningStrategy(ReasoningStrategy): """ Advanced contextual planning strategy that: 1. Analyzes multiple context types 2. Generates context-aware plans 3. Handles context changes 4. Maintains context consistency 5. Optimizes for context constraints """ def __init__(self): self.context_history: List[Dict[str, Any]] = [] self.plan_adaptations: List[Dict[str, Any]] = [] async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: """Generate context-aware plan.""" try: # Analyze contexts context_analysis = await self._analyze_contexts(query, context) # Generate base plan base_plan = await self._generate_base_plan(context_analysis, context) # Adapt to contexts adapted_plan = await self._adapt_to_contexts(base_plan, context_analysis) # Validate plan validation = await self._validate_plan(adapted_plan, context) return { "success": True, "plan": adapted_plan, "context_impact": context_analysis.get("impact_assessment", {}), "adaptations": self.plan_adaptations, "validation_results": validation } except Exception as e: logging.error(f"Error in contextual planning: {str(e)}") return {"success": False, "error": str(e)} class AdaptiveExecutionStrategy(ReasoningStrategy): """ Advanced adaptive execution strategy that: 1. Monitors execution progress 2. Adapts to changes and feedback 3. Handles errors and exceptions 4. Optimizes execution flow 5. Maintains execution state """ def __init__(self): self.execution_steps: List[ExecutionStep] = [] self.adaptation_history: List[Dict[str, Any]] = [] async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: """Execute task adaptively.""" try: # Initialize execution execution_state = await self._initialize_execution(query, context) # Monitor and adapt while not self._is_execution_complete(execution_state): # Execute step step_result = await self._execute_step(execution_state, context) # Process feedback feedback = await self._process_feedback(step_result, context) # Adapt execution execution_state = await self._adapt_execution( execution_state, feedback, context) # Record step self._record_step(step_result, feedback) return { "success": True, "execution_trace": [self._step_to_dict(s) for s in self.execution_steps], "adaptations": self.adaptation_history, "final_state": execution_state } except Exception as e: logging.error(f"Error in adaptive execution: {str(e)}") return {"success": False, "error": str(e)} class FeedbackIntegrationStrategy(ReasoningStrategy): """ Advanced feedback integration strategy that: 1. Collects multiple types of feedback 2. Analyzes feedback patterns 3. Generates improvement suggestions 4. Tracks feedback implementation 5. Measures feedback impact """ def __init__(self): self.feedback_history: List[Dict[str, Any]] = [] self.improvement_history: List[Dict[str, Any]] = [] async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: """Integrate and apply feedback.""" try: # Collect feedback feedback = await self._collect_feedback(query, context) # Analyze patterns patterns = await self._analyze_patterns(feedback, context) # Generate improvements improvements = await self._generate_improvements(patterns, context) # Implement changes implementation = await self._implement_improvements(improvements, context) # Measure impact impact = await self._measure_impact(implementation, context) return { "success": True, "feedback_analysis": patterns, "improvements": improvements, "implementation_status": implementation, "impact_metrics": impact } except Exception as e: logging.error(f"Error in feedback integration: {str(e)}") return {"success": False, "error": str(e)} async def _collect_feedback(self, query: str, context: Dict[str, Any]) -> List[Dict[str, Any]]: """Collect feedback from multiple sources.""" prompt = f""" Collect feedback from: Query: {query} Context: {json.dumps(context)} Consider: 1. User feedback 2. System metrics 3. Code analysis 4. Performance data 5. Error patterns Format as: [Feedback] Source: ... Type: ... Content: ... Priority: ... """ response = await context["groq_api"].predict(prompt) return self._parse_feedback(response["answer"]) def _parse_feedback(self, response: str) -> List[Dict[str, Any]]: """Parse feedback from response.""" feedback_items = [] current = None for line in response.split('\n'): line = line.strip() if not line: continue if line.startswith('[Feedback]'): if current: feedback_items.append(current) current = { "source": "", "type": "", "content": "", "priority": 0.0 } elif current: if line.startswith('Source:'): current["source"] = line[7:].strip() elif line.startswith('Type:'): current["type"] = line[5:].strip() elif line.startswith('Content:'): current["content"] = line[8:].strip() elif line.startswith('Priority:'): try: current["priority"] = float(line[9:].strip()) except: pass if current: feedback_items.append(current) return feedback_items