import json from typing import List, Dict, Any from openai import OpenAI import concurrent.futures from utils import SafeProgress from api_utils import get_openai_client def expand_product_descriptions(products: List[str], max_workers: int = 5, progress=None) -> Dict[str, str]: """ Expand product descriptions using OpenAI's structured output Args: products: List of product names to expand max_workers: Maximum number of concurrent API calls progress: Optional progress tracking object Returns: Dictionary mapping original product names to expanded descriptions """ progress_tracker = SafeProgress(progress, desc="Expanding product descriptions") # Set up OpenAI client openai_client = get_openai_client() expanded_descriptions = {} def process_product(product): try: response = openai_client.responses.create( # model="o3-mini", model="gpt-4o-mini", max_output_tokens=100, # reasoning={"effort": "low"}, input=[ {"role": "system", "content": """You are a product description expert. Your task is to expand product names into descriptions that would help an embedding model categorize them correctly. """}, {"role": "user", "content": f'Describe "{product}" to an embedding model categorizing products'} ], text={ "format": { "type": "json_schema", "name": "product_description", "schema": { "type": "object", "properties": { "expanded_description": { "type": "string", "description": "An expanded description of the product that includes its category, type, common ingredients or components, and typical use cases." } }, "required": ["expanded_description"], "additionalProperties": False }, "strict": True } } ) # Parse the response result = json.loads(response.output_text) return product, result["expanded_description"] except Exception as e: print(f"Error expanding description for '{product}': {e}") return product, f"{product} - No expanded description available." # Process in batches for better parallelism total_products = len(products) progress_tracker(0.1, desc=f"Processing {total_products} products") # Use thread pool for concurrent API calls with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_product = {executor.submit(process_product, product): i for i, product in enumerate(products)} for i, future in enumerate(concurrent.futures.as_completed(future_to_product)): progress_percent = 0.1 + (0.8 * (i+1) / total_products) product_index = future_to_product[future] progress_tracker(progress_percent, desc=f"Expanded {i+1}/{total_products} products") try: original_product, expanded_description = future.result() expanded_descriptions[original_product] = expanded_description except Exception as e: product = products[product_index] print(f"Error processing expansion for '{product}': {e}") expanded_descriptions[product] = product # Fallback to original product name progress_tracker(1.0, desc="Expansion complete") return expanded_descriptions