Spaces:
No application file
No application file
# processor.py | |
""" | |
Main processing orchestrator that ties together all the manufacturing priority logic. | |
This module provides high-level functions that both the CLI and Gradio interfaces can use. | |
""" | |
import os | |
import pandas as pd | |
from typing import Dict, List, Tuple, Optional | |
from datetime import datetime | |
from config import WEIGHTS | |
from sheet_reader import list_sheets, read_sheet | |
from priority_logic import compute_priority | |
from output_writer import save_with_instructions | |
from utils import prompt_weights | |
class ManufacturingProcessor: | |
""" | |
Main processor class for manufacturing priority calculations. | |
Encapsulates all the logic needed to process Excel files and generate priority rankings. | |
""" | |
def __init__(self, weights: Optional[Dict[str, int]] = None): | |
"""Initialize processor with weights""" | |
self.weights = weights or WEIGHTS.copy() | |
self.validate_weights() | |
def validate_weights(self) -> None: | |
"""Ensure weights sum to 100""" | |
total = sum(self.weights.values()) | |
if total != 100: | |
raise ValueError(f"Weights must sum to 100, got {total}") | |
def get_file_info(self, file_path: str) -> Dict: | |
"""Get information about the Excel file""" | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
try: | |
sheets = list_sheets(file_path) | |
file_size = os.path.getsize(file_path) | |
return { | |
"file_path": file_path, | |
"file_name": os.path.basename(file_path), | |
"file_size": file_size, | |
"sheets": sheets, | |
"sheet_count": len(sheets) | |
} | |
except Exception as e: | |
raise Exception(f"Error reading file info: {e}") | |
def validate_sheet_data(self, df: pd.DataFrame) -> Dict: | |
"""Validate that the sheet has required columns and data""" | |
from priority_logic import REQUIRED_COLS | |
# Normalize column names | |
df_norm = df.copy() | |
df_norm.columns = [str(c).strip() for c in df_norm.columns] | |
# Check required columns | |
missing_cols = [col for col in REQUIRED_COLS if col not in df_norm.columns] | |
# Basic data validation | |
validation_result = { | |
"valid": len(missing_cols) == 0, | |
"missing_columns": missing_cols, | |
"available_columns": list(df.columns), | |
"row_count": len(df), | |
"empty_rows": df.isnull().all(axis=1).sum(), | |
"data_issues": [] | |
} | |
if validation_result["valid"]: | |
# Check for data quality issues | |
try: | |
# Check date column | |
date_col = "Oldest Product Required First" | |
date_issues = pd.to_datetime(df_norm[date_col], errors='coerce').isnull().sum() | |
if date_issues > 0: | |
validation_result["data_issues"].append(f"{date_issues} invalid dates in '{date_col}'") | |
# Check quantity column | |
qty_col = "Quantity of Each Component" | |
qty_numeric = pd.to_numeric(df_norm[qty_col], errors='coerce') | |
qty_issues = qty_numeric.isnull().sum() | |
if qty_issues > 0: | |
validation_result["data_issues"].append(f"{qty_issues} non-numeric values in '{qty_col}'") | |
# Check for completely empty required columns | |
for col in REQUIRED_COLS: | |
if col in df_norm.columns: | |
empty_count = df_norm[col].isnull().sum() | |
if empty_count == len(df_norm): | |
validation_result["data_issues"].append(f"Column '{col}' is completely empty") | |
except Exception as e: | |
validation_result["data_issues"].append(f"Data validation error: {e}") | |
return validation_result | |
def process_file(self, | |
file_path: str, | |
sheet_name: str, | |
min_qty: int = 50, | |
custom_weights: Dict[str, int] = None) -> Tuple[pd.DataFrame, Dict]: | |
""" | |
Process a single sheet from an Excel file and return prioritized results. | |
Returns: | |
Tuple of (processed_dataframe, processing_info) | |
""" | |
# Use custom weights if provided | |
weights = custom_weights or self.weights | |
if custom_weights: | |
temp_weights = custom_weights.copy() | |
if sum(temp_weights.values()) != 100: | |
raise ValueError("Custom weights must sum to 100") | |
else: | |
temp_weights = weights | |
# Read the data | |
df = read_sheet(file_path, sheet_name) | |
if df is None or df.empty: | |
raise ValueError("Sheet is empty or could not be read") | |
# Validate data | |
validation = self.validate_sheet_data(df) | |
if not validation["valid"]: | |
raise ValueError(f"Data validation failed: Missing columns {validation['missing_columns']}") | |
# Process priority calculation | |
try: | |
processed_df = compute_priority(df, min_qty=min_qty, weights=temp_weights) | |
except Exception as e: | |
raise Exception(f"Priority calculation failed: {e}") | |
# Generate processing info | |
processing_info = { | |
"timestamp": datetime.now().isoformat(), | |
"file_name": os.path.basename(file_path), | |
"sheet_name": sheet_name, | |
"weights_used": temp_weights, | |
"min_quantity": min_qty, | |
"total_products": len(df), | |
"products_above_threshold": sum(processed_df["QtyThresholdOK"]), | |
"highest_priority_score": processed_df["PriorityScore"].max(), | |
"lowest_priority_score": processed_df["PriorityScore"].min(), | |
"validation_info": validation | |
} | |
return processed_df, processing_info | |
def save_results(self, | |
processed_df: pd.DataFrame, | |
output_path: str, | |
processing_info: Dict) -> str: | |
"""Save processed results with full documentation""" | |
try: | |
save_with_instructions( | |
processed_df, | |
output_path, | |
min_qty=processing_info["min_quantity"], | |
weights=processing_info["weights_used"] | |
) | |
# Add processing log sheet | |
self._add_processing_log(output_path, processing_info) | |
return output_path | |
except Exception as e: | |
raise Exception(f"Failed to save results: {e}") | |
def _add_processing_log(self, output_path: str, processing_info: Dict): | |
"""Add a processing log sheet to the output file""" | |
try: | |
# Read existing file and add log sheet | |
with pd.ExcelWriter(output_path, mode='a', engine='openpyxl', if_sheet_exists='replace') as writer: | |
log_data = [] | |
log_data.append(["PROCESSING LOG"]) | |
log_data.append([""]) | |
log_data.append(["Processing Timestamp", processing_info["timestamp"]]) | |
log_data.append(["Source File", processing_info["file_name"]]) | |
log_data.append(["Sheet Processed", processing_info["sheet_name"]]) | |
log_data.append([""]) | |
log_data.append(["SETTINGS USED"]) | |
log_data.append(["Age Weight", f"{processing_info['weights_used']['AGE_WEIGHT']}%"]) | |
log_data.append(["Component Weight", f"{processing_info['weights_used']['COMPONENT_WEIGHT']}%"]) | |
log_data.append(["Manual Weight", f"{processing_info['weights_used']['MANUAL_WEIGHT']}%"]) | |
log_data.append(["Minimum Quantity", processing_info["min_quantity"]]) | |
log_data.append([""]) | |
log_data.append(["RESULTS SUMMARY"]) | |
log_data.append(["Total Products", processing_info["total_products"]]) | |
log_data.append(["Above Threshold", processing_info["products_above_threshold"]]) | |
log_data.append(["Highest Priority Score", f"{processing_info['highest_priority_score']:.4f}"]) | |
log_data.append(["Lowest Priority Score", f"{processing_info['lowest_priority_score']:.4f}"]) | |
if processing_info["validation_info"]["data_issues"]: | |
log_data.append([""]) | |
log_data.append(["DATA ISSUES FOUND"]) | |
for issue in processing_info["validation_info"]["data_issues"]: | |
log_data.append(["", issue]) | |
log_df = pd.DataFrame(log_data, columns=["Parameter", "Value"]) | |
log_df.to_excel(writer, sheet_name='Processing_Log', index=False) | |
except Exception as e: | |
# If adding log fails, don't fail the whole operation | |
print(f"Warning: Could not add processing log: {e}") | |
# Convenience functions for easy import | |
def quick_process(file_path: str, | |
sheet_name: str, | |
output_path: str = None, | |
min_qty: int = 50, | |
weights: Optional[Dict[str, int]] = None) -> str: | |
""" | |
Quick processing function that handles the full workflow. | |
Args: | |
file_path: Path to Excel file | |
sheet_name: Name of sheet to process | |
output_path: Where to save results (optional, will auto-generate if not provided) | |
min_qty: Minimum quantity threshold | |
weights: Custom weights dict (optional) | |
Returns: | |
Path to generated output file | |
""" | |
processor = ManufacturingProcessor(weights) | |
# Process the data | |
processed_df, processing_info = processor.process_file( | |
file_path, sheet_name, min_qty, weights | |
) | |
# Generate output path if not provided | |
if output_path is None: | |
base_name = os.path.splitext(os.path.basename(file_path))[0] | |
output_dir = os.path.dirname(file_path) | |
output_path = os.path.join(output_dir, f"{base_name}_PRIORITY.xlsx") | |
# Save results | |
return processor.save_results(processed_df, output_path, processing_info) | |
def get_file_preview(file_path: str, sheet_name: str, max_rows: int = 5) -> Dict: | |
""" | |
Get a preview of the file data for validation purposes. | |
Returns: | |
Dict containing preview info and sample data | |
""" | |
processor = ManufacturingProcessor() | |
# Get file info | |
file_info = processor.get_file_info(file_path) | |
# Read sample data | |
df = read_sheet(file_path, sheet_name) | |
sample_df = df.head(max_rows) if df is not None else pd.DataFrame() | |
# Validate data | |
validation = processor.validate_sheet_data(df) if df is not None else {"valid": False} | |
return { | |
"file_info": file_info, | |
"sample_data": sample_df, | |
"validation": validation, | |
"preview_rows": len(sample_df) | |
} |