Spaces:
Sleeping
Sleeping
| """ | |
| Web API for the web crawler. | |
| This module provides a FastAPI-based web API for controlling and monitoring the web crawler. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import logging | |
| import datetime | |
| from typing import List, Dict, Any, Optional | |
| from fastapi import FastAPI, HTTPException, Query, Path, BackgroundTasks, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel, HttpUrl, Field | |
| import uvicorn | |
| from crawler import Crawler | |
| from models import URL, URLStatus, Priority | |
| import config | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, config.LOG_LEVEL), | |
| format=config.LOG_FORMAT | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Create FastAPI app | |
| app = FastAPI( | |
| title="Web Crawler API", | |
| description="API for controlling and monitoring the web crawler", | |
| version="1.0.0" | |
| ) | |
| # Enable CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Global crawler instance | |
| crawler = None | |
| def get_crawler() -> Crawler: | |
| """Get or initialize the crawler instance""" | |
| global crawler | |
| if crawler is None: | |
| crawler = Crawler() | |
| return crawler | |
| # API Models | |
| class SeedURL(BaseModel): | |
| url: HttpUrl | |
| priority: Optional[str] = Field( | |
| default="NORMAL", | |
| description="URL priority (VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW)" | |
| ) | |
| class SeedURLs(BaseModel): | |
| urls: List[SeedURL] | |
| class CrawlerStatus(BaseModel): | |
| running: bool | |
| paused: bool | |
| start_time: Optional[float] = None | |
| uptime_seconds: Optional[float] = None | |
| pages_crawled: int | |
| pages_failed: int | |
| urls_discovered: int | |
| urls_filtered: int | |
| domains_crawled: int | |
| frontier_size: int | |
| class CrawlerConfig(BaseModel): | |
| max_depth: int = Field(..., description="Maximum crawl depth") | |
| max_workers: int = Field(..., description="Maximum number of worker threads") | |
| delay_between_requests: float = Field(..., description="Delay between requests to the same domain (seconds)") | |
| class PageDetail(BaseModel): | |
| url: str | |
| domain: str | |
| status_code: int | |
| content_type: str | |
| content_length: int | |
| crawled_at: str | |
| is_seed: bool | |
| depth: int | |
| title: Optional[str] = None | |
| description: Optional[str] = None | |
| class URLDetail(BaseModel): | |
| url: str | |
| normalized_url: str | |
| domain: str | |
| status: str | |
| priority: str | |
| depth: int | |
| parent_url: Optional[str] = None | |
| last_crawled: Optional[str] = None | |
| error: Optional[str] = None | |
| retries: int | |
| class DomainStats(BaseModel): | |
| domain: str | |
| pages_count: int | |
| successful_requests: int | |
| failed_requests: int | |
| avg_page_size: float | |
| content_types: Dict[str, int] | |
| status_codes: Dict[str, int] | |
| # API Routes | |
| async def read_root(): | |
| """Root endpoint""" | |
| return { | |
| "name": "Web Crawler API", | |
| "version": "1.0.0", | |
| "description": "API for controlling and monitoring the web crawler", | |
| "endpoints": { | |
| "GET /": "This help message", | |
| "GET /status": "Get crawler status", | |
| "GET /stats": "Get crawler statistics", | |
| "GET /config": "Get crawler configuration", | |
| "PUT /config": "Update crawler configuration", | |
| "POST /start": "Start the crawler", | |
| "POST /stop": "Stop the crawler", | |
| "POST /pause": "Pause the crawler", | |
| "POST /resume": "Resume the crawler", | |
| "GET /pages": "List crawled pages", | |
| "GET /pages/{url}": "Get page details", | |
| "GET /urls": "List discovered URLs", | |
| "GET /urls/{url}": "Get URL details", | |
| "POST /seed": "Add seed URLs", | |
| "GET /domains": "Get domain statistics", | |
| "GET /domains/{domain}": "Get statistics for a specific domain", | |
| } | |
| } | |
| async def get_status(crawler: Crawler = Depends(get_crawler)): | |
| """Get crawler status""" | |
| status = { | |
| "running": crawler.running, | |
| "paused": crawler.paused, | |
| "start_time": crawler.stats.get('start_time'), | |
| "uptime_seconds": time.time() - crawler.stats.get('start_time', time.time()) if crawler.running else None, | |
| "pages_crawled": crawler.stats.get('pages_crawled', 0), | |
| "pages_failed": crawler.stats.get('pages_failed', 0), | |
| "urls_discovered": crawler.stats.get('urls_discovered', 0), | |
| "urls_filtered": crawler.stats.get('urls_filtered', 0), | |
| "domains_crawled": len(crawler.stats.get('domains_crawled', set())), | |
| "frontier_size": crawler.frontier.size() | |
| } | |
| return status | |
| async def get_stats(crawler: Crawler = Depends(get_crawler)): | |
| """Get detailed crawler statistics""" | |
| stats = crawler.stats.copy() | |
| # Convert sets to lists for JSON serialization | |
| for key, value in stats.items(): | |
| if isinstance(value, set): | |
| stats[key] = list(value) | |
| # Add uptime | |
| if stats.get('start_time'): | |
| stats['uptime_seconds'] = time.time() - stats['start_time'] | |
| stats['uptime_formatted'] = str(datetime.timedelta(seconds=int(stats['uptime_seconds']))) | |
| # Add DNS cache statistics if available | |
| try: | |
| dns_stats = crawler.dns_resolver.get_stats() | |
| stats['dns_cache'] = dns_stats | |
| except (AttributeError, Exception) as e: | |
| logger.warning(f"Failed to get DNS stats: {e}") | |
| stats['dns_cache'] = {'error': 'Stats not available'} | |
| # Add frontier statistics if available | |
| try: | |
| stats['frontier_size'] = crawler.frontier.size() | |
| if hasattr(crawler.frontier, 'get_stats'): | |
| frontier_stats = crawler.frontier.get_stats() | |
| stats['frontier'] = frontier_stats | |
| else: | |
| stats['frontier'] = {'size': crawler.frontier.size()} | |
| except Exception as e: | |
| logger.warning(f"Failed to get frontier stats: {e}") | |
| stats['frontier'] = {'error': 'Stats not available'} | |
| return stats | |
| async def get_config(): | |
| """Get crawler configuration""" | |
| return { | |
| "max_depth": config.MAX_DEPTH, | |
| "max_workers": config.MAX_WORKERS, | |
| "delay_between_requests": config.DELAY_BETWEEN_REQUESTS | |
| } | |
| async def update_config( | |
| crawler_config: CrawlerConfig, | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """Update crawler configuration""" | |
| # Update configuration | |
| config.MAX_DEPTH = crawler_config.max_depth | |
| config.MAX_WORKERS = crawler_config.max_workers | |
| config.DELAY_BETWEEN_REQUESTS = crawler_config.delay_between_requests | |
| return crawler_config | |
| async def start_crawler( | |
| background_tasks: BackgroundTasks, | |
| num_workers: int = Query(None, description="Number of worker threads"), | |
| async_mode: bool = Query(False, description="Whether to use async mode"), | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """Start the crawler""" | |
| if crawler.running: | |
| return {"status": "Crawler is already running"} | |
| # Start crawler in background | |
| def start_crawler_task(): | |
| try: | |
| crawler.start(num_workers=num_workers, async_mode=async_mode) | |
| except Exception as e: | |
| logger.error(f"Error starting crawler: {e}") | |
| background_tasks.add_task(start_crawler_task) | |
| return {"status": "Crawler starting in background"} | |
| async def stop_crawler(crawler: Crawler = Depends(get_crawler)): | |
| """Stop the crawler""" | |
| if not crawler.running: | |
| return {"status": "Crawler is not running"} | |
| crawler.stop() | |
| return {"status": "Crawler stopped"} | |
| async def pause_crawler(crawler: Crawler = Depends(get_crawler)): | |
| """Pause the crawler""" | |
| if not crawler.running: | |
| return {"status": "Crawler is not running"} | |
| if crawler.paused: | |
| return {"status": "Crawler is already paused"} | |
| crawler.pause() | |
| return {"status": "Crawler paused"} | |
| async def resume_crawler(crawler: Crawler = Depends(get_crawler)): | |
| """Resume the crawler""" | |
| if not crawler.running: | |
| return {"status": "Crawler is not running"} | |
| if not crawler.paused: | |
| return {"status": "Crawler is not paused"} | |
| crawler.resume() | |
| return {"status": "Crawler resumed"} | |
| async def list_pages( | |
| limit: int = Query(10, ge=1, le=100, description="Number of pages to return"), | |
| offset: int = Query(0, ge=0, description="Offset for pagination"), | |
| domain: Optional[str] = Query(None, description="Filter by domain"), | |
| status_code: Optional[int] = Query(None, description="Filter by HTTP status code"), | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """List crawled pages""" | |
| # Build query | |
| query = {} | |
| if domain: | |
| query['domain'] = domain | |
| if status_code: | |
| query['status_code'] = status_code | |
| # Execute query | |
| try: | |
| pages = list(crawler.db.pages_collection.find( | |
| query, | |
| {'_id': 0} | |
| ).skip(offset).limit(limit)) | |
| # Count total pages matching query | |
| total_count = crawler.db.pages_collection.count_documents(query) | |
| return { | |
| "pages": pages, | |
| "total": total_count, | |
| "limit": limit, | |
| "offset": offset | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing pages: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_page( | |
| url: str, | |
| include_content: bool = Query(False, description="Include page content"), | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """Get page details""" | |
| try: | |
| # Decode URL from path parameter | |
| url = url.replace("___", "/") | |
| # Find page in database | |
| page = crawler.db.pages_collection.find_one({'url': url}, {'_id': 0}) | |
| if not page: | |
| raise HTTPException(status_code=404, detail="Page not found") | |
| # Load content if requested | |
| if include_content: | |
| try: | |
| if crawler.use_s3: | |
| content = crawler._load_content_s3(url) | |
| else: | |
| content = crawler._load_content_disk(url) | |
| if content: | |
| page['content'] = content | |
| except Exception as e: | |
| logger.error(f"Error loading content for {url}: {e}") | |
| page['content'] = None | |
| return page | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting page {url}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_urls( | |
| limit: int = Query(10, ge=1, le=100, description="Number of URLs to return"), | |
| offset: int = Query(0, ge=0, description="Offset for pagination"), | |
| status: Optional[str] = Query(None, description="Filter by URL status"), | |
| domain: Optional[str] = Query(None, description="Filter by domain"), | |
| priority: Optional[str] = Query(None, description="Filter by priority"), | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """List discovered URLs""" | |
| # Build query | |
| query = {} | |
| if status: | |
| query['status'] = status | |
| if domain: | |
| query['domain'] = domain | |
| if priority: | |
| query['priority'] = priority | |
| # Execute query | |
| try: | |
| urls = list(crawler.db.urls_collection.find( | |
| query, | |
| {'_id': 0} | |
| ).skip(offset).limit(limit)) | |
| # Count total URLs matching query | |
| total_count = crawler.db.urls_collection.count_documents(query) | |
| return { | |
| "urls": urls, | |
| "total": total_count, | |
| "limit": limit, | |
| "offset": offset | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing URLs: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_url( | |
| url: str, | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """Get URL details""" | |
| try: | |
| # Decode URL from path parameter | |
| url = url.replace("___", "/") | |
| # Find URL in database | |
| url_obj = crawler.db.urls_collection.find_one({'url': url}, {'_id': 0}) | |
| if not url_obj: | |
| raise HTTPException(status_code=404, detail="URL not found") | |
| return url_obj | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting URL {url}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def add_seed_urls( | |
| seed_urls: SeedURLs, | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """Add seed URLs to the frontier""" | |
| try: | |
| urls_added = 0 | |
| for seed in seed_urls.urls: | |
| url = str(seed.url) | |
| priority = getattr(Priority, seed.priority, Priority.NORMAL) | |
| # Create URL object | |
| url_obj = URL( | |
| url=url, | |
| status=URLStatus.PENDING, | |
| priority=priority, | |
| depth=0 # Seed URLs are at depth 0 | |
| ) | |
| # Add to frontier | |
| if crawler.frontier.add_url(url_obj): | |
| # Save URL to database | |
| crawler.urls_collection.update_one( | |
| {'url': url}, | |
| {'$set': url_obj.dict()}, | |
| upsert=True | |
| ) | |
| urls_added += 1 | |
| logger.info(f"Added seed URL: {url}") | |
| return {"status": "success", "urls_added": urls_added} | |
| except Exception as e: | |
| logger.error(f"Error adding seed URLs: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_domains( | |
| limit: int = Query(10, ge=1, le=100, description="Number of domains to return"), | |
| offset: int = Query(0, ge=0, description="Offset for pagination"), | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """Get domain statistics""" | |
| try: | |
| # Get domains with counts | |
| domain_counts = crawler.db.pages_collection.aggregate([ | |
| {"$group": { | |
| "_id": "$domain", | |
| "pages_count": {"$sum": 1}, | |
| "avg_page_size": {"$avg": "$content_length"} | |
| }}, | |
| {"$sort": {"pages_count": -1}}, | |
| {"$skip": offset}, | |
| {"$limit": limit} | |
| ]) | |
| # Get total domains count | |
| total_domains = len(crawler.stats.get('domains_crawled', set())) | |
| # Format result | |
| domains = [] | |
| for domain in domain_counts: | |
| domains.append({ | |
| "domain": domain["_id"], | |
| "pages_count": domain["pages_count"], | |
| "avg_page_size": domain["avg_page_size"] | |
| }) | |
| return { | |
| "domains": domains, | |
| "total": total_domains, | |
| "limit": limit, | |
| "offset": offset | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing domains: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_domain_stats( | |
| domain: str, | |
| crawler: Crawler = Depends(get_crawler) | |
| ): | |
| """Get statistics for a specific domain""" | |
| try: | |
| # Get basic domain stats | |
| domain_stats = crawler.db.pages_collection.aggregate([ | |
| {"$match": {"domain": domain}}, | |
| {"$group": { | |
| "_id": "$domain", | |
| "pages_count": {"$sum": 1}, | |
| "successful_requests": {"$sum": {"$cond": [{"$lt": ["$status_code", 400]}, 1, 0]}}, | |
| "failed_requests": {"$sum": {"$cond": [{"$gte": ["$status_code", 400]}, 1, 0]}}, | |
| "avg_page_size": {"$avg": "$content_length"} | |
| }} | |
| ]).next() | |
| # Get content type distribution | |
| content_types = crawler.db.pages_collection.aggregate([ | |
| {"$match": {"domain": domain}}, | |
| {"$group": { | |
| "_id": "$content_type", | |
| "count": {"$sum": 1} | |
| }} | |
| ]) | |
| content_type_map = {} | |
| for ct in content_types: | |
| content_type_map[ct["_id"]] = ct["count"] | |
| # Get status code distribution | |
| status_codes = crawler.db.pages_collection.aggregate([ | |
| {"$match": {"domain": domain}}, | |
| {"$group": { | |
| "_id": "$status_code", | |
| "count": {"$sum": 1} | |
| }} | |
| ]) | |
| status_code_map = {} | |
| for sc in status_codes: | |
| status_code_map[str(sc["_id"])] = sc["count"] | |
| # Format result | |
| result = { | |
| "domain": domain, | |
| "pages_count": domain_stats["pages_count"], | |
| "successful_requests": domain_stats["successful_requests"], | |
| "failed_requests": domain_stats["failed_requests"], | |
| "avg_page_size": domain_stats["avg_page_size"], | |
| "content_types": content_type_map, | |
| "status_codes": status_code_map | |
| } | |
| return result | |
| except StopIteration: | |
| # Domain not found | |
| raise HTTPException(status_code=404, detail=f"Domain '{domain}' not found") | |
| except Exception as e: | |
| logger.error(f"Error getting domain stats for {domain}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| # Run the API server | |
| uvicorn.run( | |
| "api:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=True | |
| ) |