Spaces:
Sleeping
Sleeping
| import os | |
| import yaml | |
| import logging | |
| import nltk | |
| import torch | |
| import torchaudio | |
| from torchaudio.transforms import SpeedPerturbation | |
| from APIs import WRITE_AUDIO, LOUDNESS_NORM | |
| from utils import fade, get_service_port | |
| from flask import Flask, request, jsonify | |
| with open('config.yaml', 'r') as file: | |
| config = yaml.safe_load(file) | |
| # Configure the logging format and level | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| # Create a FileHandler for the log file | |
| os.makedirs('services_logs', exist_ok=True) | |
| log_filename = 'services_logs/Wav-API.log' | |
| file_handler = logging.FileHandler(log_filename, mode='w') | |
| file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) | |
| # Add the FileHandler to the root logger | |
| logging.getLogger('').addHandler(file_handler) | |
| """ | |
| Initialize the AudioCraft models here | |
| """ | |
| from audiocraft.models import AudioGen, MusicGen | |
| tta_model_size = config['AudioCraft']['tta_model_size'] | |
| tta_model = AudioGen.get_pretrained(f'facebook/audiogen-{tta_model_size}') | |
| logging.info(f'AudioGen ({tta_model_size}) is loaded ...') | |
| ttm_model_size = config['AudioCraft']['ttm_model_size'] | |
| ttm_model = MusicGen.get_pretrained(f'facebook/musicgen-{ttm_model_size}') | |
| logging.info(f'MusicGen ({ttm_model_size}) is loaded ...') | |
| """ | |
| Initialize the BarkModel here | |
| """ | |
| from transformers import BarkModel, AutoProcessor | |
| SPEED = float(config['Text-to-Speech']['speed']) | |
| speed_perturb = SpeedPerturbation(32000, [SPEED]) | |
| tts_model = BarkModel.from_pretrained("suno/bark") | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| tts_model = tts_model.to(device) | |
| tts_model = tts_model.to_bettertransformer() # Flash attention | |
| SAMPLE_RATE = tts_model.generation_config.sample_rate | |
| SEMANTIC_TEMPERATURE = 0.9 | |
| COARSE_TEMPERATURE = 0.5 | |
| FINE_TEMPERATURE = 0.5 | |
| processor = AutoProcessor.from_pretrained("suno/bark") | |
| logging.info('Bark model is loaded ...') | |
| """ | |
| Initialize the VoiceFixer model here | |
| """ | |
| from voicefixer import VoiceFixer | |
| vf = VoiceFixer() | |
| logging.info('VoiceFixer is loaded ...') | |
| """ | |
| Initalize the VoiceParser model here | |
| """ | |
| from VoiceParser.model import VoiceParser | |
| vp_device = config['Voice-Parser']['device'] | |
| vp = VoiceParser(device=vp_device) | |
| logging.info('VoiceParser is loaded ...') | |
| app = Flask(__name__) | |
| def generate_audio(): | |
| # Receive the text from the POST request | |
| data = request.json | |
| text = data['text'] | |
| length = float(data.get('length', 5.0)) | |
| volume = float(data.get('volume', -35)) | |
| output_wav = data.get('output_wav', 'out.wav') | |
| logging.info(f'TTA (AudioGen): Prompt: {text}, length: {length} seconds, volume: {volume} dB') | |
| try: | |
| tta_model.set_generation_params(duration=length) | |
| wav = tta_model.generate([text]) | |
| wav = torchaudio.functional.resample(wav, orig_freq=16000, new_freq=32000) | |
| wav = wav.squeeze().cpu().detach().numpy() | |
| wav = fade(LOUDNESS_NORM(wav, volumn=volume)) | |
| WRITE_AUDIO(wav, name=output_wav) | |
| # Return success message and the filename of the generated audio | |
| return jsonify({'message': f'Text-to-Audio generated successfully | {text}', 'file': output_wav}) | |
| except Exception as e: | |
| return jsonify({'API error': str(e)}), 500 | |
| def generate_music(): | |
| # Receive the text from the POST request | |
| data = request.json | |
| text = data['text'] | |
| length = float(data.get('length', 5.0)) | |
| volume = float(data.get('volume', -35)) | |
| output_wav = data.get('output_wav', 'out.wav') | |
| logging.info(f'TTM (MusicGen): Prompt: {text}, length: {length} seconds, volume: {volume} dB') | |
| try: | |
| ttm_model.set_generation_params(duration=length) | |
| wav = ttm_model.generate([text]) | |
| wav = wav[0][0].cpu().detach().numpy() | |
| wav = fade(LOUDNESS_NORM(wav, volumn=volume)) | |
| WRITE_AUDIO(wav, name=output_wav) | |
| # Return success message and the filename of the generated audio | |
| return jsonify({'message': f'Text-to-Music generated successfully | {text}', 'file': output_wav}) | |
| except Exception as e: | |
| # Return error message if something goes wrong | |
| return jsonify({'API error': str(e)}), 500 | |
| def generate_speech(): | |
| # Receive the text from the POST request | |
| data = request.json | |
| text = data['text'] | |
| speaker_id = data['speaker_id'] | |
| speaker_npz = data['speaker_npz'] | |
| volume = float(data.get('volume', -35)) | |
| output_wav = data.get('output_wav', 'out.wav') | |
| logging.info(f'TTS (Bark): Speaker: {speaker_id}, Volume: {volume} dB, Prompt: {text}') | |
| try: | |
| # Generate audio using the global pipe object | |
| text = text.replace('\n', ' ').strip() | |
| sentences = nltk.sent_tokenize(text) | |
| silence = torch.zeros(int(0.1 * SAMPLE_RATE), device=device).unsqueeze(0) # 0.1 second of silence | |
| pieces = [] | |
| for sentence in sentences: | |
| inputs = processor(sentence, voice_preset=speaker_npz).to(device) | |
| # NOTE: you must run the line below, otherwise you will see the runtime error | |
| # RuntimeError: view size is not compatible with input tensor's size and stride (at least one dimension spans across two contiguous subspaces). Use .reshape(...) instead. | |
| inputs['history_prompt']['coarse_prompt'] = inputs['history_prompt']['coarse_prompt'].transpose(0, 1).contiguous().transpose(0, 1) | |
| with torch.inference_mode(): | |
| # TODO: min_eos_p? | |
| output = tts_model.generate( | |
| **inputs, | |
| do_sample = True, | |
| semantic_temperature = SEMANTIC_TEMPERATURE, | |
| coarse_temperature = COARSE_TEMPERATURE, | |
| fine_temperature = FINE_TEMPERATURE | |
| ) | |
| pieces += [output, silence] | |
| result_audio = torch.cat(pieces, dim=1) | |
| wav_tensor = result_audio.to(dtype=torch.float32).cpu() | |
| wav = torchaudio.functional.resample(wav_tensor, orig_freq=SAMPLE_RATE, new_freq=32000) | |
| wav = speed_perturb(wav.float())[0].squeeze(0) | |
| wav = wav.numpy() | |
| wav = LOUDNESS_NORM(wav, volumn=volume) | |
| WRITE_AUDIO(wav, name=output_wav) | |
| # Return success message and the filename of the generated audio | |
| return jsonify({'message': f'Text-to-Speech generated successfully | {speaker_id}: {text}', 'file': output_wav}) | |
| except Exception as e: | |
| # Return error message if something goes wrong | |
| return jsonify({'API error': str(e)}), 500 | |
| def fix_audio(): | |
| # Receive the text from the POST request | |
| data = request.json | |
| processfile = data['processfile'] | |
| logging.info(f'Fixing {processfile} ...') | |
| try: | |
| vf.restore(input=processfile, output=processfile, cuda=True, mode=0) | |
| # Return success message and the filename of the generated audio | |
| return jsonify({'message': 'Speech restored successfully', 'file': processfile}) | |
| except Exception as e: | |
| # Return error message if something goes wrong | |
| return jsonify({'API error': str(e)}), 500 | |
| def parse_voice(): | |
| # Receive the text from the POST request | |
| data = request.json | |
| wav_path = data['wav_path'] | |
| out_dir = data['out_dir'] | |
| logging.info(f'Parsing {wav_path} ...') | |
| try: | |
| vp.extract_acoustic_embed(wav_path, out_dir) | |
| # Return success message and the filename of the generated audio | |
| return jsonify({'message': f'Sucessfully parsed {wav_path}'}) | |
| except Exception as e: | |
| # Return error message if something goes wrong | |
| return jsonify({'API error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| service_port = get_service_port() | |
| # We disable multithreading to force services to process one request at a time and avoid CUDA OOM | |
| app.run(debug=False, threaded=False, port=service_port) | |