Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import logging | |
import re | |
# Logging configuration | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler(sys.stdout)]) | |
logger = logging.getLogger(__name__) | |
# Install required dependencies automatically | |
def install_dependencies(): | |
logger.info("Verifying and installing required dependencies...") | |
try: | |
# Try to import peft | |
try: | |
import peft | |
logger.info(f"PEFT already installed (version {peft.__version__})") | |
except ImportError: | |
logger.info("Installing PEFT...") | |
os.system("pip install -q peft>=0.6.0") | |
# Try to import bitsandbytes | |
try: | |
import bitsandbytes | |
logger.info(f"BitsAndBytes already installed (version {bitsandbytes.__version__})") | |
except ImportError: | |
logger.info("Installing BitsAndBytes...") | |
os.system("pip install -q bitsandbytes>=0.41.0") | |
# Ensure other dependencies are installed | |
logger.info("Verifying other dependencies...") | |
os.system("pip install -q torch transformers>=4.30.0 accelerate>=0.20.0 gradio pillow psutil") | |
logger.info("All dependencies successfully installed") | |
# Re-import peft to verify | |
import peft | |
from peft import PeftModel, PeftConfig | |
logger.info(f"PEFT correctly imported, version: {peft.__version__}") | |
return True | |
except Exception as e: | |
logger.error(f"Error installing dependencies: {str(e)}") | |
return False | |
# Install dependencies before importing | |
success = install_dependencies() | |
if not success: | |
logger.error("Failed to install required dependencies. The application may not function properly.") | |
# Now that we have the dependencies, we import the modules | |
import torch | |
from transformers import BlipProcessor, BlipForConditionalGeneration, AutoModelForCausalLM, AutoTokenizer | |
from peft import PeftModel, PeftConfig | |
from PIL import Image | |
import random | |
import gradio as gr | |
# Check GPU availability | |
use_gpu = torch.cuda.is_available() | |
logger.info(f"GPU available: {use_gpu}") | |
if use_gpu: | |
logger.info(f"GPU device: {torch.cuda.get_device_name(0)}") | |
try: | |
logger.info(f"Total GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") | |
logger.info(f"Available GPU memory: {torch.cuda.memory_reserved(0) / 1024**3:.2f} GB") | |
except: | |
logger.info("Could not retrieve detailed GPU information") | |
# Lazy loading of models | |
processor, model = None, None | |
peft_model, tokenizer = None, None | |
# Custom function to generate text with the PEFT model | |
def generate_with_peft_model(prompt, max_new_tokens=100, temperature=0.7, top_p=0.95): | |
"""Generates text using the PEFT model directly without pipeline""" | |
global peft_model, tokenizer | |
if peft_model is None or tokenizer is None: | |
logger.error("PEFT model or tokenizer not loaded") | |
return "Error: Model not loaded. Please try again." | |
try: | |
# Prepare input | |
inputs = tokenizer(prompt, return_tensors="pt") | |
if torch.cuda.is_available(): | |
inputs = inputs.to("cuda") | |
# Generate output | |
with torch.no_grad(): | |
output_ids = peft_model.generate( | |
inputs.input_ids, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
temperature=temperature, | |
top_p=top_p, | |
) | |
# Decode output | |
output_text = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
# Extract assistant response if possible | |
if "<|assistant|>" in prompt and "<|assistant|>" in output_text: | |
response = output_text.split("<|assistant|>")[-1].strip() | |
return response | |
# If we can't extract assistant response, remove the original prompt | |
if prompt in output_text: | |
response = output_text[len(prompt):].strip() | |
return response | |
return output_text | |
except Exception as e: | |
logger.error(f"Error generating text with PEFT: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return f"Error: {str(e)}" | |
def load_models(): | |
"""Load models only when needed""" | |
global processor, model, peft_model, tokenizer | |
try: | |
# Load BLIP model | |
logger.info("Loading BLIP model...") | |
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") | |
model = BlipForConditionalGeneration.from_pretrained( | |
"Salesforce/blip-image-captioning-large", | |
torch_dtype=torch.float32 # Use float32 for CPU | |
) | |
logger.info("β BLIP model loaded successfully") | |
# Load tokenizer and base model with quantization to reduce memory | |
logger.info("Loading tokenizer...") | |
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceH4/zephyr-7b-beta") | |
logger.info("β Tokenizer loaded successfully") | |
# Configure quantization options | |
logger.info("Loading base model with quantization...") | |
try: | |
# Try to use BitsAndBytes for 8-bit quantization | |
from transformers import BitsAndBytesConfig | |
# Quantization configuration | |
quantization_config = BitsAndBytesConfig( | |
load_in_8bit=True, | |
bnb_4bit_compute_dtype=torch.float16 | |
) | |
# Load quantized base model | |
base_model = AutoModelForCausalLM.from_pretrained( | |
"HuggingFaceH4/zephyr-7b-beta", | |
quantization_config=quantization_config, | |
device_map="auto" if use_gpu else None, | |
) | |
logger.info("β Base model loaded with 8-bit quantization") | |
except (ImportError, Exception) as e: | |
logger.warning(f"Could not load model with BitsAndBytes quantization: {str(e)}") | |
logger.info("Trying to load model with half precision...") | |
# Load model in half precision to reduce memory usage | |
base_model = AutoModelForCausalLM.from_pretrained( | |
"HuggingFaceH4/zephyr-7b-beta", | |
torch_dtype=torch.float16 if use_gpu else torch.float32, | |
device_map="auto" if use_gpu else None, | |
low_cpu_mem_usage=True, | |
) | |
logger.info("β Base model loaded in half precision") | |
# Load LORA adapter | |
try: | |
logger.info("Loading LORA adapter from HuggingFace...") | |
# Load PEFT configuration | |
adapter_config = PeftConfig.from_pretrained("Malaji71/SARA-Zephyr") | |
# Load adapter over base model | |
peft_model = PeftModel.from_pretrained( | |
base_model, | |
"Malaji71/SARA-Zephyr" | |
) | |
logger.info("β LORA adapter loaded successfully from HuggingFace") | |
return True | |
except Exception as e: | |
logger.error(f"Error loading LORA adapter from HuggingFace: {str(e)}") | |
# Try to load locally | |
try: | |
logger.info("Trying to load LORA adapter locally...") | |
local_adapter_path = "./SARA-Zephyr" | |
adapter_config = PeftConfig.from_pretrained(local_adapter_path) | |
peft_model = PeftModel.from_pretrained( | |
base_model, | |
local_adapter_path | |
) | |
logger.info("β LORA adapter loaded successfully from local storage") | |
return True | |
except Exception as e2: | |
logger.error(f"Error loading LORA adapter locally: {str(e2)}") | |
logger.error("Could not load LORA adapter. The application will not function properly.") | |
return False | |
except Exception as e: | |
logger.error(f"Error loading models: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return False | |
# Universal Video Prompting Guide combining SARA framework | |
unified_instructions = """ | |
# π¬ Universal Video Prompting Guide | |
*Compatible with Sora, Gen-4, Pika, Luma, Runway and all diffusion-based video models* | |
## Core Principles | |
β **Focus on MOTION, not static description** | |
β **Use positive phrasing exclusively** | |
β **Start simple, iterate progressively** | |
β **Refer to subjects in general terms** ("the subject," "the woman") | |
β **Keep prompts direct and easily understood** | |
## SARA Framework (Subject + Action + Reference + Atmosphere) | |
- **Subject (S)**: Main element to control | |
- **Action (A)**: Movement/transformation ([verb] + [adverb]) | |
- **Reference (R)**: Spatial anchors ("while X remains steady") | |
- **Atmosphere (A)**: Context and style | |
**Template**: [Subject] [verb] [adverb] while [reference] [atmosphere] | |
**Example**: "The subject walks smoothly while background remains steady, cinematic atmosphere" | |
""" | |
def analyze_image_with_zephyr(image): | |
"""Analyze image using BLIP + Zephyr AI for enhanced understanding""" | |
logger.info("Starting image analysis...") | |
if image is None: | |
logger.warning("No image provided") | |
return "Please upload an image first.", {} | |
try: | |
# Lazy load models | |
if model is None or processor is None or peft_model is None or tokenizer is None: | |
logger.info("Loading models...") | |
success = load_models() | |
if not success: | |
logger.error("Error loading models") | |
return "Error: Could not load models. Please try again.", {} | |
# Convert to PIL if needed | |
if not isinstance(image, Image.Image): | |
logger.info("Converting image to PIL format") | |
image = Image.fromarray(image) | |
# Get image dimensions | |
width, height = image.size | |
aspect_ratio = width / height | |
logger.info(f"Image dimensions: {width}x{height}, Ratio: {aspect_ratio:.2f}") | |
if aspect_ratio > 1.5: | |
composition = "Wide landscape shot" | |
elif aspect_ratio < 0.7: | |
composition = "Vertical portrait shot" | |
else: | |
composition = "Balanced composition" | |
# Generate caption with BLIP | |
logger.info("Generating caption with BLIP...") | |
inputs = processor(image, return_tensors="pt") | |
out = model.generate(**inputs, max_length=50, num_beams=3) | |
basic_caption = processor.decode(out[0], skip_special_tokens=True) | |
logger.info(f"Generated caption: {basic_caption}") | |
# Use Zephyr for advanced analysis | |
logger.info("Performing advanced analysis with LORA model...") | |
enhanced_analysis = analyze_scene_with_zephyr(basic_caption, aspect_ratio, composition) | |
# Create comprehensive analysis | |
analysis = f"""π **Image Analysis:** | |
β’ **Dimensions**: {width} x {height} | |
β’ **Composition**: {composition} | |
β’ **Aspect Ratio**: {aspect_ratio:.2f} | |
π¨ **Scene Description**: | |
"{basic_caption}" | |
π€ **AI Enhanced Analysis**: | |
{enhanced_analysis['scene_interpretation']} | |
π‘ **Motion Insights**: | |
{chr(10).join(f"β’ {insight}" for insight in enhanced_analysis['motion_insights'])} | |
π― **Recommended Approach**: | |
{enhanced_analysis['recommended_approach']}""" | |
# Scene info for prompt generation | |
scene_info = { | |
'basic_description': basic_caption, | |
'composition': composition, | |
'aspect_ratio': aspect_ratio, | |
'enhanced_analysis': enhanced_analysis | |
} | |
logger.info("Image analysis completed successfully") | |
return analysis, scene_info | |
except Exception as e: | |
logger.error(f"Error in image analysis: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return f"Error analyzing image: {str(e)}", {} | |
def analyze_scene_with_zephyr(basic_caption, aspect_ratio, composition): | |
"""Use PEFT model for advanced scene analysis""" | |
logger.info("Starting scene analysis...") | |
# Verify that the model is loaded | |
if peft_model is None or tokenizer is None: | |
logger.error("PEFT model not available") | |
return { | |
'scene_interpretation': "Error: Analysis model not available.", | |
'motion_insights': ["Try reloading the application"], | |
'recommended_approach': "Could not determine" | |
} | |
try: | |
analysis_prompt = f"""<|system|> | |
You are a video prompt engineering expert specializing in the SARA framework. Analyze this image description for video creation potential. | |
<|user|> | |
Image description: "{basic_caption}" | |
Image composition: {composition} | |
Aspect ratio: {aspect_ratio:.2f} | |
Please provide: | |
1. Type of motion that would work best | |
2. Recommended camera movements | |
3. Emotional tone/style suggestions | |
4. Best prompting approach (SARA framework) | |
Be concise and practical. | |
<|assistant|>""" | |
logger.info("Generating analysis with PEFT model...") | |
generated_text = generate_with_peft_model( | |
analysis_prompt, | |
max_new_tokens=200, | |
temperature=0.7, | |
top_p=0.95 | |
) | |
logger.info(f"Analysis generated: {generated_text[:100]}...") | |
lines = generated_text.split('\n') | |
motion_insights = [] | |
recommended_approach = "SARA framework recommended for precise control" | |
for line in lines: | |
if line.strip(): | |
if any(keyword in line.lower() for keyword in ['motion', 'movement', 'camera', 'lighting']): | |
motion_insights.append(line.strip('- ').strip()) | |
elif 'sara' in line.lower() or 'gen-4' in line.lower(): | |
recommended_approach = line.strip('- ').strip() | |
logger.info(f"Insights extracted: {len(motion_insights)}") | |
return { | |
'scene_interpretation': lines[0] if lines else "Scene analysis completed", | |
'motion_insights': motion_insights[:6] if motion_insights else ["Smooth cinematic movement", "Steady camera tracking", "Natural lighting transitions"], | |
'recommended_approach': recommended_approach | |
} | |
except Exception as e: | |
logger.error(f"Error in scene analysis: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return { | |
'scene_interpretation': f"Analysis error: {str(e)}", | |
'motion_insights': ["Error during analysis", "Try with another image"], | |
'recommended_approach': "SARA framework (default)" | |
} | |
def generate_sample_prompts_with_zephyr(scene_info=None): | |
"""Generate sample prompts using PEFT model""" | |
logger.info("Generating sample prompts...") | |
# Verify that the model is loaded | |
if peft_model is None or tokenizer is None: | |
logger.error("PEFT model not available") | |
return [ | |
"Error: Model not available. Try reloading the application.", | |
"The subject walks forward smoothly while the background remains steady, cinematic atmosphere.", | |
"A dramatic close-up captures the subject's expression as they speak directly to the camera." | |
] | |
if scene_info and scene_info.get('basic_description'): | |
try: | |
# Use PEFT model to generate contextual prompts | |
context_prompt = f"""<|system|> | |
Generate 3 professional video prompts using the SARA framework based on this image analysis. | |
<|user|> | |
Image description: {scene_info['basic_description']} | |
Composition: {scene_info.get('composition', 'Balanced')} | |
Aspect Ratio: {scene_info.get('aspect_ratio', 'N/A'):.2f} | |
Remember the SARA framework: Subject + Action + Reference + Atmosphere | |
<|assistant|>""" | |
logger.info("Generating prompts for the scene...") | |
generated_text = generate_with_peft_model( | |
context_prompt, | |
max_new_tokens=200, | |
temperature=0.8, | |
top_p=0.95 | |
) | |
logger.info(f"Generated text: {generated_text[:100]}...") | |
# Extract and clean prompts | |
prompts = [p.strip('123.-β’ ') for p in generated_text.split('\n') if p.strip()] | |
# Return first 3 clean prompts | |
if len(prompts) >= 3: | |
logger.info(f"Prompts extracted: {len(prompts)}") | |
return prompts[:3] | |
except Exception as e: | |
logger.error(f"Error generating sample prompts: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
# Continue to fallback prompts if there's an error | |
# Fallback prompts if model fails or no scene info | |
logger.info("Using default prompts") | |
base_prompts = [ | |
"The subject walks forward smoothly while the background remains steady, cinematic atmosphere.", | |
"A dramatic close-up captures the subject's expression as they speak directly to the camera.", | |
"The scene transitions with a handheld camera following the subject through a bustling environment." | |
] | |
return base_prompts | |
def optimize_user_prompt_with_zephyr(user_idea, scene_info=None): | |
"""Optimize user's prompt idea using PEFT model""" | |
logger.info(f"Optimizing prompt: {user_idea}") | |
if not user_idea.strip(): | |
return "Please enter your idea first.", "No input provided" | |
# Verify that the model is loaded | |
if peft_model is None or tokenizer is None: | |
logger.error("PEFT model not available") | |
return "Error: Model not available. Try reloading the application.", "Model not loaded" | |
# Create context from scene if available | |
context = "" | |
if scene_info and scene_info.get('basic_description'): | |
context = f"Image context: {scene_info['basic_description']}" | |
logger.info(f"Using image context: {context}") | |
try: | |
# Enforce structure based on approach | |
logger.info("Preparing prompt for optimization...") | |
# Detect language and adjust system prompt accordingly | |
non_english_pattern = re.compile(r'[^\x00-\x7F]+') | |
has_non_english = bool(non_english_pattern.search(user_idea)) | |
if has_non_english: | |
logger.info("Detected non-English input") | |
optimization_prompt = f"""<|system|> | |
You are an expert in video prompting, specializing in the SARA framework. Transform user ideas into professional prompts compatible with AI video models like Sora, Gen-4, Pika, Runway, and Luma. | |
IMPORTANT: Preserve the original language of the user's idea in your response. For example, if they write in Spanish, your response should be in Spanish. | |
Key principles: | |
- Focus on MOTION, not static description | |
- Use positive phrasing | |
- Be specific about camera work | |
- Include lighting/atmosphere details | |
- Follow the SARA structure: Subject + Action + Reference + Atmosphere | |
<|user|> | |
User's idea: "{user_idea}" | |
{context} | |
Please create an optimized video prompt using the SARA framework. Respond with just the prompt in the same language as the user's input. | |
<|assistant|>""" | |
else: | |
optimization_prompt = f"""<|system|> | |
You are an expert in video prompting, specializing in the SARA framework. Transform user ideas into professional prompts compatible with AI video models like Sora, Gen-4, Pika, Runway, and Luma. | |
Key principles: | |
- Focus on MOTION, not static description | |
- Use positive phrasing | |
- Be specific about camera work | |
- Include lighting/atmosphere details | |
- Follow the SARA structure: Subject + Action + Reference + Atmosphere | |
<|user|> | |
User's idea: "{user_idea}" | |
{context} | |
Please create an optimized video prompt using the SARA framework. Respond with just the prompt. | |
<|assistant|>""" | |
logger.info("Generating optimized prompt...") | |
optimized = generate_with_peft_model( | |
optimization_prompt, | |
max_new_tokens=100, | |
temperature=0.7, | |
top_p=0.95 | |
) | |
logger.info(f"Optimized prompt: {optimized}") | |
# Status message in English regardless of input language | |
return optimized, "SARA-Zephyr LORA used successfully" | |
except Exception as e: | |
logger.error(f"Error optimizing prompt: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return (f"Error generating prompt: {str(e)}. Try with a simpler description.", | |
f"Error: {str(e)}") | |
def fallback_generate_prompt(user_idea, scene_info=None): | |
"""Fallback function to generate prompts manually if the model fails""" | |
logger.info(f"Using fallback generation for: {user_idea}") | |
if not user_idea.strip(): | |
return "Please enter your idea first.", "No input provided" | |
# Manual generation based on user text | |
words = user_idea.strip().split() | |
if len(words) > 2: | |
subject = "The subject" | |
if any(word.lower() in ["man", "boy", "male", "guy", "father", "son", "brother"] for word in words): | |
subject = "The man" | |
elif any(word.lower() in ["woman", "girl", "female", "gal", "mother", "daughter", "sister"] for word in words): | |
subject = "The woman" | |
elif any(word.lower() in ["child", "kid", "baby", "infant", "toddler"] for word in words): | |
subject = "The child" | |
action = "moves naturally" | |
for verb in ["walk", "run", "jump", "sit", "stand", "dance", "move", "turn", "look", "speak", "talk", "smile"]: | |
if any(verb in word.lower() for word in words): | |
action = verb + "s smoothly" | |
break | |
return f"{subject} {action} while camera remains steady, cinematic atmosphere.", "Manual generation successful" | |
else: | |
return "The subject moves naturally while camera remains steady, cinematic atmosphere.", "Manual generation used" | |
def refine_prompt_with_zephyr(current_prompt, feedback, chat_history, scene_info=None): | |
"""Refine a prompt based on user feedback using PEFT model""" | |
logger.info(f"Refining prompt with feedback: {feedback}") | |
if not feedback.strip(): | |
return current_prompt, chat_history | |
# Verify that the model is loaded | |
if peft_model is None or tokenizer is None: | |
logger.error("PEFT model not available") | |
return "Error: Model not available. Try reloading the application.", chat_history | |
# Create refinement context | |
context = "" | |
if scene_info and scene_info.get('basic_description'): | |
context = f"Image context: {scene_info['basic_description']}" | |
try: | |
# Detect language of current prompt and feedback | |
non_english_pattern = re.compile(r'[^\x00-\x7F]+') | |
has_non_english_prompt = bool(non_english_pattern.search(current_prompt)) | |
has_non_english_feedback = bool(non_english_pattern.search(feedback)) | |
# Determine response language | |
preserve_language_instruction = "" | |
if has_non_english_prompt or has_non_english_feedback: | |
preserve_language_instruction = "IMPORTANT: Preserve the original language of the prompt in your response. For example, if the prompt is in Spanish, your refined prompt should be in Spanish." | |
# Construct refinement prompt | |
refinement_prompt = f"""<|system|> | |
You are an expert in refining video prompts using the SARA framework. Based on the user's feedback, improve the current prompt while maintaining its core structure. | |
{preserve_language_instruction} | |
Key principles: | |
- Focus on MOTION, not static description | |
- Use positive phrasing | |
- Be specific about camera work | |
- Include lighting/atmosphere details | |
- Follow the SARA structure: Subject + Action + Reference + Atmosphere | |
<|user|> | |
Current prompt: "{current_prompt}" | |
Feedback: "{feedback}" | |
{context} | |
Please refine the prompt while keeping it under 100 words. Respond with just the refined prompt. | |
<|assistant|>""" | |
logger.info("Generating refined prompt...") | |
refined = generate_with_peft_model( | |
refinement_prompt, | |
max_new_tokens=100, | |
temperature=0.7, | |
top_p=0.95 | |
) | |
logger.info(f"Refined prompt: {refined}") | |
# Update chat history | |
new_chat_history = chat_history + [[feedback, refined]] | |
return refined, new_chat_history | |
except Exception as e: | |
logger.error(f"Error refining prompt: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return f"Error refining prompt: {str(e)}. Try with a simpler request.", chat_history | |
def build_custom_prompt(foundation, subject_motion, scene_motion, camera_motion, style): | |
"""Build custom prompt using SARA framework""" | |
# SARA Structure: [Subject] [Action] while [Reference], [Atmosphere] | |
parts = [] | |
if foundation: | |
parts.append(foundation) | |
# Add motion elements | |
motion_parts = [] | |
if subject_motion: | |
motion_parts.extend(subject_motion) | |
if scene_motion: | |
motion_parts.extend(scene_motion) | |
if motion_parts: | |
parts.append(", ".join(motion_parts)) | |
# Reference (camera stability) | |
if camera_motion: | |
parts.append(f"while {camera_motion}") | |
else: | |
parts.append("while background remains steady") | |
# Atmosphere | |
if style: | |
parts.append(style) | |
return " ".join(parts) | |
def test_basic_generation(): | |
"""Test basic generation with PEFT model""" | |
try: | |
if peft_model is None or tokenizer is None: | |
if not load_models(): | |
return "Error: Could not load PEFT model" | |
# Test simple generation | |
prompt = "Write a short sentence about a movie" | |
result = generate_with_peft_model(prompt, max_new_tokens=20) | |
return f"Test successful! Generated: {result}" | |
except Exception as e: | |
logger.error(f"Error in test_basic_generation: {str(e)}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return f"Error: {str(e)}" | |
def get_debug_info(): | |
"""Get detailed debug information about the model state""" | |
try: | |
info = [] | |
# System and versions | |
import platform | |
import torch | |
info.append(f"System: {platform.system()} {platform.version()}") | |
info.append(f"Python: {platform.python_version()}") | |
info.append(f"PyTorch: {torch.__version__}") | |
try: | |
import transformers | |
info.append(f"Transformers: {transformers.__version__}") | |
except Exception as e: | |
info.append(f"Transformers error: {str(e)}") | |
try: | |
import peft | |
info.append(f"PEFT: {peft.__version__}") | |
except Exception as e: | |
info.append(f"PEFT error: {str(e)}") | |
# Model states | |
info.append(f"BLIP processor loaded: {processor is not None}") | |
info.append(f"BLIP model loaded: {model is not None}") | |
info.append(f"PEFT model loaded: {peft_model is not None}") | |
info.append(f"Tokenizer loaded: {tokenizer is not None}") | |
if peft_model is not None: | |
info.append(f"PEFT model type: {type(peft_model).__name__}") | |
# More information about PEFT model | |
if hasattr(peft_model, 'base_model'): | |
base_model_type = type(peft_model.base_model).__name__ | |
info.append(f"Base model type: {base_model_type}") | |
if hasattr(peft_model, 'config'): | |
info.append(f"Config type: {type(peft_model.config).__name__}") | |
if hasattr(peft_model.config, 'model_type'): | |
info.append(f"Model type: {peft_model.config.model_type}") | |
# Memory information | |
memory_stats = {} | |
for param_name, param in peft_model.named_parameters(): | |
dtype = str(param.dtype).split('.')[-1] | |
size_mb = param.numel() * param.element_size() / (1024 * 1024) | |
if dtype not in memory_stats: | |
memory_stats[dtype] = 0 | |
memory_stats[dtype] += size_mb | |
for dtype, size_mb in memory_stats.items(): | |
info.append(f"Memory {dtype}: {size_mb:.2f} MB") | |
# GPU info | |
info.append(f"GPU available: {torch.cuda.is_available()}") | |
if torch.cuda.is_available(): | |
info.append(f"GPU device: {torch.cuda.get_device_name(0)}") | |
info.append(f"Allocated memory: {torch.cuda.memory_allocated(0) / (1024**3):.2f} GB") | |
info.append(f"Reserved memory: {torch.cuda.memory_reserved(0) / (1024**3):.2f} GB") | |
# System memory information | |
try: | |
import psutil | |
vm = psutil.virtual_memory() | |
info.append(f"Total RAM: {vm.total / (1024**3):.2f} GB") | |
info.append(f"Available RAM: {vm.available / (1024**3):.2f} GB") | |
info.append(f"RAM usage percentage: {vm.percent}%") | |
except ImportError: | |
info.append("psutil not available for system memory information") | |
return "\n".join(info) | |
except Exception as e: | |
logger.error(f"Error generating debug info: {str(e)}") | |
return f"Error: {str(e)}" | |
# Nueva funciΓ³n para el chat conversacional | |
def chat_with_ai(user_input, chat_history, last_prompt, scene_info=None): | |
"""Process user message for chat-based prompt creation or refinement""" | |
logger.info(f"Processing chat message: {user_input}") | |
if not user_input.strip(): | |
return "", chat_history, last_prompt | |
try: | |
# Check if this is refinement of an existing prompt or a new prompt | |
is_refinement = False | |
if last_prompt: | |
# Check if input seems like feedback rather than a new prompt idea | |
# Common words used in feedback | |
feedback_keywords = ['more', 'less', 'add', 'change', 'make', 'remove', | |
'modify', 'update', 'adjust', | |
# Spanish equivalents | |
'mΓ‘s', 'menos', 'aΓ±adir', 'cambiar', 'hacer', 'quitar', | |
'modificar', 'actualizar', 'ajustar'] | |
# Check for feedback keywords and short inputs which are likely feedback | |
first_word = user_input.strip().split(' ')[0].lower() if user_input.strip() else "" | |
if (first_word in feedback_keywords or | |
len(user_input.split()) <= 4 or | |
not user_input[0].isupper()): | |
is_refinement = True | |
# Detect language | |
non_english_pattern = re.compile(r'[^\x00-\x7F]+') | |
has_non_english = bool(non_english_pattern.search(user_input)) | |
if is_refinement and last_prompt: | |
# Process as refinement to previous prompt | |
logger.info("Processing as refinement feedback") | |
refined_prompt, _ = refine_prompt_with_zephyr(last_prompt, user_input, [], scene_info) | |
# Prepare response based on language | |
if has_non_english: | |
response_text = f"AquΓ estΓ‘ tu prompt refinado:\n\n**{refined_prompt}**\n\nΒΏQuieres hacer mΓ‘s cambios?" | |
else: | |
response_text = f"Here's your refined prompt:\n\n**{refined_prompt}**\n\nAny other changes you'd like to make?" | |
chat_history.append([user_input, response_text]) | |
return "", chat_history, refined_prompt | |
else: | |
# Process as new prompt creation | |
logger.info("Processing as new prompt idea") | |
optimized, _ = optimize_user_prompt_with_zephyr(user_input, scene_info) | |
# Prepare response based on language | |
if has_non_english: | |
response_text = f"AquΓ estΓ‘ tu prompt optimizado:\n\n**{optimized}**\n\nPuedes pedir cambios especΓficos o mejoras." | |
else: | |
response_text = f"Here's your optimized video prompt:\n\n**{optimized}**\n\nYou can ask for specific changes or improvements." | |
chat_history.append([user_input, response_text]) | |
return "", chat_history, optimized | |
except Exception as e: | |
logger.error(f"Error in chat processing: {str(e)}") | |
error_message = f"Error processing your message: {str(e)}. Please try again with a simpler request." | |
chat_history.append([user_input, error_message]) | |
return "", chat_history, last_prompt | |
# Create the Gradio interface | |
def create_interface(): | |
"""Create the Gradio interface""" | |
# Pre-load models | |
try: | |
logger.info("Pre-loading models...") | |
load_models() | |
except Exception as e: | |
logger.error(f"Error during preloading: {str(e)}") | |
logger.info("Models will be loaded on demand") | |
logger.info("Creating Gradio interface...") | |
with gr.Blocks(title="AI Video Prompt Generator") as demo: | |
# Header | |
gr.Markdown("# π¬ AI Video Prompt Generator - π€ SARA Framework") | |
gr.Markdown("*Professional prompts for Sora, Gen-4, Pika, Luma, Runway and more*") | |
# State variables | |
scene_state = gr.State({}) | |
chat_history_state = gr.State([]) | |
last_prompt_state = gr.State("") | |
with gr.Tabs(): | |
# Tab 1: Learning Guide | |
with gr.Tab("π Prompting Guide"): | |
gr.Markdown(unified_instructions) | |
# Advanced tips | |
with gr.Accordion("π― Advanced Tips", open=False): | |
gr.Markdown(""" | |
## Advanced Prompting Strategies | |
### π¨ Style Integration | |
- **Cinematography**: "Dutch angle," "Extreme close-up," "Bird's eye view" | |
- **Lighting**: "Golden hour," "Neon glow," "Harsh shadows," "Soft diffused light" | |
- **Movement Quality**: "Fluid motion," "Mechanical precision," "Organic flow" | |
### β‘ Motion Types | |
- **Subject Motion**: Walking, running, dancing, gesturing | |
- **Camera Motion**: Pan, tilt, dolly, zoom, orbit, tracking | |
- **Environmental**: Wind, water flow, particle effects, lighting changes | |
""") | |
# Tab 2: Image Analysis | |
with gr.Tab("π· Image Analysis"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
image_input = gr.Image( | |
label="Upload Image for Analysis", | |
type="pil" | |
) | |
analyze_btn = gr.Button("π Analyze Image", variant="primary") | |
with gr.Column(scale=2): | |
analysis_output = gr.Markdown(label="AI Analysis Results") | |
# Sample prompts section | |
with gr.Group(): | |
gr.Markdown("### π‘ Sample Prompts") | |
sample_btn = gr.Button("π² Generate Sample Prompts") | |
sample_prompts = [ | |
gr.Textbox( | |
label=f"Sample {i+1}", | |
lines=2, | |
interactive=False, | |
show_copy_button=True | |
) | |
for i in range(3) | |
] | |
# Tab 3: AI Prompt Generator (Conversational) | |
with gr.Tab("π€ AI Prompt Generator"): | |
with gr.Row(): | |
# Conversation history | |
chat_display = gr.Chatbot( | |
height=450, | |
show_copy_button=True, | |
label="AI Conversation" | |
) | |
with gr.Row(): | |
# Message input | |
user_message = gr.Textbox( | |
placeholder="Type your video idea or feedback here (any language)...", | |
lines=2, | |
label="Your message" | |
) | |
send_btn = gr.Button("π¬ Send", variant="primary") | |
with gr.Row(): | |
# Action buttons | |
regenerate_btn = gr.Button("π Regenerate") | |
clear_btn = gr.Button("ποΈ Clear Chat") | |
copy_last_btn = gr.Button("π Copy Last Prompt") | |
# Debug info in collapsible section | |
with gr.Accordion("π§ Debug Info", open=False): | |
test_btn = gr.Button("π¬ Test Basic Generation", variant="secondary") | |
test_output = gr.Textbox( | |
label="Basic Generation Test", | |
lines=2, | |
interactive=False | |
) | |
debug_btn = gr.Button("Get Debug Info") | |
debug_info = gr.Textbox( | |
label="Debug Information", | |
value="Click 'Get Debug Info' to see model status", | |
lines=8, | |
interactive=False | |
) | |
# Tab 4: Custom Builder | |
with gr.Tab("π οΈ Custom Builder"): | |
gr.Markdown("## Build Your Custom Prompt") | |
with gr.Row(): | |
custom_foundation = gr.Textbox( | |
label="Foundation", | |
placeholder="The subject...", | |
lines=1 | |
) | |
with gr.Row(): | |
subject_motion = gr.CheckboxGroup( | |
choices=[ | |
"walks smoothly", "speaks clearly", "gestures naturally", | |
"moves gracefully", "turns slowly", "smiles confidently", | |
"dances rhythmically", "stands firmly", "runs energetically", | |
"sits relaxed", "laughs joyfully", "looks curiously" | |
], | |
label="Subject Motion" | |
) | |
scene_motion = gr.CheckboxGroup( | |
choices=[ | |
"dust swirls", "lighting changes", "wind effects", | |
"water movement", "atmosphere shifts", "leaves flutter", | |
"shadows elongate", "fog rolls in", "sunlight filters through", | |
"rain falls gently", "snow drifts", "crowds bustle" | |
], | |
label="Scene Motion" | |
) | |
with gr.Row(): | |
camera_motion = gr.Dropdown( | |
choices=[ | |
"camera remains steady", "handheld camera follows", | |
"camera pans left", "camera pans right", | |
"camera tracks forward", "camera zooms in slowly", | |
"camera pulls back", "camera orbits subject", | |
"drone shot from above", "camera tilts upward", | |
"camera moves from low angle", "camera shifts focus" | |
], | |
label="Camera Motion", | |
value="camera remains steady" | |
) | |
style_motion = gr.Dropdown( | |
choices=[ | |
"cinematic atmosphere", "documentary style", "live-action feel", | |
"dramatic lighting", "peaceful ambiance", "energetic mood", | |
"professional setting", "nostalgic tone", "futuristic environment", | |
"golden hour warmth", "neon-lit urban setting", "minimalist aesthetic", | |
"high-contrast look", "soft-focused dreamlike quality" | |
], | |
label="Style/Atmosphere", | |
value="cinematic atmosphere" | |
) | |
build_custom_btn = gr.Button("π¨ Build Custom Prompt", variant="secondary") | |
custom_output = gr.Textbox( | |
label="Your Custom Prompt", | |
lines=3, | |
interactive=True, | |
show_copy_button=True | |
) | |
# Event handlers for Image Analysis tab | |
analyze_btn.click( | |
fn=analyze_image_with_zephyr, | |
inputs=[image_input], | |
outputs=[analysis_output, scene_state] | |
) | |
sample_btn.click( | |
fn=generate_sample_prompts_with_zephyr, | |
inputs=[scene_state], | |
outputs=sample_prompts | |
) | |
# Event handlers for AI Prompt Generator tab (Chat) | |
send_btn.click( | |
fn=chat_with_ai, | |
inputs=[user_message, chat_history_state, last_prompt_state, scene_state], | |
outputs=[user_message, chat_history_state, last_prompt_state] | |
) | |
# Also trigger on Enter key | |
user_message.submit( | |
fn=chat_with_ai, | |
inputs=[user_message, chat_history_state, last_prompt_state, scene_state], | |
outputs=[user_message, chat_history_state, last_prompt_state] | |
) | |
# Function to regenerate last prompt | |
def regenerate_last_prompt(chat_history, scene_info): | |
if not chat_history: | |
return chat_history, "" | |
# Get the last user message | |
last_user_msg = chat_history[-1][0] | |
new_prompt, _ = optimize_user_prompt_with_zephyr(last_user_msg, scene_info) | |
# Add regenerated response to chat | |
chat_history.append(["π Regenerate", f"Here's an alternative version:\n\n**{new_prompt}**"]) | |
return chat_history, new_prompt | |
regenerate_btn.click( | |
fn=regenerate_last_prompt, | |
inputs=[chat_history_state, scene_state], | |
outputs=[chat_history_state, last_prompt_state] | |
) | |
# Clear chat button | |
clear_btn.click( | |
fn=lambda: ([], ""), | |
inputs=[], | |
outputs=[chat_history_state, last_prompt_state] | |
) | |
# Copy last prompt button | |
def copy_last_prompt(last_prompt): | |
if last_prompt: | |
return gr.update(value=f"Copied: {last_prompt[:20]}...") | |
return gr.update(value="No prompt to copy") | |
copy_last_btn.click( | |
fn=copy_last_prompt, | |
inputs=[last_prompt_state], | |
outputs=[test_output] | |
) | |
# Diagnostic buttons | |
test_btn.click( | |
fn=test_basic_generation, | |
inputs=[], | |
outputs=[test_output] | |
) | |
debug_btn.click( | |
fn=get_debug_info, | |
inputs=[], | |
outputs=[debug_info] | |
) | |
# Update chat display when history changes | |
chat_history_state.change( | |
fn=lambda history: history, | |
inputs=[chat_history_state], | |
outputs=[chat_display] | |
) | |
# Event handlers for Custom Builder tab | |
build_custom_btn.click( | |
fn=build_custom_prompt, | |
inputs=[custom_foundation, subject_motion, scene_motion, camera_motion, style_motion], | |
outputs=[custom_output] | |
) | |
return demo | |
# Launch the app | |
if __name__ == "__main__": | |
print("π¬ Starting AI Video Prompt Generator with SARA LORA Adapter...") | |
print(f"π Status: {'GPU' if use_gpu else 'CPU'} Mode Enabled") | |
print("π§ Loading models (this may take a few minutes)...") | |
try: | |
demo = create_interface() | |
print("β Interface created successfully!") | |
print("π Launching application...") | |
demo.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
debug=True, | |
show_error=True | |
) | |
except Exception as e: | |
print(f"β Error launching app: {e}") | |
print("π§ Make sure you have sufficient CPU resources and all dependencies installed.") | |
print("π¦ Required packages:") | |
print(" pip install torch transformers gradio pillow accelerate bitsandbytes peft>=0.6.0") | |
# Alternative launch attempt | |
print("\nπ Attempting alternative launch...") | |
try: | |
# Try to install necessary dependencies | |
import subprocess | |
print("π Installing/updating necessary dependencies...") | |
subprocess.call(["pip", "install", "-U", "transformers", "accelerate", "peft>=0.6.0", "huggingface_hub", "bitsandbytes"]) | |
demo = create_interface() | |
demo.launch( | |
share=False, | |
server_name="127.0.0.1", | |
server_port=7860, | |
debug=False | |
) | |
except Exception as e2: | |
print(f"β Alternative launch failed: {e2}") | |
print("\nπ‘ Troubleshooting tips:") | |
print("1. Ensure CPU resources are sufficient.") | |
print("2. Check CPU usage: top or htop") | |
print("3. Try reducing model precision: set torch_dtype=torch.float16") | |
print("4. Monitor memory usage: free -h") |