Spaces:
Running
Running
| import streamlit as st | |
| import asyncio | |
| import edge_tts | |
| import time | |
| import os | |
| import uuid | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| from openai import OpenAI | |
| # ---- Firebase setup ---- | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate("firebase-service-account.json") | |
| firebase_admin.initialize_app(cred) | |
| db = firestore.client() | |
| # ---- OpenAI setup ---- | |
| openai_key = os.getenv("openai_key") | |
| assistant_id = os.getenv("assistant_id") | |
| client = OpenAI(api_key=openai_key) | |
| # ---- Edge TTS voices ---- | |
| VOICE_OPTIONS = { | |
| "Jenny (US, Female)": "en-US-JennyNeural", | |
| "Aria (US, Female)": "en-US-AriaNeural", | |
| "Ryan (UK, Male)": "en-GB-RyanNeural", | |
| "Natasha (AU, Female)": "en-AU-NatashaNeural", | |
| "William (AU, Male)": "en-AU-WilliamNeural", | |
| "Libby (UK, Female)": "en-GB-LibbyNeural", | |
| "Leah (SA, Female)": "en-ZA-LeahNeural", | |
| "Luke (SA, Male)": "en-ZA-LukeNeural" | |
| } | |
| # --- Streamlit Config --- | |
| st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide") | |
| # --- User/session state --- | |
| if "user_id" not in st.session_state: | |
| st.session_state["user_id"] = str(uuid.uuid4()) | |
| user_id = st.session_state["user_id"] | |
| if "mute_voice" not in st.session_state: | |
| st.session_state["mute_voice"] = False | |
| if "last_tts_text" not in st.session_state: | |
| st.session_state["last_tts_text"] = "" | |
| if "last_audio_path" not in st.session_state: | |
| st.session_state["last_audio_path"] = "" | |
| if "selected_voice" not in st.session_state: | |
| st.session_state["selected_voice"] = "Jenny (US, Female)" | |
| # --- Sidebar for Voice Selection --- | |
| with st.sidebar: | |
| st.markdown("### Voice Settings") | |
| selected_voice = st.selectbox("Select assistant voice", list(VOICE_OPTIONS.keys()), index=list(VOICE_OPTIONS.keys()).index(st.session_state["selected_voice"])) | |
| st.session_state["selected_voice"] = selected_voice | |
| # --- Branding & Styling --- | |
| st.markdown(""" | |
| <style> | |
| .block-container {padding-top: 1rem; padding-bottom: 0rem;} | |
| header {visibility: hidden;} | |
| .stChatMessage { max-width: 85%; border-radius: 12px; padding: 8px; margin-bottom: 10px; } | |
| .stChatMessage[data-testid="stChatMessage-user"] { background: #f0f0f0; color: #000000; } | |
| .stChatMessage[data-testid="stChatMessage-assistant"] { background: #e3f2fd; color: #000000; } | |
| .lt-logo { vertical-align: middle; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.markdown(""" | |
| <div style='text-align: center; margin-top: 20px; margin-bottom: -10px;'> | |
| <span style='display: inline-flex; align-items: center; gap: 8px;'> | |
| <img src='https://lortechnologies.com/wp-content/uploads/2023/03/LOR-Online-Logo.svg' width='100' class='lor-logo'/> | |
| <span style='font-size: 12px; color: gray;'>Powered by LOR Technologies</span> | |
| </span> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # --- Firestore helpers --- | |
| def get_or_create_thread_id(): | |
| doc_ref = db.collection("users").document(user_id) | |
| doc = doc_ref.get() | |
| if doc.exists: | |
| return doc.to_dict()["thread_id"] | |
| else: | |
| thread = client.beta.threads.create() | |
| doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP}) | |
| return thread.id | |
| def save_message(role, content): | |
| db.collection("users").document(user_id).collection("messages").add({ | |
| "role": role, | |
| "content": content, | |
| "timestamp": firestore.SERVER_TIMESTAMP | |
| }) | |
| def display_chat_history(): | |
| messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream() | |
| assistant_icon_html = "<img src='/static/lorain.jpg' width='20' style='vertical-align:middle;'/>" | |
| for msg in list(messages)[::-1]: | |
| data = msg.to_dict() | |
| if data["role"] == "user": | |
| st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-user'>π€ <strong>You:</strong> {data['content']}</div>", unsafe_allow_html=True) | |
| else: | |
| st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-assistant'>{assistant_icon_html} <strong>LORAIN:</strong> {data['content']}</div>", unsafe_allow_html=True) | |
| # --- Edge TTS synth --- | |
| async def edge_tts_synthesize(text, voice, user_id): | |
| out_path = f"output_{user_id}.mp3" | |
| communicate = edge_tts.Communicate(text, voice) | |
| await communicate.save(out_path) | |
| return out_path | |
| def synthesize_voice(text, voice_key, user_id): | |
| voice = VOICE_OPTIONS[voice_key] | |
| out_path = f"output_{user_id}.mp3" | |
| # Only synthesize if text changed or file missing or voice changed | |
| if st.session_state["last_tts_text"] != text or not os.path.exists(out_path) or st.session_state.get("last_voice") != voice: | |
| with st.spinner(f"Generating voice ({voice_key})..."): | |
| asyncio.run(edge_tts_synthesize(text, voice, user_id)) | |
| st.session_state["last_tts_text"] = text | |
| st.session_state["last_audio_path"] = out_path | |
| st.session_state["last_voice"] = voice | |
| return out_path | |
| # --- Main Chat UI --- | |
| input_col, clear_col = st.columns([9, 1]) | |
| with input_col: | |
| user_input = st.chat_input("Type your message here...") | |
| with clear_col: | |
| if st.button("ποΈ", key="clear-chat", help="Clear Chat"): | |
| try: | |
| user_doc_ref = db.collection("users").document(user_id) | |
| for msg in user_doc_ref.collection("messages").stream(): | |
| msg.reference.delete() | |
| user_doc_ref.delete() | |
| st.session_state.clear() | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Failed to clear chat: {e}") | |
| thread_id = get_or_create_thread_id() | |
| display_chat_history() | |
| if user_input: | |
| # --- OpenAI Assistant Response --- | |
| client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input) | |
| save_message("user", user_input) | |
| with st.spinner("Thinking and typing... π"): | |
| run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id) | |
| while True: | |
| run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) | |
| if run_status.status == "completed": | |
| break | |
| time.sleep(1) | |
| messages_response = client.beta.threads.messages.list(thread_id=thread_id) | |
| latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1] | |
| assistant_message = latest_response.content[0].text.value | |
| save_message("assistant", assistant_message) | |
| # --- TTS: Speak unless muted --- | |
| mute_voice = st.session_state.get("mute_voice", False) | |
| audio_path = None | |
| if not mute_voice and assistant_message.strip(): | |
| audio_path = synthesize_voice(assistant_message, st.session_state["selected_voice"], user_id) | |
| st.session_state["last_audio_path"] = audio_path | |
| st.audio(audio_path, format="audio/mp3", autoplay=True) | |
| elif mute_voice: | |
| st.info("π Voice is muted. Click Unmute below to enable assistant speech.") | |
| # --- Controls (Mute/Unmute/Replay) --- | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| if not mute_voice and st.button("π Mute Voice"): | |
| st.session_state["mute_voice"] = True | |
| st.rerun() | |
| elif mute_voice and st.button("π Unmute Voice"): | |
| st.session_state["mute_voice"] = False | |
| st.rerun() | |
| with col2: | |
| # Replay button: Always available if last_audio_path exists | |
| if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]): | |
| if st.button("π Replay Voice"): | |
| st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True) | |
| time.sleep(0.2) | |
| st.rerun() | |
| else: | |
| # Always show last audio with replay if available | |
| if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]) and not st.session_state["mute_voice"]: | |
| st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=False) | |
| # Controls: Only show Replay when idle | |
| if st.button("π Replay Last Voice"): | |
| st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True) | |
| # Show mute/unmute in idle state too | |
| if not st.session_state["mute_voice"]: | |
| if st.button("π Mute Voice"): | |
| st.session_state["mute_voice"] = True | |
| st.rerun() | |
| else: | |
| if st.button("π Unmute Voice"): | |
| st.session_state["mute_voice"] = False | |
| st.rerun() | |