from fastapi import FastAPI, HTTPException, Request from pydantic import HttpUrl from playwright.async_api import async_playwright from urllib.parse import urljoin, urlparse import logging from fastapi.responses import JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from typing import List, Dict import asyncio import os # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = FastAPI(title="Website Scraper API with Frontend") # Mount static files app.mount("/static", StaticFiles(directory="static"), name="static") # Set up Jinja2 templates templates = Jinja2Templates(directory="templates") # Maximum number of pages to scrape MAX_PAGES = 20 async def scrape_page(url: str, visited: set, base_domain: str) -> tuple[Dict, set]: """Scrape a single page for text, images, and links using Playwright.""" try: logger.info(f"Starting Playwright for URL: {url}") async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", viewport={"width": 800, "height": 600}, # Reduced viewport for performance bypass_csp=True # Bypass Content Security Policy ) page = await context.new_page() # Retry navigation with fallback for attempt in range(2): # Try up to 2 times try: logger.info(f"Navigating to {url} (Attempt {attempt + 1})") await page.goto(url, wait_until="domcontentloaded", timeout=30000) # 30s timeout break # Success, exit retry loop except Exception as e: logger.warning(f"Navigation attempt {attempt + 1} failed for {url}: {str(e)}") if attempt == 1: # Last attempt logger.error(f"All navigation attempts failed for {url}") await browser.close() return {}, set() await asyncio.sleep(1) # Wait before retry # Scroll to trigger lazy-loaded images await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await page.wait_for_timeout(2000) # Wait for lazy-loaded content # Extract text content text_content = await page.evaluate( """() => document.body.innerText""" ) text_content = ' '.join(text_content.split()) if text_content else "" # Extract images from src, data-src, and srcset images = await page.evaluate( """() => { const imgElements = document.querySelectorAll('img'); const imgUrls = new Set(); imgElements.forEach(img => { if (img.src) imgUrls.add(img.src); if (img.dataset.src) imgUrls.add(img.dataset.src); if (img.srcset) { img.srcset.split(',').forEach(src => { const url = src.trim().split(' ')[0]; if (url) imgUrls.add(url); }); } }); return Array.from(imgUrls); }""" ) images = [urljoin(url, img) for img in images if img] # Extract links links = await page.evaluate( """() => Array.from(document.querySelectorAll('a')).map(a => a.href)""" ) links = set(urljoin(url, link) for link in links if urlparse(urljoin(url, link)).netloc == base_domain and urljoin(url, link) not in visited) await browser.close() logger.info(f"Successfully scraped {url}") page_data = { "url": url, "text": text_content, "images": images } return page_data, links except Exception as e: logger.error(f"Error scraping {url}: {str(e)}") return {}, set() @app.get("/scrape") async def crawl_website(url: HttpUrl): """Crawl the website starting from the given URL and return scraped data for up to 10 pages as JSON.""" try: logger.info(f"Starting crawl for {url}") visited = set() to_visit = {str(url)} base_domain = urlparse(str(url)).netloc results = [] while to_visit and len(visited) < MAX_PAGES: current_url = to_visit.pop() if current_url in visited: continue logger.info(f"Scraping: {current_url}") visited.add(current_url) page_data, new_links = await scrape_page(current_url, visited, base_domain) if page_data: results.append(page_data) to_visit.update(new_links) # Small delay to avoid overwhelming the server await asyncio.sleep(0.5) logger.info(f"Crawl completed for {url}") return JSONResponse(content={"pages": results}) except Exception as e: logger.error(f"Scraping failed for {url}: {str(e)}") raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}") @app.get("/") async def serve_home(request: Request): """Serve the frontend HTML page.""" logger.info("Serving home page") return templates.TemplateResponse("index.html", {"request": request}) if __name__ == "__main__": logger.info("Starting FastAPI server on port 7860") import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)