TayyabTanveer's picture
update
be27a3f verified
import streamlit as st
import asyncio
import os
from IntegratedPipeline import DeepgramSTTProcessor, LanguageModelProcessor, TTSProcessor, AvatarGenerator
# Run one cycle of the pipeline: mic/file -> transcript -> LLM -> TTS -> Video generation
def run_pipeline_once(audio_file_path=None):
async def pipeline():
# If an audio file is provided, process it instead of using the mic
if audio_file_path:
stt_processor = DeepgramSTTProcessor(None)
# Assume process_file returns the transcript
transcript = stt_processor.process_file(audio_file_path)
# Skip starting the microphone task
else:
transcript_queue = asyncio.Queue()
stt_processor = DeepgramSTTProcessor(transcript_queue)
stt_task = asyncio.create_task(stt_processor.run())
# Wait for one final transcript from mic
transcript = await transcript_queue.get()
# Initialize other processors
llm_processor = LanguageModelProcessor()
tts_processor = TTSProcessor()
avatar_generator = AvatarGenerator()
# Process transcript through LLM
full_response = ""
for llm_chunk in llm_processor.process(transcript):
full_response += llm_chunk
# Generate audio and then the avatar video
audio_path = "./uploaded_audio/response_audio.wav"
path = tts_processor.speak(full_response, audio_path)
video_path = avatar_generator.generate_video(path)
if not audio_file_path:
stt_task.cancel()
await stt_processor.shutdown()
return transcript, full_response, video_path
return asyncio.run(pipeline())
st.title("SadTalker LiveAgent Interface")
uploaded_audio = st.file_uploader("Upload an audio file (mic access is unavailable on this deployment)")
if st.button("Start Processing") or uploaded_audio is not None:
audio_path = None
if uploaded_audio is not None:
# Save the uploaded audio to disk
audio_path = "./temp_uploaded_audio.wav"
with open(audio_path, "wb") as f:
f.write(uploaded_audio.getbuffer())
st.info("Processing your audio...")
transcript, response, video_path = run_pipeline_once(audio_path)
st.write("Transcript:", transcript)
st.write("LLM Response:", response)
if video_path and os.path.exists(video_path):
with open(video_path, "rb") as vid_file:
video_bytes = vid_file.read()
st.video(video_bytes)
else:
st.error("Video generation failed.")