Spaces:
Sleeping
Sleeping
from typing import Optional, Iterator, Any | |
from tqdm import tqdm as tqdm_original | |
import sys | |
import pickle | |
import json | |
import os | |
class SafeProgress: | |
"""Wrapper for progress tracking that handles both tqdm (console) and Gradio progress""" | |
def __init__(self, progress_obj=None, desc="Processing", track_tqdm=True): | |
self.progress = progress_obj | |
self.desc = desc | |
self.track_tqdm = track_tqdm | |
self.console_progress = None | |
def __call__(self, value, desc=None): | |
"""Update progress indicator directly with a value""" | |
if desc is None: | |
desc = self.desc | |
# Update Gradio progress if available | |
if self.progress is not None: | |
try: | |
self.progress(value, desc=desc) | |
except Exception as e: | |
print(f"Progress update error: {e}") | |
# Always show console progress | |
if value < 1.0 and self.console_progress is None: | |
# Initialize console progress bar | |
self.console_progress = tqdm_original(total=100, desc=desc, file=sys.stdout) | |
self.console_progress.update(int(value * 100)) | |
elif value < 1.0: | |
# Update existing console progress bar | |
current = int(value * 100) | |
previous = self.console_progress.n | |
if current > previous: | |
self.console_progress.update(current - previous) | |
self.console_progress.set_description(desc) | |
elif self.console_progress is not None: | |
# Complete and close the progress bar | |
self.console_progress.update(100 - self.console_progress.n) | |
self.console_progress.close() | |
self.console_progress = None | |
def tqdm(self, iterable, desc=None, total=None): | |
"""Wrap an iterable with a progress bar for iteration""" | |
if desc is None: | |
desc = self.desc | |
# Track with Gradio if available | |
if self.progress is not None: | |
if hasattr(self.progress, 'tqdm'): | |
# Use Gradio's tqdm if available | |
for item in self.progress.tqdm(iterable, desc=desc, total=total): | |
yield item | |
return | |
# Always provide console progress bar | |
length = total if total is not None else len(iterable) if hasattr(iterable, "__len__") else None | |
with tqdm_original(iterable, desc=desc, total=length, file=sys.stdout) as pbar: | |
# Track progress in Gradio manually if needed | |
i = 0 | |
for item in pbar: | |
if self.progress is not None and length: | |
self.progress((i + 1) / length, desc=desc) | |
yield item | |
i += 1 | |
def load_embeddings(embeddings_path): | |
"""Load ingredient embeddings from pickle file""" | |
print(f"Loading ingredient embeddings from {embeddings_path}") | |
with open(embeddings_path, "rb") as f: | |
ingredients_embeddings = pickle.load(f) | |
print(f"Loaded {len(ingredients_embeddings)} ingredient embeddings") | |
return ingredients_embeddings | |
def preprocess_product_for_matching(product, progress=None, description=None): | |
""" | |
Preprocess a product for ingredient matching. | |
Args: | |
product (dict): Product dictionary containing at minimum 'name' and 'ingredients' | |
progress (SafeProgress, optional): Progress bar to update | |
description (str, optional): Description for progress update | |
Returns: | |
dict: Processed product with normalized fields ready for matching | |
""" | |
try: | |
# Extract essential product info | |
processed_product = { | |
'id': product.get('id', ''), | |
'name': product.get('name', '').strip(), | |
'ingredients': product.get('ingredients', '').strip(), | |
'image_url': product.get('image_url', ''), | |
'url': product.get('url', ''), | |
} | |
# Skip products without ingredients | |
if not processed_product['ingredients']: | |
if progress: | |
progress.update(1, description=f"{description}: Skipping product without ingredients") | |
return None | |
# Normalize ingredients text | |
processed_product['ingredients'] = processed_product['ingredients'].replace('\n', ' ').strip() | |
# Additional preprocessing could be added here | |
if progress: | |
progress.update(1, description=f"{description}: Processed {processed_product['name']}") | |
return processed_product | |
except Exception as e: | |
if progress: | |
progress.update(1, description=f"{description}: Error processing product: {str(e)}") | |
return None | |
# Keep these color utility functions in utils.py as they're generic helpers: | |
def get_confidence_color(score): | |
"""Get color based on confidence score""" | |
if score >= 0.8: | |
return "#1a8a38" # Strong green | |
elif score >= 0.65: | |
return "#4caf50" # Medium green | |
elif score >= 0.5: | |
return "#8bc34a" # Light green | |
else: | |
return "#9e9e9e" # Gray | |
def get_confidence_bg_color(score): | |
"""Get background color for confidence badge based on score""" | |
if score >= 0.8: | |
return "#2e7d32" # Dark green | |
elif score >= 0.65: | |
return "#558b2f" # Medium green | |
elif score >= 0.5: | |
return "#9e9d24" # Light green/yellow | |
else: | |
return "#757575" # Gray | |
def get_confidence_text_color(score): | |
"""Get text color that's readable on the confidence background""" | |
if score >= 0.5: | |
return "#ffffff" # White text on dark backgrounds | |
else: | |
return "#f5f5f5" # Light gray on gray background | |
# Remove any UI formatting-specific functions that now exist in ui_formatters.py: | |
# - format_categories_html | |
# - Any other UI formatting functions |