Spaces:
Running
Running
import os | |
import os.path as osp | |
import sys | |
import tempfile | |
import gradio as gr | |
import librosa | |
import soundfile | |
import torch | |
import torch.nn.functional as F | |
import torchaudio | |
from huggingface_hub import snapshot_download | |
from moviepy import VideoFileClip | |
from pydub import AudioSegment | |
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, AutoTokenizer, pipeline | |
from src.internvl.eval import load_video | |
from src.moviedubber.infer.utils_infer import ( | |
cfg_strength, | |
chunk_text, | |
nfe_step, | |
sway_sampling_coef, | |
) | |
from src.moviedubber.infer.video_preprocess import VideoFeatureExtractor | |
from src.moviedubber.infer_with_mmlm_result import concat_movie_with_audio, get_spk_emb, load_models | |
from src.moviedubber.model.utils import convert_char_to_pinyin | |
sys.path.insert(0, "src/third_party") | |
sys.path.append("src/third_party/BigVGAN") | |
from InternVL.internvl_chat.internvl.model.internvl_chat.modeling_internvl_chat import InternVLChatModel # type: ignore | |
def load_asr_model(model_id="openai/whisper-large-v3-turbo"): | |
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
).to(device) | |
processor = AutoProcessor.from_pretrained(model_id) | |
pipe = pipeline( | |
"automatic-speech-recognition", | |
model=model, | |
tokenizer=processor.tokenizer, | |
feature_extractor=processor.feature_extractor, | |
torch_dtype=torch_dtype, | |
device=device, | |
) | |
return pipe | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
repo_local_path = snapshot_download(repo_id="woak-oa/DeepDubber-V1") | |
mmlm_path = osp.join(repo_local_path, "mmlm") | |
mmlm = InternVLChatModel.from_pretrained( | |
mmlm_path, | |
torch_dtype=torch.bfloat16, | |
low_cpu_mem_usage=True, | |
use_flash_attn=False, | |
) | |
mmlm = mmlm.eval().to(device) | |
tokenizer = AutoTokenizer.from_pretrained(mmlm_path, trust_remote_code=True, use_fast=False) | |
generation_config = dict(max_new_tokens=1024, do_sample=False) | |
ema_model, vocoder, ort_session = load_models(repo_local_path, device=device) | |
asr_pipe = load_asr_model() | |
videofeature_extractor = VideoFeatureExtractor(device=device) | |
out_dir = "./output" | |
if not os.path.exists(out_dir): | |
os.makedirs(out_dir) | |
def deepdubber(video_path: str, subtitle_text: str, audio_path: str = None) -> str: | |
pixel_values, num_patches_list = load_video(video_path, num_segments=8, max_num=1) | |
pixel_values = pixel_values.to(torch.bfloat16).to(device) | |
video_prefix = "".join([f"Frame{i + 1}: <image>\n" for i in range(len(num_patches_list))]) | |
question = ( | |
video_prefix | |
+ "What is the voice-over category for this video? Options: A. dialogue, B. monologue, C. narration." | |
) | |
response = mmlm.chat( | |
tokenizer, | |
pixel_values, | |
question, | |
generation_config, | |
num_patches_list=num_patches_list, | |
history=None, | |
return_history=False, | |
) | |
try: | |
response = response.split("<REASONING>")[1].split("</REASONING>")[0].strip() | |
except Exception as e: | |
print(f"Error: {e}, response: {response}") | |
response = response.strip()[0] | |
print(f"Starting deepdubber with video_path: {video_path} and subtitle_text: {subtitle_text}") | |
gen_clip = videofeature_extractor.extract_features(video_path) | |
gen_text = subtitle_text | |
clip = VideoFileClip(video_path) | |
gen_audio_len = int(clip.duration * 24000 // 256) | |
gen_clip = gen_clip.unsqueeze(0).to(device=device, dtype=torch.float32).transpose(1, 2) | |
gen_clip = F.interpolate(gen_clip, size=(gen_audio_len,), mode="linear", align_corners=False).transpose(1, 2) | |
ref_audio_len = None | |
if audio_path is not None: | |
print("reference audio is not None, dubbing with reference audio") | |
if audio_path.endswith(".mp3"): | |
audio = AudioSegment.from_mp3(audio_path) | |
wav_file = audio_path.replace(".mp3", ".wav") | |
audio.export(wav_file, format="wav") | |
else: | |
wav_file = audio_path | |
ref_text = asr_pipe(librosa.load(wav_file, sr=16000)[0], generate_kwargs={"language": "english"})["text"] | |
ref_text = ref_text.replace("\n", " ").replace("\r", " ") | |
print(f"Reference text: {ref_text}") | |
spk_emb = get_spk_emb(wav_file, ort_session) | |
spk_emb = torch.tensor(spk_emb).to(device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0) | |
audio_data, sr = torchaudio.load(wav_file) | |
resampler = torchaudio.transforms.Resample(sr, 24000) | |
if sr != 24000: | |
audio_data = resampler(audio_data) | |
if audio_data.shape[0] > 1: | |
audio_data = torch.mean(audio_data, dim=0, keepdim=True) | |
audio_data = audio_data.to(device) | |
ref_audio_len = int(audio_data.shape[-1] // 256) | |
ref_clip = torch.zeros((1, ref_audio_len, 768)).to(device=device) | |
gen_clip = torch.cat((gen_clip, ref_clip), dim=1) | |
gen_audio_len = ref_audio_len + gen_audio_len | |
gen_text = ref_text + " " + gen_text | |
else: | |
spk_emb = torch.zeros((1, 1, 192)).to(device=device) | |
audio_data = torch.zeros((1, gen_audio_len, 100)).to(device=device) | |
gen_text_batches = chunk_text(gen_text, max_chars=1024) | |
final_text_list = convert_char_to_pinyin(gen_text_batches) | |
with torch.inference_mode(): | |
generated, _ = ema_model.sample( | |
cond=audio_data, | |
text=final_text_list, | |
clip=gen_clip, | |
spk_emb=spk_emb, | |
duration=gen_audio_len, | |
steps=nfe_step, | |
cfg_strength=cfg_strength, | |
sway_sampling_coef=sway_sampling_coef, | |
no_ref_audio=False, | |
) | |
generated = generated.to(torch.float32) | |
if ref_audio_len is not None: | |
generated = generated[:, ref_audio_len:, :] | |
generated_mel_spec = generated.permute(0, 2, 1) | |
generated_wave = vocoder(generated_mel_spec) | |
generated_wave = generated_wave.squeeze().cpu().numpy() | |
# using a temporary wav file to save the generated audio | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav", dir="./output") as temp_wav_file: | |
temp_wav_path = temp_wav_file.name | |
soundfile.write(temp_wav_path, generated_wave, samplerate=24000) | |
concated_video = concat_movie_with_audio(temp_wav_path, video_path, ".") | |
# Ensure the temporary file is deleted after use | |
os.remove(temp_wav_path) | |
print(f"Deepdubber completed successfully, output path: {concated_video}") | |
return response, concated_video | |
def process_video_dubbing(video_path: str, subtitle_text: str, audio_path: str = None) -> str: | |
try: | |
if not os.path.exists(video_path): | |
raise ValueError("Video file does not exist") | |
if not subtitle_text.strip(): | |
raise ValueError("Subtitle text cannot be empty") | |
if audio_path is None: | |
audio_path = "datasets/CoTMovieDubbing/GT.wav" | |
print(f"Processing video: {video_path}") | |
res, output_path = deepdubber(video_path, subtitle_text, audio_path) | |
return res, output_path | |
except Exception as e: | |
print(f"Error in process_video_dubbing: {e}") | |
return None, None | |
def create_ui(): | |
with gr.Blocks(title="DeepDubber-V1") as app: | |
gr.Markdown("# DeepDubber-V1\nUpload your video file and enter the subtitle you want to dub") | |
with gr.Row(): | |
video_input = gr.Video(label="Upload video") | |
subtitle_input = gr.Textbox( | |
label="Enter the subtitle", placeholder="Enter the subtitle to be dubbed...", lines=5 | |
) | |
audio_input = gr.Audio(label="Upload speech prompt (Optional)", type="filepath") | |
process_btn = gr.Button("Start Dubbing") | |
with gr.Row(): | |
output_response = gr.Textbox(label="Response", placeholder="Response from MMLM", lines=5) | |
output_video = gr.Video(label="Dubbed Video") | |
# add some examples | |
examples = [ | |
[ | |
"datasets/CoTMovieDubbing/demo/v01input.mp4", | |
"it isn't simply a question of creating a robot who can love", | |
"datasets/CoTMovieDubbing/demo/speech_prompt_01.mp3", | |
], | |
[ | |
"datasets/CoTMovieDubbing/demo/v02input.mp4", | |
"Me, I'd be happy with one who's not... fixed.", | |
"datasets/CoTMovieDubbing/demo/speech_prompt_02.mp3", | |
], | |
[ | |
"datasets/CoTMovieDubbing/demo/v03input.mp4", | |
"Man, Papi. What am I gonna do?", | |
"datasets/CoTMovieDubbing/demo/speech_prompt_03.mp3", | |
], | |
] | |
process_btn.click( | |
fn=process_video_dubbing, | |
inputs=[video_input, subtitle_input, audio_input], | |
outputs=[output_response, output_video], | |
) | |
gr.Examples(examples=examples, inputs=[video_input, subtitle_input, audio_input]) | |
return app | |
if __name__ == "__main__": | |
app = create_ui() | |
app.launch(allowed_paths=["./output", "./datasets"]) | |