import os import json import uuid import traceback import logging from pathlib import Path import numpy as np import pandas as pd from typing import Dict, Any # Import the base intelligent agent. # Adjust this import as necessary if you move IntelligentAgent to a separate module. logger = logging.getLogger(__name__) from app.py import IntelligentAgent class FileManagementAgent(IntelligentAgent): def __init__(self, hub): super().__init__("file_management", hub) def process_task(self, task: str) -> Dict[str, Any]: logger.info(f"FileManagementAgent processing: {task}") task_lower = task.lower() if any(word in task_lower for word in ["create", "make", "generate", "write"]): operation = "create" elif any(word in task_lower for word in ["read", "open", "show", "display", "content"]): operation = "read" elif any(word in task_lower for word in ["list", "find", "directory", "folder", "files in"]): operation = "list" elif any(word in task_lower for word in ["delete", "remove"]): operation = "delete" else: operation = "unknown" filename = None file_extensions = ['.txt', '.json', '.csv', '.md', '.py', '.html', '.js', '.css'] words = task.split() for word in words: for ext in file_extensions: if ext in word.lower(): filename = word.strip(':"\'.,;') break if filename: break if not filename: file_keywords = ["file", "named", "called", "filename"] for i, word in enumerate(words): if word.lower() in file_keywords and i < len(words) - 1: potential_name = words[i+1].strip(':"\'.,;') if '.' not in potential_name: if "json" in task_lower: potential_name += ".json" elif "csv" in task_lower: potential_name += ".csv" elif "python" in task_lower or "py" in task_lower: potential_name += ".py" else: potential_name += ".txt" filename = potential_name break if not filename: if "json" in task_lower: filename = f"data_{uuid.uuid4().hex[:6]}.json" elif "csv" in task_lower: filename = f"data_{uuid.uuid4().hex[:6]}.csv" elif "python" in task_lower or "py" in task_lower: filename = f"script_{uuid.uuid4().hex[:6]}.py" elif "log" in task_lower: filename = f"log_{uuid.uuid4().hex[:6]}.txt" else: filename = f"file_{uuid.uuid4().hex[:6]}.txt" result = {} if operation == "create": if filename.endswith('.json'): content = json.dumps({ "name": "Sample Data", "description": task, "created": pd.Timestamp.now().isoformat(), "values": [1, 2, 3, 4, 5], "metadata": {"source": "FileManagementAgent", "version": "1.0"} }, indent=2) elif filename.endswith('.csv'): content = "id,name,value,timestamp\n" for i in range(5): content += f"{i+1},Item{i+1},{np.random.randint(1, 100)},{pd.Timestamp.now().isoformat()}\n" elif filename.endswith('.py'): content = f"""# Generated Python Script: {filename} # Created: {pd.Timestamp.now().isoformat()} # Description: {task} def main(): print("Hello from the FileManagementAgent!") data = [1, 2, 3, 4, 5] result = sum(data) print(f"Sample calculation: sum(data) = {{result}}") return result if __name__ == "__main__": main() """ else: content = f"File created by FileManagementAgent\nCreated: {pd.Timestamp.now().isoformat()}\nBased on request: {task}\n\nThis is sample content." try: with open(filename, 'w', encoding='utf-8') as f: f.write(content) result = { "text": f"Successfully created file: {filename}", "operation": "create", "filename": filename, "size": len(content), "preview": content[:200] + "..." if len(content) > 200 else content } self.memory.add_short_term({"operation": "create", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) self.memory.add_long_term(f"file:{filename}", {"operation": "create", "type": Path(filename).suffix, "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error creating file {filename}: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} elif operation == "read": if not filename: result = {"text": "Please specify a filename to read."} elif not Path(filename).exists(): result = {"text": f"File '{filename}' not found."} else: try: with open(filename, 'r', encoding='utf-8') as f: content = f.read() result = {"text": f"Content of {filename}:\n\n{content}", "operation": "read", "filename": filename, "content": content, "size": len(content)} self.memory.add_short_term({"operation": "read", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error reading file {filename}: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} elif operation == "list": try: directory = "." for term in ["directory", "folder", "in"]: if term in task_lower: parts = task_lower.split(term) if len(parts) > 1: potential_dir = parts[1].strip().split()[0].strip(':"\'.,;') if Path(potential_dir).exists() and Path(potential_dir).is_dir(): directory = potential_dir extension_filter = None for ext in file_extensions: if ext in task_lower: extension_filter = ext break files = list(Path(directory).glob('*' + (extension_filter or ''))) file_groups = {} for file in files: file_groups.setdefault(file.suffix, []).append({ "name": file.name, "size": file.stat().st_size, "modified": pd.Timestamp(file.stat().st_mtime, unit='s').isoformat() }) response_text = f"Found {len(files)} files" + (f" with extension {extension_filter}" if extension_filter else "") + f" in {directory}:\n\n" for ext, group in file_groups.items(): response_text += f"{ext} files ({len(group)}):\n" for file_info in sorted(group, key=lambda x: x["name"]): size_kb = file_info["size"] / 1024 response_text += f"- {file_info['name']} ({size_kb:.1f} KB, modified: {file_info['modified']})\n" response_text += "\n" result = {"text": response_text, "operation": "list", "directory": directory, "file_count": len(files), "files": file_groups} self.memory.add_short_term({"operation": "list", "directory": directory, "file_count": len(files), "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error listing files: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} elif operation == "delete": if not filename: result = {"text": "Please specify a filename to delete."} elif not Path(filename).exists(): result = {"text": f"File '{filename}' not found."} else: try: os.remove(filename) result = {"text": f"Successfully deleted file: {filename}", "operation": "delete", "filename": filename} self.memory.add_short_term({"operation": "delete", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()}) self.memory.add_long_term(f"file:{filename}", {"operation": "delete", "timestamp": pd.Timestamp.now().isoformat()}) except Exception as e: error_msg = f"Error deleting file {filename}: {str(e)}" logger.error(error_msg) result = {"text": error_msg, "error": str(e)} else: result = {"text": f"Unknown operation requested in task: {task}"} return result