product_ingredient_demo / openai_expansion.py
esilver's picture
Fixed pipelines bugs
39f78ce
raw
history blame
4.52 kB
import json
from typing import List, Dict, Any
from openai import OpenAI
import concurrent.futures
from utils import SafeProgress
from api_utils import get_openai_client
def expand_product_descriptions(products: List[str],
max_workers: int = 5,
progress=None) -> Dict[str, str]:
"""
Expand product descriptions using OpenAI's structured output
Args:
products: List of product names to expand
max_workers: Maximum number of concurrent API calls
progress: Optional progress tracking object
Returns:
Dictionary mapping original product names to expanded descriptions
"""
progress_tracker = SafeProgress(progress, desc="Expanding product descriptions")
# Set up OpenAI client
openai_client = get_openai_client()
expanded_descriptions = {}
def process_product(product):
try:
response = openai_client.responses.create(
# model="o3-mini",
model="gpt-4o-mini",
# max_output_tokens=100,
# reasoning={"effort": "low"},
input=[
{"role": "system", "content": """You are a product description expert. Your task is to convert brand names or abbreviated product mentions into clear product descriptions that help
categorization models understand WHAT the item is, not just who makes it. Focus on the product category, form, and usage rather
than brand attributes. When brands are mentioned (like Green Mountain), identify the actual product type they represent (coffee pods)
rather than describing the brand itself. Always provide the most common product interpretation and avoid mentioning brands in your description when possible.
"""},
{"role": "user", "content": f'Describe "{product}" to an embedding model categorizing products'}
],
text={
"format": {
"type": "json_schema",
"name": "product_description",
"schema": {
"type": "object",
"properties": {
"expanded_description": {
"type": "string",
"description": "A concise description of the product, if multiple interpretations are possible, provide the most common one."
},
},
"required": ["expanded_description"],
"additionalProperties": False
},
"strict": True
}
}
)
# Parse the response
result = json.loads(response.output_text)
return product, result["expanded_description"]
except Exception as e:
print(f"Error expanding description for '{product}': {e}")
return product, f"{product} - No expanded description available."
# Process in batches for better parallelism
total_products = len(products)
progress_tracker(0.1, desc=f"Processing {total_products} products")
# Use thread pool for concurrent API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_product = {executor.submit(process_product, product): i
for i, product in enumerate(products)}
for i, future in enumerate(concurrent.futures.as_completed(future_to_product)):
progress_percent = 0.1 + (0.8 * (i+1) / total_products)
product_index = future_to_product[future]
progress_tracker(progress_percent, desc=f"Expanded {i+1}/{total_products} products")
try:
original_product, expanded_description = future.result()
expanded_descriptions[original_product] = expanded_description
except Exception as e:
product = products[product_index]
print(f"Error processing expansion for '{product}': {e}")
expanded_descriptions[product] = product # Fallback to original product name
progress_tracker(1.0, desc="Expansion complete")
return expanded_descriptions