product_ingredient_demo / api_utils.py
esilver's picture
updated prompt
7b33b6d
raw
history blame
16.9 kB
import os
import concurrent.futures
from typing import List, Dict, Callable, Any, Tuple
from openai import OpenAI
import voyageai
from utils import SafeProgress
import json
# Centralized API clients
def get_openai_client():
"""Get a configured OpenAI client"""
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
return OpenAI(api_key=OPENAI_API_KEY)
def get_voyage_client():
"""Get a configured Voyage AI client"""
return voyageai.Client()
# General batch processing utilities
def process_batch(items_batch: List[Any], processor_func: Callable) -> Dict:
"""
Process a batch of items using the provided processor function
Args:
items_batch: List of items to process
processor_func: Function that processes a single item and returns (key, value)
Returns:
Dictionary of processing results
"""
results = {}
for item in items_batch:
try:
key, value = processor_func(item)
results[key] = value
except Exception as e:
print(f"Error processing batch item '{item}': {e}")
results[item] = []
return results
def process_in_parallel(
items: List[Any],
processor_func: Callable,
max_workers: int = 10,
progress_tracker: Any = None,
progress_start: float = 0.0,
progress_end: float = 1.0,
progress_desc: str = "Processing in parallel"
) -> Dict:
"""
Process items in parallel using thread pool while preserving original order
Args:
items: List of items to process
processor_func: Function that processes a single item
max_workers: Maximum number of threads
progress_tracker: Optional progress tracking object
progress_start: Starting progress percentage (0.0-1.0)
progress_end: Ending progress percentage (0.0-1.0)
progress_desc: Description for the progress tracker
Returns:
Combined results dictionary with preserved input order
"""
# Ensure reasonable number of workers
max_workers = min(max_workers, len(items))
# Track original item positions
ordered_items = [(idx, item) for idx, item in enumerate(items)]
# Define a wrapper function to preserve order
def process_with_index(idx_item_pair):
idx, item = idx_item_pair
try:
key, value = processor_func(item)
return (idx, key, value)
except Exception as e:
print(f"Error processing item '{item}': {e}")
return (idx, item, None)
# Process items in parallel
indexed_results = [] # Store (idx, key, value) tuples
completed_count = 0
total_count = len(items)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_with_index, item_pair) for item_pair in ordered_items]
for future in concurrent.futures.as_completed(futures):
completed_count += 1
# Update progress if tracker provided
if progress_tracker:
progress_percent = progress_start + ((progress_end - progress_start) * completed_count / total_count)
progress_tracker(progress_percent, desc=f"{progress_desc}: {completed_count}/{total_count}")
try:
result = future.result()
if result[2] is not None: # (idx, key, value) - check if value is not None
indexed_results.append(result)
except Exception as e:
print(f"Error processing item: {e}")
# Sort results by original index and create ordered dictionary
ordered_results = {}
for idx, key, value in sorted(indexed_results, key=lambda x: x[0]):
ordered_results[key] = value
return ordered_results
def openai_structured_query(
prompt: str,
system_message: str = "You are a helpful assistant.",
schema: dict = None,
model: str = "gpt-4o-mini",
client=None,
schema_name: str = "structured_output"
) -> dict:
"""
Make an OpenAI API call with structured output format
Args:
prompt: The user prompt
system_message: The system message to guide the model
schema: JSON schema for structured output
model: OpenAI model to use
client: Optional pre-configured client, otherwise will be created
schema_name: Name for the schema
Returns:
Parsed JSON response as dictionary
"""
if client is None:
client = get_openai_client()
try:
response = client.responses.create(
model=model,
input=[
{"role": "system", "content": system_message},
{"role": "user", "content": prompt}
],
text={
"format": {
"type": "json_schema",
"name": schema_name,
"schema": schema,
"strict": True
}
}
)
# Parse the response
return json.loads(response.output_text)
except Exception as e:
print(f"Error in OpenAI structured query: {e}")
raise
def rank_ingredients_openai(
product: str,
candidates: List[str],
expanded_description: str = None,
client=None,
model: str = "gpt-4o-mini",
max_results: int = 3,
confidence_threshold: float = 0.5,
debug: bool = False
) -> List[Tuple[str, float]]:
"""
Rank ingredients for a product using OpenAI
Args:
product: Product name
candidates: List of candidate ingredients
expanded_description: Optional expanded product description
client: Optional pre-configured client
model: OpenAI model to use
max_results: Maximum number of results to return
confidence_threshold: Minimum confidence threshold
debug: Whether to print debug info
Returns:
List of (ingredient, confidence) tuples
"""
if not candidates:
return []
if client is None:
client = get_openai_client()
if debug:
print(f"Ranking for product: {product} with {len(candidates)} candidates")
# Format prompt with expanded description if available
prompt = f"Product: {product}"
if expanded_description:
prompt += f"\n\nExpanded description: {expanded_description}"
prompt += f"\n\nPotential ingredients: {', '.join(candidates)}"
# Define the ranking schema
ranking_schema = {
"type": "object",
"properties": {
"rankings": {
"type": "array",
"description": f"Only the top {max_results} most relevant ingredients with scores >= {confidence_threshold}",
"items": {
"type": "object",
"properties": {
"ingredient": {
"type": "string",
"description": "The name of the ingredient"
},
"relevance_score": {
"type": "number",
"description": "Score between 0 and 1 indicating relevance"
},
"explanation": {
"type": "string",
"description": "Brief explanation for the matching"
}
},
"required": ["ingredient", "relevance_score", "explanation"],
"additionalProperties": False
}
}
},
"required": ["rankings"],
"additionalProperties": False
}
try:
# Make the API call directly for more control
response = client.responses.create(
model=model,
# reasoning={"effort": "low"},
input=[
{"role": "system", "content": f"""
You are a product categorization expert. Your task is to match product descriptions to the most relevant categories from the PROVIDED LIST ONLY.
CRITICAL RULES:
1. You MUST ONLY select from the exact items listed in "Potential ingredients" - DO NOT create or invent new categories
2. Do not combine items from the list or add any words to them
3. Choose the items from the list that best match what the product IS or CONTAINS
4. If none of the items perfectly match, choose the closest matches from the provided list
For the rankings:
- Select ONLY from the exact items in the "Potential ingredients" list
- Assign relevance scores from 0.0 to 1.0
- Rank the top {max_results} matching ingredients.
- Provide brief explanations for why each item is relevant
- Do not suggest alternatives outside the provided list
Aim to identify the specific product category a consumer would look for when shopping for this exact item.
Only include ingredients with relevance score >= {confidence_threshold}.
Remember: Your ONLY options are the exact items listed in "Potential ingredients" - no additions, modifications, or combinations.
"""},
{"role": "user", "content": prompt}
],
text={
"format": {
"type": "json_schema",
"name": "ingredient_ranking",
"schema": ranking_schema,
"strict": True
}
}
)
# Parse the response
result = json.loads(response.output_text)
# Process ranking results
ingredients = []
for item in result["rankings"]:
ingredient = item["ingredient"]
score = float(item["relevance_score"])
ingredients.append((ingredient, score))
if debug:
print(f"Ranking results for {product}: {len(ingredients)} ingredients")
if ingredients:
print(f"Top match: {ingredients[0]}")
return ingredients
except Exception as e:
print(f"Error ranking ingredients for '{product}': {e}")
return []
def rank_categories_openai(
product: str,
categories: dict,
expanded_description: str = None,
client=None,
model: str = "gpt-4o-mini",
max_results: int = 5,
confidence_threshold: float = 0.5,
debug: bool = False
) -> List[Tuple[str, float]]:
"""
Rank food categories for a product using OpenAI
Args:
product: Product name
categories: Dictionary of category data
expanded_description: Optional expanded product description
client: Optional pre-configured client
model: OpenAI model to use
max_results: Maximum number of results to return
confidence_threshold: Minimum confidence threshold
debug: Whether to print debug info
Returns:
List of (category, confidence) tuples
"""
if not categories:
return []
if client is None:
client = get_openai_client()
if debug:
print(f"Category ranking for product: {product}")
# Format categories for the prompt - handle both string and dict formats
categories_text = ""
for category_id, category_data in categories.items():
if isinstance(category_data, str):
# Simple string description
# print(f"Category data: {category_data}, format: {type(category_data)}")
# categories_text += f"- {category_id}: {category_data}\n"
categories_text += category_id + "\n"
# elif isinstance(category_data, dict) and 'description' in category_data:
# print(f"Category data: {category_data}, format: {type(category_data)}")
# # Dictionary with description field
# categories_text += f"- {category_id}: {category_data['description']}\n"
else:
# Default case - just use the ID
categories_text += f"- {category_id}\n"
# categories_text += f"- {category_id}\n"
# Format prompt with expanded description if available
prompt = f"Product: {product}"
if expanded_description:
prompt += f"\n\nExpanded description: {expanded_description}"
prompt += f"\n\nAvailable food categories:\n{categories_text}"
# Define the ranking schema
ranking_schema = {
"type": "object",
"properties": {
"rankings": {
"type": "array",
"description": f"Only the top most relevant category with scores >= {confidence_threshold}",
"items": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": "Reasoning, , step by step, first weigh options, then consider the best match"
},
"category": {
"type": "string",
"description": "The name of the food category"
},
"relevance_score": {
"type": "number",
"description": "Score between 0 and 1 indicating relevance"
},
},
"required": ["category", "relevance_score", "reasoning"],
# "required": ["category", "relevance_score", "explanation"],
"additionalProperties": False
}
}
},
"required": ["rankings"],
"additionalProperties": False
}
try:
# Make the API call
response = client.responses.create(
model=model,
# reasoning={"effort": "low"},
input=[
{"role": "system", "content": f"""
You are a product categorization expert. Your task is to match product descriptions to the most relevant categories from the PROVIDED LIST ONLY.
CRITICAL RULES:
1. You MUST ONLY select from the exact items listed in "Potential ingredients" - DO NOT create or invent new categories
2. Do not combine items from the list or add any words to them
3. Choose the items from the list that best match what the product IS or CONTAINS
4. If none of the items perfectly match, choose the closest matches from the provided list
For the rankings:
- Select ONLY from the exact items in the "Potential ingredients" list
- Assign relevance scores from 0.0 to 1.0
- Rank the top {max_results} matching ingredients.
- Provide brief explanations for why each item is relevant
- Do not suggest alternatives outside the provided list
Aim to identify the specific product category a consumer would look for when shopping for this exact item.
Only include ingredients with relevance score >= {confidence_threshold}.
Remember: Your ONLY options are the exact items listed in "Potential ingredients" - no additions, modifications, or combinations.
"""},
{"role": "user", "content": prompt}
],
text={
"format": {
"type": "json_schema",
"name": "category_ranking",
"schema": ranking_schema,
"strict": True
}
}
)
# Parse the response
result = json.loads(response.output_text)
# Process ranking results
categories = []
for item in result["rankings"]:
category = item["category"]
score = float(item["relevance_score"])
categories.append((category, score))
if debug:
print(f"Category results for {product}: {len(categories)} categories")
if categories:
print(f"Top match: {categories[0]}")
return categories
except Exception as e:
print(f"Error categorizing {product}: {e}")
if debug:
import traceback
traceback.print_exc()
return []