import os import concurrent.futures from typing import List, Dict, Callable, Any, Tuple from openai import OpenAI import voyageai from utils import SafeProgress import json # Centralized API clients def get_openai_client(): """Get a configured OpenAI client""" OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") return OpenAI(api_key=OPENAI_API_KEY) def get_voyage_client(): """Get a configured Voyage AI client""" return voyageai.Client() # General batch processing utilities def process_batch(items_batch: List[Any], processor_func: Callable) -> Dict: """ Process a batch of items using the provided processor function Args: items_batch: List of items to process processor_func: Function that processes a single item and returns (key, value) Returns: Dictionary of processing results """ results = {} for item in items_batch: try: key, value = processor_func(item) results[key] = value except Exception as e: print(f"Error processing batch item '{item}': {e}") results[item] = [] return results def process_in_parallel( items: List[Any], processor_func: Callable, max_workers: int = 10, progress_tracker: Any = None, progress_start: float = 0.0, progress_end: float = 1.0, progress_desc: str = "Processing in parallel" ) -> Dict: """ Process items in parallel using thread pool Args: items: List of items to process processor_func: Function that processes a single item max_workers: Maximum number of threads progress_tracker: Optional progress tracking object progress_start: Starting progress percentage (0.0-1.0) progress_end: Ending progress percentage (0.0-1.0) progress_desc: Description for the progress tracker Returns: Combined results dictionary """ # Ensure reasonable number of workers max_workers = min(max_workers, len(items)) # Split items into batches batch_size = max(1, len(items) // max_workers) batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)] # Process batches in parallel results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_batch = {executor.submit(process_batch, batch, processor_func): i for i, batch in enumerate(batches)} for i, future in enumerate(concurrent.futures.as_completed(future_to_batch)): batch_index = future_to_batch[future] # Update progress if tracker provided if progress_tracker: progress_percent = progress_start + ((progress_end - progress_start) * (i+1) / len(batches)) progress_tracker(progress_percent, desc=f"{progress_desc}: batch {batch_index+1}/{len(batches)}") try: batch_results = future.result() results.update(batch_results) except Exception as e: print(f"Error processing batch {batch_index}: {e}") return results def openai_structured_query( prompt: str, system_message: str = "You are a helpful assistant.", schema: dict = None, model: str = "o3-mini", client=None, schema_name: str = "structured_output" ) -> dict: """ Make an OpenAI API call with structured output format Args: prompt: The user prompt system_message: The system message to guide the model schema: JSON schema for structured output model: OpenAI model to use client: Optional pre-configured client, otherwise will be created schema_name: Name for the schema Returns: Parsed JSON response as dictionary """ if client is None: client = get_openai_client() try: response = client.responses.create( model=model, input=[ {"role": "system", "content": system_message}, {"role": "user", "content": prompt} ], text={ "format": { "type": "json_schema", "name": schema_name, "schema": schema, "strict": True } } ) # Parse the response return json.loads(response.output_text) except Exception as e: print(f"Error in OpenAI structured query: {e}") raise def rank_ingredients_openai( product: str, candidates: List[str], expanded_description: str = None, client=None, model: str = "o3-mini", max_results: int = 3, confidence_threshold: float = 0.5, debug: bool = False ) -> List[Tuple[str, float]]: """ Rank ingredients for a product using OpenAI Args: product: Product name candidates: List of candidate ingredients expanded_description: Optional expanded product description client: Optional pre-configured client model: OpenAI model to use max_results: Maximum number of results to return confidence_threshold: Minimum confidence threshold debug: Whether to print debug info Returns: List of (ingredient, confidence) tuples """ if not candidates: return [] if client is None: client = get_openai_client() if debug: print(f"Ranking for product: {product} with {len(candidates)} candidates") # Format prompt with expanded description if available prompt = f"Product: {product}" if expanded_description: prompt += f"\n\nExpanded description: {expanded_description}" prompt += f"\n\nPotential ingredients: {', '.join(candidates)}" # Define the ranking schema ranking_schema = { "type": "object", "properties": { "rankings": { "type": "array", "description": f"Only the top {max_results} most relevant ingredients with scores >= {confidence_threshold}", "items": { "type": "object", "properties": { "ingredient": { "type": "string", "description": "The name of the ingredient" }, "relevance_score": { "type": "number", "description": "Score between 0 and 1 indicating relevance" }, "explanation": { "type": "string", "description": "Brief explanation for the matching" } }, "required": ["ingredient", "relevance_score", "explanation"], "additionalProperties": False } } }, "required": ["rankings"], "additionalProperties": False } try: # Make the API call directly for more control response = client.responses.create( model=model, reasoning={"effort": "low"}, # Include effort parameter from ui_expanded_matching input=[ {"role": "system", "content": f"You are a food ingredient matching expert. Rank the top {max_results} ingredient based on how well they match the given product. Only include ingredients with relevance score >= {confidence_threshold}."}, {"role": "user", "content": prompt} ], text={ "format": { "type": "json_schema", "name": "ingredient_ranking", "schema": ranking_schema, "strict": True } } ) # Parse the response result = json.loads(response.output_text) # Process ranking results ingredients = [] for item in result["rankings"]: ingredient = item["ingredient"] score = float(item["relevance_score"]) ingredients.append((ingredient, score)) if debug: print(f"Ranking results for {product}: {len(ingredients)} ingredients") if ingredients: print(f"Top match: {ingredients[0]}") return ingredients except Exception as e: print(f"Error ranking ingredients for '{product}': {e}") return [] def rank_categories_openai( product: str, categories: dict, expanded_description: str = None, client=None, model: str = "o3-mini", max_results: int = 5, confidence_threshold: float = 0.5, debug: bool = False ) -> List[Tuple[str, float]]: """ Rank food categories for a product using OpenAI Args: product: Product name categories: Dictionary of category data expanded_description: Optional expanded product description client: Optional pre-configured client model: OpenAI model to use max_results: Maximum number of results to return confidence_threshold: Minimum confidence threshold debug: Whether to print debug info Returns: List of (category, confidence) tuples """ if not categories: return [] if client is None: client = get_openai_client() if debug: print(f"Category ranking for product: {product}") # Format categories for the prompt - handle both string and dict formats categories_text = "" for category_id, category_data in categories.items(): if isinstance(category_data, str): # Simple string description categories_text += f"- {category_id}: {category_data}\n" elif isinstance(category_data, dict) and 'description' in category_data: # Dictionary with description field categories_text += f"- {category_id}: {category_data['description']}\n" else: # Default case - just use the ID categories_text += f"- {category_id}\n" # categories_text += f"- {category_id}\n" # Format prompt with expanded description if available prompt = f"Product: {product}" # if expanded_description: # prompt += f"\n\nExpanded description: {expanded_description}" prompt += f"\n\nAvailable food categories:\n{categories_text}" # Define the ranking schema ranking_schema = { "type": "object", "properties": { "rankings": { "type": "array", "description": f"Only the top most relevant category with scores >= {confidence_threshold}", "items": { "type": "object", "properties": { "reasoning": { "type": "string", "description": "Reasoning, , step by step, first weigh options, then consider the best match" }, "category": { "type": "string", "description": "The name of the food category" }, "relevance_score": { "type": "number", "description": "Score between 0 and 1 indicating relevance" }, }, "required": ["category", "relevance_score", "reasoning"], # "required": ["category", "relevance_score", "explanation"], "additionalProperties": False } } }, "required": ["rankings"], "additionalProperties": False } try: # Make the API call response = client.responses.create( model=model, # reasoning={"effort": "low"}, input=[ {"role": "system", "content": f"You are a food categorization expert. Think this through step by step: Rank the top category based on how well it match the given product. Only include categories with relevance score >= {confidence_threshold}."}, {"role": "user", "content": prompt} ], text={ "format": { "type": "json_schema", "name": "category_ranking", "schema": ranking_schema, "strict": True } } ) # Parse the response result = json.loads(response.output_text) # Process ranking results categories = [] for item in result["rankings"]: category = item["category"] score = float(item["relevance_score"]) categories.append((category, score)) if debug: print(f"Category results for {product}: {len(categories)} categories") if categories: print(f"Top match: {categories[0]}") return categories except Exception as e: print(f"Error categorizing {product}: {e}") if debug: import traceback traceback.print_exc() return []