File size: 3,005 Bytes
9d9ac6c
3a55fb8
9d9ac6c
3a55fb8
9d9ac6c
 
 
 
 
 
 
 
 
 
 
 
3a55fb8
 
3936b33
3a55fb8
3936b33
3a55fb8
3936b33
3a55fb8
 
3936b33
9d9ac6c
3a55fb8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9d9ac6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64daaa2
37ac125
 
9d9ac6c
37ac125
9d9ac6c
64daaa2
 
 
9d9ac6c
 
 
 
 
 
 
 
0389089
9d9ac6c
0389089
9d9ac6c
 
 
 
 
 
 
 
 
0389089
9d9ac6c
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import subprocess as sp

import cv2
import onnxruntime
import torchaudio
import torchaudio.compliance.kaldi as kaldi
from omegaconf import OmegaConf

from src.moviedubber.infer.utils_infer import (
    load_model,
    load_vocoder,
)
from src.moviedubber.model import ControlNetDiT, DiT


def get_video_duration(video_path):
    cap = cv2.VideoCapture(video_path)

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    fps = cap.get(cv2.CAP_PROP_FPS)

    duration = total_frames / fps
    return duration


def merge_video_audio(video_path, audio_path, output_path, start_time, duration):
    command = [
        "ffmpeg",
        "-y",
        "-ss",
        str(start_time),
        "-t",
        str(duration),
        "-i",
        video_path,
        "-i",
        audio_path,
        "-c:v",
        "copy",
        "-c:a",
        "aac",
        "-map",
        "0:v:0",
        "-map",
        "1:a:0",
        "-shortest",
        "-strict",
        "experimental",
        output_path,
    ]

    try:
        sp.run(command, check=True, stdout=sp.DEVNULL, stderr=sp.DEVNULL, stdin=sp.DEVNULL)
        print(f"Successfully merged audio and video into {output_path}")
        return output_path
    except sp.CalledProcessError as e:
        print(f"Error merging audio and video: {e}")
        return None


def get_spk_emb(audio_path, ort_session):
    audio, sample_rate = torchaudio.load(str(audio_path))
    if sample_rate != 16000:
        audio = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(audio)
    feat = kaldi.fbank(audio, num_mel_bins=80, dither=0, sample_frequency=16000)
    feat = feat - feat.mean(dim=0, keepdim=True)
    embedding = (
        ort_session.run(None, {ort_session.get_inputs()[0].name: feat.unsqueeze(dim=0).cpu().numpy()})[0]
        .flatten()
        .tolist()
    )
    return embedding


def load_models(repo_local_path, device):
    model_cfg = "src/moviedubber/configs/basemodel.yaml"
    vocoder_name = "bigvgan"

    vocoder = load_vocoder(local_path="nvidia/bigvgan_v2_24khz_100band_256x", device=device)

    ckpt_path = os.path.join(repo_local_path, "mmdubber.pt")
    vocab_file = os.path.join(repo_local_path, "vocab.txt")
    campplus_path = os.path.join(repo_local_path, "campplus.onnx")

    model_cls = DiT
    model_cfg = OmegaConf.load(model_cfg).model.arch
    controlnet = ControlNetDiT

    ema_model = load_model(
        model_cls,
        model_cfg,
        ckpt_path=ckpt_path,
        mel_spec_type=vocoder_name,
        vocab_file=vocab_file,
        controlnet=controlnet,
        device=device,
    )

    option = onnxruntime.SessionOptions()
    option.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
    option.intra_op_num_threads = 1
    providers = ["CPUExecutionProvider"]
    ort_session = onnxruntime.InferenceSession(
        campplus_path,
        sess_options=option,
        providers=providers,
    )
    return ema_model, vocoder, ort_session