Spaces:
Sleeping
Sleeping
| """ | |
| SEO Analyzer UI using Gradio, Web Crawler, and OpenAI | |
| """ | |
| import gradio as gr | |
| import logging | |
| import json | |
| from typing import Dict, List, Any, Tuple, Optional | |
| from urllib.parse import urlparse | |
| import tldextract | |
| from openai import OpenAI | |
| import time | |
| import os | |
| import threading | |
| import queue | |
| import shutil | |
| import uuid | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime | |
| import tempfile | |
| from crawler import Crawler | |
| from frontier import URLFrontier | |
| from models import URL, Page | |
| import config | |
| from run_crawler import reset_databases | |
| from dotenv import load_dotenv, find_dotenv | |
| load_dotenv(find_dotenv()) | |
| # Check if we're in deployment mode (e.g., Hugging Face Spaces) | |
| IS_DEPLOYMENT = os.getenv('DEPLOYMENT', 'false').lower() == 'true' | |
| # Custom CSS for better styling | |
| CUSTOM_CSS = """ | |
| .container { | |
| max-width: 1200px !important; | |
| margin: auto; | |
| padding: 20px; | |
| } | |
| .header { | |
| text-align: center; | |
| margin-bottom: 2rem; | |
| } | |
| .header h1 { | |
| color: #2d3748; | |
| font-size: 2.5rem; | |
| font-weight: 700; | |
| margin-bottom: 1rem; | |
| } | |
| .header p { | |
| color: #4a5568; | |
| font-size: 1.1rem; | |
| max-width: 800px; | |
| margin: 0 auto; | |
| } | |
| .input-section { | |
| background: #f7fafc; | |
| border-radius: 12px; | |
| padding: 24px; | |
| margin-bottom: 24px; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| .analysis-section { | |
| background: white; | |
| border-radius: 12px; | |
| padding: 24px; | |
| margin-top: 24px; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| .log-section { | |
| font-family: monospace; | |
| background: #1a202c; | |
| color: #e2e8f0; | |
| padding: 16px; | |
| border-radius: 8px; | |
| margin-top: 24px; | |
| } | |
| /* Custom styling for inputs */ | |
| .input-container { | |
| background: white; | |
| padding: 16px; | |
| border-radius: 8px; | |
| margin-bottom: 16px; | |
| } | |
| /* Custom styling for the slider */ | |
| .slider-container { | |
| padding: 12px; | |
| background: white; | |
| border-radius: 8px; | |
| } | |
| /* Custom styling for buttons */ | |
| .primary-button { | |
| background: #4299e1 !important; | |
| color: white !important; | |
| padding: 12px 24px !important; | |
| border-radius: 8px !important; | |
| font-weight: 600 !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .primary-button:hover { | |
| background: #3182ce !important; | |
| transform: translateY(-1px) !important; | |
| } | |
| /* Markdown output styling */ | |
| .markdown-output { | |
| font-family: system-ui, -apple-system, sans-serif; | |
| line-height: 1.6; | |
| } | |
| .markdown-output h1 { | |
| color: #2d3748; | |
| border-bottom: 2px solid #e2e8f0; | |
| padding-bottom: 0.5rem; | |
| } | |
| .markdown-output h2 { | |
| color: #4a5568; | |
| margin-top: 2rem; | |
| } | |
| .markdown-output h3 { | |
| color: #718096; | |
| margin-top: 1.5rem; | |
| } | |
| /* Progress bar styling */ | |
| .progress-bar { | |
| height: 8px !important; | |
| border-radius: 4px !important; | |
| background: #ebf8ff !important; | |
| } | |
| .progress-bar-fill { | |
| background: #4299e1 !important; | |
| border-radius: 4px !important; | |
| } | |
| /* Add some spacing between sections */ | |
| .gap { | |
| margin: 2rem 0; | |
| } | |
| """ | |
| # Create a custom handler that will store logs in a queue | |
| class QueueHandler(logging.Handler): | |
| def __init__(self, log_queue): | |
| super().__init__() | |
| self.log_queue = log_queue | |
| def emit(self, record): | |
| log_entry = self.format(record) | |
| try: | |
| self.log_queue.put_nowait(f"{datetime.now().strftime('%H:%M:%S')} - {log_entry}") | |
| except queue.Full: | |
| pass # Ignore if queue is full | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, config.LOG_LEVEL), | |
| format='%(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| logger.info(f"IS_DEPLOYMENT: {IS_DEPLOYMENT}") | |
| class InMemoryStorage: | |
| """Simple in-memory storage for deployment mode""" | |
| def __init__(self): | |
| self.urls = {} | |
| self.pages = {} | |
| def reset(self): | |
| self.urls.clear() | |
| self.pages.clear() | |
| def add_url(self, url_obj): | |
| self.urls[url_obj.url] = url_obj | |
| def add_page(self, page_obj): | |
| self.pages[page_obj.url] = page_obj | |
| def get_url(self, url): | |
| return self.urls.get(url) | |
| def get_page(self, url): | |
| return self.pages.get(url) | |
| class SEOAnalyzer: | |
| """ | |
| SEO Analyzer that combines web crawler with OpenAI analysis | |
| """ | |
| def __init__(self, api_key: str): | |
| """Initialize SEO Analyzer""" | |
| self.client = OpenAI(api_key=api_key) | |
| self.crawler = None | |
| self.crawled_pages = [] | |
| self.pages_crawled = 0 | |
| self.max_pages = 0 | |
| self.crawl_complete = threading.Event() | |
| self.log_queue = queue.Queue(maxsize=1000) | |
| self.session_id = str(uuid.uuid4()) | |
| self.storage = InMemoryStorage() if IS_DEPLOYMENT else None | |
| # Add queue handler to logger | |
| queue_handler = QueueHandler(self.log_queue) | |
| queue_handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s')) | |
| logger.addHandler(queue_handler) | |
| def _setup_session_storage(self) -> Tuple[str, str, str]: | |
| """ | |
| Set up session-specific storage directories | |
| Returns: | |
| Tuple of (storage_path, html_path, log_path) | |
| """ | |
| # Create session-specific paths | |
| session_storage = os.path.join(config.STORAGE_PATH, self.session_id) | |
| session_html = os.path.join(session_storage, "html") | |
| session_logs = os.path.join(session_storage, "logs") | |
| # Create directories | |
| os.makedirs(session_storage, exist_ok=True) | |
| os.makedirs(session_html, exist_ok=True) | |
| os.makedirs(session_logs, exist_ok=True) | |
| logger.info(f"Created session storage at {session_storage}") | |
| return session_storage, session_html, session_logs | |
| def _cleanup_session_storage(self): | |
| """Clean up session-specific storage""" | |
| session_path = os.path.join(config.STORAGE_PATH, self.session_id) | |
| try: | |
| if os.path.exists(session_path): | |
| shutil.rmtree(session_path) | |
| logger.info(f"Cleaned up session storage at {session_path}") | |
| except Exception as e: | |
| logger.error(f"Error cleaning up session storage: {e}") | |
| def _reset_storage(self): | |
| """Reset storage based on deployment mode""" | |
| if IS_DEPLOYMENT: | |
| self.storage.reset() | |
| else: | |
| reset_databases() | |
| def analyze_website(self, url: str, max_pages: int = 10, progress: gr.Progress = gr.Progress()) -> Tuple[str, List[Dict], str]: | |
| """ | |
| Crawl website and analyze SEO using OpenAI | |
| Args: | |
| url: Seed URL to crawl | |
| max_pages: Maximum number of pages to crawl | |
| progress: Gradio progress indicator | |
| Returns: | |
| Tuple of (overall analysis, list of page-specific analyses, log output) | |
| """ | |
| try: | |
| # Reset state | |
| self.crawled_pages = [] | |
| self.pages_crawled = 0 | |
| self.max_pages = max_pages | |
| self.crawl_complete.clear() | |
| # Set up storage | |
| if IS_DEPLOYMENT: | |
| # Use temporary directory for file storage in deployment | |
| temp_dir = tempfile.mkdtemp() | |
| session_storage = temp_dir | |
| session_html = os.path.join(temp_dir, "html") | |
| session_logs = os.path.join(temp_dir, "logs") | |
| os.makedirs(session_html, exist_ok=True) | |
| os.makedirs(session_logs, exist_ok=True) | |
| else: | |
| session_storage, session_html, session_logs = self._setup_session_storage() | |
| # Update config paths for this session | |
| config.HTML_STORAGE_PATH = session_html | |
| config.LOG_PATH = session_logs | |
| # Clear log queue | |
| while not self.log_queue.empty(): | |
| self.log_queue.get_nowait() | |
| logger.info(f"Starting analysis of {url} with max_pages={max_pages}") | |
| # Reset storage | |
| logger.info("Resetting storage...") | |
| self._reset_storage() | |
| logger.info("Storage reset completed") | |
| # Create new crawler instance with appropriate storage | |
| logger.info("Creating crawler instance...") | |
| if IS_DEPLOYMENT: | |
| # In deployment mode, use in-memory storage | |
| self.crawler = Crawler(storage=self.storage) | |
| # Set frontier to use memory mode | |
| self.crawler.frontier = URLFrontier(use_memory=True) | |
| else: | |
| # In local mode, use MongoDB and Redis | |
| self.crawler = Crawler() | |
| logger.info("Crawler instance created successfully") | |
| # Extract domain for filtering | |
| domain = self._extract_domain(url) | |
| logger.info(f"Analyzing domain: {domain}") | |
| # Add seed URL and configure domain filter | |
| self.crawler.add_seed_urls([url]) | |
| config.ALLOWED_DOMAINS = [domain] | |
| logger.info("Added seed URL and configured domain filter") | |
| # Override the crawler's _process_url method to capture pages | |
| original_process_url = self.crawler._process_url | |
| def wrapped_process_url(url_obj): | |
| if self.pages_crawled >= self.max_pages: | |
| self.crawler.running = False # Signal crawler to stop | |
| self.crawl_complete.set() | |
| return | |
| original_process_url(url_obj) | |
| # Get the page based on storage mode | |
| if IS_DEPLOYMENT: | |
| # In deployment mode, get page from in-memory storage | |
| page = self.storage.get_page(url_obj.url) | |
| if page: | |
| _, metadata = self.crawler.parser.parse(page) | |
| self.crawled_pages.append({ | |
| 'url': url_obj.url, | |
| 'content': page.content, | |
| 'metadata': metadata | |
| }) | |
| self.pages_crawled += 1 | |
| logger.info(f"Crawled page {self.pages_crawled}/{max_pages}: {url_obj.url}") | |
| else: | |
| # In local mode, get page from MongoDB | |
| page_data = self.crawler.pages_collection.find_one({'url': url_obj.url}) | |
| if page_data and page_data.get('content'): | |
| _, metadata = self.crawler.parser.parse(Page(**page_data)) | |
| self.crawled_pages.append({ | |
| 'url': url_obj.url, | |
| 'content': page_data['content'], | |
| 'metadata': metadata | |
| }) | |
| self.pages_crawled += 1 | |
| logger.info(f"Crawled page {self.pages_crawled}/{max_pages}: {url_obj.url}") | |
| if self.pages_crawled >= self.max_pages: | |
| self.crawler.running = False # Signal crawler to stop | |
| self.crawl_complete.set() | |
| self.crawler._process_url = wrapped_process_url | |
| def run_crawler(): | |
| try: | |
| # Skip signal handler registration | |
| self.crawler.running = True | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| try: | |
| futures = [executor.submit(self.crawler._crawl_worker)] | |
| for future in futures: | |
| future.result() | |
| except Exception as e: | |
| logger.error(f"Error in crawler worker: {e}") | |
| finally: | |
| self.crawler.running = False | |
| self.crawl_complete.set() | |
| except Exception as e: | |
| logger.error(f"Error in run_crawler: {e}") | |
| self.crawl_complete.set() | |
| # Start crawler in a thread | |
| crawler_thread = threading.Thread(target=run_crawler) | |
| crawler_thread.daemon = True | |
| crawler_thread.start() | |
| # Wait for completion or timeout with progress updates | |
| timeout = 300 # 5 minutes | |
| start_time = time.time() | |
| last_progress = 0 | |
| while not self.crawl_complete.is_set() and time.time() - start_time < timeout: | |
| current_progress = min(0.8, self.pages_crawled / max_pages) | |
| if current_progress != last_progress: | |
| progress(current_progress, f"Crawled {self.pages_crawled}/{max_pages} pages") | |
| last_progress = current_progress | |
| time.sleep(0.1) # More frequent updates | |
| if time.time() - start_time >= timeout: | |
| logger.warning("Crawler timed out") | |
| self.crawler.running = False | |
| # Wait for thread to finish | |
| crawler_thread.join(timeout=10) | |
| # Restore original method | |
| self.crawler._process_url = original_process_url | |
| # Collect all logs | |
| logs = [] | |
| while not self.log_queue.empty(): | |
| logs.append(self.log_queue.get_nowait()) | |
| log_output = "\n".join(logs) | |
| if not self.crawled_pages: | |
| self._cleanup_session_storage() | |
| return "No pages were successfully crawled.", [], log_output | |
| logger.info("Starting OpenAI analysis...") | |
| progress(0.9, "Analyzing crawled pages with OpenAI...") | |
| # Analyze crawled pages with OpenAI | |
| overall_analysis = self._get_overall_analysis(self.crawled_pages) | |
| progress(0.95, "Generating page-specific analyses...") | |
| page_analyses = self._get_page_analyses(self.crawled_pages) | |
| logger.info("Analysis complete") | |
| progress(1.0, "Analysis complete") | |
| # Format the results | |
| formatted_analysis = f""" | |
| # SEO Analysis Report for {domain} | |
| ## Overall Analysis | |
| {overall_analysis} | |
| ## Page-Specific Analyses | |
| """ | |
| for page_analysis in page_analyses: | |
| formatted_analysis += f""" | |
| ### {page_analysis['url']} | |
| {page_analysis['analysis']} | |
| """ | |
| # Clean up all resources | |
| logger.info("Cleaning up resources...") | |
| if IS_DEPLOYMENT: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| self.storage.reset() | |
| else: | |
| self._cleanup_session_storage() | |
| self._reset_storage() | |
| logger.info("All resources cleaned up") | |
| return formatted_analysis, page_analyses, log_output | |
| except Exception as e: | |
| logger.error(f"Error analyzing website: {e}") | |
| # Clean up all resources even on error | |
| if IS_DEPLOYMENT: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| self.storage.reset() | |
| else: | |
| self._cleanup_session_storage() | |
| self._reset_storage() | |
| # Collect all logs | |
| logs = [] | |
| while not self.log_queue.empty(): | |
| logs.append(self.log_queue.get_nowait()) | |
| log_output = "\n".join(logs) | |
| return f"Error analyzing website: {str(e)}", [], log_output | |
| def _extract_domain(self, url: str) -> str: | |
| """Extract domain from URL""" | |
| extracted = tldextract.extract(url) | |
| return f"{extracted.domain}.{extracted.suffix}" | |
| def _get_overall_analysis(self, pages: List[Dict]) -> str: | |
| """Get overall SEO analysis using OpenAI""" | |
| try: | |
| # Prepare site overview for analysis | |
| site_overview = { | |
| 'num_pages': len(pages), | |
| 'pages': [{ | |
| 'url': page['url'], | |
| 'metadata': page['metadata'] | |
| } for page in pages] | |
| } | |
| # Create analysis prompt | |
| prompt = f""" | |
| You are an expert SEO consultant. Analyze this website's SEO based on the crawled data: | |
| {json.dumps(site_overview, indent=2)} | |
| Provide a comprehensive SEO analysis including: | |
| 1. Overall site structure and navigation | |
| 2. Common SEO issues across pages | |
| 3. Content quality and optimization | |
| 4. Technical SEO recommendations | |
| 5. Priority improvements | |
| Format your response in Markdown. | |
| """ | |
| # Get analysis from OpenAI | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert SEO consultant providing detailed website analysis."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.7, | |
| max_tokens=2000 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| logger.error(f"Error getting overall analysis: {e}") | |
| return f"Error generating overall analysis: {str(e)}" | |
| def _get_page_analyses(self, pages: List[Dict]) -> List[Dict]: | |
| """Get page-specific SEO analyses using OpenAI""" | |
| page_analyses = [] | |
| for page in pages: | |
| try: | |
| # Create page analysis prompt | |
| prompt = f""" | |
| Analyze this page's SEO: | |
| URL: {page['url']} | |
| Metadata: {json.dumps(page['metadata'], indent=2)} | |
| Provide specific recommendations for: | |
| 1. Title and meta description | |
| 2. Heading structure | |
| 3. Content optimization | |
| 4. Internal linking | |
| 5. Technical improvements | |
| Format your response in Markdown. | |
| """ | |
| # Get analysis from OpenAI | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert SEO consultant providing detailed page analysis."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.7, | |
| max_tokens=1000 | |
| ) | |
| page_analyses.append({ | |
| 'url': page['url'], | |
| 'analysis': response.choices[0].message.content | |
| }) | |
| # Sleep to respect rate limits | |
| time.sleep(1) | |
| except Exception as e: | |
| logger.error(f"Error analyzing page {page['url']}: {e}") | |
| page_analyses.append({ | |
| 'url': page['url'], | |
| 'analysis': f"Error analyzing page: {str(e)}" | |
| }) | |
| return page_analyses | |
| def create_ui() -> gr.Interface: | |
| """Create Gradio interface""" | |
| def analyze(url: str, api_key: str, max_pages: int, progress: gr.Progress = gr.Progress()) -> Tuple[str, str]: | |
| """Gradio interface function""" | |
| try: | |
| # Initialize analyzer | |
| analyzer = SEOAnalyzer(api_key) | |
| # Run analysis with progress updates | |
| analysis, _, logs = analyzer.analyze_website(url, max_pages, progress) | |
| # Collect all logs | |
| log_output = "" | |
| while not analyzer.log_queue.empty(): | |
| try: | |
| log_output += analyzer.log_queue.get_nowait() + "\n" | |
| except queue.Empty: | |
| break | |
| # Set progress to complete | |
| progress(1.0, "Analysis complete") | |
| # Return results | |
| return analysis, log_output | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| logger.error(error_msg) | |
| return error_msg, error_msg | |
| # Create markdown content for the about section | |
| about_markdown = """ | |
| # π SEO Analyzer Pro | |
| Analyze your website's SEO performance using advanced crawling and AI technology. | |
| ### Features: | |
| - π·οΈ Intelligent Web Crawling | |
| - π§ AI-Powered Analysis | |
| - π Comprehensive Reports | |
| - π Performance Insights | |
| ### How to Use: | |
| 1. Enter your website URL | |
| 2. Provide your OpenAI API key | |
| 3. Choose how many pages to analyze | |
| 4. Click Analyze and watch the magic happen! | |
| ### What You'll Get: | |
| - Detailed SEO analysis | |
| - Content quality assessment | |
| - Technical recommendations | |
| - Performance insights | |
| - Actionable improvements | |
| """ | |
| # Create the interface with custom styling | |
| with gr.Blocks(css=CUSTOM_CSS) as iface: | |
| gr.Markdown(about_markdown) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| with gr.Group(elem_classes="input-section"): | |
| gr.Markdown("### π Enter Website Details") | |
| url_input = gr.Textbox( | |
| label="Website URL", | |
| placeholder="https://example.com", | |
| elem_classes="input-container", | |
| info="Enter the full URL of the website you want to analyze (e.g., https://example.com)" | |
| ) | |
| api_key = gr.Textbox( | |
| label="OpenAI API Key", | |
| placeholder="sk-...", | |
| type="password", | |
| elem_classes="input-container", | |
| info="Your OpenAI API key is required for AI-powered analysis. Keep this secure!" | |
| ) | |
| max_pages = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=10, | |
| step=1, | |
| label="Maximum Pages to Crawl", | |
| elem_classes="slider-container", | |
| info="Choose how many pages to analyze. More pages = more comprehensive analysis but takes longer" | |
| ) | |
| analyze_btn = gr.Button( | |
| "π Analyze Website", | |
| elem_classes="primary-button" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Group(elem_classes="analysis-section"): | |
| gr.Markdown("### π Analysis Results") | |
| analysis_output = gr.Markdown( | |
| label="SEO Analysis", | |
| elem_classes="markdown-output" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Group(elem_classes="log-section"): | |
| gr.Markdown("### π Process Logs") | |
| logs_output = gr.Textbox( | |
| label="Logs", | |
| lines=10, | |
| elem_classes="log-output" | |
| ) | |
| # Connect the button click to the analyze function | |
| analyze_btn.click( | |
| fn=analyze, | |
| inputs=[url_input, api_key, max_pages], | |
| outputs=[analysis_output, logs_output], | |
| ) | |
| return iface | |
| if __name__ == "__main__": | |
| # Create base storage directory if it doesn't exist | |
| os.makedirs(config.STORAGE_PATH, exist_ok=True) | |
| # Create and launch UI | |
| ui = create_ui() | |
| ui.launch( | |
| share=False, | |
| server_name="0.0.0.0", | |
| show_api=False, | |
| show_error=True, | |
| ) |