Spaces:
Runtime error
Runtime error
import os | |
import json | |
import logging | |
import asyncio | |
import time | |
from datetime import datetime | |
from enum import Enum | |
from typing import Dict, List, Optional, Set, Union, Any | |
from dataclasses import dataclass, field | |
from pathlib import Path | |
import hashlib | |
import tempfile | |
import shutil | |
import gradio as gr | |
import networkx as nx | |
from langchain.prompts import PromptTemplate, MessagesPlaceholder | |
from langchain.memory import ConversationBufferMemory | |
from langchain.agents import Tool, AgentType | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain_community.agents import initialize_agent | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
import subprocess | |
import asyncio | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Load the LLM and tokenizer | |
def load_model(): | |
"""Load the model and tokenizer.""" | |
try: | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
model_name = "gpt2" # Using a smaller model for testing | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForCausalLM.from_pretrained(model_name) | |
return tokenizer, model | |
except Exception as e: | |
logger.error(f"Failed to load model: {str(e)}") | |
raise | |
# Initialize models lazily | |
tokenizer = None | |
model = None | |
hf_pipeline = None | |
llm = None | |
def get_llm(): | |
"""Get or initialize the language model.""" | |
global llm, tokenizer, model, hf_pipeline | |
try: | |
if llm is None: | |
tokenizer, model = load_model() | |
hf_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_new_tokens=500, | |
pad_token_id=tokenizer.eos_token_id, | |
temperature=0.7, | |
return_full_text=False, | |
) | |
llm = HuggingFacePipeline( | |
pipeline=hf_pipeline, | |
model_kwargs={"max_length": 2048} | |
) | |
return llm | |
except Exception as e: | |
logger.error(f"Failed to get LLM: {str(e)}") | |
raise | |
def get_agent(agent_type): | |
"""Get or initialize an agent with the specified type.""" | |
try: | |
llm = get_llm() | |
tools = [ | |
Tool( | |
name="Code Formatter", | |
func=lambda x: subprocess.run(["black", "-"], input=x.encode(), capture_output=True).stdout.decode(), | |
description="Formats code using Black.", | |
), | |
Tool( | |
name="API Generator", | |
func=lambda x: json.dumps({"endpoints": {"example": "POST - Example endpoint."}}), | |
description="Generates API details from code.", | |
), | |
Tool( | |
name="Task Decomposer", | |
func=lambda x: json.dumps({"tasks": ["Design UI", "Develop Backend", "Test App", "Deploy App"]}), | |
description="Breaks down app requirements into smaller tasks.", | |
), | |
] | |
memory = ConversationBufferMemory( | |
memory_key="chat_history", | |
return_messages=True | |
) | |
agent_kwargs = { | |
"extra_prompt_messages": [MessagesPlaceholder(variable_name="chat_history")], | |
} | |
return initialize_agent( | |
tools=tools, | |
llm=llm, | |
agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION, | |
verbose=True, | |
memory=memory, | |
agent_kwargs=agent_kwargs, | |
handle_parsing_errors=True, | |
) | |
except Exception as e: | |
logger.error(f"Failed to get agent: {str(e)}") | |
raise | |
# Enhanced prompt templates with more specific instructions | |
ui_designer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert UI Designer specializing in modern, responsive web applications. | |
Task: {input} | |
Focus on: | |
1. Clean, intuitive user interface | |
2. Responsive design principles | |
3. Modern UI components | |
4. Accessibility standards | |
5. Cross-browser compatibility | |
Generate code using: | |
- HTML5 semantic elements | |
- Modern CSS (Flexbox/Grid) | |
- React/Vue.js best practices | |
- Material UI or Tailwind CSS | |
Provide detailed component structure and styling.""" | |
) | |
backend_developer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert Backend Developer specializing in scalable applications. | |
Task: {input} | |
Focus on: | |
1. RESTful API design | |
2. Database schema optimization | |
3. Security best practices | |
4. Error handling | |
5. Performance optimization | |
Include: | |
- API endpoint definitions | |
- Database models | |
- Authentication/Authorization | |
- Input validation | |
- Error handling middleware | |
- Rate limiting | |
- Logging | |
Use modern backend frameworks (FastAPI/Django/Express).""" | |
) | |
qa_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert QA Engineer focusing on comprehensive testing. | |
Task: {input} | |
Implement: | |
1. Unit tests | |
2. Integration tests | |
3. API endpoint tests | |
4. UI component tests | |
5. Performance tests | |
Include: | |
- Test cases for edge cases | |
- Input validation tests | |
- Error handling tests | |
- Load testing scenarios | |
- Security testing checks""" | |
) | |
devops_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert DevOps Engineer specializing in modern deployment practices. | |
Task: {input} | |
Provide: | |
1. Dockerfile configuration | |
2. Docker Compose setup | |
3. CI/CD pipeline configuration | |
4. Environment configuration | |
5. Monitoring setup | |
Include: | |
- Development/Production configs | |
- Environment variables | |
- Health checks | |
- Logging setup | |
- Monitoring integration | |
- Backup strategies""" | |
) | |
def generate_project_structure(app_name, features): | |
"""Generate a complete project structure based on features.""" | |
return f""" | |
{app_name}/ | |
├── frontend/ | |
│ ├── src/ | |
│ │ ├── components/ | |
│ │ ├── pages/ | |
│ │ ├── hooks/ | |
│ │ ├── utils/ | |
│ │ └── styles/ | |
│ ├── package.json | |
│ └── README.md | |
├── backend/ | |
│ ├── src/ | |
│ │ ├── routes/ | |
│ │ ├── controllers/ | |
│ │ ├── models/ | |
│ │ ├── middleware/ | |
│ │ └── utils/ | |
│ ├── requirements.txt | |
│ └── README.md | |
├── tests/ | |
│ ├── unit/ | |
│ ├── integration/ | |
│ └── e2e/ | |
├── docs/ | |
│ ├── API.md | |
│ ├── SETUP.md | |
│ └── DEPLOYMENT.md | |
├── docker-compose.yml | |
├── .env.example | |
└── README.md | |
""" | |
def generate_documentation(app_name, features, api_details): | |
"""Generate comprehensive documentation.""" | |
return f""" | |
# {app_name} | |
## Overview | |
A modern web application with the following features: | |
{features} | |
## Quick Start | |
```bash | |
# Clone the repository | |
git clone <repository-url> | |
# Install dependencies | |
cd {app_name} | |
# Frontend | |
cd frontend && npm install | |
# Backend | |
cd ../backend && pip install -r requirements.txt | |
# Run the application | |
docker-compose up | |
``` | |
## API Documentation | |
{api_details} | |
## Development | |
- Frontend: React.js with TypeScript | |
- Backend: Python with FastAPI | |
- Database: PostgreSQL | |
- Cache: Redis | |
- Testing: Jest, Pytest | |
## Deployment | |
Includes Docker configuration for easy deployment: | |
- Frontend container | |
- Backend container | |
- Database container | |
- Redis container | |
## Testing | |
```bash | |
# Run frontend tests | |
cd frontend && npm test | |
# Run backend tests | |
cd backend && pytest | |
``` | |
## Contributing | |
Please read CONTRIBUTING.md for details on our code of conduct and the process for submitting pull requests. | |
## License | |
This project is licensed under the MIT License - see the LICENSE.md file for details | |
""" | |
# AI Flow States and Types | |
class FlowState(Enum): | |
PENDING = "pending" | |
RUNNING = "running" | |
COMPLETED = "completed" | |
FAILED = "failed" | |
class AgentRole(Enum): | |
ARCHITECT = "architect" | |
UI_DESIGNER = "ui_designer" | |
BACKEND_DEVELOPER = "backend_developer" | |
DATABASE_ENGINEER = "database_engineer" | |
SECURITY_EXPERT = "security_expert" | |
QA_ENGINEER = "qa_engineer" | |
DEVOPS_ENGINEER = "devops_engineer" | |
DOCUMENTATION_WRITER = "documentation_writer" | |
class AgentContext: | |
"""Context information for each agent in the flow.""" | |
role: AgentRole | |
state: FlowState | |
artifacts: Dict[str, str] | |
dependencies: List[AgentRole] | |
feedback: List[str] | |
class AIFlow: | |
"""Manages the flow of work between different AI agents.""" | |
def __init__(self): | |
self.flow_graph = nx.DiGraph() | |
self.contexts: Dict[AgentRole, AgentContext] = {} | |
self.global_context = {} | |
def initialize_flow(self): | |
"""Initialize the AI Flow with agent relationships and dependencies.""" | |
# Define agent relationships | |
flow_structure = { | |
AgentRole.ARCHITECT: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
AgentRole.UI_DESIGNER: [AgentRole.QA_ENGINEER], | |
AgentRole.BACKEND_DEVELOPER: [AgentRole.SECURITY_EXPERT, AgentRole.QA_ENGINEER], | |
AgentRole.DATABASE_ENGINEER: [AgentRole.SECURITY_EXPERT], | |
AgentRole.SECURITY_EXPERT: [AgentRole.QA_ENGINEER], | |
AgentRole.QA_ENGINEER: [AgentRole.DEVOPS_ENGINEER], | |
AgentRole.DEVOPS_ENGINEER: [AgentRole.DOCUMENTATION_WRITER], | |
AgentRole.DOCUMENTATION_WRITER: [] | |
} | |
# Build the flow graph | |
for role, dependencies in flow_structure.items(): | |
self.flow_graph.add_node(role) | |
for dep in dependencies: | |
self.flow_graph.add_edge(role, dep) | |
# Initialize context for each agent | |
self.contexts[role] = AgentContext( | |
role=role, | |
state=FlowState.PENDING, | |
artifacts={}, | |
dependencies=dependencies, | |
feedback=[] | |
) | |
async def execute_flow(self, requirements: str): | |
"""Execute the AI Flow with parallel processing where possible.""" | |
try: | |
self.initialize_flow() | |
self.global_context["requirements"] = requirements | |
# Get all paths through the flow graph | |
paths = list(nx.all_simple_paths( | |
self.flow_graph, | |
AgentRole.ARCHITECT, | |
AgentRole.DOCUMENTATION_WRITER | |
)) | |
# Execute paths in parallel | |
await self._execute_paths(paths) | |
return self._compile_results() | |
except Exception as e: | |
logger.error(f"Flow execution failed: {str(e)}") | |
raise | |
async def _execute_paths(self, paths: List[List[AgentRole]]): | |
"""Execute all paths in the flow graph.""" | |
try: | |
results = [] | |
for path in paths: | |
path_results = [] | |
for role in path: | |
# Get the agent's prompt based on previous results | |
prompt = self._generate_prompt(role, path_results) | |
# Execute the agent's task | |
result = await self._execute_agent_task(role, prompt) | |
path_results.append(result) | |
# Store result in context | |
self.context_manager.add_memory( | |
f"{role.value}_result", | |
result, | |
{"timestamp": datetime.now()} | |
) | |
results.extend(path_results) | |
# Store all results in context | |
self.context_manager.add_memory( | |
"path_results", | |
results, | |
{"timestamp": datetime.now()} | |
) | |
return results | |
except Exception as e: | |
logger.error(f"Failed to execute paths: {str(e)}") | |
raise | |
def _generate_prompt(self, role: AgentRole, previous_results: List[str]) -> str: | |
"""Generate a prompt for an agent based on previous results.""" | |
requirements = self.context_manager.global_context.get("requirements", "") | |
# Base prompt with requirements | |
prompt = f"Requirements: {requirements}\n\n" | |
# Add context from previous results | |
if previous_results: | |
prompt += "Previous work:\n" | |
for i, result in enumerate(previous_results): | |
prompt += f"{i+1}. {result}\n" | |
# Add role-specific instructions | |
if role == AgentRole.ARCHITECT: | |
prompt += "\nAs the Architect, design the high-level system architecture." | |
elif role == AgentRole.UI_DESIGNER: | |
prompt += "\nAs the UI Designer, create the user interface design." | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
prompt += "\nAs the Backend Developer, implement the server-side logic." | |
elif role == AgentRole.DATABASE_ENGINEER: | |
prompt += "\nAs the Database Engineer, design the data model and storage." | |
elif role == AgentRole.SECURITY_EXPERT: | |
prompt += "\nAs the Security Expert, ensure security best practices." | |
elif role == AgentRole.QA_ENGINEER: | |
prompt += "\nAs the QA Engineer, create test cases and validation." | |
elif role == AgentRole.DEVOPS_ENGINEER: | |
prompt += "\nAs the DevOps Engineer, set up deployment and CI/CD." | |
elif role == AgentRole.DOCUMENTATION_WRITER: | |
prompt += "\nAs the Documentation Writer, create comprehensive documentation." | |
return prompt | |
def _compile_results(self) -> str: | |
"""Compile all results into a final output.""" | |
try: | |
results = [] | |
# Get all results from memory | |
for role in AgentRole: | |
result = self.context_manager.get_memory(f"{role.value}_result") | |
if result: | |
results.append(f"## {role.value}\n{result['value']}\n") | |
return "\n".join(results) | |
except Exception as e: | |
logger.error(f"Failed to compile results: {str(e)}") | |
raise | |
async def _execute_agent_task(self, role: AgentRole, prompt: str) -> str: | |
"""Execute a specific agent's task with the given prompt.""" | |
try: | |
if role == AgentRole.ARCHITECT: | |
agent = get_agent("architect") | |
elif role == AgentRole.UI_DESIGNER: | |
agent = get_agent("ui_designer") | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
agent = get_agent("backend_developer") | |
elif role == AgentRole.DATABASE_ENGINEER: | |
agent = get_agent("database_engineer") | |
elif role == AgentRole.SECURITY_EXPERT: | |
agent = get_agent("security_expert") | |
elif role == AgentRole.QA_ENGINEER: | |
agent = get_agent("qa_engineer") | |
elif role == AgentRole.DEVOPS_ENGINEER: | |
agent = get_agent("devops_engineer") | |
elif role == AgentRole.DOCUMENTATION_WRITER: | |
agent = get_agent("documentation_writer") | |
else: | |
raise ValueError(f"Unknown agent role: {role}") | |
# Execute the agent's task | |
result = await asyncio.to_thread(agent.run, prompt) | |
# Log the execution | |
logger.info(f"Agent {role.value} completed task") | |
return result | |
except Exception as e: | |
logger.error(f"Agent {role.value} failed: {str(e)}") | |
raise | |
class FileContext: | |
"""Context for file operations and tracking.""" | |
path: Path | |
content: str | |
last_modified: datetime | |
dependencies: Set[Path] | |
checksum: str | |
def from_path(cls, path: Path): | |
content = path.read_text() | |
return cls( | |
path=path, | |
content=content, | |
last_modified=datetime.fromtimestamp(path.stat().st_mtime), | |
dependencies=set(), | |
checksum=hashlib.md5(content.encode()).hexdigest() | |
) | |
class MemoryItem: | |
"""Represents a single memory item in the system.""" | |
key: str | |
value: Any | |
context: dict | |
timestamp: datetime | |
importance: float = 1.0 | |
references: Set[str] = field(default_factory=set) | |
class ContextManager: | |
"""Manages real-time context awareness across the system.""" | |
def __init__(self): | |
self.file_contexts: Dict[Path, FileContext] = {} | |
self.global_context: Dict[str, Any] = {} | |
self.command_history: List[Dict] = [] | |
self.memory_store: Dict[str, MemoryItem] = {} | |
def update_file_context(self, path: Path) -> FileContext: | |
"""Update context for a specific file.""" | |
context = FileContext.from_path(path) | |
self.file_contexts[path] = context | |
return context | |
def get_related_files(self, path: Path) -> Set[Path]: | |
"""Find files related to the given file.""" | |
if path not in self.file_contexts: | |
self.update_file_context(path) | |
context = self.file_contexts[path] | |
return context.dependencies | |
def track_command(self, command: str, args: List[str], result: Any): | |
"""Track command execution and results.""" | |
self.command_history.append({ | |
'command': command, | |
'args': args, | |
'result': result, | |
'timestamp': datetime.now(), | |
}) | |
def add_memory(self, key: str, value: Any, context: dict = None): | |
"""Add an item to the memory store.""" | |
self.memory_store[key] = MemoryItem( | |
key=key, | |
value=value, | |
context=context or {}, | |
timestamp=datetime.now() | |
) | |
def get_memory(self, key: str) -> Any: | |
"""Retrieve an item from memory.""" | |
item = self.memory_store.get(key) | |
return item.value if item else None | |
class FileOperationManager: | |
"""Manages multi-file operations and tracking.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.pending_changes: Dict[Path, str] = {} | |
async def edit_files(self, changes: Dict[Path, str]): | |
"""Apply changes to multiple files atomically.""" | |
try: | |
# Validate all changes first | |
for path, content in changes.items(): | |
if not self._validate_change(path, content): | |
raise ValueError(f"Invalid change for {path}") | |
# Apply changes | |
for path, content in changes.items(): | |
await self._apply_change(path, content) | |
# Update contexts | |
for path in changes: | |
self.context_manager.update_file_context(path) | |
except Exception as e: | |
logger.error(f"Failed to apply multi-file changes: {str(e)}") | |
raise | |
def _validate_change(self, path: Path, content: str) -> bool: | |
"""Validate a proposed file change.""" | |
try: | |
# Check file exists or can be created | |
if not path.parent.exists(): | |
path.parent.mkdir(parents=True) | |
# Validate syntax if it's a Python file | |
if path.suffix == '.py': | |
compile(content, str(path), 'exec') | |
return True | |
except Exception as e: | |
logger.error(f"Validation failed for {path}: {str(e)}") | |
return False | |
async def _apply_change(self, path: Path, content: str): | |
"""Apply a single file change.""" | |
path.write_text(content) | |
class CommandManager: | |
"""Manages command suggestions and execution.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.command_templates: Dict[str, str] = {} | |
def suggest_commands(self, context: dict) -> List[Dict]: | |
"""Suggest relevant commands based on context.""" | |
suggestions = [] | |
for cmd_name, template in self.command_templates.items(): | |
if self._is_relevant(cmd_name, context): | |
suggestions.append({ | |
'command': cmd_name, | |
'template': template, | |
'confidence': self._calculate_confidence(cmd_name, context) | |
}) | |
return sorted(suggestions, key=lambda x: x['confidence'], reverse=True) | |
async def execute_command(self, command: str, args: List[str]) -> Any: | |
"""Execute a command and track its result.""" | |
try: | |
# Execute the command | |
result = await self._run_command(command, args) | |
# Track the execution | |
self.context_manager.track_command(command, args, result) | |
return result | |
except Exception as e: | |
logger.error(f"Command execution failed: {str(e)}") | |
raise | |
def _is_relevant(self, cmd_name: str, context: dict) -> bool: | |
"""Determine if a command is relevant to the current context.""" | |
# Implementation depends on specific rules | |
return True | |
def _calculate_confidence(self, cmd_name: str, context: dict) -> float: | |
"""Calculate confidence score for a command suggestion.""" | |
# Implementation depends on specific metrics | |
return 1.0 | |
class RuleSystem: | |
"""Manages system rules and constraints.""" | |
def __init__(self): | |
self.rules: Dict[str, callable] = {} | |
self.constraints: Dict[str, callable] = {} | |
def add_rule(self, name: str, rule_func: callable): | |
"""Add a new rule to the system.""" | |
self.rules[name] = rule_func | |
def add_constraint(self, name: str, constraint_func: callable): | |
"""Add a new constraint to the system.""" | |
self.constraints[name] = constraint_func | |
def evaluate_rules(self, context: dict) -> Dict[str, bool]: | |
"""Evaluate all rules against the current context.""" | |
return {name: rule(context) for name, rule in self.rules.items()} | |
def check_constraints(self, context: dict) -> Dict[str, bool]: | |
"""Check all constraints against the current context.""" | |
return {name: constraint(context) for name, constraint in self.constraints.items()} | |
class ProjectBuilder: | |
"""Handles autonomous creation of project files and folders.""" | |
def __init__(self, base_path: Path): | |
self.base_path = Path(base_path) | |
self.current_build = None | |
self.file_manifest = [] | |
async def create_project(self, app_name: str, structure: dict) -> Path: | |
"""Create a new project with the specified structure.""" | |
try: | |
# Create temporary build directory | |
build_dir = Path(tempfile.mkdtemp()) | |
self.current_build = build_dir / app_name | |
self.current_build.mkdir(parents=True) | |
# Create project structure | |
await self._create_structure(self.current_build, structure) | |
return self.current_build | |
except Exception as e: | |
logger.error(f"Project creation failed: {str(e)}") | |
if self.current_build and self.current_build.exists(): | |
shutil.rmtree(self.current_build) | |
raise | |
async def _create_structure(self, parent: Path, structure: dict): | |
"""Recursively create project structure.""" | |
for name, content in structure.items(): | |
path = parent / name | |
if isinstance(content, dict): | |
path.mkdir(exist_ok=True) | |
await self._create_structure(path, content) | |
else: | |
path.write_text(str(content)) | |
self.file_manifest.append(path) | |
class OutputManager: | |
"""Manages project outputs and creates downloadable artifacts.""" | |
def __init__(self, project_builder: ProjectBuilder): | |
self.project_builder = project_builder | |
self.output_dir = Path(tempfile.mkdtemp()) | |
self.downloads = {} | |
def create_download(self, app_name: str) -> str: | |
"""Create a downloadable zip file of the project.""" | |
try: | |
if not self.project_builder.current_build: | |
raise ValueError("No project has been built yet") | |
# Create zip file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
zip_name = f"{app_name}_{timestamp}.zip" | |
zip_path = self.output_dir / zip_name | |
with ZipFile(zip_path, 'w') as zipf: | |
for file_path in self.project_builder.file_manifest: | |
rel_path = file_path.relative_to(self.project_builder.current_build) | |
zipf.write(file_path, rel_path) | |
# Store download info | |
self.downloads[zip_name] = { | |
'path': zip_path, | |
'created_at': datetime.now(), | |
'size': zip_path.stat().st_size | |
} | |
return str(zip_path) | |
except Exception as e: | |
logger.error(f"Failed to create download: {str(e)}") | |
raise | |
class EnhancedAIFlow(AIFlow): | |
"""Enhanced AI Flow with project building and output management.""" | |
def __init__(self): | |
super().__init__() | |
self.project_builder = ProjectBuilder(Path(tempfile.mkdtemp())) | |
self.output_manager = OutputManager(self.project_builder) | |
self.context_manager = ContextManager() | |
self.file_manager = FileOperationManager(self.context_manager) | |
self.command_manager = CommandManager(self.context_manager) | |
self.rule_system = RuleSystem() | |
self.flow_graph = nx.DiGraph() | |
self.contexts: Dict[AgentRole, AgentContext] = {} | |
self.global_context = {} | |
self.requirements = "" | |
def initialize_flow(self): | |
"""Initialize the AI Flow with agent relationships and dependencies.""" | |
# Create nodes for each agent role | |
for role in AgentRole: | |
self.flow_graph.add_node(role) | |
self.contexts[role] = AgentContext( | |
role=role, | |
state=FlowState.PENDING, | |
artifacts={}, | |
dependencies=[], | |
feedback=[] | |
) | |
# Define dependencies | |
dependencies = { | |
AgentRole.UI_DESIGNER: [AgentRole.ARCHITECT], | |
AgentRole.BACKEND_DEVELOPER: [AgentRole.ARCHITECT], | |
AgentRole.DATABASE_ENGINEER: [AgentRole.ARCHITECT, AgentRole.BACKEND_DEVELOPER], | |
AgentRole.SECURITY_EXPERT: [AgentRole.ARCHITECT, AgentRole.BACKEND_DEVELOPER], | |
AgentRole.QA_ENGINEER: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER], | |
AgentRole.DEVOPS_ENGINEER: [AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
AgentRole.DOCUMENTATION_WRITER: [AgentRole.ARCHITECT, AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER] | |
} | |
# Add edges based on dependencies | |
for role, deps in dependencies.items(): | |
for dep in deps: | |
self.flow_graph.add_edge(dep, role) | |
self.contexts[role].dependencies.extend(deps) | |
def _generate_prompt(self, role: AgentRole) -> str: | |
"""Generate a prompt for an agent based on context and dependencies.""" | |
try: | |
context = self.contexts[role] | |
dependencies_output = [] | |
# Gather outputs from dependencies (limited to last 1000 chars) | |
for dep_role in context.dependencies: | |
dep_context = self.contexts[dep_role] | |
if dep_context.state == FlowState.COMPLETED and "output" in dep_context.artifacts: | |
output = dep_context.artifacts['output'] | |
if len(output) > 1000: | |
output = output[:997] + "..." | |
dependencies_output.append(f"## {dep_role.value} Output:\n{output}") | |
# Build role-specific prompts (with size limits) | |
role_prompts = { | |
AgentRole.ARCHITECT: """Design the high-level architecture (brief overview): | |
Requirements: {requirements} | |
Focus: system design, components, tech stack, data flow, scalability""", | |
AgentRole.UI_DESIGNER: """Design the UI (key elements): | |
Requirements: {requirements} | |
Previous: {dependencies} | |
Focus: UX, layout, responsiveness, themes""", | |
AgentRole.BACKEND_DEVELOPER: """Implement core backend logic: | |
Requirements: {requirements} | |
Architecture: {dependencies} | |
Focus: API, business logic, validation""", | |
AgentRole.DATABASE_ENGINEER: """Design data layer: | |
Requirements: {requirements} | |
Context: {dependencies} | |
Focus: schema, relationships, optimization""", | |
AgentRole.SECURITY_EXPERT: """Review security: | |
Requirements: {requirements} | |
Context: {dependencies} | |
Focus: auth, data protection, best practices""", | |
AgentRole.QA_ENGINEER: """Design testing: | |
Requirements: {requirements} | |
Implementation: {dependencies} | |
Focus: coverage, automation, edge cases""", | |
AgentRole.DEVOPS_ENGINEER: """Setup deployment: | |
Requirements: {requirements} | |
Context: {dependencies} | |
Focus: CI/CD, infrastructure, monitoring""", | |
AgentRole.DOCUMENTATION_WRITER: """Create docs: | |
Requirements: {requirements} | |
System: {dependencies} | |
Focus: setup, API docs, guides""" | |
} | |
# Get the base prompt for the role | |
base_prompt = role_prompts.get(role, "") | |
# Truncate requirements if too long | |
requirements = self.requirements | |
if len(requirements) > 1000: | |
requirements = requirements[:997] + "..." | |
# Format the prompt with requirements and dependencies | |
formatted_prompt = base_prompt.format( | |
requirements=requirements, | |
dependencies="\n\n".join(dependencies_output) if dependencies_output else "No previous context available." | |
) | |
return formatted_prompt | |
except Exception as e: | |
logger.error(f"Failed to generate prompt for {role}: {str(e)}") | |
raise | |
async def execute_flow(self, requirements: str) -> str: | |
"""Execute the AI Flow and build the project.""" | |
try: | |
# Initialize flow with requirements | |
self.requirements = requirements | |
self.initialize_flow() | |
# Extract app name from requirements | |
app_name = requirements.split()[0].lower().replace(" ", "_") | |
# Execute agents in parallel where possible | |
paths = list(nx.all_simple_paths(self.flow_graph, AgentRole.ARCHITECT, AgentRole.DOCUMENTATION_WRITER)) | |
results = await self._execute_paths(paths) | |
# Generate project structure and documentation | |
project_structure = generate_project_structure(app_name, self.contexts[AgentRole.ARCHITECT].artifacts) | |
documentation = generate_documentation(app_name, requirements, self.contexts[AgentRole.DOCUMENTATION_WRITER].artifacts) | |
return f""" | |
# {app_name.title()} - Generated Application | |
## Project Structure | |
``` | |
{project_structure} | |
``` | |
## Documentation | |
{documentation} | |
## Next Steps | |
1. Review the generated architecture and components | |
2. Set up the development environment | |
3. Implement the components following the provided structure | |
4. Run the test suite | |
5. Deploy using the provided configurations | |
## Support | |
For any issues or questions, please refer to the documentation or create an issue in the repository. | |
""" | |
except Exception as e: | |
logger.error(f"Failed to execute flow: {str(e)}") | |
raise | |
finally: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
async def _execute_paths(self, paths: List[List[AgentRole]]) -> List[str]: | |
"""Execute all paths in the flow graph.""" | |
try: | |
# Execute paths in parallel | |
tasks = [] | |
for path in paths: | |
for role in path: | |
if self.contexts[role].state == FlowState.PENDING: | |
tasks.append(self._execute_agent(role)) | |
self.contexts[role].state = FlowState.RUNNING | |
# Wait for all tasks to complete | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Process results | |
for result in results: | |
if isinstance(result, Exception): | |
raise result | |
return results | |
except Exception as e: | |
logger.error(f"Failed to execute paths: {str(e)}") | |
raise | |
async def _execute_agent(self, role: AgentRole) -> str: | |
"""Execute a single agent's tasks with enhanced context.""" | |
try: | |
# Generate prompt | |
prompt = self._generate_prompt(role) | |
# Execute agent's task | |
result = await self._execute_agent_task(role, prompt) | |
# Update context | |
self.contexts[role].state = FlowState.COMPLETED | |
self.contexts[role].artifacts["output"] = result | |
return result | |
except Exception as e: | |
logger.error(f"Failed to execute agent {role}: {str(e)}") | |
self.contexts[role].state = FlowState.FAILED | |
raise | |
async def _execute_agent_task(self, role: AgentRole, prompt: str) -> str: | |
"""Execute a specific agent's task with the given prompt.""" | |
try: | |
# Get agent | |
agent = get_agent(role) | |
# Execute the agent's task | |
result = await asyncio.to_thread(agent.run, prompt) | |
# Process and return the result | |
return result | |
except Exception as e: | |
logger.error(f"Agent task execution failed for {role}: {str(e)}") | |
raise | |
# Update the multi_agent_workflow function to use AI Flows | |
async def multi_agent_workflow(requirements: str) -> str: | |
"""Execute a multi-agent workflow using AI Flows to generate a complex app.""" | |
try: | |
# Create AI Flow instance | |
ai_flow = EnhancedAIFlow() | |
# Generate the app | |
result = await ai_flow.execute_flow(requirements) | |
return result | |
except Exception as e: | |
logger.error(f"Multi-agent workflow failed: {str(e)}") | |
raise | |
# Update the app_generator function to handle async execution | |
async def app_generator(requirements: str) -> Dict[str, str]: | |
"""Generate an app based on the provided requirements using AI Flows.""" | |
try: | |
# Create AI Flow instance | |
ai_flow = EnhancedAIFlow() | |
# Generate the app | |
result = await ai_flow.execute_flow(requirements) | |
# Create downloadable output | |
download_path = ai_flow.output_manager.create_download("generated_app") | |
return { | |
"output": result, | |
"download_path": str(download_path) if download_path else None | |
} | |
except Exception as e: | |
logger.error(f"App generation failed: {str(e)}") | |
raise | |
async def stream_output(requirements, progress=gr.Progress()): | |
"""Stream the output during app generation.""" | |
try: | |
# Initialize | |
stream_handler.update(" Starting app generation...", "Initializing") | |
yield "Starting...", None, " Starting app generation...", "Initializing" | |
# Update progress | |
phases = [ | |
(" Analyzing requirements...", "Analyzing"), | |
(" Generating architecture...", "Designing"), | |
(" Creating project structure...", "Creating"), | |
(" Implementing features...", "Implementing"), | |
(" Finalizing...", "Finalizing") | |
] | |
for msg, status in progress.tqdm(phases): | |
stream_handler.update(msg, status) | |
yield None, None, "\n".join(stream_handler.output), status | |
await asyncio.sleep(1) # Non-blocking sleep | |
# Generate the app | |
stream_handler.update(" Running AI Flow system...", "Processing") | |
yield None, None, "\n".join(stream_handler.output), "Processing" | |
try: | |
# Run the app generator with a timeout | |
async with asyncio.timeout(60): # 60 second timeout | |
result = await app_generator(requirements) | |
# Update output with result | |
if result["output"]: | |
stream_handler.update("\n" + result["output"], "Completed") | |
yield result["output"], result["download_path"], "\n".join(stream_handler.output), "Completed" | |
else: | |
raise Exception("No output generated") | |
except asyncio.TimeoutError: | |
stream_handler.update("\nApp generation timed out after 60 seconds", "Failed") | |
yield None, None, "\n".join(stream_handler.output), "Failed" | |
raise | |
except Exception as e: | |
error_msg = f"\nError: {str(e)}" | |
stream_handler.update(error_msg, "Failed") | |
yield None, None, "\n".join(stream_handler.output), "Failed" | |
raise | |
finally: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
class StreamHandler: | |
"""Handles streaming output for the Gradio interface.""" | |
def __init__(self): | |
self.output = [] | |
self.current_status = "" | |
def update(self, message: str, status: str = None): | |
"""Update the output stream.""" | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
formatted_message = f"[{timestamp}] {message}" | |
self.output.append(formatted_message) | |
if status: | |
self.current_status = status | |
# Keep only the last 100 lines | |
if len(self.output) > 100: | |
self.output = self.output[-100:] | |
return "\n".join(self.output), self.current_status | |
# Gradio UI | |
with gr.Blocks(theme=gr.themes.Soft()) as ui: | |
stream_handler = StreamHandler() | |
gr.Markdown("# Autonomous App Generator with AI Flow") | |
gr.Markdown(""" | |
## Instructions | |
1. Describe the app you want to build in detail | |
2. Include any specific requirements or features | |
3. Click 'Generate App' to start the process | |
4. Download your generated app from the provided link | |
### Example: | |
``` | |
Create a personal task management application with: | |
- User authentication (email/password, Google OAuth) | |
- Task management (CRUD, priorities, due dates, reminders) | |
- Modern UI with dark/light theme | |
- Real-time updates using WebSocket | |
- PostgreSQL and Redis for storage | |
``` | |
""") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
requirements_input = gr.Textbox( | |
label="App Requirements", | |
placeholder="Describe the app you want to build...", | |
lines=10 | |
) | |
with gr.Row(): | |
generate_button = gr.Button("Generate App", variant="primary") | |
cancel_button = gr.Button("Cancel", variant="stop") | |
status = gr.Textbox( | |
label="Status", | |
value="Ready", | |
interactive=False | |
) | |
with gr.Column(scale=6): | |
with gr.Tabs(): | |
with gr.TabItem("Output"): | |
output = gr.Markdown( | |
label="Generated App Details", | |
value="Your app details will appear here..." | |
) | |
with gr.TabItem("Download"): | |
file_output = gr.File( | |
label="Download Generated App", | |
interactive=False | |
) | |
with gr.TabItem("Live Log"): | |
log_output = gr.Textbox( | |
label="Generation Logs", | |
value="Logs will appear here...", | |
lines=10, | |
max_lines=20, | |
interactive=False, | |
show_copy_button=True | |
) | |
async def stream_output(requirements, progress=gr.Progress()): | |
"""Stream the output during app generation.""" | |
try: | |
# Initialize | |
stream_handler.update(" Starting app generation...", "Initializing") | |
yield "Starting...", None, " Starting app generation...", "Initializing" | |
# Update progress | |
phases = [ | |
(" Analyzing requirements...", "Analyzing"), | |
(" Generating architecture...", "Designing"), | |
(" Creating project structure...", "Creating"), | |
(" Implementing features...", "Implementing"), | |
(" Finalizing...", "Finalizing") | |
] | |
for msg, status in progress.tqdm(phases): | |
stream_handler.update(msg, status) | |
yield None, None, "\n".join(stream_handler.output), status | |
await asyncio.sleep(1) # Non-blocking sleep | |
# Generate the app | |
stream_handler.update(" Running AI Flow system...", "Processing") | |
yield None, None, "\n".join(stream_handler.output), "Processing" | |
try: | |
# Run the app generator with a timeout | |
async with asyncio.timeout(60): # 60 second timeout | |
result = await app_generator(requirements) | |
# Update output with result | |
if result["output"]: | |
stream_handler.update("\n" + result["output"], "Completed") | |
yield result["output"], result["download_path"], "\n".join(stream_handler.output), "Completed" | |
else: | |
raise Exception("No output generated") | |
except asyncio.TimeoutError: | |
stream_handler.update("\nApp generation timed out after 60 seconds", "Failed") | |
yield None, None, "\n".join(stream_handler.output), "Failed" | |
raise | |
except Exception as e: | |
error_msg = f"\nError: {str(e)}" | |
stream_handler.update(error_msg, "Failed") | |
yield None, None, "\n".join(stream_handler.output), "Failed" | |
raise | |
finally: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
def cancel_generation(): | |
"""Cancel the current generation process.""" | |
stream_handler.update(" Generation cancelled by user", "Cancelled") | |
return "Generation cancelled", None, "\n".join(stream_handler.output), "Cancelled" | |
generate_button.click( | |
stream_output, | |
inputs=[requirements_input], | |
outputs=[output, file_output, log_output, status], | |
show_progress=True | |
) | |
cancel_button.click( | |
cancel_generation, | |
outputs=[output, file_output, log_output, status] | |
) | |
# Run the Gradio app | |
if __name__ == "__main__": | |
try: | |
ui.launch( | |
share=True, # Enable sharing | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch Gradio interface: {str(e)}") |