Spaces:
Running
Running
import gradio as gr | |
import os | |
import asyncio | |
import nest_asyncio | |
from datetime import datetime | |
from typing import Optional, Dict, Any | |
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent | |
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination | |
from autogen_agentchat.teams import SelectorGroupChat | |
from autogen_ext.models.openai import OpenAIChatCompletionClient | |
from autogen_ext.agents.web_surfer import MultimodalWebSurfer | |
# Enable nested event loops | |
nest_asyncio.apply() | |
class AIShoppingAnalyzer: | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
os.environ["OPENAI_API_KEY"] = api_key | |
self.model_client = OpenAIChatCompletionClient(model="gpt-4o") | |
self.termination = MaxMessageTermination(max_messages=20) | TextMentionTermination("TERMINATE") | |
def create_websurfer(self) -> MultimodalWebSurfer: | |
"""Initialize the web surfer agent for e-commerce research""" | |
description = ( | |
"E-commerce research specialist that:\n" | |
"1. Searches multiple retailers for product options\n" | |
"2. Compares prices and reviews\n" | |
"3. Checks product specifications and availability\n" | |
"4. Analyzes website structure and findability\n" | |
"5. Detects and analyzes structured data (Schema.org, JSON-LD, Microdata)\n" | |
"6. Evaluates product markup and rich snippets\n" | |
"7. Checks for proper semantic HTML and data organization" | |
) | |
return MultimodalWebSurfer( | |
name="websurfer_agent", | |
model_client=self.model_client, | |
description=description, | |
headless=True, | |
to_save_screenshots=True, # Save screenshots for analysis | |
use_ocr=True, # Enable OCR for better text extraction | |
to_resize_viewport=True, # Ensure proper viewport sizing | |
debug_dir="debug_logs" # Save debug information | |
) | |
def create_assistant(self) -> AssistantAgent: | |
"""Initialize the shopping assistant agent""" | |
system_message = ( | |
"You are an expert shopping assistant and e-commerce analyst. " | |
"Analyze websites and provide reports in this format:\n\n" | |
"π E-COMMERCE ANALYSIS REPORT\n" | |
"============================\n" | |
"Site: {url}\n" | |
"Date: {date}\n\n" | |
"π FINDABILITY SCORE: [β β β β β]\n" | |
"-----------------------------\n" | |
"β’ Category Organization\n" | |
"β’ Navigation Structure\n" | |
"β’ Filter Systems\n\n" | |
"π INFORMATION QUALITY: [β β β β β]\n" | |
"------------------------------\n" | |
"β’ Product Details\n" | |
"β’ Image Quality\n" | |
"β’ Technical Specs\n" | |
"β’ Structured Data\n\n" | |
"π NAVIGATION & SEARCH: [β β β β β]\n" | |
"------------------------------\n" | |
"β’ Search Features\n" | |
"β’ User Experience\n" | |
"β’ Mobile Design\n\n" | |
"π° PRICING TRANSPARENCY: [β β β β β]\n" | |
"------------------------------\n" | |
"β’ Price Display\n" | |
"β’ Special Offers\n" | |
"β’ Comparison Tools\n\n" | |
"π OVERALL ASSESSMENT\n" | |
"-------------------\n" | |
"[Summary]\n\n" | |
"π§ TECHNICAL INSIGHTS\n" | |
"-------------------\n" | |
"[Technical Details]" | |
) | |
return AssistantAgent( | |
name="assistant_agent", | |
description="E-commerce shopping advisor and website analyzer", | |
system_message=system_message, | |
model_client=self.model_client | |
) | |
def create_team(self, websurfer_agent: MultimodalWebSurfer, assistant_agent: AssistantAgent) -> SelectorGroupChat: | |
"""Set up the team of agents""" | |
user_proxy = UserProxyAgent( | |
name="user_proxy", | |
description="A user looking for product recommendations" | |
) | |
return SelectorGroupChat( | |
participants=[websurfer_agent, assistant_agent, user_proxy], | |
selector_prompt="""You are coordinating a shopping assistance system. The following roles are available: | |
{roles} | |
Given the conversation history {history}, select the next role from {participants}. | |
- The websurfer_agent searches products and analyzes website structure | |
- The assistant_agent analyzes findings and makes recommendations | |
- The user_proxy provides input when needed | |
Return only the role name.""", | |
model_client=self.model_client, | |
termination_condition=self.termination | |
) | |
async def analyze_site(self, | |
website_url: str, | |
product_category: str, | |
specific_product: Optional[str] = None) -> str: | |
"""Run the analysis with proper cleanup""" | |
websurfer = None | |
try: | |
# Set up the analysis query | |
query = ( | |
f"Analyze the e-commerce experience for {website_url} focusing on:\n" | |
f"1. Product findability in the {product_category} category\n" | |
"2. Product information quality\n" | |
"3. Navigation and search functionality\n" | |
"4. Price visibility and comparison features" | |
) | |
if specific_product: | |
query += f"\n5. Detailed analysis of this specific product: {specific_product}" | |
# Initialize agents with proper configuration | |
websurfer = self.create_websurfer() | |
assistant = self.create_assistant() | |
team = self.create_team(websurfer, assistant) | |
try: | |
result = [] | |
async for message in team.run_stream(task=query): | |
if isinstance(message, str): | |
result.append(message) | |
else: | |
result.append(str(message)) | |
return "\n".join(result) | |
except EOFError: | |
return "Analysis completed with some limitations. Please try again if results are incomplete." | |
except Exception as e: | |
return f"Analysis error: {str(e)}" | |
finally: | |
if websurfer: | |
try: | |
# Properly close the browser | |
await websurfer.close() | |
print("Browser closed successfully") | |
except Exception as e: | |
print(f"Error closing browser: {str(e)}") | |
def create_gradio_interface() -> gr.Blocks: | |
"""Create the Gradio interface for the AI Shopping Analyzer""" | |
css = """ | |
@import url('https://fonts.googleapis.com/css2?family=Open+Sans:wght@300;400;600;700&display=swap'); | |
body { | |
font-family: 'Open Sans', sans-serif !important; | |
} | |
.dashboard-container { | |
border: 1px solid #e0e5ff; | |
border-radius: 8px; | |
background-color: #ffffff; | |
} | |
.token-header { | |
font-size: 1.25rem; | |
font-weight: 600; | |
margin-top: 1rem; | |
margin-bottom: 0.5rem; | |
} | |
.feature-button { | |
display: inline-block; | |
margin: 0.25rem; | |
padding: 0.5rem 1rem; | |
background-color: #f3f4f6; | |
border: 1px solid #e5e7eb; | |
border-radius: 0.375rem; | |
font-size: 0.875rem; | |
} | |
.feature-button:hover { | |
background-color: #e5e7eb; | |
} | |
.gr-form { | |
background: transparent !important; | |
border: none !important; | |
box-shadow: none !important; | |
} | |
.gr-input, .gr-textarea { | |
border: 1px solid #e5e7eb !important; | |
border-radius: 6px !important; | |
padding: 8px 12px !important; | |
font-size: 14px !important; | |
transition: all 0.2s !important; | |
} | |
.gr-input:focus, .gr-textarea:focus { | |
border-color: #3452DB !important; | |
outline: none !important; | |
box-shadow: 0 0 0 2px rgba(52, 82, 219, 0.2) !important; | |
} | |
.gr-button { | |
background-color: #3452DB !important; | |
color: white !important; | |
border-radius: 6px !important; | |
padding: 8px 16px !important; | |
font-size: 14px !important; | |
font-weight: 600 !important; | |
transition: all 0.2s !important; | |
} | |
.gr-button:hover { | |
background-color: #2a41af !important; | |
} | |
.analysis-output { | |
background: white; | |
padding: 20px; | |
border-radius: 8px; | |
border: 1px solid #e0e5ff; | |
margin-top: 20px; | |
font-family: 'Open Sans', sans-serif; | |
} | |
.analysis-output h1 { | |
font-size: 1.5em; | |
font-weight: bold; | |
margin-bottom: 1em; | |
color: #1a1a1a; | |
} | |
.analysis-output h2 { | |
font-size: 1.25em; | |
font-weight: 600; | |
margin-top: 1.5em; | |
margin-bottom: 0.5em; | |
color: #2a2a2a; | |
border-bottom: 2px solid #e0e5ff; | |
padding-bottom: 0.5em; | |
} | |
.analysis-output h3 { | |
font-size: 1.1em; | |
font-weight: 600; | |
margin-top: 1em; | |
margin-bottom: 0.5em; | |
color: #3a3a3a; | |
} | |
.analysis-output ul { | |
margin-left: 1.5em; | |
margin-bottom: 1em; | |
list-style-type: none; | |
} | |
.analysis-output li { | |
margin-bottom: 0.8em; | |
position: relative; | |
line-height: 1.6; | |
} | |
.analysis-output li:before { | |
content: "β’"; | |
position: absolute; | |
left: -1.2em; | |
color: #3452DB; | |
} | |
.analysis-output p { | |
margin-bottom: 1em; | |
line-height: 1.6; | |
color: #4a4a4a; | |
} | |
.analysis-output code { | |
background: #f3f4f6; | |
padding: 0.2em 0.4em; | |
border-radius: 4px; | |
font-size: 0.9em; | |
color: #3452DB; | |
} | |
/* Star rating styles */ | |
.star-rating { | |
color: #3452DB; | |
letter-spacing: 2px; | |
} | |
/* Section dividers */ | |
.section-divider { | |
border-top: 1px solid #e0e5ff; | |
margin: 2em 0; | |
} | |
/* Score indicators */ | |
.score-indicator { | |
background: #f8f9ff; | |
padding: 0.5em 1em; | |
border-radius: 4px; | |
border-left: 4px solid #3452DB; | |
margin: 1em 0; | |
} | |
/* Special formatting for emojis */ | |
.emoji-icon { | |
font-size: 1.2em; | |
margin-right: 0.5em; | |
vertical-align: middle; | |
} | |
""" | |
def format_markdown_report(report_text: str) -> str: | |
"""Format the report text with proper Markdown and styling""" | |
# Extract just the report content using markers | |
try: | |
start_marker = "π E-COMMERCE ANALYSIS REPORT" | |
end_marker = "TECHNICAL INSIGHTS" | |
# Find the report content | |
start_idx = report_text.find(start_marker) | |
if start_idx == -1: | |
return "Error: Could not find report content" | |
# Extract and clean the report | |
report_lines = [] | |
in_report = False | |
for line in report_text.split('\n'): | |
if start_marker in line: | |
in_report = True | |
report_lines.append("# " + line.strip()) | |
continue | |
if in_report: | |
# Skip empty lines | |
if not line.strip(): | |
continue | |
# Format section headers | |
if any(emoji in line for emoji in ['π', 'π', 'π', 'π°', 'π', 'π§']): | |
if ":" in line: | |
title, score = line.split(":", 1) | |
report_lines.append(f"\n## {title.strip()}") | |
if score.strip(): | |
report_lines.append(f"**Score: {score.strip()}**\n") | |
else: | |
report_lines.append(f"\n## {line.strip()}\n") | |
continue | |
# Format bullet points | |
if line.strip().startswith('β’'): | |
report_lines.append(line.replace('β’', '-')) | |
continue | |
# Add other lines as is | |
report_lines.append(line.strip()) | |
# Join the lines and clean up the formatting | |
report_text = '\n'.join(report_lines) | |
# Clean up multiple blank lines | |
report_text = '\n'.join(line for line, _ in itertools.groupby(report_text.split('\n'))) | |
# Ensure proper spacing around headers and bullet points | |
report_text = re.sub(r'\n#{1,2} ', r'\n\n# ', report_text) | |
report_text = re.sub(r'\n- ', r'\n\n- ', report_text) | |
return report_text | |
except Exception as e: | |
return f"Error formatting report: {str(e)}" | |
async def run_analysis(api_key: str, | |
website_url: str, | |
product_category: str, | |
specific_product: str) -> str: | |
"""Handle the analysis submission""" | |
if not api_key.startswith("sk-"): | |
return "Please enter a valid OpenAI API key (should start with 'sk-')" | |
if not website_url: | |
return "Please enter a website URL" | |
if not product_category: | |
return "Please specify a product category" | |
try: | |
analyzer = AIShoppingAnalyzer(api_key) | |
result = await analyzer.analyze_site( | |
website_url=website_url, | |
product_category=product_category, | |
specific_product=specific_product if specific_product else None | |
) | |
return format_markdown_report(result) | |
except Exception as e: | |
return f"Error during analysis: {str(e)}" | |
with gr.Blocks(css=css) as demo: | |
gr.HTML(""" | |
<div class="dashboard-container p-6"> | |
<h1 class="text-2xl font-bold mb-2">AI Shopping Agent Analyzer</h1> | |
<p class="text-gray-600 mb-6">Analyze how your e-commerce site performs with AI shoppers</p> | |
</div> | |
""") | |
with gr.Row(): | |
# Left column for inputs | |
with gr.Column(scale=1): | |
api_key = gr.Textbox( | |
label="OpenAI API Key", | |
placeholder="sk-...", | |
type="password", | |
container=True | |
) | |
website_url = gr.Textbox( | |
label="Website URL", | |
placeholder="https://your-store.com", | |
container=True | |
) | |
product_category = gr.Textbox( | |
label="Product Category", | |
placeholder="e.g., Electronics, Clothing, etc.", | |
container=True | |
) | |
specific_product = gr.Textbox( | |
label="Specific Product (Optional)", | |
placeholder="e.g., Blue Widget Model X", | |
container=True | |
) | |
analyze_button = gr.Button( | |
"Analyze Site", | |
size="lg" | |
) | |
# Right column for output | |
with gr.Column(scale=1): | |
analysis_output = gr.Markdown( | |
value="Results will appear here...", | |
label="Analysis Results", | |
elem_classes="analysis-output", | |
show_copy_button=True, | |
line_breaks=True | |
) | |
analyze_button.click( | |
fn=run_analysis, | |
inputs=[api_key, website_url, product_category, specific_product], | |
outputs=analysis_output | |
) | |
return demo | |
if __name__ == "__main__": | |
print("Setting up Playwright...") | |
try: | |
import subprocess | |
subprocess.run( | |
["playwright", "install", "chromium"], | |
check=True, | |
capture_output=True, | |
text=True | |
) | |
except Exception as e: | |
print(f"Warning: Playwright setup encountered an issue: {str(e)}") | |
print("Starting Gradio interface...") | |
demo = create_gradio_interface() | |
demo.launch() |