Spaces:
Running
on
Zero
Running
on
Zero
import spaces | |
import torch | |
import gradio as gr | |
from transformers import pipeline, AutoModel, LlamaTokenizer, LlamaForCausalLM, InstructBlipForConditionalGeneration, InstructBlipProcessor | |
import numpy as np | |
#import yaml | |
#import os | |
import requests | |
import nltk | |
import scipy.io.wavfile | |
import os | |
import subprocess | |
from huggingface_hub import hf_hub_download | |
subprocess.run(['bash','llama.sh']) | |
from llama_cpp import Llama | |
os.environ["SAFETENSORS_FAST_GPU"] = "1" | |
os.putenv("HF_HUB_ENABLE_HF_TRANSFER","1") | |
from espnet2.bin.tts_inference import Text2Speech | |
repo_id = "Sosaka/Vicuna-7B-4bit-ggml" | |
filename = "vicuna-7B-1.1-ggml_q4_0-ggjt_v3.bin" | |
cache_dir="~/.cache/huggingface/hub" | |
#hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=cache_dir) | |
''' | |
llm = Llama( | |
model_path="~/.cache/huggingface/hub/vicuna-7B-1.1-ggml_q4_0-ggjt_v3.bin", | |
n_gpu_layers=-1, # Uncomment to use GPU acceleration | |
# seed=1337, # Uncomment to set a specific seed | |
n_ctx=4096, # Uncomment to increase the context window | |
) | |
llm = Llama.from_pretrained( | |
repo_id="Sosaka/Vicuna-7B-4bit-ggml", | |
filename="vicuna-7B-1.1-ggml_q4_0-ggjt_v3.bin", | |
n_gpu_layers=-1, # Uncomment to use GPU acceleration | |
n_ctx = 4096, | |
verbose=False | |
) | |
''' | |
try: | |
nltk.data.find('taggers/averaged_perceptron_tagger_eng') | |
except LookupError: | |
nltk.download('averaged_perceptron_tagger_eng') | |
try: | |
nltk.data.find('corpora/cmudict') # Check for cmudict | |
except LookupError: | |
nltk.download('cmudict') | |
ASR_MODEL_NAME = "openai/whisper-medium.en" | |
asr_pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=ASR_MODEL_NAME, | |
chunk_length_s=30, | |
device='cuda' if torch.cuda.is_available() else 'cpu', # Use GPU if available | |
) | |
all_special_ids = asr_pipe.tokenizer.all_special_ids | |
transcribe_token_id = all_special_ids[-5] | |
translate_token_id = all_special_ids[-6] | |
def _preload_and_load_models(): | |
global vicuna_tokenizer, vicuna_model | |
#VICUNA_MODEL_NAME = "EleutherAI/gpt-neo-2.7B" # Or another model | |
VICUNA_MODEL_NAME = "lmsys/vicuna-13b-v1.5" # Or another model | |
#VICUNA_MODEL_NAME = "lmsys/vicuna-7b-v1.5" # Or another model | |
vicuna_tokenizer = LlamaTokenizer.from_pretrained(VICUNA_MODEL_NAME) | |
vicuna_model = LlamaForCausalLM.from_pretrained( | |
VICUNA_MODEL_NAME, | |
#torch_dtype=torch.float16, | |
# device_map="auto", # or.to('cuda') | |
).to('cuda',torch.float16) # Explicitly move to CUDA after loading | |
_preload_and_load_models() | |
tts = Text2Speech.from_pretrained("espnet/kan-bayashi_ljspeech_vits",device='cuda') | |
model5 = InstructBlipForConditionalGeneration.from_pretrained("Salesforce/instructblip-vicuna-7b").to('cuda',torch.bfloat16) | |
processor5 = InstructBlipProcessor.from_pretrained("Salesforce/instructblip-vicuna-7b") | |
cap_prompt = ( | |
"Describe this image with a caption to be used for question answering." | |
) | |
def process_audio(img, microphone, audio_upload, state, answer_mode): # Added audio_upload | |
audio_source = None | |
if microphone: | |
audio_source = microphone | |
asr_pipe.model.config.forced_decoder_ids = [[2, transcribe_token_id ]] | |
text = asr_pipe(audio_source)["text"] | |
elif audio_upload: | |
audio_source = audio_upload | |
rate, data = scipy.io.wavfile.read(audio_source) | |
asr_pipe.model.config.forced_decoder_ids = [[2, transcribe_token_id ]] | |
text = asr_pipe(data)["text"] | |
else: | |
return state, state, None # No audio input | |
system_prompt = """You are a friendly and enthusiastic tutor for young children (ages 6-9). | |
You answer questions clearly and simply, using age-appropriate language. | |
You are also a little bit silly and like to make jokes.""" | |
prompt = f"{system_prompt}\nUser: {text}" | |
if img is not None: | |
sd_image_a = Image.open(img.name).convert('RGB') | |
inputsa = processor5(images=sd_image_a, text=cap_prompt, return_tensors="pt").to('cuda') | |
sd_image_a.resize((512,512), Image.LANCZOS) | |
with torch.no_grad(): | |
generated_ids = model5.generate( | |
**inputsa, | |
do_sample=True, | |
num_beams=1, | |
max_length=96, | |
min_length=64, | |
top_p=0.9, | |
repetition_penalty=1.0, | |
length_penalty=2.0, | |
temperature=0.5, | |
) | |
generated_text = processor5.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() | |
print(generated_text) | |
prompt = f"{system_prompt}\nImage: {generated_text}\nUser: {text}" | |
with torch.no_grad(): | |
vicuna_input = vicuna_tokenizer(prompt, return_tensors="pt").to('cuda') | |
if answer_mode == 'slow': | |
torch.backends.cuda.matmul.allow_tf32 = False | |
torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False | |
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False | |
torch.backends.cudnn.allow_tf32 = False | |
torch.backends.cudnn.deterministic = False | |
torch.backends.cudnn.benchmark = True | |
torch.set_float32_matmul_precision("highest") | |
vicuna_output = vicuna_model.generate( | |
**vicuna_input, | |
max_new_tokens = 512, | |
min_new_tokens = 256, | |
do_sample = True, | |
low_memory = False | |
) | |
''' | |
vicuna_output = llm( | |
**vicuna_input, | |
max_tokens=96, # Generate up to 32 tokens, set to None to generate up to the end of the context window | |
stop=["Q:", "\n"], # Stop generating just before the model would generate a new question | |
echo=True # Echo the prompt back in the output | |
) | |
''' | |
if answer_mode == 'medium': | |
torch.backends.cuda.matmul.allow_tf32 = True | |
torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False | |
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False | |
torch.backends.cudnn.allow_tf32 = True | |
torch.backends.cudnn.deterministic = False | |
torch.backends.cudnn.benchmark = False | |
torch.set_float32_matmul_precision("high") | |
vicuna_output = vicuna_model.generate( | |
**vicuna_input, | |
max_length = 192, | |
min_new_tokens = 64, | |
do_sample = True, | |
low_memory = False | |
) | |
if answer_mode == 'fast': | |
torch.backends.cuda.matmul.allow_tf32 = True | |
torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = True | |
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = True | |
torch.backends.cudnn.allow_tf32 = True | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
# torch.backends.cuda.preferred_blas_library="cublas" | |
# torch.backends.cuda.preferred_linalg_library="cusolver" | |
torch.set_float32_matmul_precision("medium") | |
with torch.no_grad(): | |
vicuna_output = vicuna_model.generate( | |
**vicuna_input, | |
#max_new_tokens = 64, | |
min_new_tokens = 16, | |
do_sample = True, | |
low_memory = True | |
) | |
vicuna_response = vicuna_tokenizer.decode(vicuna_output[0], skip_special_tokens=True) | |
vicuna_response = vicuna_response.replace(prompt, "").strip() | |
updated_state = state + "\nUser: " + text + "\n" + "Tutor: " + vicuna_response | |
try: | |
with torch.no_grad(): | |
output = tts(vicuna_response) | |
wav = output["wav"] | |
sr = tts.fs | |
audio_arr = wav.cpu().numpy() | |
SAMPLE_RATE = sr | |
audio_arr = audio_arr / np.abs(audio_arr).max() | |
audio_output = (SAMPLE_RATE, audio_arr) | |
#sf.write('generated_audio.wav', audio_arr, SAMPLE_RATE) # Removed writing to file | |
except requests.exceptions.RequestException as e: | |
print(f"Error in Hugging Face API request: {e}") | |
audio_output = None | |
except Exception as e: | |
print(f"Error in speech synthesis: {e}") | |
audio_output = None | |
return updated_state, updated_state, audio_output | |
with gr.Blocks(title="Whisper, Vicuna, & TTS Demo") as demo: # Updated title | |
gr.Markdown("# Speech-to-Text-to-Speech Demo with Vicuna and Hugging Face TTS") | |
gr.Markdown("Speak into your microphone, get a transcription, Vicuna will process it, and then you'll hear the result!") | |
with gr.Tab("Transcribe & Synthesize"): | |
with gr.Row(): # Added a row for better layout | |
image = gr.File(label="Image Prompt (Optional)") | |
mic_input = gr.Audio(sources="microphone", type="filepath", label="Speak Here", elem_id="mic_audio") | |
audio_upload = gr.Audio(sources="upload", type="filepath", label="Or Upload Audio File") # Added upload component | |
transcription_output = gr.Textbox(lines=5, label="Transcription and Vicuna Response") | |
audio_output = gr.Audio(label="Synthesized Speech", type="numpy", autoplay=True) | |
answer_mode = gr.Radio(["fast", "medium", "slow"], value='medium') | |
transcription_state = gr.State(value="") | |
mic_input.change( | |
fn=process_audio, | |
inputs=[image, mic_input, audio_upload, transcription_state, answer_mode], # Include audio_upload | |
outputs=[transcription_output, transcription_state, audio_output] | |
) | |
audio_upload.change( # Added change event for upload | |
fn=process_audio, | |
inputs=[image, mic_input, audio_upload, transcription_state, answer_mode], # Include audio_upload | |
outputs=[transcription_output, transcription_state, audio_output], | |
api_name='/api/predict' | |
) | |
if __name__ == '__main__': | |
demo.launch(share=False) |