agenticAi / agents /code_analysis_agent.py
Cline
Initial commit
0af0a55
import ast
from typing import Dict, Any, List
from loguru import logger
import os
import re
class CodeAnalysisAgent:
def __init__(self):
"""Initialize the Code Analysis Agent."""
logger.info("Initializing CodeAnalysisAgent")
self.capabilities = [
"code_analysis",
"static_analysis",
"repository_analysis",
"code_summarization",
"vulnerability_detection",
"debugging",
"bug_detection",
"code_smell_detection"
]
self.setup_logger()
async def find_bugs(self, file_path: str) -> List[str]:
"""Find potential bugs in a code file using static analysis."""
logger.info(f"Finding bugs in file {file_path}")
bugs = []
try:
with open(file_path, "r") as f:
source_code = f.read()
tree = ast.parse(source_code)
# Example: Find unclosed files
for node in ast.walk(tree):
if isinstance(node, ast.Call) and isinstance(
node.func, ast.Name) and node.func.id == 'open':
if not any(isinstance(parent, ast.With)
for parent in ast.walk(node)):
bugs.append(
f"Line {node.lineno}: Potential unclosed file")
# Add more bug detection patterns here
except Exception as e:
logger.error(f"Error finding bugs in file {file_path}: {str(e)}")
return bugs
def setup_logger(self):
"""Configure logging for the agent."""
logger.add("logs/code_analysis_agent.log", rotation="500 MB")
async def analyze_repository(self, repo_path: str) -> Dict[str, Any]:
"""Analyze a code repository."""
logger.info(f"Analyzing repository at {repo_path}")
try:
code_files = self.collect_code_files(repo_path)
analysis_results = []
for file_path in code_files:
analysis_results.append(await self.analyze_file(file_path))
summary = self.generate_repository_summary(analysis_results)
logger.info(f"Finished analyzing repository at {repo_path}")
return {
"status": "success",
"results": analysis_results,
"summary": summary
}
except Exception as e:
logger.error(f"Error analyzing repository {repo_path}: {str(e)}")
return {
"status": "error",
"message": str(e)
}
def collect_code_files(self, repo_path: str) -> List[str]:
"""Collect all code files in a repository."""
code_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith(
".py"): # Currently only supports Python files
code_files.append(os.path.join(root, file))
return code_files
async def analyze_file(self, file_path: str) -> Dict[str, Any]:
"""Analyze a single code file."""
logger.info(f"Analyzing file {file_path}")
try:
with open(file_path, "r") as f:
source_code = f.read()
# Basic static analysis using AST
tree = ast.parse(source_code)
# Extract functions and classes
functions = [node.name for node in ast.walk(
tree) if isinstance(node, ast.FunctionDef)]
classes = [node.name for node in ast.walk(
tree) if isinstance(node, ast.ClassDef)]
# Basic complexity analysis (Cyclomatic Complexity)
complexity = self.calculate_complexity(tree)
# Identify potential vulnerabilities (basic example)
vulnerabilities = self.detect_vulnerabilities(source_code)
return {
"file_path": file_path,
"functions": functions,
"classes": classes,
"complexity": complexity,
"vulnerabilities": vulnerabilities
}
except Exception as e:
logger.error(f"Error analyzing file {file_path}: {str(e)}")
return {
"file_path": file_path,
"error": str(e)
}
def calculate_complexity(self, tree: ast.AST) -> int:
"""Calculate the Cyclomatic Complexity of a code snippet."""
complexity = 1
for node in ast.walk(tree):
if isinstance(node, (ast.If, ast.While, ast.For,
ast.AsyncFor, ast.With, ast.AsyncWith, ast.Try)):
complexity += 1
return complexity
def detect_vulnerabilities(self, source_code: str) -> List[str]:
"""Detect potential vulnerabilities in the code (basic example)."""
vulnerabilities = []
# Detect SQL injection patterns
if re.search(r"SELECT \* FROM .* WHERE .*", source_code):
vulnerabilities.append("Potential SQL injection vulnerability")
# Detect hardcoded credentials
if re.search(r"password\s*=\s*['\"].*['\"]",
source_code, re.IGNORECASE):
vulnerabilities.append("Potential hardcoded credentials")
return vulnerabilities
def detect_code_smells(self, tree: ast.AST) -> List[str]:
"""Detect potential code smells in the code."""
code_smells = []
# Example: Long function
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and len(node.body) > 50:
code_smells.append(f"Line {node.lineno}: Long function")
# Example: Large class
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and len(node.body) > 100:
code_smells.append(f"Line {node.lineno}: Large class")
return code_smells
def generate_repository_summary(
self, analysis_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate a summary of the repository analysis."""
total_files = len(analysis_results)
total_functions = sum(len(result.get("functions", []))
for result in analysis_results)
total_classes = sum(len(result.get("classes", []))
for result in analysis_results)
average_complexity = sum(
result.get(
"complexity",
0) for result in analysis_results) / total_files if total_files > 0 else 0
total_vulnerabilities = sum(
len(result.get("vulnerabilities", [])) for result in analysis_results)
return {
"total_files": total_files,
"total_functions": total_functions,
"total_classes": total_classes,
"average_complexity": average_complexity,
"total_vulnerabilities": total_vulnerabilities
}