Spaces:
Sleeping
Sleeping
import json | |
import numpy as np | |
from typing import Dict, List, Tuple, Any | |
from category_matching import hybrid_category_matching | |
from similarity import hybrid_ingredient_matching | |
from api_utils import process_in_parallel, rank_ingredients_openai | |
from ui_formatters import format_comparison_html, create_results_container | |
from utils import SafeProgress | |
from chicory_api import call_chicory_parser | |
from embeddings import create_product_embeddings | |
from similarity import compute_similarities | |
def compare_ingredient_methods(products: List[str], ingredients_dict: Dict[str, Any], | |
embedding_top_n: int = 20, final_top_n: int = 3, | |
confidence_threshold: float = 0.5, match_type="ingredients", | |
progress=None, expanded_descriptions=None) -> Dict[str, Dict[str, List[Tuple]]]: | |
""" | |
Compare multiple ingredient/category matching methods on the same products | |
Args: | |
products: List of product names to process | |
ingredients_dict: Dictionary with ingredient embeddings | |
embedding_top_n: Number of top ingredients to retrieve using embeddings | |
final_top_n: Number of final results to show for each method | |
confidence_threshold: Minimum score threshold for final results | |
match_type: Type of matching to perform ('ingredients' or 'categories') | |
progress: Optional progress tracking object | |
Returns: | |
Dictionary mapping products to methods and their results | |
""" | |
# Add debug info at the beginning | |
if expanded_descriptions: | |
print(f"Expanded descriptions provided with {len(expanded_descriptions)} entries") | |
# Check a sample product to confirm it has expanded text | |
if products and products[0] in expanded_descriptions: | |
sample = expanded_descriptions[products[0]] | |
print(f"Sample expansion for '{products[0]}': {sample[:50]}...") | |
else: | |
print(f"WARNING: First product '{products[0] if products else 'None'}' not found in expanded descriptions") | |
progress_tracker = SafeProgress(progress, desc="Comparing matching methods") | |
# Step 1: Generate embeddings for all products (used by multiple methods) | |
progress_tracker(0.1, desc="Generating product embeddings") | |
# Use expanded descriptions for embeddings if available | |
if expanded_descriptions: | |
expanded_product_texts = [expanded_descriptions.get(p, p) for p in products] | |
product_embeddings = create_product_embeddings(expanded_product_texts, progress=progress_tracker, | |
original_products=products) # Keep original product IDs | |
else: | |
product_embeddings = create_product_embeddings(products, progress=progress_tracker) | |
# Step 2: Get embedding-based candidates for all products | |
progress_tracker(0.2, desc="Finding embedding candidates") | |
similarities = compute_similarities(ingredients_dict, product_embeddings) | |
# Filter to top N candidates per product | |
embedding_results = {} | |
for product, product_similarities in similarities.items(): | |
embedding_results[product] = product_similarities[:embedding_top_n] | |
# Step 3: Process with Chicory Parser | |
progress_tracker(0.3, desc="Running Chicory Parser") | |
# Import here to avoid circular imports | |
# from chicory_parser import parse_products | |
chicory_results = call_chicory_parser(products, progress=progress_tracker) | |
# Initialize result structure | |
comparison_results = {} | |
for product in products: | |
comparison_results[product] = { | |
"base": [], | |
"voyage": [], | |
"chicory": [], | |
"openai": [] | |
} | |
# Add basic embedding results | |
if product in embedding_results: | |
base_results = [] | |
for name, score in embedding_results[product]: | |
if score >= confidence_threshold: | |
base_results.append((name, score)) | |
comparison_results[product]["base"] = base_results[:final_top_n] | |
# Process Chicory results | |
chicory_matches = [] | |
if product in chicory_results: | |
chicory_data = chicory_results[product] | |
if isinstance(chicory_data, dict): | |
# Handle different response formats based on match type | |
ingredient = chicory_data.get("ingredient", "") | |
confidence = chicory_data.get("confidence", 0) | |
if ingredient and confidence >= confidence_threshold: | |
chicory_matches.append((ingredient, confidence)) | |
comparison_results[product]["chicory"] = chicory_matches | |
# Step 4: Process with Voyage AI | |
progress_tracker(0.4, desc="Processing with Voyage AI") | |
# Define processing function for Voyage | |
def process_voyage(product): | |
try: | |
# Get candidates from embedding results | |
candidates = [] | |
if product in embedding_results: | |
candidates = embedding_results[product] | |
if not candidates: | |
print(f"No candidates found for product: {product}") | |
return product, [] | |
# Create a proper dictionary with just this product if expanded_descriptions exists | |
expanded_product_desc = None | |
if expanded_descriptions and product in expanded_descriptions: | |
expanded_product_desc = {product: expanded_descriptions.get(product)} | |
# Rerank using Voyage | |
try: | |
if match_type == "ingredients": | |
# Convert candidates to the expected dictionary format | |
ingredient_dict = {} | |
for c in candidates: | |
if c[0] in ingredients_dict: # Get from the original embeddings | |
ingredient_dict[c[0]] = ingredients_dict[c[0]] | |
results = hybrid_ingredient_matching( | |
[product], # Pass as a list of one product | |
ingredient_dict, | |
expanded_descriptions=expanded_product_desc | |
) | |
else: | |
# Convert candidates to the expected format | |
candidate_dict = {c[0]: c[0] for c in candidates} | |
results = hybrid_category_matching( | |
products=[product], | |
categories=candidate_dict, | |
embedding_top_n=embedding_top_n, | |
final_top_n=final_top_n, | |
confidence_threshold=confidence_threshold, | |
expanded_descriptions=expanded_descriptions | |
) | |
# Handle special case: if results is a dictionary with product as key | |
if isinstance(results, dict): | |
results = results.get(product, []) | |
# No need to check 'product in results' when results is not a dict | |
# Ensure results are in the expected format | |
formatted_results = [] | |
for r in results[:final_top_n]: | |
if isinstance(r, dict) and "name" in r and "score" in r: | |
# Convert score to float to ensure type compatibility | |
try: | |
score = float(r["score"]) | |
if score >= confidence_threshold: | |
formatted_results.append((r["name"], score)) | |
except (ValueError, TypeError): | |
print(f"Invalid score format in result: {r}") | |
elif isinstance(r, tuple) and len(r) >= 2: | |
try: | |
# Handle 3-element tuples from category matching (id, description, score) | |
if len(r) >= 3: | |
score = float(r[2]) # Score is the third element | |
name = r[0] # Use category ID as the name | |
else: | |
# Standard 2-element tuple (name, score) | |
score = float(r[1]) | |
name = r[0] | |
if score >= confidence_threshold: | |
formatted_results.append((name, score)) | |
except (ValueError, TypeError): | |
print(f"Invalid score format in tuple: {r}") | |
return product, formatted_results | |
except Exception as e: | |
print(f"Error in Voyage AI reranking for {product}: {str(e)}") | |
# Fall back to embedding results | |
return product, [(c[0], c[1]) for c in candidates[:final_top_n] | |
if c[1] >= confidence_threshold] | |
except Exception as e: | |
print(f"Error processing {product} with Voyage: {str(e)}") | |
# Return an empty result as the ultimate fallback | |
return product, [] | |
# Process all products with Voyage in parallel | |
voyage_results = process_in_parallel( | |
items=products, | |
processor_func=process_voyage, | |
max_workers=min(20, len(products)), | |
progress_tracker=progress_tracker, | |
progress_start=0.4, | |
progress_end=0.65, | |
progress_desc="Voyage AI" | |
) | |
# Update comparison results with Voyage results | |
for product, results in voyage_results.items(): | |
if product in comparison_results: | |
comparison_results[product]["voyage"] = results | |
# Step 5: Process with OpenAI | |
progress_tracker(0.7, desc="Running OpenAI processing in parallel") | |
# Define processing function for OpenAI | |
def process_openai(product): | |
try: | |
# Get candidates from embedding results | |
candidates = [] | |
if product in embedding_results: | |
candidates = embedding_results[product] | |
if not candidates: | |
return product, [] | |
from api_utils import rank_ingredients_openai | |
# Extract just the names for OpenAI | |
candidate_names = [c[0] for c in candidates] | |
# Get expanded description if available | |
expanded_text = expanded_descriptions.get(product, product) if expanded_descriptions else product | |
# Use appropriate function based on match type | |
if match_type == "ingredients": | |
ranked_candidates = rank_ingredients_openai( | |
product=product, | |
candidates=candidate_names, | |
expanded_description=expanded_text | |
) | |
else: | |
# For categories, use a similar function but with category prompt | |
from api_utils import rank_categories_openai | |
# Convert the list of names to the dictionary format expected by rank_categories_openai | |
categories_dict = {name: name for name in candidate_names} | |
ranked_candidates = rank_categories_openai( | |
product=product, | |
categories=categories_dict, | |
expanded_description=expanded_text | |
) | |
return product, [(c[0], c[1]) for c in ranked_candidates[:final_top_n] | |
if c[1] >= confidence_threshold] | |
except Exception as e: | |
print(f"Error processing {product} with OpenAI: {str(e)}") | |
return product, [] | |
# Process all products with OpenAI in parallel | |
openai_results = process_in_parallel( | |
items=products, | |
processor_func=process_openai, | |
max_workers=min(20, len(products)), | |
progress_tracker=progress_tracker, | |
progress_start=0.7, | |
progress_end=0.95, | |
progress_desc="OpenAI" | |
) | |
# Update comparison results with OpenAI results | |
for product, results in openai_results.items(): | |
if product in comparison_results: | |
comparison_results[product]["openai"] = results | |
# After processing with each method, ensure consistent format | |
for product, method_results in comparison_results.items(): | |
# Ensure all results are in the same format | |
for method in method_results: | |
formatted_results = [] | |
for item in method_results[method]: | |
# Convert all results to (name, score) tuples | |
if isinstance(item, tuple) and len(item) >= 2: | |
formatted_results.append((str(item[0]), float(item[1]))) | |
elif isinstance(item, dict): | |
if "ingredient" in item: | |
name = item["ingredient"] | |
elif "category" in item: | |
name = item["category"] | |
else: | |
name = str(item) | |
if "relevance_score" in item: | |
score = float(item["relevance_score"]) | |
elif "confidence" in item: | |
score = float(item["confidence"]) | |
else: | |
score = 0.0 | |
formatted_results.append((name, score)) | |
else: | |
formatted_results.append((str(item), 0.0)) | |
method_results[method] = formatted_results | |
progress_tracker(1.0, desc="Comparison complete") | |
return comparison_results | |
def compare_ingredient_methods_ui(product_input, embedding_top_n=20, | |
final_top_n=3, confidence_threshold=0.5, | |
match_type="categories", use_expansion=False, progress=None): | |
""" | |
Compare multiple ingredient matching methods on the same products | |
Args: | |
product_input: Text input with product names or file path | |
embedding_top_n: Number of top ingredients to retrieve using embeddings | |
final_top_n: Number of final results to show for each method | |
confidence_threshold: Minimum score threshold for final results | |
match_type: Type of matching to perform ('ingredients' or 'categories') | |
use_expansion: Whether to use description expansion | |
progress: Optional progress tracking object | |
Returns: | |
HTML formatted comparison results | |
""" | |
from utils import SafeProgress, load_embeddings | |
progress_tracker = SafeProgress(progress, desc="Comparing matching methods") | |
progress_tracker(0.1, desc="Processing input") | |
# Split text input by lines and remove empty lines | |
if not product_input: | |
return "Please enter at least one product." | |
product_names = [p.strip() for p in product_input.split('\n') if p.strip()] | |
if not product_names: | |
return "Please enter at least one product." | |
# Load appropriate embeddings based on match type | |
try: | |
progress_tracker(0.2, desc="Loading embeddings") | |
if match_type == "ingredients": | |
embeddings_path = "data/ingredient_embeddings_voyageai.pkl" | |
embeddings_dict = load_embeddings(embeddings_path) | |
header_text = f"Comparing {len(product_names)} products using multiple ingredient matching methods." | |
else: # categories | |
embeddings_path = "data/category_embeddings.pickle" | |
embeddings_dict = load_embeddings(embeddings_path) | |
header_text = f"Comparing {len(product_names)} products using multiple category matching methods." | |
# Initialize expanded_products variable | |
expanded_products = None | |
print("USE EXPANSION:", use_expansion) | |
# Expand descriptions if requested | |
if use_expansion: | |
from openai_expansion import expand_product_descriptions | |
progress_tracker(0.25, desc="Expanding product descriptions") | |
expanded_products = expand_product_descriptions(product_names, progress=progress_tracker) | |
# Add at beginning of results | |
header_text = f"Comparing {len(product_names)} products using multiple {match_type} matching methods WITH expanded descriptions." | |
progress_tracker(0.3, desc="Comparing methods") | |
comparison_results = compare_ingredient_methods( | |
products=product_names, | |
ingredients_dict=embeddings_dict, | |
embedding_top_n=embedding_top_n, | |
final_top_n=final_top_n, | |
confidence_threshold=confidence_threshold, | |
match_type=match_type, | |
progress=progress_tracker, | |
expanded_descriptions=expanded_products | |
) | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
return f"<div style='color: red;'>Error comparing methods: {str(e)}<br><pre>{error_details}</pre></div>" | |
# Format results as HTML using centralized formatters | |
progress_tracker(0.9, desc="Formatting results") | |
result_elements = [] | |
for product in product_names: | |
if product in comparison_results: | |
expanded_text = expanded_products.get(product, "") if expanded_products else "" | |
result_elements.append(format_comparison_html( | |
product, | |
comparison_results[product], | |
expanded_description=expanded_text | |
)) | |
output_html = create_results_container( | |
result_elements, | |
header_text=header_text | |
) | |
progress_tracker(1.0, desc="Complete") | |
return output_html | |