import aiohttp from bs4 import BeautifulSoup from typing import Dict, Any, List from loguru import logger from utils.llm_orchestrator import LLMOrchestrator import asyncio from urllib.parse import urljoin, urlparse class WebBrowsingAgent: def __init__(self, llm_api_key: str): """Initialize the Web Browsing Agent.""" logger.info("Initializing WebBrowsingAgent") self.llm_orchestrator = LLMOrchestrator(llm_api_key) self.session = None self.setup_logger() self.visited_urls = set() self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } self.capabilities = [ "web_browsing", "data_collection", "content_processing", "information_extraction", "link_crawling" ] def setup_logger(self): """Configure logging for the agent.""" logger.add("logs/web_browsing_agent.log", rotation="500 MB") async def initialize(self): """Initialize the aiohttp session.""" logger.info("Initializing aiohttp session") if not self.session: self.session = aiohttp.ClientSession(headers=self.headers) async def execute(self, task: Dict[str, Any]) -> Dict[str, Any]: """Execute a web browsing task.""" logger.info(f"Executing task: {task}") await self.initialize() if 'url' not in task: logger.error("URL not provided in task") raise ValueError("URL not provided in task") try: content = await self.collect_data(task['url']) processed_data = await self.process_content(content, task) logger.info(f"Successfully executed task: {task}") return { 'status': 'success', 'data': processed_data, 'url': task['url'] } except Exception as e: logger.error(f"Error executing task: {str(e)}") return { 'status': 'error', 'error': str(e), 'url': task['url'] } async def collect_data(self, url: str, retries: int = 3, delay: int = 1) -> Dict[str, Any]: """Collect data from a URL with error handling and retries.""" for attempt in range(retries): try: async with self.session.get(url) as response: if response.status == 200: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Extract various types of content text_content = soup.get_text(separator=' ', strip=True) links = [ link.get('href') for link in soup.find_all( 'a', href=True)] images = [ img.get('src') for img in soup.find_all( 'img', src=True)] # Process links to get absolute URLs processed_links = [urljoin(url, link) for link in links] logger.info(f"Successfully collected data from {url}") return { 'url': url, 'text_content': text_content, 'links': processed_links, 'images': images, 'status_code': response.status, 'headers': dict(response.headers) } else: logger.error( f"HTTP {response.status}: Failed to fetch {url} on attempt {attempt + 1}") if attempt < retries - 1: # Exponential backoff await asyncio.sleep(delay * (2 ** attempt)) else: raise Exception( f"HTTP {response.status}: Failed to fetch {url} after multiple retries") except aiohttp.ClientError as e: logger.error( f"Network error on attempt {attempt + 1} for {url}: {str(e)}") if attempt < retries - 1: # Exponential backoff await asyncio.sleep(delay * (2 ** attempt)) else: raise Exception( f"Network error: Failed to fetch {url} after multiple retries") except aiohttp.HttpProcessingError as e: logger.error( f"HTTP processing error on attempt {attempt + 1} for {url}: {str(e)}") if attempt < retries - 1: # Exponential backoff await asyncio.sleep(delay * (2 ** attempt)) else: raise Exception( f"HTTP processing error: Failed to fetch {url} after multiple retries") except Exception as e: logger.error( f"Unexpected error on attempt {attempt + 1} for {url}: {str(e)}") if attempt < retries - 1: # Exponential backoff await asyncio.sleep(delay * (2 ** attempt)) else: raise Exception( f"Unexpected error: Failed to fetch {url} after multiple retries") async def process_content( self, content: Dict[str, Any], task: Dict[str, Any]) -> Dict[str, Any]: """Process collected content using LLM.""" logger.info(f"Processing content for {content['url']}") try: # Generate summary of the content summary = await self.llm_orchestrator.generate_completion( f"Summarize the following content:\n{content['text_content'][:1000]}..." ) # Extract key information based on task requirements extracted_info = await self.extract_relevant_information(content, task) logger.info(f"Successfully processed content for {content['url']}") return { 'summary': summary, 'extracted_info': extracted_info, 'metadata': { 'url': content['url'], 'num_links': len(content['links']), 'num_images': len(content['images']) } } except Exception as e: logger.error(f"Error processing content: {str(e)}") raise async def extract_relevant_information( self, content: Dict[str, Any], task: Dict[str, Any]) -> Dict[str, Any]: """Extract relevant information based on task requirements.""" logger.info(f"Extracting relevant information for {content['url']}") # Use LLM to extract specific information based on task requirements prompt = f""" Extract relevant information from the following content based on these requirements: Task requirements: {task.get('requirements', 'general information')} Content: {content['text_content'][:1500]}... """ extracted_info = await self.llm_orchestrator.generate_completion(prompt) logger.info(f"Successfully extracted information for {content['url']}") return {'extracted_information': extracted_info} async def crawl_links(self, base_url: str, max_depth: int = 2) -> List[Dict[str, Any]]: """Crawl links starting from a base URL up to a maximum depth.""" logger.info(f"Crawling links from {base_url} up to depth {max_depth}") results = [] async def crawl(url: str, depth: int): if depth > max_depth or url in self.visited_urls: return self.visited_urls.add(url) try: content = await self.collect_data(url) results.append(content) if depth < max_depth: tasks = [] for link in content['links']: if link not in self.visited_urls: tasks.append(crawl(link, depth + 1)) await asyncio.gather(*tasks) except Exception as e: logger.error(f"Error crawling {url}: {str(e)}") await crawl(base_url, 0) logger.info(f"Finished crawling links from {base_url}") return results async def shutdown(self): """Cleanup resources.""" logger.info("Shutting down WebBrowsingAgent") if self.session: await self.session.close() self.session = None