import json from typing import List, Dict, Any from openai import OpenAI import concurrent.futures from utils import SafeProgress from api_utils import get_openai_client def expand_product_descriptions(products: List[str], max_workers: int = 5, progress=None) -> Dict[str, str]: """ Expand product descriptions using OpenAI's structured output Args: products: List of product names to expand max_workers: Maximum number of concurrent API calls progress: Optional progress tracking object Returns: Dictionary mapping original product names to expanded descriptions """ progress_tracker = SafeProgress(progress, desc="Expanding product descriptions") # Set up OpenAI client openai_client = get_openai_client() expanded_descriptions = {} def process_product(product): try: response = openai_client.responses.create( # model="o3-mini", model="gpt-4o-mini", # max_output_tokens=100, # reasoning={"effort": "low"}, input=[ {"role": "system", "content": """You are a product description expert. Your task is to expand product names into a one sentence descriptions that would help an embedding model categorize them correctly. if multiple variations are possible, provide the most common one. instead of listing the different variations. For example, if the product is in the fresh aisle mostly, it is not frozen. """}, {"role": "user", "content": f'Describe "{product}" to an embedding model categorizing products'} ], text={ "format": { "type": "json_schema", "name": "product_description", "schema": { "type": "object", "properties": { "expanded_description": { "type": "string", "description": "A concise description of the product, if multiple interpretations are possible, provide the most common one." }, }, "required": ["expanded_description"], "additionalProperties": False }, "strict": True } } ) # Parse the response result = json.loads(response.output_text) return product, result["expanded_description"] except Exception as e: print(f"Error expanding description for '{product}': {e}") return product, f"{product} - No expanded description available." # Process in batches for better parallelism total_products = len(products) progress_tracker(0.1, desc=f"Processing {total_products} products") # Use thread pool for concurrent API calls with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_product = {executor.submit(process_product, product): i for i, product in enumerate(products)} for i, future in enumerate(concurrent.futures.as_completed(future_to_product)): progress_percent = 0.1 + (0.8 * (i+1) / total_products) product_index = future_to_product[future] progress_tracker(progress_percent, desc=f"Expanded {i+1}/{total_products} products") try: original_product, expanded_description = future.result() expanded_descriptions[original_product] = expanded_description except Exception as e: product = products[product_index] print(f"Error processing expansion for '{product}': {e}") expanded_descriptions[product] = product # Fallback to original product name progress_tracker(1.0, desc="Expansion complete") return expanded_descriptions