|
|
|
import gradio as gr |
|
import torch |
|
import requests |
|
from PIL import Image |
|
import numpy as np |
|
import os |
|
from tqdm import tqdm |
|
|
|
|
|
|
|
from diffusers import FluxControlNetPipeline, ControlNetModel, FluxPipeline |
|
|
|
|
|
|
|
def download_file(url, local_filename): |
|
"""Скачивает файл по URL с индикатором прогресса.""" |
|
print(f"Скачиваю {url} в {local_filename}...") |
|
if os.path.exists(local_filename): |
|
print(f"Файл уже существует: {local_filename}. Пропускаю скачивание.") |
|
return local_filename |
|
|
|
try: |
|
response = requests.get(url, stream=True) |
|
response.raise_for_status() |
|
|
|
total_size_in_bytes = int(response.headers.get('content-length', 0)) |
|
block_size = 8192 |
|
|
|
if total_size_in_bytes > 0: |
|
progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True, desc=f"Скачивание {local_filename}") |
|
else: |
|
print("Размер файла неизвестен, скачивание без индикатора прогресса.") |
|
progress_bar = None |
|
|
|
with open(local_filename, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=block_size): |
|
if progress_bar: |
|
progress_bar.update(len(chunk)) |
|
f.write(chunk) |
|
|
|
if progress_bar: |
|
progress_bar.close() |
|
|
|
print(f"Скачивание завершено: {local_filename}") |
|
return local_filename |
|
except requests.exceptions.RequestException as e: |
|
print(f"Ошибка скачивания {url}: {e}") |
|
return None |
|
except Exception as e: |
|
print(f"Произошла другая ошибка при скачивании: {e}") |
|
return None |
|
|
|
|
|
|
|
CIVITAI_FLUX_FUSION_URL = "https://civitai.com/api/download/models/936565?type=Model&format=SafeTensor&fp=fp8" |
|
|
|
LOCAL_FLUX_FUSION_FILENAME = "flux_fusion_v2_fp8.safetensors" |
|
|
|
|
|
CONTROLNET_FLUX_MODEL_ID = "ABDALLALSWAITI/FLUX.1-dev-ControlNet-Union-Pro-2.0-fp8" |
|
|
|
|
|
pipeline = None |
|
downloaded_base_model_path = None |
|
|
|
|
|
print("Начинаю скачивание базовой модели с Civitai...") |
|
downloaded_base_model_path = download_file(CIVITAI_FLUX_FUSION_URL, LOCAL_FLUX_FUSION_FILENAME) |
|
|
|
|
|
|
|
def load_pipeline_components(base_model_path, controlnet_model_id): |
|
""" |
|
Загружает ControlNet с HF и пытается собрать пайплайн FLUX, |
|
используя локальный SafeTensor как базовую модель. |
|
""" |
|
if not base_model_path or not os.path.exists(base_model_path): |
|
print(f"Ошибка загрузки: Файл базовой модели не найден по пути: {base_model_path}") |
|
return None |
|
|
|
print(f"Загрузка ControlNet модели FLUX с HF Hub: {controlnet_model_id}") |
|
try: |
|
|
|
controlnet = ControlNetModel.from_pretrained(controlnet_model_id, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32) |
|
except Exception as e: |
|
print(f"Ошибка загрузки ControlNet модели с HF Hub: {controlnet_model_id}. Проверьте ID или соединение.") |
|
print(f"Ошибка: {e}") |
|
return None |
|
|
|
print(f"Попытка собрать пайплайн FLUX ControlNet, используя локальный файл: {base_model_path} как базовую модель.") |
|
print("ВНИМАНИЕ: Загрузка FLUX пайплайна из одиночного SafeTensor файла методом from_single_file") |
|
print("не является стандартной и может вызвать ошибки совместимости.") |
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
base_pipe = FluxPipeline.from_single_file( |
|
base_model_path, |
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
|
|
|
|
) |
|
print("Успешно загружен базовый FLUX пайплайн из SafeTensor файла методом from_single_file (если это сообщение видно).") |
|
|
|
|
|
|
|
|
|
print("Собираю финальный FluxControlNetPipeline...") |
|
|
|
|
|
try: |
|
controlnet_pipe = FluxControlNetPipeline( |
|
transformer=base_pipe.transformer, |
|
vae=base_pipe.vae, |
|
text_encoder=base_pipe.text_encoder, |
|
tokenizer=base_pipe.tokenizer, |
|
scheduler=base_pipe.scheduler, |
|
controlnet=controlnet, |
|
feature_extractor=base_pipe.feature_extractor if hasattr(base_pipe, 'feature_extractor') else None, |
|
image_processor=base_pipe.image_processor if hasattr(base_pipe, 'image_processor') else None, |
|
) |
|
|
|
|
|
print(f"Финальный планировщик: {type(controlnet_pipe.scheduler).__name__}") |
|
|
|
|
|
del base_pipe |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
print("Память GPU очищена после создания ControlNet пайплайна.") |
|
|
|
|
|
if torch.cuda.is_available(): |
|
controlnet_pipe = controlnet_pipe.to("cuda") |
|
print("Финальный пайплайн FLUX ControlNet перемещен на GPU.") |
|
else: |
|
print("GPU не найдено. Пайплайн на CPU.") |
|
|
|
print("Сборка финального пайплайна FLUX ControlNet завершена успешно.") |
|
return controlnet_pipe |
|
|
|
except Exception as e: |
|
print(f"Ошибка при сборке финального FluxControlNetPipeline: {e}") |
|
print("Проверьте, что базовая модель, загруженная из SafeTensor, содержит все компоненты FLUX (transformer, vae, text_encoder и т.д.).") |
|
print("Возможно, from_single_file не смог загрузить все необходимые компоненты FLUX из этого файла.") |
|
return None |
|
|
|
|
|
except Exception as e: |
|
print(f"Критическая ошибка при попытке загрузить базовый FLUX пайплайн из файла {base_model_path}: {e}") |
|
print("Наиболее вероятно, этот файл SafeTensor несовместим с методами загрузки FLUX в diffusers.") |
|
print("Возможно, файл поврежден или не содержит ожидаемой структуры FLUX.") |
|
return None |
|
|
|
|
|
|
|
if downloaded_base_model_path and os.path.exists(downloaded_base_model_path): |
|
pipeline = load_pipeline_components(downloaded_base_model_path, CONTROLNET_FLUX_MODEL_ID) |
|
else: |
|
print("Пропуск загрузки пайплайна из-за ошибки скачивания или отсутствия файла.") |
|
pipeline = None |
|
|
|
|
|
|
|
|
|
|
|
def generate_image_gradio(controlnet_image: np.ndarray, prompt: str, negative_prompt: str = "", guidance_scale: float = 5.0, num_inference_steps: int = 4, controlnet_conditioning_scale: float = 1.0): |
|
""" |
|
Генерирует изображение с использованием FLUX ControlNet. |
|
Принимает изображение NumPy, текст промта и другие параметры. |
|
Возвращает сгенерированное изображение в формате PIL Image. |
|
""" |
|
if pipeline is None: |
|
print("Попытка генерации, но пайплайн модели не загружен.") |
|
return None, "Ошибка: Пайплайн модели не загружен. Проверьте логи Space." |
|
|
|
if controlnet_image is None: |
|
return None, "Ошибка: необходимо загрузить изображение для ControlNet." |
|
|
|
print(f"Генерация изображения FLUX с промтом: '{prompt}'") |
|
print(f"Размер входного изображения для ControlNet: {controlnet_image.shape}") |
|
|
|
input_image_pil = Image.fromarray(controlnet_image).convert("RGB") |
|
|
|
|
|
try: |
|
|
|
|
|
output = pipeline( |
|
prompt=prompt, |
|
image=input_image_pil, |
|
negative_prompt=negative_prompt, |
|
guidance_scale=guidance_scale, |
|
num_inference_steps=num_inference_steps, |
|
controlnet_conditioning_scale=controlnet_conditioning_scale, |
|
|
|
|
|
) |
|
|
|
generated_image_pil = output.images[0] |
|
|
|
print("Генерация FLUX завершена.") |
|
return generated_image_pil, "Успех!" |
|
except Exception as e: |
|
print(f"Ошибка при генерации FLUX: {e}") |
|
return None, f"Ошибка при генерации FLUX: {e}" |
|
|
|
|
|
|
|
|
|
input_image_comp = gr.Image(type="numpy", label="Изображение для ControlNet (набросок, карта глубины и т.д.)") |
|
prompt_comp = gr.Textbox(label="Промт (Prompt)") |
|
negative_prompt_comp = gr.Textbox(label="Негативный промт (Negative Prompt)") |
|
|
|
guidance_scale_comp = gr.Slider(minimum=0.0, maximum=10.0, value=5.0, step=0.1, label="Степень соответствия промту (Guidance Scale)") |
|
|
|
num_inference_steps_comp = gr.Slider(minimum=1, maximum=20, value=4, step=1, label="Количество шагов (Inference Steps) [для FLUX Fusion V2 обычно 4]") |
|
controlnet_conditioning_scale_comp = gr.Slider(minimum=0.0, maximum=2.0, value=1.0, step=0.05, label="Вес ControlNet (ControlNet Scale)") |
|
|
|
output_image_comp = gr.Image(type="pil", label="Сгенерированное изображение") |
|
status_text_comp = gr.Textbox(label="Статус") |
|
|
|
|
|
|
|
interface = gr.Interface( |
|
fn=generate_image_gradio, |
|
inputs=[ |
|
input_image_comp, |
|
prompt_comp, |
|
negative_prompt_comp, |
|
guidance_scale_comp, |
|
num_inference_steps_comp, |
|
controlnet_conditioning_scale_comp |
|
], |
|
outputs=[output_image_comp, status_text_comp], |
|
title="FLUX ControlNet Interface (Attempt with Civitai SafeTensor)", |
|
description="Загрузите изображение для ControlNet, введите промт и нажмите 'Generate'. Попытка использовать SafeTensor 'Flux Fusion V2' с Civitai как базовую модель FLUX с ControlNet с HF." |
|
) |
|
|
|
|