import spaces import os import gc import gradio as gr import torch import json import utils import logging from diffusers.models import AutoencoderKL from diffusers import StableDiffusionXLImg2ImgPipeline from config import ( MODELS, MIN_IMAGE_SIZE, MAX_IMAGE_SIZE, OUTPUT_DIR, DEFAULT_NEGATIVE_PROMPT, DEFAULT_ASPECT_RATIO, QUALITY_TAGS, examples, sampler_list, aspect_ratios, css ) import time from typing import List, Dict, Tuple # Enhanced logging configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Constants IS_COLAB = utils.is_google_colab() or os.getenv("IS_COLAB") == "1" HF_TOKEN = os.getenv("HF_TOKEN") CACHE_EXAMPLES = torch.cuda.is_available() and os.getenv("CACHE_EXAMPLES") == "1" # PyTorch settings for better performance and determinism torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False torch.backends.cuda.matmul.allow_tf32 = True device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {device}") class GenerationError(Exception): """Custom exception for generation errors""" pass def validate_prompt(prompt: str) -> str: """Validate and clean up the input prompt.""" if not isinstance(prompt, str): raise GenerationError("Prompt must be a string") try: # Ensure proper UTF-8 encoding/decoding prompt = prompt.encode('utf-8').decode('utf-8') # Add space between ! and , prompt = prompt.replace("!,", "! ,") except UnicodeError: raise GenerationError("Invalid characters in prompt") # Only check if the prompt is completely empty or only whitespace if not prompt or prompt.isspace(): raise GenerationError("Prompt cannot be empty") return prompt.strip() def validate_dimensions(width: int, height: int) -> None: """Validate image dimensions.""" if not MIN_IMAGE_SIZE <= width <= MAX_IMAGE_SIZE: raise GenerationError(f"Width must be between {MIN_IMAGE_SIZE} and {MAX_IMAGE_SIZE}") if not MIN_IMAGE_SIZE <= height <= MAX_IMAGE_SIZE: raise GenerationError(f"Height must be between {MIN_IMAGE_SIZE} and {MAX_IMAGE_SIZE}") @spaces.GPU def generate( prompt: str, negative_prompt: str = DEFAULT_NEGATIVE_PROMPT, seed: int = 0, custom_width: int = 1024, custom_height: int = 1024, guidance_scale: float = 6.0, num_inference_steps: int = 25, sampler: str = "Euler a", model_name: str = "v14", aspect_ratio_selector: str = DEFAULT_ASPECT_RATIO, use_upscaler: bool = False, upscaler_strength: float = 0.55, upscale_by: float = 1.5, add_quality_tags: bool = True, progress: gr.Progress = gr.Progress(track_tqdm=True), ) -> Tuple[List[str], Dict]: """Generate images based on the given parameters.""" start_time = time.time() upscaler_pipe = None backup_scheduler = None try: # Memory management torch.cuda.empty_cache() gc.collect() # Input validation prompt = validate_prompt(prompt) if negative_prompt: negative_prompt = negative_prompt.encode('utf-8').decode('utf-8') validate_dimensions(custom_width, custom_height) # Set up generation generator = utils.seed_everything(seed) width, height = utils.aspect_ratio_handler( aspect_ratio_selector, custom_width, custom_height, ) # Process prompts if add_quality_tags: prompt = QUALITY_TAGS.format(prompt=prompt) prompt, negative_prompt = utils.preprocess_prompt( prompt, negative_prompt ) width, height = utils.preprocess_image_dimensions(width, height) # Set up pipeline pipe = pipes[model_name] backup_scheduler = pipe.scheduler pipe.scheduler = utils.get_scheduler(pipe.scheduler.config, sampler) if use_upscaler: upscaler_pipe = StableDiffusionXLImg2ImgPipeline(**pipe.components) # Prepare metadata metadata = { "prompt": prompt, "negative_prompt": negative_prompt, "resolution": f"{width} x {height}", "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps, "seed": seed, "sampler": sampler, "Model": "WAI NSFW illustrious SDXL " + model_name, "Model hash": "BDB59BAC77D94AE7A55FF893170F9554C3F349E48A1B73C0C17C0B7C6F4D41A2", } if use_upscaler: new_width = int(width * upscale_by) new_height = int(height * upscale_by) metadata["use_upscaler"] = { "upscale_method": "nearest-exact", "upscaler_strength": upscaler_strength, "upscale_by": upscale_by, "new_resolution": f"{new_width} x {new_height}", } else: metadata["use_upscaler"] = None logger.info(f"Starting generation with parameters: {json.dumps(metadata, indent=4)}") # Generate images if use_upscaler: latents = pipe( prompt=prompt, negative_prompt=negative_prompt, width=width, height=height, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, generator=generator, output_type="latent", ).images upscaled_latents = utils.upscale(latents, "nearest-exact", upscale_by) images = upscaler_pipe( prompt=prompt, negative_prompt=negative_prompt, image=upscaled_latents, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, strength=upscaler_strength, generator=generator, output_type="pil", ).images else: images = pipe( prompt=prompt, negative_prompt=negative_prompt, width=width, height=height, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, generator=generator, output_type="pil", ).images # Save images if images: total = len(images) image_paths = [] for idx, image in enumerate(images, 1): progress(idx/total, desc="Saving images...") path = utils.save_image(image, metadata, OUTPUT_DIR, IS_COLAB) image_paths.append(path) logger.info(f"Image {idx}/{total} saved as {path}") generation_time = time.time() - start_time logger.info(f"Generation completed successfully in {generation_time:.2f} seconds") metadata["generation_time"] = f"{generation_time:.2f}s" return image_paths, metadata except GenerationError as e: logger.warning(f"Generation validation error: {str(e)}") raise gr.Error(str(e)) except Exception as e: logger.exception("Unexpected error during generation") raise gr.Error(f"Generation failed: {str(e)}") finally: # Cleanup torch.cuda.empty_cache() gc.collect() if upscaler_pipe is not None: del upscaler_pipe if backup_scheduler is not None and pipe is not None: pipe.scheduler = backup_scheduler utils.free_memory() # Model initialization if torch.cuda.is_available(): pipes = {} try: logger.info("Loading VAE and pipeline...") vae = AutoencoderKL.from_pretrained( "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16, ) for model in MODELS: pipes[model['name']] = utils.load_pipeline(model['path'], device, vae=vae) logger.info(f"Pipeline for {model} loaded successfully on GPU!") logger.info("Pipeline loaded successfully on GPU!") except Exception as e: logger.error(f"Error loading VAE, falling back to default: {e}") for model in MODELS: if model['name'] not in pipes: pipes[model['name']] = utils.load_pipeline(model['path'], device) logger.info(f"Pipeline for {model} loaded successfully on GPU!") else: logger.warning("CUDA not available, running on CPU") # check if os.environ keys have VSCODE, if not, load the model on CPU skip = False for key in dict(os.environ).keys(): if "VSCODE" in key: skip = True break if not skip: logger.info("Loading pipeline on CPU...") pipe = utils.load_pipeline(MODELS[0]['name'], torch.device("cpu")) logger.info("Pipeline loaded successfully on CPU!") with gr.Blocks(css=css, theme="Nymbo/Nymbo_Theme_5") as demo: gr.HTML( """