import json from typing import List, Dict, Any from openai import OpenAI import concurrent.futures from utils import SafeProgress from api_utils import get_openai_client def expand_product_descriptions(products: List[str], max_workers: int = 5, progress=None) -> Dict[str, str]: """ Expand product descriptions using OpenAI's structured output Args: products: List of product names to expand max_workers: Maximum number of concurrent API calls progress: Optional progress tracking object Returns: Dictionary mapping original product names to expanded descriptions """ progress_tracker = SafeProgress(progress, desc="Expanding product descriptions") # Set up OpenAI client openai_client = get_openai_client() expanded_descriptions = {} def process_product(product): try: response = openai_client.responses.create( # model="o3-mini", model="gpt-4o-mini", # max_output_tokens=100, # reasoning={"effort": "low"}, input=[ {"role": "system", "content": """You are a product description expert. Your task is to convert brand names or abbreviated product mentions into clear product descriptions that help categorization models understand WHAT the item is, not just who makes it. Focus on the product category, form, and usage rather than brand attributes. When brands are mentioned (like Green Mountain), identify the actual product type they represent (coffee pods) rather than describing the brand itself. Always provide the most common product interpretation and avoid mentioning brands in your description when possible. """}, {"role": "user", "content": f'Describe "{product}" to an embedding model categorizing products'} ], text={ "format": { "type": "json_schema", "name": "product_description", "schema": { "type": "object", "properties": { "expanded_description": { "type": "string", "description": "A concise description of the product, if multiple interpretations are possible, provide the most common one." }, }, "required": ["expanded_description"], "additionalProperties": False }, "strict": True } } ) # Parse the response result = json.loads(response.output_text) return product, result["expanded_description"] except Exception as e: print(f"Error expanding description for '{product}': {e}") return product, f"{product} - No expanded description available." # Process in batches for better parallelism total_products = len(products) progress_tracker(0.1, desc=f"Processing {total_products} products") # Use thread pool for concurrent API calls with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_product = {executor.submit(process_product, product): i for i, product in enumerate(products)} for i, future in enumerate(concurrent.futures.as_completed(future_to_product)): progress_percent = 0.1 + (0.8 * (i+1) / total_products) product_index = future_to_product[future] progress_tracker(progress_percent, desc=f"Expanded {i+1}/{total_products} products") try: original_product, expanded_description = future.result() expanded_descriptions[original_product] = expanded_description except Exception as e: product = products[product_index] print(f"Error processing expansion for '{product}': {e}") expanded_descriptions[product] = product # Fallback to original product name progress_tracker(1.0, desc="Expansion complete") return expanded_descriptions