import os import concurrent.futures from typing import List, Dict, Callable, Any, Tuple from openai import OpenAI import voyageai from utils import SafeProgress import json # Centralized API clients def get_openai_client(): """Get a configured OpenAI client""" OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") return OpenAI(api_key=OPENAI_API_KEY) def get_voyage_client(): """Get a configured Voyage AI client""" return voyageai.Client() # General batch processing utilities def process_batch(items_batch: List[Any], processor_func: Callable) -> Dict: """ Process a batch of items using the provided processor function Args: items_batch: List of items to process processor_func: Function that processes a single item and returns (key, value) Returns: Dictionary of processing results """ results = {} for item in items_batch: try: key, value = processor_func(item) results[key] = value except Exception as e: print(f"Error processing batch item '{item}': {e}") results[item] = [] return results def process_in_parallel( items: List[Any], processor_func: Callable, max_workers: int = 10, progress_tracker: Any = None, progress_start: float = 0.0, progress_end: float = 1.0, progress_desc: str = "Processing in parallel" ) -> Dict: """ Process items in parallel using thread pool while preserving original order Args: items: List of items to process processor_func: Function that processes a single item max_workers: Maximum number of threads progress_tracker: Optional progress tracking object progress_start: Starting progress percentage (0.0-1.0) progress_end: Ending progress percentage (0.0-1.0) progress_desc: Description for the progress tracker Returns: Combined results dictionary with preserved input order """ # Ensure reasonable number of workers max_workers = min(max_workers, len(items)) # Track original item positions ordered_items = [(idx, item) for idx, item in enumerate(items)] # Define a wrapper function to preserve order def process_with_index(idx_item_pair): idx, item = idx_item_pair try: key, value = processor_func(item) return (idx, key, value) except Exception as e: print(f"Error processing item '{item}': {e}") return (idx, item, None) # Process items in parallel indexed_results = [] # Store (idx, key, value) tuples completed_count = 0 total_count = len(items) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(process_with_index, item_pair) for item_pair in ordered_items] for future in concurrent.futures.as_completed(futures): completed_count += 1 # Update progress if tracker provided if progress_tracker: progress_percent = progress_start + ((progress_end - progress_start) * completed_count / total_count) progress_tracker(progress_percent, desc=f"{progress_desc}: {completed_count}/{total_count}") try: result = future.result() if result[2] is not None: # (idx, key, value) - check if value is not None indexed_results.append(result) except Exception as e: print(f"Error processing item: {e}") # Sort results by original index and create ordered dictionary ordered_results = {} for idx, key, value in sorted(indexed_results, key=lambda x: x[0]): ordered_results[key] = value return ordered_results def openai_structured_query( prompt: str, system_message: str = "You are a helpful assistant.", schema: dict = None, model: str = "gpt-4o-mini", client=None, schema_name: str = "structured_output" ) -> dict: """ Make an OpenAI API call with structured output format Args: prompt: The user prompt system_message: The system message to guide the model schema: JSON schema for structured output model: OpenAI model to use client: Optional pre-configured client, otherwise will be created schema_name: Name for the schema Returns: Parsed JSON response as dictionary """ if client is None: client = get_openai_client() try: response = client.responses.create( model=model, input=[ {"role": "system", "content": system_message}, {"role": "user", "content": prompt} ], text={ "format": { "type": "json_schema", "name": schema_name, "schema": schema, "strict": True } } ) # Parse the response return json.loads(response.output_text) except Exception as e: print(f"Error in OpenAI structured query: {e}") raise def rank_ingredients_openai( product: str, candidates: List[str], expanded_description: str = None, client=None, model: str = "gpt-4o-mini", max_results: int = 3, confidence_threshold: float = 0.5, debug: bool = False ) -> List[Tuple[str, float]]: """ Rank ingredients for a product using OpenAI Args: product: Product name candidates: List of candidate ingredients expanded_description: Optional expanded product description client: Optional pre-configured client model: OpenAI model to use max_results: Maximum number of results to return confidence_threshold: Minimum confidence threshold debug: Whether to print debug info Returns: List of (ingredient, confidence) tuples """ if not candidates: return [] if client is None: client = get_openai_client() if debug: print(f"Ranking for product: {product} with {len(candidates)} candidates") # Format prompt with expanded description if available prompt = f"Product: {product}" if expanded_description: prompt += f"\n\nExpanded description: {expanded_description}" prompt += f"\n\nPotential ingredients: {', '.join(candidates)}" # Define the ranking schema ranking_schema = { "type": "object", "properties": { "rankings": { "type": "array", "description": f"Only the top {max_results} most relevant ingredients with scores >= {confidence_threshold}", "items": { "type": "object", "properties": { "ingredient": { "type": "string", "description": "The name of the ingredient" }, "relevance_score": { "type": "number", "description": "Score between 0 and 1 indicating relevance" }, "explanation": { "type": "string", "description": "Brief explanation for the matching" } }, "required": ["ingredient", "relevance_score", "explanation"], "additionalProperties": False } } }, "required": ["rankings"], "additionalProperties": False } try: # Make the API call directly for more control response = client.responses.create( model=model, # reasoning={"effort": "low"}, input=[ {"role": "system", "content": f""" You are a product categorization expert. Your task is to match product descriptions to the most relevant categories from the PROVIDED LIST ONLY. CRITICAL RULES: 1. You MUST ONLY select from the exact items listed in "Potential ingredients" - DO NOT create or invent new categories 2. Do not combine items from the list or add any words to them 3. Choose the items from the list that best match what the product IS or CONTAINS 4. If none of the items perfectly match, choose the closest matches from the provided list For the rankings: - Select ONLY from the exact items in the "Potential ingredients" list - Assign relevance scores from 0.0 to 1.0 - Rank the top {max_results} matching ingredients. - Provide brief explanations for why each item is relevant - Do not suggest alternatives outside the provided list Aim to identify the specific product category a consumer would look for when shopping for this exact item. Only include ingredients with relevance score >= {confidence_threshold}. Remember: Your ONLY options are the exact items listed in "Potential ingredients" - no additions, modifications, or combinations. """}, {"role": "user", "content": prompt} ], text={ "format": { "type": "json_schema", "name": "ingredient_ranking", "schema": ranking_schema, "strict": True } } ) # Parse the response result = json.loads(response.output_text) # Process ranking results ingredients = [] for item in result["rankings"]: ingredient = item["ingredient"] score = float(item["relevance_score"]) ingredients.append((ingredient, score)) if debug: print(f"Ranking results for {product}: {len(ingredients)} ingredients") if ingredients: print(f"Top match: {ingredients[0]}") return ingredients except Exception as e: print(f"Error ranking ingredients for '{product}': {e}") return [] def rank_categories_openai( product: str, categories: dict, expanded_description: str = None, client=None, model: str = "gpt-4o-mini", max_results: int = 5, confidence_threshold: float = 0.5, debug: bool = False ) -> List[Tuple[str, float]]: """ Rank food categories for a product using OpenAI Args: product: Product name categories: Dictionary of category data expanded_description: Optional expanded product description client: Optional pre-configured client model: OpenAI model to use max_results: Maximum number of results to return confidence_threshold: Minimum confidence threshold debug: Whether to print debug info Returns: List of (category, confidence) tuples """ if not categories: return [] if client is None: client = get_openai_client() if debug: print(f"Category ranking for product: {product}") # Format categories for the prompt - handle both string and dict formats categories_text = "" for category_id, category_data in categories.items(): if isinstance(category_data, str): # Simple string description # print(f"Category data: {category_data}, format: {type(category_data)}") # categories_text += f"- {category_id}: {category_data}\n" categories_text += category_id + "\n" # elif isinstance(category_data, dict) and 'description' in category_data: # print(f"Category data: {category_data}, format: {type(category_data)}") # # Dictionary with description field # categories_text += f"- {category_id}: {category_data['description']}\n" else: # Default case - just use the ID categories_text += f"- {category_id}\n" # categories_text += f"- {category_id}\n" # Format prompt with expanded description if available prompt = f"Product: {product}" if expanded_description: prompt += f"\n\nExpanded description: {expanded_description}" prompt += f"\n\nAvailable food categories:\n{categories_text}" # Define the ranking schema ranking_schema = { "type": "object", "properties": { "rankings": { "type": "array", "description": f"Only the top most relevant category with scores >= {confidence_threshold}", "items": { "type": "object", "properties": { "reasoning": { "type": "string", "description": "Reasoning, , step by step, first weigh options, then consider the best match" }, "category": { "type": "string", "description": "The name of the food category" }, "relevance_score": { "type": "number", "description": "Score between 0 and 1 indicating relevance" }, }, "required": ["category", "relevance_score", "reasoning"], # "required": ["category", "relevance_score", "explanation"], "additionalProperties": False } } }, "required": ["rankings"], "additionalProperties": False } try: # Make the API call response = client.responses.create( model=model, # reasoning={"effort": "low"}, input=[ {"role": "system", "content": f""" You are a product categorization expert. Your task is to match product descriptions to the most relevant categories from the PROVIDED LIST ONLY. CRITICAL RULES: 1. You MUST ONLY select from the exact items listed in "Potential ingredients" - DO NOT create or invent new categories 2. Do not combine items from the list or add any words to them 3. Choose the items from the list that best match what the product IS or CONTAINS 4. If none of the items perfectly match, choose the closest matches from the provided list For the rankings: - Select ONLY from the exact items in the "Potential ingredients" list - Assign relevance scores from 0.0 to 1.0 - Rank the top {max_results} matching ingredients. - Provide brief explanations for why each item is relevant - Do not suggest alternatives outside the provided list Aim to identify the specific product category a consumer would look for when shopping for this exact item. Only include ingredients with relevance score >= {confidence_threshold}. Remember: Your ONLY options are the exact items listed in "Potential ingredients" - no additions, modifications, or combinations. """}, {"role": "user", "content": prompt} ], text={ "format": { "type": "json_schema", "name": "category_ranking", "schema": ranking_schema, "strict": True } } ) # Parse the response result = json.loads(response.output_text) # Process ranking results categories = [] for item in result["rankings"]: category = item["category"] score = float(item["relevance_score"]) categories.append((category, score)) if debug: print(f"Category results for {product}: {len(categories)} categories") if categories: print(f"Top match: {categories[0]}") return categories except Exception as e: print(f"Error categorizing {product}: {e}") if debug: import traceback traceback.print_exc() return []