Spaces:
Sleeping
Sleeping
| from typing import Dict, List, Optional | |
| import logging | |
| from datetime import datetime, timedelta | |
| from .model_manager import ModelManager | |
| from .config import Config | |
| import gc | |
| logger = logging.getLogger(__name__) | |
| class CodeReview: | |
| def __init__(self, code: str, language: str, review_id: str): | |
| self.code = code | |
| self.language = language | |
| self.review_id = review_id | |
| self.timestamp = datetime.now() | |
| self.suggestions: List[Dict] = [] | |
| self.metrics: Dict = {} | |
| class CodeReviewer: | |
| def __init__(self, model_manager: ModelManager): | |
| self.model_manager = model_manager | |
| self.review_history: List[CodeReview] = [] | |
| self._last_cleanup = datetime.now() | |
| def _create_review_prompt(self, code: str, language: str) -> str: | |
| """Create a structured prompt for code review.""" | |
| # More concise prompt to reduce token usage | |
| return f"""Review this {language} code. List specific points in these sections: | |
| Issues: | |
| Improvements: | |
| Best Practices: | |
| Security: | |
| Code: | |
| ```{language} | |
| {code} | |
| ```""" | |
| def review_code(self, code: str, language: str, review_id: str) -> CodeReview: | |
| """Perform code review using the LLM.""" | |
| try: | |
| start_time = datetime.now() | |
| # Clean up old reviews periodically | |
| self._cleanup_old_reviews() | |
| # Create review instance | |
| review = CodeReview(code, language, review_id) | |
| # Truncate code if too long | |
| max_code_length = Config.MAX_INPUT_LENGTH - 200 # Reserve tokens for prompt | |
| if len(code) > max_code_length: | |
| code = code[:max_code_length] + "\n# ... (code truncated for length)" | |
| # Generate review prompt | |
| prompt = self._create_review_prompt(code, language) | |
| # Get model response | |
| response = self.model_manager.generate_text( | |
| prompt, | |
| max_new_tokens=Config.MAX_OUTPUT_LENGTH | |
| ) | |
| # Parse and structure the response | |
| sections = self._parse_review_response(response) | |
| # Store suggestions | |
| review.suggestions = sections | |
| # Calculate metrics | |
| end_time = datetime.now() | |
| review.metrics = { | |
| 'response_time': (end_time - start_time).total_seconds(), | |
| 'code_length': len(code), | |
| 'suggestion_count': sum(len(section['items']) for section in sections) | |
| } | |
| # Store review in history | |
| self._add_to_history(review) | |
| # Force garbage collection | |
| gc.collect() | |
| return review | |
| except Exception as e: | |
| logger.error(f"Error during code review: {str(e)}") | |
| raise | |
| def _parse_review_response(self, response: str) -> List[Dict]: | |
| """Parse the LLM response into structured sections.""" | |
| sections = [] | |
| current_section = None | |
| required_sections = ['Issues', 'Improvements', 'Best Practices', 'Security'] | |
| try: | |
| # Split response into lines and process each line | |
| lines = response.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check for section headers | |
| for section in required_sections: | |
| if line.lower().startswith(section.lower()): | |
| current_section = { | |
| 'type': section, | |
| 'items': [] | |
| } | |
| sections.append(current_section) | |
| break | |
| # Add items to current section if not a section header | |
| if current_section and line.strip('-* ') and not any( | |
| line.lower().startswith(s.lower()) for s in required_sections | |
| ): | |
| item = line.strip('-* ') | |
| if item and not any(item == existing for existing in current_section['items']): | |
| current_section['items'].append(item) | |
| except Exception as e: | |
| logger.error(f"Error parsing response: {str(e)}") | |
| # Ensure all required sections exist | |
| result = [] | |
| for section_type in required_sections: | |
| found_section = next((s for s in sections if s['type'] == section_type), None) | |
| if found_section: | |
| result.append(found_section) | |
| else: | |
| result.append({ | |
| 'type': section_type, | |
| 'items': [] | |
| }) | |
| return result | |
| def _add_to_history(self, review: CodeReview): | |
| """Add review to history and maintain size limit.""" | |
| self.review_history.append(review) | |
| while len(self.review_history) > Config.MAX_HISTORY_ITEMS: | |
| self.review_history.pop(0) | |
| def _cleanup_old_reviews(self): | |
| """Clean up reviews older than retention period.""" | |
| if (datetime.now() - self._last_cleanup) > timedelta(hours=1): | |
| cutoff_date = datetime.now() - timedelta(days=Config.HISTORY_RETENTION_DAYS) | |
| self.review_history = [r for r in self.review_history if r.timestamp > cutoff_date] | |
| self._last_cleanup = datetime.now() | |
| gc.collect() | |
| def get_review_metrics(self) -> Dict: | |
| """Calculate aggregate metrics from review history.""" | |
| if not self.review_history: | |
| return { | |
| 'total_reviews': 0, | |
| 'avg_response_time': 0.0, | |
| 'avg_suggestions': 0.0, | |
| 'reviews_today': 0 | |
| } | |
| total_reviews = len(self.review_history) | |
| avg_response_time = sum(r.metrics['response_time'] for r in self.review_history) / total_reviews | |
| avg_suggestions = sum(r.metrics['suggestion_count'] for r in self.review_history) / total_reviews | |
| return { | |
| 'total_reviews': total_reviews, | |
| 'avg_response_time': avg_response_time, | |
| 'avg_suggestions': avg_suggestions, | |
| 'reviews_today': sum(1 for r in self.review_history if r.timestamp.date() == datetime.now().date()) | |
| } | |
| def get_review_history(self, limit: Optional[int] = None) -> List[CodeReview]: | |
| """Get review history with optional limit.""" | |
| if limit: | |
| return self.review_history[-limit:] | |
| return self.review_history.copy() # Return copy to prevent external modifications | |