# built-in import os import subprocess import logging import re import random import string import requests import sys import warnings # external import spaces import torch import gradio as gr import numpy as np from lxml.html import fromstring #from transformers import pipeline from torch import multiprocessing as mp, nn #from torch.multiprocessing import Pool #from pathos.multiprocessing import ProcessPool as Pool #from pathos.threading import ThreadPool as Pool #from diffusers.pipelines.flux import FluxPipeline from diffusers.utils import export_to_gif, load_image from diffusers.models.modeling_utils import ModelMixin from huggingface_hub import hf_hub_download from safetensors.torch import load_file, save_file from diffusers import DiffusionPipeline, AnimateDiffPipeline, MotionAdapter, EulerDiscreteScheduler, DDIMScheduler, StableDiffusionXLPipeline, UNet2DConditionModel, AutoencoderKL, UNet3DConditionModel #import jax #import jax.numpy as jnp from numba import njit as cpu1, jit as cpu2, cuda from numba.cuda import jit as gpu # optimization: # @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) # logging warnings.filterwarnings("ignore") root = logging.getLogger() root.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n') handler.setFormatter(formatter) root.addHandler(handler) handler2 = logging.StreamHandler(sys.stderr) handler2.setLevel(logging.DEBUG) formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n') handler2.setFormatter(formatter) root.addHandler(handler2) # spawn mp.set_start_method("spawn", force=True) # data last_motion=None dtype = torch.float16 result=[] device = "cuda" #repo = "ByteDance/AnimateDiff-Lightning" #ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors" base = "emilianJR/epiCRealism" #base = "SG161222/Realistic_Vision_V6.0_B1_noVAE" vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse").to(device, dtype=dtype) #unet = UNet2DConditionModel.from_config("emilianJR/epiCRealism",subfolder="unet").to(device, dtype).load_state_dict(load_file(hf_hub_download("emilianJR/epiCRealism", "unet/diffusion_pytorch_model.safetensors"), device=device), strict=False) adapter = MotionAdapter.from_pretrained("guoyww/animatediff-motion-adapter-v1-5-3", torch_dtype=dtype, device=device) fast=True fps=10 time=1 width=384 height=768 step=40 accu=10 css=""" input, input::placeholder { text-align: center !important; } *, *::placeholder { font-family: Suez One !important; } h1,h2,h3,h4,h5,h6 { width: 100%; text-align: center; } footer { display: none !important; } #col-container { margin: 0 auto; max-width: 15cm; } .image-container { aspect-ratio: """+str(width)+"/"+str(height)+""" !important; } .dropdown-arrow { display: none !important; } *:has(>.btn) { display: flex; justify-content: space-evenly; align-items: center; } .btn { display: flex; } """ js=""" function custom(){ document.querySelector("div#prompt input").setAttribute("maxlength","38") document.querySelector("div#prompt2 input").setAttribute("maxlength","38") } """ # functionality @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) def run(cmd, assert_success=False, capture_output=False, env=None, dry_run=False): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx if dry_run: print(f"--> {cmd}") result = 1 else: result = subprocess.run(cmd, shell=True, capture_output=capture_output, env=env) if assert_success and result.returncode != 0: logging.error( f"Command '{cmd}' failed with exit status code '{result.returncode}'. Exiting..." ) sys.exit() return result @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) def translate(text,lang): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx if text == None or lang == None: return "" text = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', text)).lower().strip() lang = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', lang)).lower().strip() if text == "" or lang == "": return "" if len(text) > 38: raise Exception("Translation Error: Too long text!") user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15' ] padded_chars = re.sub("[(^\-)(\-$)]","",text.replace("","-").replace("- -"," ")).strip() query_text = f'Please translate {padded_chars}, into {lang}' url = f'https://www.google.com/search?q={query_text}' resp = requests.get( url = url, headers = { 'User-Agent': random.choice(user_agents) } ) content = resp.content html = fromstring(content) translated = text try: src_lang = html.xpath('//*[@class="source-language"]')[0].text_content().lower().strip() trgt_lang = html.xpath('//*[@class="target-language"]')[0].text_content().lower().strip() src_text = html.xpath('//*[@id="tw-source-text"]/*')[0].text_content().lower().strip() trgt_text = html.xpath('//*[@id="tw-target-text"]/*')[0].text_content().lower().strip() if trgt_lang == lang: translated = trgt_text except: print(f'Translation Warning: Failed To Translate!') ret = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', translated)).lower().strip() print(ret) return ret @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) def generate_random_string(length): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx characters = string.ascii_letters + string.digits return ''.join(random.choice(characters) for _ in range(length)) @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) @spaces.GPU(duration=65) def Piper(image,positive,negative,motion): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx global last_motion global ip_loaded if last_motion != motion: pipe.unload_lora_weights() if motion != "": pipe.load_lora_weights(motion, adapter_name="motion") pipe.fuse_lora() pipe.set_adapters(["motion"], [0.7]) last_motion = motion pipe.to(device,dtype) if negative=="": return pipe( prompt=positive, height=height, width=width, ip_adapter_image=image.convert("RGB").resize((width,height)), num_inference_steps=step, guidance_scale=accu, num_frames=(fps*time) ) return pipe( prompt=positive, negative_prompt=negative, height=height, width=width, ip_adapter_image=image.convert("RGB").resize((width,height)), num_inference_steps=step, guidance_scale=accu, num_frames=(fps*time) ) @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) def infer(pm): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx print("infer: started") p1 = pm["p"] name = generate_random_string(12)+".png" neg = pm["n"] if neg != "": neg = f"{neg} where in the image" _do = ['photographed', 'realistic', 'dynamic poze', 'deep field', 'reasonable', "natural", 'rough', 'best quality', 'focused', "highly detailed"] if p1 != "": _do.append(f"a new {p1} content in the image") posi = ", ".join(_do) if pm["i"] == None: return None out = Piper(pm["i"],posi,neg,pm["m"]) export_to_gif(out.frames[0],name,fps=fps) return name @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) def handle(i,m,p1,p2,*result): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx p1_en = translate(p1,"english") p2_en = translate(p2,"english") pm = {"p":p1_en,"n":p2_en,"m":m,"i":i} ln = len(result) rng = list(range(ln)) arr = [pm for _ in rng] #with Pool(f'{ ln }:ppn=2', queue='productionQ', timelimit='5:00:00', workdir='.') as pool: #return pool.map(infer,arr) ret = [] for _ in range(ln): ret.append(infer,pm) return ret @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) def ui(): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx with gr.Blocks(theme=gr.themes.Soft(),css=css,js=js) as demo: with gr.Column(elem_id="col-container"): gr.Markdown(f""" # MULTI-LANGUAGE IMAGE GENERATOR """) with gr.Row(): img = gr.Image(label="STATIC PHOTO",show_label=True,container=True,type="pil") with gr.Row(): prompt = gr.Textbox( elem_id="prompt", placeholder="INCLUDE", container=False, max_lines=1 ) with gr.Row(): prompt2 = gr.Textbox( elem_id="prompt2", placeholder="EXCLUDE", container=False, max_lines=1 ) with gr.Row(): motion = gr.Dropdown( label='CAMERA', show_label=True, container=True, choices=[ ("(No Effect)", ""), ("Zoom in", "guoyww/animatediff-motion-lora-zoom-in"), ("Zoom out", "guoyww/animatediff-motion-lora-zoom-out"), ("Tilt up", "guoyww/animatediff-motion-lora-tilt-up"), ("Tilt down", "guoyww/animatediff-motion-lora-tilt-down"), ("Pan left", "guoyww/animatediff-motion-lora-pan-left"), ("Pan right", "guoyww/animatediff-motion-lora-pan-right"), ("Roll left", "guoyww/animatediff-motion-lora-rolling-anticlockwise"), ("Roll right", "guoyww/animatediff-motion-lora-rolling-clockwise"), ], value="", interactive=True ) with gr.Row(): run_button = gr.Button("START",elem_classes="btn",scale=0) with gr.Row(): result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False)) result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False)) gr.on( triggers=[run_button.click, prompt.submit, prompt2.submit], fn=handle,inputs=[img,motion,prompt,prompt2,*result],outputs=result ) demo.queue().launch() @gpu(cache=True) # @cpu1(cache=True,nopython=True,parallel=True) # @cpu2(cache=True,nopython=True,parallel=True) # @cpu1(cache=True) # @cpu2(cache=True) def pre(): tx = cuda.threadIdx.x bx = cuda.blockIdx.x dx = cuda.blockDim.x pos = tx + bx * dx pipe = AnimateDiffPipeline.from_pretrained(base, vae=vae, motion_adapter=adapter, torch_dtype=dtype).to(device) pipe.scheduler = DDIMScheduler( clip_sample=False, beta_start=0.00085, beta_end=0.012, beta_schedule="linear", timestep_spacing="trailing", steps_offset=1 ) pipe.unet.load_state_dict(load_file(hf_hub_download(repo, ckpt), device=device), strict=False) pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models", weight_name="ip-adapter_sd15.bin") pipe.enable_vae_slicing() pipe.enable_free_init(method="butterworth", use_fast_sampling=fast) # entry if __name__ == "__main__": pre[32,32]() ui[32,32]() # end