Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Utility functions for FLUX Prompt Optimizer | |
| Clean, focused, and reusable utilities | |
| """ | |
| import re | |
| import logging | |
| import gc | |
| from typing import Optional, Tuple, Dict, Any, List | |
| from PIL import Image | |
| import torch | |
| import numpy as np | |
| from config import PROCESSING_CONFIG, FLUX_RULES | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def setup_logging(level: str = "INFO") -> None: | |
| """Setup logging configuration""" | |
| logging.basicConfig( | |
| level=getattr(logging, level.upper()), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| def optimize_image(image: Any) -> Optional[Image.Image]: | |
| """ | |
| Optimize image for processing | |
| Args: | |
| image: Input image (PIL, numpy array, or file path) | |
| Returns: | |
| Optimized PIL Image or None if failed | |
| """ | |
| if image is None: | |
| return None | |
| try: | |
| # Convert to PIL Image if necessary | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| elif isinstance(image, str): | |
| image = Image.open(image) | |
| elif not isinstance(image, Image.Image): | |
| logger.error(f"Unsupported image type: {type(image)}") | |
| return None | |
| # Convert to RGB if necessary | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Resize if too large | |
| max_size = PROCESSING_CONFIG["max_image_size"] | |
| if image.size[0] > max_size or image.size[1] > max_size: | |
| image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| logger.info(f"Image resized to {image.size}") | |
| return image | |
| except Exception as e: | |
| logger.error(f"Image optimization failed: {e}") | |
| return None | |
| def validate_image(image: Any) -> bool: | |
| """ | |
| Validate if image is processable | |
| Args: | |
| image: Input image to validate | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| if image is None: | |
| return False | |
| try: | |
| optimized = optimize_image(image) | |
| return optimized is not None | |
| except Exception: | |
| return False | |
| def clean_memory() -> None: | |
| """Clean up memory and GPU cache""" | |
| try: | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.synchronize() | |
| logger.debug("Memory cleaned") | |
| except Exception as e: | |
| logger.warning(f"Memory cleanup failed: {e}") | |
| def apply_flux_rules(prompt: str, analysis_metadata: Optional[Dict[str, Any]] = None) -> str: | |
| """ | |
| Apply Flux optimization rules to a prompt | |
| Args: | |
| prompt: Raw prompt text | |
| analysis_metadata: Optional metadata from image analysis including camera suggestions | |
| Returns: | |
| Optimized prompt following Flux rules | |
| """ | |
| if not prompt or not isinstance(prompt, str): | |
| return "" | |
| # Clean the prompt from unwanted elements | |
| cleaned_prompt = prompt | |
| for pattern in FLUX_RULES["remove_patterns"]: | |
| cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE) | |
| # Extract description part only (remove CAMERA_SETUP section if present) | |
| description_part = _extract_description_only(cleaned_prompt) | |
| # Check if BAGEL provided intelligent camera setup | |
| camera_config = "" | |
| if analysis_metadata and analysis_metadata.get("has_camera_suggestion") and analysis_metadata.get("camera_setup"): | |
| # Use BAGEL's intelligent camera suggestion - clean and format it properly | |
| bagel_camera = analysis_metadata["camera_setup"] | |
| camera_config = _format_bagel_camera_suggestion(bagel_camera) | |
| logger.info(f"Using BAGEL camera suggestion: {camera_config}") | |
| else: | |
| # Only use fallback if BAGEL didn't suggest anything | |
| camera_config = _get_fallback_camera_config(description_part.lower()) | |
| logger.info("Using fallback camera configuration") | |
| # Add lighting enhancements if not present and not already covered by BAGEL | |
| lighting_enhancement = _get_lighting_enhancement(description_part.lower(), camera_config) | |
| # Build final prompt: Description + Camera + Lighting | |
| final_prompt = description_part + camera_config + lighting_enhancement | |
| # Clean up formatting | |
| final_prompt = _clean_prompt_formatting(final_prompt) | |
| return final_prompt | |
| def _extract_description_only(prompt: str) -> str: | |
| """Extract only the description part, removing camera setup sections""" | |
| # Remove CAMERA_SETUP section if present | |
| if "CAMERA_SETUP:" in prompt: | |
| parts = prompt.split("CAMERA_SETUP:") | |
| description = parts[0].strip() | |
| elif "2. CAMERA_SETUP" in prompt: | |
| parts = prompt.split("2. CAMERA_SETUP") | |
| description = parts[0].strip() | |
| else: | |
| description = prompt | |
| # Remove "DESCRIPTION:" label if present | |
| if description.startswith("DESCRIPTION:"): | |
| description = description.replace("DESCRIPTION:", "").strip() | |
| elif description.startswith("1. DESCRIPTION:"): | |
| description = description.replace("1. DESCRIPTION:", "").strip() | |
| # Clean up any remaining camera recommendations from the description | |
| description = re.sub(r'For this type of scene.*?shooting style would be.*?\.', '', description, flags=re.DOTALL) | |
| description = re.sub(r'I would recommend.*?aperture.*?\.', '', description, flags=re.DOTALL) | |
| # Remove numbered section residues (like "2.," at the end) | |
| description = re.sub(r'\s*\d+\.\s*,?\s*$', '', description) | |
| description = re.sub(r'\s*\d+\.\s*,?\s*', ' ', description) | |
| return description.strip() | |
| def _format_bagel_camera_suggestion(bagel_camera: str) -> str: | |
| """Format BAGEL's camera suggestion into clean FLUX format""" | |
| try: | |
| # Clean up the BAGEL suggestion | |
| camera_text = bagel_camera.strip() | |
| # Remove "CAMERA_SETUP:" if it's still there | |
| camera_text = re.sub(r'^CAMERA_SETUP:\s*', '', camera_text) | |
| # Extract key camera info using regex patterns | |
| camera_patterns = { | |
| 'camera': r'(Canon EOS [^,]+|Sony A[^,]+|Leica [^,]+|Hasselblad [^,]+|Phase One [^,]+|Fujifilm [^,]+)', | |
| 'lens': r'(\d+mm[^,]*|[^,]*lens[^,]*)', | |
| 'aperture': r'(f/[\d.]+[^,]*)' | |
| } | |
| extracted_parts = [] | |
| for key, pattern in camera_patterns.items(): | |
| match = re.search(pattern, camera_text, re.IGNORECASE) | |
| if match: | |
| extracted_parts.append(match.group(1).strip()) | |
| if extracted_parts: | |
| # Build clean camera config | |
| camera_info = ', '.join(extracted_parts) | |
| return f", Shot on {camera_info}, professional photography" | |
| else: | |
| # Fallback: use the first part of BAGEL's suggestion | |
| first_sentence = camera_text.split('.')[0].strip() | |
| if len(first_sentence) > 10: | |
| return f", {first_sentence}, professional photography" | |
| else: | |
| return ", professional camera setup" | |
| except Exception as e: | |
| logger.warning(f"Failed to format BAGEL camera suggestion: {e}") | |
| return ", professional camera setup" | |
| def _get_fallback_camera_config(prompt_lower: str) -> str: | |
| """Get fallback camera configuration when BAGEL doesn't suggest one""" | |
| # Improved detection logic | |
| if any(word in prompt_lower for word in ['street', 'urban', 'city', 'documentary', 'crowd', 'protest']): | |
| return FLUX_RULES["camera_configs"]["street"] | |
| elif any(word in prompt_lower for word in ['portrait', 'person', 'man', 'woman', 'face']) and not any(word in prompt_lower for word in ['street', 'crowd', 'scene']): | |
| return FLUX_RULES["camera_configs"]["portrait"] | |
| elif any(word in prompt_lower for word in ['landscape', 'mountain', 'nature', 'outdoor']): | |
| return FLUX_RULES["camera_configs"]["landscape"] | |
| else: | |
| return FLUX_RULES["camera_configs"]["default"] | |
| def _get_lighting_enhancement(prompt_lower: str, camera_config: str) -> str: | |
| """Determine appropriate lighting enhancement""" | |
| # Don't add lighting if already mentioned in prompt or camera config | |
| if 'lighting' in prompt_lower or 'lighting' in camera_config.lower(): | |
| return "" | |
| if 'dramatic' in prompt_lower or 'chaos' in prompt_lower or 'fire' in prompt_lower: | |
| return FLUX_RULES["lighting_enhancements"]["dramatic"] | |
| elif 'portrait' in camera_config.lower(): | |
| return FLUX_RULES["lighting_enhancements"]["portrait"] | |
| else: | |
| return FLUX_RULES["lighting_enhancements"]["default"] | |
| def _clean_prompt_formatting(prompt: str) -> str: | |
| """Clean up prompt formatting""" | |
| if not prompt: | |
| return "" | |
| # Ensure it starts with capital letter | |
| prompt = prompt.strip() | |
| if prompt: | |
| prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper() | |
| # Clean up spaces and commas | |
| prompt = re.sub(r'\s+', ' ', prompt) | |
| prompt = re.sub(r',\s*,+', ',', prompt) | |
| prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas | |
| prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas | |
| # Remove redundant periods | |
| prompt = re.sub(r'\.+', '.', prompt) | |
| return prompt.strip() | |
| def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
| """ | |
| Calculate quality score for a prompt | |
| Args: | |
| prompt: The prompt to score | |
| analysis_data: Optional analysis data to enhance scoring | |
| Returns: | |
| Tuple of (total_score, breakdown_dict) | |
| """ | |
| if not prompt: | |
| return 0, {"prompt_quality": 0, "technical_details": 0, "artistic_value": 0, "flux_optimization": 0} | |
| breakdown = {} | |
| # Prompt quality score (0-30 points) | |
| length_score = min(20, len(prompt) // 8) # Reward decent length | |
| detail_score = min(10, len(prompt.split(',')) * 2) # Reward detail | |
| breakdown["prompt_quality"] = length_score + detail_score | |
| # Technical details score (0-25 points) - Enhanced for BAGEL camera suggestions | |
| tech_score = 0 | |
| tech_keywords = ['shot on', 'lens', 'photography', 'lighting', 'camera'] | |
| for keyword in tech_keywords: | |
| if keyword in prompt.lower(): | |
| tech_score += 5 | |
| # Bonus points for BAGEL camera suggestions | |
| if analysis_data and analysis_data.get("has_camera_suggestion"): | |
| tech_score += 10 # Higher bonus for intelligent camera selection | |
| breakdown["technical_details"] = min(25, tech_score) | |
| # Artistic value score (0-25 points) | |
| art_keywords = ['masterful', 'professional', 'cinematic', 'dramatic', 'beautiful'] | |
| art_score = sum(5 for keyword in art_keywords if keyword in prompt.lower()) | |
| breakdown["artistic_value"] = min(25, art_score) | |
| # Flux optimization score (0-20 points) | |
| flux_score = 0 | |
| # Check for camera configuration (prefer BAGEL over fallback) | |
| if analysis_data and analysis_data.get("has_camera_suggestion"): | |
| flux_score += 15 # Higher score for BAGEL suggestions | |
| elif any(camera in prompt for camera in FLUX_RULES["camera_configs"].values()): | |
| flux_score += 10 # Lower score for fallback | |
| # Check for lighting configuration | |
| if any(lighting in prompt for lighting in FLUX_RULES["lighting_enhancements"].values()): | |
| flux_score += 5 | |
| breakdown["flux_optimization"] = flux_score | |
| # Calculate total | |
| total_score = sum(breakdown.values()) | |
| return total_score, breakdown | |
| def get_score_grade(score: int) -> Dict[str, str]: | |
| """ | |
| Get grade information for a score | |
| Args: | |
| score: Numeric score | |
| Returns: | |
| Dictionary with grade and color information | |
| """ | |
| from config import SCORING_CONFIG | |
| for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True): | |
| if score >= threshold: | |
| return grade_info | |
| # Default to lowest grade | |
| return SCORING_CONFIG["grade_thresholds"][0] | |
| def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str: | |
| """ | |
| Format analysis data into a readable report | |
| Args: | |
| analysis_data: Analysis results | |
| processing_time: Time taken for processing | |
| Returns: | |
| Formatted markdown report | |
| """ | |
| model_used = analysis_data.get("model_used", "Unknown") | |
| prompt_length = len(analysis_data.get("prompt", "")) | |
| report = f"""**π FLUX OPTIMIZATION COMPLETE** | |
| **Model:** {model_used} β’ **Time:** {processing_time:.1f}s β’ **Length:** {prompt_length} chars | |
| **π ANALYSIS SUMMARY:** | |
| {analysis_data.get("summary", "Analysis completed successfully")} | |
| **π― OPTIMIZATIONS APPLIED:** | |
| β Flux camera configuration | |
| β Professional lighting setup | |
| β Technical photography details | |
| β Artistic enhancement keywords | |
| **β‘ Powered by Frame 0 Laboratory for MIA**""" | |
| return report | |
| def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]: | |
| """ | |
| Safely execute a function with error handling | |
| Args: | |
| func: Function to execute | |
| *args: Function arguments | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| Tuple of (success: bool, result: Any) | |
| """ | |
| try: | |
| result = func(*args, **kwargs) | |
| return True, result | |
| except Exception as e: | |
| logger.error(f"Safe execution failed for {func.__name__}: {e}") | |
| return False, str(e) | |
| def truncate_text(text: str, max_length: int = 100) -> str: | |
| """ | |
| Truncate text to specified length with ellipsis | |
| Args: | |
| text: Text to truncate | |
| max_length: Maximum length | |
| Returns: | |
| Truncated text | |
| """ | |
| if not text or len(text) <= max_length: | |
| return text | |
| return text[:max_length-3] + "..." | |
| # Export main functions | |
| __all__ = [ | |
| "setup_logging", | |
| "optimize_image", | |
| "validate_image", | |
| "clean_memory", | |
| "apply_flux_rules", | |
| "calculate_prompt_score", | |
| "get_score_grade", | |
| "format_analysis_report", | |
| "safe_execute", | |
| "truncate_text" | |
| ] |