Spaces:
Running
Running
| import os | |
| import subprocess | |
| REST_IP = os.environ['REST_IP'] | |
| SERVICE_PORT = int(os.environ['SERVICE_PORT']) | |
| TRANSLATION_APIKEY_URL = os.environ['TRANSLATION_APIKEY_URL'] | |
| GOOGLE_APPLICATION_CREDENTIALS = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] | |
| subprocess.call(f"wget --no-check-certificate -O {GOOGLE_APPLICATION_CREDENTIALS} {TRANSLATION_APIKEY_URL}", shell=True) | |
| TOXICITY_THRESHOLD = float(os.getenv('TOXICITY_THRESHOLD', 0.7)) | |
| import gradio as gr | |
| from toxicity_estimator import PerspectiveAPI | |
| from translator import Translator | |
| from client_rest import RestAPIApplication | |
| from pathlib import Path | |
| import argparse | |
| import threading | |
| import yaml | |
| TITLE = Path("docs/title.txt").read_text() | |
| DESCRIPTION = Path("docs/description.md").read_text() | |
| class GradioApplication: | |
| def __init__(self, rest_ip, rest_port, max_seed): | |
| self.lang_list = { | |
| 'ko': 'ko_KR', | |
| 'en': 'en_US', | |
| 'ja': 'ja_JP', | |
| 'zh': 'zh_CN', | |
| 'zh-CN': 'zh_CN' | |
| } | |
| self.background_list = [None, | |
| "background_image/cvpr.png", | |
| "background_image/black.png", | |
| "background_image/river.mp4", | |
| "background_image/sky.mp4"] | |
| self.perspective_api = PerspectiveAPI() | |
| self.translator = Translator() | |
| self.rest_application = RestAPIApplication(rest_ip, rest_port) | |
| self.output_dir = Path("output_file") | |
| inputs = prepare_input() | |
| outputs = prepare_output() | |
| self.iface = gr.Interface(fn=self.infer, | |
| title=TITLE, | |
| description=DESCRIPTION, | |
| inputs=inputs, | |
| outputs=outputs, | |
| allow_flagging='never', | |
| article=Path("docs/article.md").read_text()) | |
| self.max_seed = max_seed | |
| self._file_seed = 0 | |
| self.lock = threading.Lock() | |
| def _get_file_seed(self): | |
| return f"{self._file_seed % self.max_seed:02d}" | |
| def _reset_file_seed(self): | |
| self._file_seed = 0 | |
| def _counter_file_seed(self): | |
| with self.lock: | |
| self._file_seed += 1 | |
| def get_lang_code(self, lang): | |
| return self.lang_list[lang] | |
| def get_background_data(self, background_index): | |
| # get background filename and its extension | |
| data_path = self.background_list[background_index] | |
| if data_path is not None: | |
| with open(data_path, 'rb') as rf: | |
| background_data = rf.read() | |
| is_video_background = str(data_path).endswith(".mp4") | |
| else: | |
| background_data = None | |
| is_video_background = False | |
| return background_data, is_video_background | |
| def return_format(toxicity_prob, target_text, lang_dest, video_filename, detail=""): | |
| return {'Toxicity': toxicity_prob}, f"Language: {lang_dest}\nText: {target_text}\nDetails: {detail}", str(video_filename) | |
| def infer(self, text, lang, duration_rate, action, background_index): | |
| self._counter_file_seed() | |
| print(f"File Seed: {self._file_seed}") | |
| toxicity_prob = 0.0 | |
| target_text = "" | |
| lang_dest = "" | |
| video_filename = "vacant.mp4" | |
| # Toxicity estimation | |
| try: | |
| toxicity_prob = self.perspective_api.get_score(text) | |
| except Exception as e: # when Perspective API doesn't work | |
| pass | |
| if toxicity_prob > TOXICITY_THRESHOLD: | |
| detail = "Sorry, it seems that the input text is too toxic." | |
| return self.return_format(toxicity_prob, target_text, lang_dest, video_filename, detail=detail) | |
| # Google Translate API | |
| try: | |
| target_text, lang_dest = self.translator.get_translation(text, lang) | |
| except Exception as e: | |
| target_text = "" | |
| lang_dest = "" | |
| detail = f"Error from language translation: ({e})" | |
| return self.return_format(toxicity_prob, target_text, lang_dest, video_filename, detail=detail) | |
| try: | |
| self.translator.length_check(lang_dest, target_text) # assertion check | |
| except AssertionError as e: | |
| return self.return_format(toxicity_prob, target_text, lang_dest, video_filename, detail=str(e)) | |
| lang_rpc_code = self.get_lang_code(lang_dest) | |
| # Video Inference | |
| background_data, is_video_background = self.get_background_data(background_index) | |
| video_data = self.rest_application.get_video(target_text, lang_rpc_code, duration_rate, action.lower(), | |
| background_data, is_video_background) | |
| print(f"Video data size: {len(video_data)}") | |
| video_filename = self.output_dir / f"{self._file_seed:02d}.mkv" | |
| with open(video_filename, "wb") as video_file: | |
| video_file.write(video_data) | |
| return self.return_format(toxicity_prob, target_text, lang_dest, video_filename) | |
| def run(self, server_port=7860, share=False): | |
| try: | |
| self.iface.launch(height=900, | |
| share=share, server_port=server_port, | |
| enable_queue=True) | |
| except KeyboardInterrupt: | |
| gr.close_all() | |
| def prepare_input(): | |
| text_input = gr.Textbox(lines=2, | |
| placeholder="Type your text with English, Chinese, Korean, and Japanese.", | |
| value="Hello, this is demonstration for talking face generation " | |
| "with multilingual text-to-speech.", | |
| label="Text") | |
| lang_input = gr.Radio(['Korean', 'English', 'Japanese', 'Chinese'], | |
| type='value', | |
| value=None, | |
| label="Language") | |
| duration_rate_input = gr.Slider(minimum=0.8, | |
| maximum=1.2, | |
| step=0.01, | |
| value=1.0, | |
| label="Duration (The bigger the value, the slower the speech)") | |
| action_input = gr.Radio(['Default', 'Hand', 'BothHand', 'HandDown', 'Sorry'], | |
| type='value', | |
| value='Default', | |
| label="Select an action ...") | |
| background_input = gr.Radio(['None', 'CVPR', 'Black', 'River', 'Sky'], | |
| type='index', | |
| value='None', | |
| label="Select a background image/video ...") | |
| return [text_input, lang_input, duration_rate_input, | |
| action_input, background_input] | |
| def prepare_output(): | |
| toxicity_output = gr.Label(num_top_classes=1, label="Toxicity (from Perspective API)") | |
| translation_result_otuput = gr.Textbox(type="str", label="Translation Result") | |
| video_output = gr.Video(format='mp4') | |
| return [toxicity_output, translation_result_otuput, video_output] | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description='GRADIO DEMO for talking face generation submitted to CVPR2022') | |
| parser.add_argument('-p', '--port', dest='gradio_port', type=int, default=7860, help="Port for gradio") | |
| parser.add_argument('--rest_ip', type=str, default=REST_IP, help="IP for REST API") | |
| parser.add_argument('--rest_port', type=int, default=SERVICE_PORT, help="Port for REST API") | |
| parser.add_argument('--max_seed', type=int, default=20, help="Max seed for saving video") | |
| parser.add_argument('--share', action='store_true', help='get publicly sharable link') | |
| args = parser.parse_args() | |
| return args | |
| if __name__ == '__main__': | |
| args = parse_args() | |
| gradio_application = GradioApplication(args.rest_ip, args.rest_port, args.max_seed) | |
| gradio_application.run(server_port=args.gradio_port, share=args.share) | |