Spaces:
Running
Running
""" | |
French Conversation Tutor - Main Application | |
Practice French through natural conversation with Mr. Mistral! | |
""" | |
import gradio as gr | |
import numpy as np | |
import os | |
import io | |
import wave | |
import tempfile | |
import time | |
from datetime import datetime | |
from typing import List, Dict, Tuple | |
import re | |
import random | |
import shutil | |
from dotenv import load_dotenv | |
import soundfile as sf # Added missing import | |
# Load environment variables | |
load_dotenv() | |
# Model imports | |
from mistralai import Mistral | |
import google.generativeai as genai | |
from groq import Groq | |
import openai | |
# Load API keys | |
mistral_api_key = os.environ.get("MISTRAL_API_KEY") | |
gemini_api_key = os.environ.get("GEMINI_API_KEY") | |
groq_api_key = os.environ.get("GROQ_API_KEY") | |
openai_api_key = os.environ.get("OPENAI_API_KEY") | |
# Debug: Check if keys are loaded | |
print(f"Mistral API key loaded: {'Yes' if mistral_api_key else 'No'}") | |
print(f"Gemini API key loaded: {'Yes' if gemini_api_key else 'No'}") | |
print(f"Groq API key loaded: {'Yes' if groq_api_key else 'No'}") | |
print(f"OpenAI API key loaded: {'Yes' if openai_api_key else 'No'}") | |
# Initialize clients | |
mistral_client = None | |
if mistral_api_key: | |
mistral_client = Mistral(api_key=mistral_api_key) | |
current_llm = "Mistral AI" | |
elif gemini_api_key: | |
genai.configure(api_key=gemini_api_key) | |
current_llm = "Google Gemini (Fallback)" | |
else: | |
raise ValueError("Neither MISTRAL_API_KEY nor GEMINI_API_KEY found in environment variables.") | |
# Initialize Gemini for fallback even if Mistral is primary | |
if gemini_api_key and mistral_api_key: | |
genai.configure(api_key=gemini_api_key) | |
if not groq_api_key: | |
raise ValueError("GROQ_API_KEY not found in environment variables.") | |
groq_client = Groq(api_key=groq_api_key) | |
# Global list to track temp files (to prevent deletion before serving) | |
temp_audio_files = [] | |
current_llm = "Unknown" # Track which LLM is being used | |
def cleanup_old_audio_files(): | |
global temp_audio_files | |
# Keep more files and add delay to avoid deleting files being served | |
if len(temp_audio_files) > 20: # Increased from 10 to 20 | |
old_files = temp_audio_files[:-20] | |
for file_path in old_files: | |
try: | |
# Check if file is older than 60 seconds before deleting | |
if os.path.exists(file_path): | |
file_age = datetime.now().timestamp() - os.path.getmtime(file_path) | |
if file_age > 60: # Only delete files older than 60 seconds | |
os.remove(file_path) | |
temp_audio_files.remove(file_path) | |
except: | |
pass | |
def get_system_prompt(): | |
return """You are Mr. Mistral, a French tutor having a conversation with ONE student. | |
CRITICAL: You are ONLY the tutor. The student will speak to you, and you respond ONLY to what they actually said. | |
NEVER: | |
- Create dialogue for the student | |
- Imagine what the student might say | |
- Write "You:" or "Student:" or any dialogue | |
- Continue the conversation by yourself | |
ALWAYS: | |
- Wait for the student's actual input | |
- Respond with ONE French sentence only | |
- Use exactly 3 lines: | |
French sentence | |
(pronunciation) | |
[translation] | |
Example - if student says "Bonjour": | |
Bonjour! Comment allez-vous? | |
(bohn-ZHOOR! koh-mahn tah-lay VOO?) | |
[Hello! How are you?] | |
ONE sentence response only. NO additional dialogue.""" | |
def validate_response_format(response: str) -> Tuple[bool, str]: | |
lines = response.strip().split('\n') | |
cleaned_lines = [] | |
for line in lines: | |
line = line.strip() | |
if any(marker in line.lower() for marker in ['you:', 'user:', 'student:', 'me:', 'moi:']): | |
continue | |
if 'what do you' in line.lower() or "qu'est-ce que" in line.lower(): | |
continue | |
if line: | |
cleaned_lines.append(line) | |
french_line = None | |
pronunciation_line = None | |
translation_line = None | |
for i, line in enumerate(cleaned_lines): | |
if '(' in line and ')' in line and not pronunciation_line: | |
pronunciation_line = line | |
if i > 0 and not french_line: | |
french_line = cleaned_lines[i-1] | |
elif '[' in line and ']' in line and not translation_line: | |
translation_line = line | |
if not french_line: | |
for line in cleaned_lines: | |
if line and not any(c in line for c in ['(', ')', '[', ']', '*']): | |
french_line = line | |
break | |
if french_line: | |
if not pronunciation_line: | |
pronunciation_line = "(pronunciation guide not available)" | |
if not translation_line: | |
translation_line = "[translation not available]" | |
return True, f"{french_line}\n{pronunciation_line}\n{translation_line}" | |
return False, response | |
def generate_scenario(): | |
"""Generate initial scenario and hints""" | |
try: | |
# List of diverse topics | |
topics = [ | |
{ | |
"name": "Daily Routine", | |
"phrases": [ | |
"Je me rΓ©veille Γ ... (zhuh muh ray-vay ah) [I wake up at...]", | |
"Je prends le petit dΓ©jeuner (zhuh prahn luh puh-tee day-zhuh-nay) [I have breakfast]", | |
"Je travaille de... Γ ... (zhuh trah-vay duh... ah) [I work from... to...]", | |
"Le soir, je... (luh swahr, zhuh) [In the evening, I...]" | |
], | |
"opening": "Γ quelle heure vous levez-vous le matin?\n(ah kel uhr voo luh-vay voo luh mah-tahn?)\n[What time do you get up in the morning?]" | |
}, | |
{ | |
"name": "Favorite Foods", | |
"phrases": [ | |
"Mon plat prΓ©fΓ©rΓ© est... (mohn plah pray-fay-ray ay) [My favorite dish is...]", | |
"J'adore... (zhah-dohr) [I love...]", | |
"Je n'aime pas... (zhuh nehm pah) [I don't like...]", | |
"C'est dΓ©licieux! (say day-lee-see-uh) [It's delicious!]" | |
], | |
"opening": "Quel est votre plat prΓ©fΓ©rΓ©?\n(kel ay voh-truh plah pray-fay-ray?)\n[What is your favorite dish?]" | |
}, | |
{ | |
"name": "Work and Career", | |
"phrases": [ | |
"Je travaille comme... (zhuh trah-vay kohm) [I work as...]", | |
"Mon bureau est... (mohn bew-roh ay) [My office is...]", | |
"J'aime mon travail (zhehm mohn trah-vay) [I like my job]", | |
"Mes collègues sont... (may koh-lehg sohn) [My colleagues are...]" | |
], | |
"opening": "Qu'est-ce que vous faites comme travail?\n(kess-kuh voo feht kohm trah-vay?)\n[What do you do for work?]" | |
}, | |
{ | |
"name": "Music and Hobbies", | |
"phrases": [ | |
"J'Γ©coute... (zhay-koot) [I listen to...]", | |
"Mon chanteur prΓ©fΓ©rΓ© est... (mohn shahn-tuhr pray-fay-ray ay) [My favorite singer is...]", | |
"Je joue de... (zhuh zhoo duh) [I play (instrument)...]", | |
"Dans mon temps libre... (dahn mohn tahn lee-bruh) [In my free time...]" | |
], | |
"opening": "Quel type de musique aimez-vous?\n(kel teep duh mew-zeek ay-may voo?)\n[What type of music do you like?]" | |
}, | |
{ | |
"name": "Weekend Plans", | |
"phrases": [ | |
"Ce weekend, je vais... (suh wee-kehnd, zhuh vay) [This weekend, I'm going to...]", | |
"J'aimerais... (zheh-muh-ray) [I would like to...]", | |
"Avec mes amis... (ah-vek may zah-mee) [With my friends...]", | |
"Γa sera amusant! (sah suh-rah ah-mew-zahn) [It will be fun!]" | |
], | |
"opening": "Qu'est-ce que vous faites ce weekend?\n(kess-kuh voo feht suh wee-kehnd?)\n[What are you doing this weekend?]" | |
}, | |
{ | |
"name": "Family and Friends", | |
"phrases": [ | |
"Ma famille habite... (mah fah-mee ah-beet) [My family lives...]", | |
"J'ai... frères/soeurs (zhay... frehr/suhr) [I have... brothers/sisters]", | |
"Mon meilleur ami... (mohn may-yuhr ah-mee) [My best friend...]", | |
"Nous aimons... ensemble (noo zeh-mohn... ahn-sahm-bluh) [We like to... together]" | |
], | |
"opening": "Parlez-moi de votre famille!\n(pahr-lay mwah duh voh-truh fah-mee!)\n[Tell me about your family!]" | |
}, | |
{ | |
"name": "Weather and Seasons", | |
"phrases": [ | |
"Il fait beau/mauvais (eel feh boh/moh-veh) [The weather is nice/bad]", | |
"J'aime l'Γ©tΓ©/l'hiver (zhehm lay-tay/lee-vehr) [I like summer/winter]", | |
"Il pleut souvent (eel pluh soo-vahn) [It rains often]", | |
"Ma saison prΓ©fΓ©rΓ©e est... (mah seh-zohn pray-fay-ray ay) [My favorite season is...]" | |
], | |
"opening": "Quel temps fait-il aujourd'hui?\n(kel tahn feh-teel oh-zhoor-dwee?)\n[What's the weather like today?]" | |
}, | |
{ | |
"name": "Travel and Vacations", | |
"phrases": [ | |
"J'ai visitΓ©... (zhay vee-zee-tay) [I visited...]", | |
"Je voudrais aller Γ ... (zhuh voo-dray ah-lay ah) [I would like to go to...]", | |
"En vacances, je... (ahn vah-kahns, zhuh) [On vacation, I...]", | |
"C'Γ©tait magnifique! (say-teh mahn-yee-feek) [It was magnificent!]" | |
], | |
"opening": "OΓΉ aimez-vous voyager?\n(oo ay-may voo vwah-yah-zhay?)\n[Where do you like to travel?]" | |
} | |
] | |
# Select a random topic | |
selected_topic = random.choice(topics) | |
# Format the scenario directly without using LLM | |
scenario = f"""**Topic: {selected_topic['name']}** | |
**Helpful phrases:** | |
- {selected_topic['phrases'][0]} | |
- {selected_topic['phrases'][1]} | |
- {selected_topic['phrases'][2]} | |
- {selected_topic['phrases'][3]} | |
{selected_topic['opening']}""" | |
return scenario | |
except Exception as e: | |
return f"Error generating scenario: {str(e)}" | |
def extract_french_for_tts(text: str) -> str: | |
"""Extract only the French text (first line without parentheses/brackets)""" | |
lines = text.strip().split('\n') | |
for line in lines: | |
line = line.strip() | |
if line and '(' not in line and '[' not in line and '*' not in line and not line.startswith('**'): | |
return line | |
return "" | |
def process_speech_to_text(audio_tuple) -> Tuple[str, bool]: | |
"""Convert audio to text using Groq Whisper""" | |
if audio_tuple is None: | |
return "No audio received", False | |
try: | |
sample_rate, audio_data = audio_tuple | |
wav_buffer = io.BytesIO() | |
sf.write(wav_buffer, audio_data, sample_rate, format='WAV') | |
wav_buffer.seek(0) | |
transcription = groq_client.audio.transcriptions.create( | |
file=("audio.wav", wav_buffer), | |
model="whisper-large-v3-turbo", | |
language="fr" | |
) | |
return transcription.text, True | |
except Exception as e: | |
error_msg = str(e) | |
if "401" in error_msg or "Invalid API Key" in error_msg: | |
return "Error: Invalid Groq API key. Please check your GROQ_API_KEY.", False | |
elif "quota" in error_msg.lower(): | |
return "Error: Groq API quota exceeded. Please check your account.", False | |
else: | |
return f"Error in speech recognition: {error_msg}", False | |
def generate_tutor_response(conversation_history: List[Dict], user_text: str) -> str: | |
global current_llm | |
# Try Mistral first | |
if mistral_client: | |
try: | |
messages = [ | |
{"role": "system", "content": get_system_prompt()} | |
] | |
for msg in conversation_history: | |
role = "user" if msg["role"] == "user" else "assistant" | |
messages.append({"role": role, "content": msg["content"]}) | |
messages.append({"role": "user", "content": user_text}) | |
response = mistral_client.chat.complete( | |
model="mistral-large-latest", | |
messages=messages | |
) | |
raw_response = response.choices[0].message.content | |
current_llm = "Mistral AI" | |
is_valid, cleaned_response = validate_response_format(raw_response) | |
if not is_valid: | |
french_text = extract_french_for_tts(raw_response) | |
if french_text: | |
cleaned_response = f"{french_text}\n(pronunciation not available)\n[translation not available]" | |
return cleaned_response | |
except Exception as e: | |
print(f"Mistral error: {str(e)}, falling back to Gemini") | |
if not gemini_api_key: | |
return f"Error: Mistral failed and no Gemini fallback available: {str(e)}" | |
# Fallback to Gemini | |
if gemini_api_key: | |
try: | |
genai.configure(api_key=gemini_api_key) | |
model = genai.GenerativeModel("models/gemini-1.5-flash-latest") | |
messages = [ | |
{"role": "user", "parts": [get_system_prompt()]} | |
] | |
for msg in conversation_history: | |
messages.append({"role": msg["role"], "parts": [msg["content"]]}) | |
messages.append({"role": "user", "parts": [user_text]}) | |
response = model.generate_content(messages) | |
raw_response = response.text | |
current_llm = "Google Gemini (Fallback)" | |
is_valid, cleaned_response = validate_response_format(raw_response) | |
if not is_valid: | |
french_text = extract_french_for_tts(raw_response) | |
if french_text: | |
cleaned_response = f"{french_text}\n(pronunciation not available)\n[translation not available]" | |
return cleaned_response | |
except Exception as e: | |
return f"Error: Both Mistral and Gemini failed: {str(e)}" | |
return "Error: No LLM available" | |
def text_to_speech(text: str) -> str: | |
global temp_audio_files | |
try: | |
french_text = extract_french_for_tts(text) | |
if not french_text: | |
return None | |
# Use Groq TTS | |
tts_response = groq_client.audio.speech.create( | |
model="tts-1", # or "tts-1-hd" for higher quality | |
voice="alloy", # or another supported voice, e.g., "echo", "fable", "onyx", "nova" | |
input=french_text | |
) | |
temp_dir = tempfile.mkdtemp() | |
temp_path = os.path.join(temp_dir, f"audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3") | |
with open(temp_path, "wb") as f: | |
f.write(tts_response.content) | |
temp_audio_files.append(temp_path) | |
cleanup_old_audio_files() | |
return temp_path | |
except Exception as e: | |
error_msg = str(e) | |
if "401" in error_msg or "Invalid API Key" in error_msg: | |
print(f"Groq TTS Error: Invalid API key, falling back to gTTS") | |
else: | |
print(f"Groq TTS Error: {error_msg}, falling back to gTTS") | |
# Fallback to gTTS if Groq fails | |
try: | |
from gtts import gTTS | |
tts = gTTS(text=french_text, lang='fr') | |
temp_dir = tempfile.mkdtemp() | |
temp_path = os.path.join(temp_dir, f"audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3") | |
tts.save(temp_path) | |
temp_audio_files.append(temp_path) | |
cleanup_old_audio_files() | |
return temp_path | |
except Exception as e2: | |
print(f"gTTS Fallback Error: {str(e2)}") | |
return None | |
def analyze_conversation(full_transcript: List[Dict]) -> str: | |
global current_llm | |
transcript_text = "\n".join([ | |
f"{msg['role']}: {msg['content']}" for msg in full_transcript | |
]) | |
analysis_prompt = """Analyze this French conversation and provide:\n1. Grammar corrections with specific examples\n2. Pronunciation tips for common mistakes\n3. Vocabulary suggestions to improve fluency\n4. Overall assessment with encouragement\n\nBe specific, constructive, and encouraging. Format clearly with sections.""" | |
# Try Mistral first | |
if mistral_client: | |
try: | |
messages = [ | |
{"role": "system", "content": analysis_prompt}, | |
{"role": "user", "content": f"Analyze this conversation:\n{transcript_text}"} | |
] | |
response = mistral_client.chat.complete( | |
model="mistral-large-latest", | |
messages=messages | |
) | |
current_llm = "Mistral AI" | |
return response.choices[0].message.content | |
except Exception as e: | |
print(f"Mistral error in analysis: {str(e)}, falling back to Gemini") | |
# Fallback to Gemini | |
if gemini_api_key: | |
try: | |
genai.configure(api_key=gemini_api_key) | |
model = genai.GenerativeModel("models/gemini-1.5-flash-latest") | |
messages = [ | |
{"role": "user", "parts": [analysis_prompt]}, | |
{"role": "user", "parts": [f"Analyze this conversation:\n{transcript_text}"]} | |
] | |
response = model.generate_content(messages) | |
current_llm = "Google Gemini (Fallback)" | |
return response.text | |
except Exception as e: | |
return f"Error generating analysis: {str(e)}" | |
return "Error: No LLM available for analysis" | |
def create_app(): | |
with gr.Blocks(title="French Tutor", theme=gr.themes.Soft()) as app: | |
# State management | |
conversation_state = gr.State([]) | |
exchange_count = gr.State(0) | |
full_transcript = gr.State([]) | |
current_scenario = gr.State("") | |
gr.Markdown("# π«π· French Conversation Tutor") | |
gr.Markdown("Practice French through natural conversation! (3 exchanges per session)") | |
# Model info banner | |
with gr.Row(): | |
model_info = gr.Markdown( | |
f"**π€ Models:** LLM: {current_llm} | STT: Groq Whisper | TTS: gTTS", | |
elem_id="model-info" | |
) | |
# Main layout with two columns | |
with gr.Row(): | |
# Left sidebar (30% width) | |
with gr.Column(scale=3): | |
gr.Markdown("## π Control Panel") | |
# Start/New Topic buttons | |
start_btn = gr.Button("Start New Conversation", variant="primary", size="lg") | |
new_topic_btn = gr.Button("π² Generate New Topic & Restart", variant="secondary", visible=False) | |
# Topic display in sidebar | |
with gr.Group(): | |
gr.Markdown("### Current Topic") | |
sidebar_scenario = gr.Markdown("Click 'Start' to begin", elem_id="sidebar-scenario") | |
# Analysis section in sidebar | |
with gr.Group(visible=False) as analysis_group: | |
gr.Markdown("### π Your Analysis") | |
analysis_box = gr.Markdown() | |
restart_btn = gr.Button("π Start Another Conversation", variant="secondary", size="lg") | |
# Status in sidebar | |
status_text = gr.Textbox( | |
label="System Status", | |
value="Ready to start", | |
interactive=False | |
) | |
# Right main content (70% width) | |
with gr.Column(scale=7): | |
# Conversation interface | |
with gr.Column(visible=False) as conversation_ui: | |
gr.Markdown("## π¬ Conversation") | |
# Chat display - always visible | |
chat_display = gr.Markdown(value="", elem_id="chat-display") | |
# Progress indicator | |
progress_text = gr.Textbox( | |
label="Progress", | |
value="Ready to start", | |
interactive=False | |
) | |
# Audio interface | |
with gr.Row(): | |
audio_input = gr.Audio( | |
sources=["microphone"], | |
type="numpy", | |
label="π€ Record your response in French" | |
) | |
record_btn = gr.Button("Send Response", variant="primary") | |
# Tutor's audio response | |
audio_output = gr.Audio( | |
label="π Tutor's Response", | |
type="filepath", | |
autoplay=True | |
) | |
def reset_conversation_states(): | |
"""Helper to reset all conversation states""" | |
return [], 0, [], "", gr.update(value=None) | |
def start_conversation(scenario_text=None): | |
"""Initialize a new conversation""" | |
# Reset global state | |
global current_llm | |
print("Starting new conversation...") | |
# Generate scenario if not provided | |
if scenario_text is None: | |
scenario = generate_scenario() | |
else: | |
scenario = scenario_text | |
# Extract the tutor's first message for audio | |
audio_path = text_to_speech(scenario) | |
if audio_path is None: | |
audio_path = gr.update() # No change to audio output | |
# Format the scenario for display | |
scenario_display = scenario.strip() | |
# Create fresh empty states | |
new_conversation_state = [] | |
new_full_transcript = [] | |
new_exchange_count = 0 | |
print(f"Reset states - Exchange count: {new_exchange_count}, History length: {len(new_conversation_state)}") | |
return ( | |
gr.update(visible=True), # conversation_ui | |
scenario_display, # sidebar_scenario | |
scenario, # current_scenario state | |
"", # clear chat_display | |
new_exchange_count, # reset exchange_count | |
new_conversation_state, # reset conversation_state | |
new_full_transcript, # reset full_transcript | |
audio_path, # play initial audio | |
"Ready to start - 3 exchanges to go", # progress | |
gr.update(visible=False), # hide analysis_group | |
gr.update(visible=False), # hide start_btn | |
gr.update(visible=True), # show new_topic_btn | |
gr.update(value=None), # clear audio input | |
gr.update(interactive=True), # enable record button | |
"Ready to start" # status text | |
) | |
def generate_new_topic_and_start(): | |
"""Generate a new topic and start the conversation""" | |
scenario = generate_scenario() | |
# Return all the values that start_conversation returns | |
result = start_conversation(scenario) | |
# Update the progress text | |
result_list = list(result) | |
result_list[8] = "New topic generated! Ready to start - 3 exchanges to go" # Update progress text | |
return tuple(result_list) | |
def process_user_audio(audio, chat_text, exchanges, history, transcript, scenario): | |
"""Process user's audio input and generate response""" | |
global current_llm | |
print(f"Processing audio - Exchange count: {exchanges}, History length: {len(history) if history else 0}") | |
# Ensure exchange count is an integer | |
if exchanges is None: | |
exchanges = 0 | |
# Check if conversation is complete | |
if exchanges >= 3: | |
return ( | |
chat_text, exchanges, history, transcript, | |
"Conversation complete! Check your analysis in the sidebar.", | |
f"Exchange {exchanges} of 3 - Complete!", | |
gr.update(), gr.update(value=None), | |
gr.update() # no change to model_info | |
) | |
# Ensure states are properly initialized | |
if history is None: | |
history = [] | |
if transcript is None: | |
transcript = [] | |
if chat_text is None: | |
chat_text = "" | |
# Check for audio | |
if audio is None: | |
return ( | |
chat_text, exchanges, history, transcript, | |
"Please record audio first", | |
f"Exchange {exchanges} of 3", | |
gr.update(), gr.update(value=None), | |
gr.update() # no change to model_info | |
) | |
# Transcribe user's speech | |
user_text, success = process_speech_to_text(audio) | |
if not success: | |
return ( | |
chat_text, exchanges, history, transcript, | |
user_text, # Error message | |
f"Exchange {exchanges} of 3", | |
gr.update(), gr.update(value=None), | |
gr.update() # no change to model_info | |
) | |
# Update chat display with user's message | |
if chat_text: | |
chat_text += f"\n\n**You:** {user_text}" | |
else: | |
# First message - include scenario context | |
chat_text = f"{scenario}\n\n---\n\n**You:** {user_text}" | |
# Get tutor's response | |
tutor_response = generate_tutor_response(history, user_text) | |
# Generate audio for tutor's response | |
audio_path = text_to_speech(tutor_response) | |
if audio_path is None: | |
audio_path = gr.update() # No change to audio output | |
# Update chat display with tutor's response | |
chat_text += f"\n\n**Mr. Mistral:**\n{tutor_response}" | |
# Update conversation history (for context) | |
history.append({"role": "user", "content": user_text}) | |
history.append({"role": "assistant", "content": tutor_response}) | |
# Update transcript (for analysis) | |
transcript.extend([ | |
{"role": "user", "content": user_text}, | |
{"role": "assistant", "content": tutor_response} | |
]) | |
# Increment exchange counter | |
exchanges += 1 | |
# Check if this was the last exchange | |
if exchanges >= 3: | |
progress_msg = "Exchange 3 of 3 - Complete! Analysis ready." | |
else: | |
progress_msg = f"Exchange {exchanges} of 3 - Keep going!" | |
# Update model info | |
model_info_text = f"**π€ Models:** LLM: {current_llm} | STT: Groq Whisper | TTS: gTTS" | |
# Return updated state | |
return ( | |
chat_text, | |
exchanges, | |
history, | |
transcript, | |
f"Great! {progress_msg}", | |
progress_msg, | |
audio_path, | |
gr.update(value=None), # Clear audio input properly | |
gr.update(value=model_info_text) # Update model info | |
) | |
def show_analysis_if_complete(exchanges, transcript): | |
"""Show analysis in sidebar if conversation is complete""" | |
if exchanges >= 3: | |
analysis = analyze_conversation(transcript) | |
return ( | |
gr.update(visible=True, value=analysis), # analysis_box with content | |
gr.update(visible=True), # analysis_group | |
gr.update(interactive=False), # disable record button | |
gr.update(visible=False) # hide new topic button | |
) | |
return ( | |
gr.update(), # no change to analysis_box | |
gr.update(), # no change to analysis_group | |
gr.update(interactive=True), # keep record button enabled | |
gr.update() # no change to new topic button | |
) | |
# Initialize API on load | |
def check_initialization(): | |
status_msgs = [] | |
if mistral_client: | |
status_msgs.append("β Mistral AI ready") | |
if gemini_api_key: | |
status_msgs.append("β Gemini fallback ready") | |
if groq_client: | |
status_msgs.append("β Groq STT ready") | |
status_msgs.append("β gTTS ready") | |
if not status_msgs: | |
return "β No APIs initialized!" | |
return " | ".join(status_msgs) | |
app.load( | |
fn=check_initialization, | |
outputs=status_text | |
) | |
# Start conversation | |
start_btn.click( | |
fn=start_conversation, | |
outputs=[ | |
conversation_ui, sidebar_scenario, current_scenario, | |
chat_display, exchange_count, conversation_state, | |
full_transcript, audio_output, progress_text, | |
analysis_group, start_btn, new_topic_btn, | |
audio_input, record_btn, status_text | |
] | |
) | |
# Generate new topic and start conversation | |
new_topic_btn.click( | |
fn=generate_new_topic_and_start, | |
outputs=[ | |
conversation_ui, sidebar_scenario, current_scenario, | |
chat_display, exchange_count, conversation_state, | |
full_transcript, audio_output, progress_text, | |
analysis_group, start_btn, new_topic_btn, | |
audio_input, record_btn, status_text | |
] | |
) | |
# Process user audio | |
record_btn.click( | |
fn=process_user_audio, | |
inputs=[ | |
audio_input, chat_display, exchange_count, | |
conversation_state, full_transcript, current_scenario | |
], | |
outputs=[ | |
chat_display, exchange_count, conversation_state, | |
full_transcript, status_text, progress_text, | |
audio_output, audio_input, model_info | |
], | |
queue=False # Disable queueing to avoid state issues | |
).then( | |
fn=show_analysis_if_complete, | |
inputs=[exchange_count, full_transcript], | |
outputs=[analysis_box, analysis_group, record_btn, new_topic_btn], | |
queue=False # Disable queueing to avoid state issues | |
) | |
# Restart conversation | |
restart_btn.click( | |
fn=start_conversation, | |
outputs=[ | |
conversation_ui, sidebar_scenario, current_scenario, | |
chat_display, exchange_count, conversation_state, | |
full_transcript, audio_output, progress_text, | |
analysis_group, start_btn, new_topic_btn, | |
audio_input, record_btn, status_text | |
] | |
) | |
return app | |
# Launch the app | |
if __name__ == "__main__": | |
try: | |
app = create_app() | |
app.launch() | |
except Exception as e: | |
print(f"Failed to start app: {e}") |