#!/usr/bin/env python3
#
# Copyright      2023  Xiaomi Corp.        (authors: Fangjun Kuang)
#
# See LICENSE for clarification regarding multiple authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# References:
# https://gradio.app/docs/#dropdown

import logging
import shutil
import tempfile
import time
import urllib.request
from datetime import datetime

import gradio as gr
import torch
from pydub import AudioSegment

from separate import get_file, load_audio, load_model, separate

examples = [
    "yesterday-once-more-Carpenters.mp3",
    "das-beste-Silbermond.mp3",
    "hotel-in-california.mp3",
    "起风了.mp3",
]

for name in examples:
    filename = get_file(
        "csukuangfj/spleeter-torch",
        name,
        subfolder="test_wavs",
    )

    shutil.copyfile(filename, name)


def build_html_output(s: str, style: str = "result_item_success"):
    return f"""
    <div class='result'>
        <div class='result_item {style}'>
          {s}
        </div>
    </div>
    """


def process_url(url: str):
    logging.info(f"Processing URL: {url}")
    with tempfile.NamedTemporaryFile() as f:
        try:
            urllib.request.urlretrieve(url, f.name)
            return process(in_filename=f.name)
        except Exception as e:
            logging.info(str(e))
            return "", build_html_output(str(e), "result_item_error")


def process_uploaded_file(in_filename: str):
    if in_filename is None or in_filename == "":
        return "", build_html_output(
            "Please first upload a file and then click "
            'the button "submit for separation"',
            "result_item_error",
        )

    logging.info(f"Processing uploaded file: {in_filename}")
    try:
        return process(in_filename=in_filename)
    except Exception as e:
        logging.info(str(e))
        return "", build_html_output(str(e), "result_item_error")


def process_microphone(in_filename: str):
    if in_filename is None or in_filename == "":
        return "", build_html_output(
            "Please first click 'Record from microphone', speak, "
            "click 'Stop recording', and then "
            "click the button 'submit for separation'",
            "result_item_error",
        )

    logging.info(f"Processing microphone: {in_filename}")
    try:
        return process(in_filename=in_filename)
    except Exception as e:
        logging.info(str(e))
        return "", build_html_output(str(e), "result_item_error")


@torch.no_grad()
def process(in_filename: str):
    logging.info(f"in_filename: {in_filename}")

    waveform = load_audio(in_filename)
    duration = waveform.shape[0] / 44100  # in seconds

    vocals = load_model("vocals.pt")
    accompaniment = load_model("accompaniment.pt")

    now = datetime.now()
    date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f")
    logging.info(f"Started at {date_time}")

    start = time.time()

    vocals_wave, accompaniment_wave = separate(vocals, accompaniment, waveform)

    date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f")
    end = time.time()

    vocals_wave = (vocals_wave.t() * 32768).to(torch.int16)
    accompaniment_wave = (accompaniment_wave.t() * 32768).to(torch.int16)

    vocals_sound = AudioSegment(
        data=vocals_wave.numpy().tobytes(), sample_width=2, frame_rate=44100, channels=2
    )
    vocals_filename = in_filename + "-vocals.mp3"
    vocals_sound.export(vocals_filename, format="mp3", bitrate="128k")

    accompaniment_sound = AudioSegment(
        data=accompaniment_wave.numpy().tobytes(),
        sample_width=2,
        frame_rate=44100,
        channels=2,
    )
    accompaniment_filename = in_filename + "-accompaniment.mp3"
    accompaniment_sound.export(accompaniment_filename, format="mp3", bitrate="128k")

    rtf = (end - start) / duration

    logging.info(f"Finished at {date_time} s. Elapsed: {end - start: .3f} s")

    info = f"""
    Input duration  : {duration: .3f} s <br/>
    Processing time: {end - start: .3f} s <br/>
    RTF: {end - start: .3f}/{duration: .3f} = {rtf:.3f} <br/>
    """
    logging.info(info)

    return vocals_filename, accompaniment_filename, build_html_output(info)


title = "# Music source separation with Spleeter in PyTorch"

# css style is copied from
# https://huggingface.co/spaces/alphacep/asr/blob/main/app.py#L113
css = """
.result {display:flex;flex-direction:column}
.result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%}
.result_item_success {background-color:mediumaquamarine;color:white;align-self:start}
.result_item_error {background-color:#ff7070;color:white;align-self:start}
"""


demo = gr.Blocks(css=css)


with demo:
    gr.Markdown(title)

    with gr.Tabs():
        with gr.TabItem("Upload from disk"):
            uploaded_file = gr.Audio(
                sources=["upload"],  # Choose between "microphone", "upload"
                type="filepath",
                label="Upload from disk",
            )
            upload_button = gr.Button("Submit for separation")
            uploaded_html_info = gr.HTML(label="Info")

            uploaded_vocals = gr.Audio(label="vocals")
            uploaded_accompaniment = gr.Audio(label="accompaniment")

            gr.Examples(
                examples=examples,
                inputs=[uploaded_file],
                outputs=[uploaded_vocals, uploaded_accompaniment, uploaded_html_info],
                fn=process_uploaded_file,
            )

        with gr.TabItem("Record from microphone"):
            microphone = gr.Audio(
                sources=["microphone"],  # Choose between "microphone", "upload"
                type="filepath",
                label="Record from microphone",
            )

            record_button = gr.Button("Submit for separation")
            recorded_html_info = gr.HTML(label="Info")

            recorded_vocals = gr.Audio(label="vocals")
            recorded_accompaniment = gr.Audio(label="accompaniment")

            gr.Examples(
                examples=examples,
                inputs=[microphone],
                outputs=[recorded_vocals, recorded_accompaniment, recorded_html_info],
                fn=process_microphone,
            )

        with gr.TabItem("From URL"):
            url_textbox = gr.Textbox(
                max_lines=1,
                placeholder="URL to an audio file",
                label="URL",
                interactive=True,
            )

            url_button = gr.Button("Submit for separation")
            url_html_info = gr.HTML(label="Info")

            url_vocals = gr.Audio(label="vocals")
            url_accompaniment = gr.Audio(label="accompaniment")

            gr.Examples(
                examples=[
                    "https://huggingface.co/csukuangfj/spleeter-torch/resolve/main/test_wavs/yesterday-once-more-Carpenters.mp3",
                    "https://huggingface.co/csukuangfj/spleeter-torch/resolve/main/test_wavs/das-beste-Silbermond.mp3",
                    "https://huggingface.co/csukuangfj/spleeter-torch/resolve/main/test_wavs/hotel-in-california.mp3",
                ],
                inputs=[url_textbox],
                outputs=[url_vocals, url_accompaniment, recorded_html_info],
                fn=process_url,
            )

        upload_button.click(
            process_uploaded_file,
            inputs=[uploaded_file],
            outputs=[uploaded_vocals, uploaded_accompaniment, uploaded_html_info],
        )

        record_button.click(
            process_microphone,
            inputs=[microphone],
            outputs=[recorded_vocals, recorded_accompaniment, recorded_html_info],
        )

        url_button.click(
            process_url,
            inputs=[url_textbox],
            outputs=[url_vocals, url_accompaniment, url_html_info],
        )

torch.set_num_threads(1)
torch.set_num_interop_threads(1)

torch._C._jit_set_profiling_executor(False)
torch._C._jit_set_profiling_mode(False)
torch._C._set_graph_executor_optimize(False)

if __name__ == "__main__":
    formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s"

    logging.basicConfig(format=formatter, level=logging.INFO)

    demo.launch()