Spaces:
Paused
Paused
# Standard libraries | |
import base64 | |
import io | |
import json | |
import os | |
import uuid | |
import re | |
import tempfile | |
import time | |
from os import getenv | |
from typing import Any | |
from typing import Dict | |
from typing import IO | |
from typing import List | |
from typing import Optional | |
from typing import Tuple | |
from typing import Union | |
from io import BytesIO | |
from copy import deepcopy | |
import hashlib | |
# Third-party libraries | |
import requests | |
import streamlit as st | |
#import streamlit.components.v1 as components | |
#from audiorecorder import audiorecorder | |
from openai import OpenAI | |
from pydub import AudioSegment | |
import warnings | |
# Ignore DeprecationWarning | |
warnings.filterwarnings("ignore", category=DeprecationWarning) | |
from dotenv import load_dotenv | |
# Charger les variables d'environnement depuis le fichier .env | |
load_dotenv() | |
from var_app import __version__ | |
from var_app import LANGUAGES_EMOJI | |
from var_app import SUPPORTED_LANGUAGES | |
from var_app import CHAT_FILES_UPLOAD_ALLOWED_TYPES | |
from core.core import translations | |
from core.core import get_translation | |
from core.converter import convert_iso6391_to_language_name | |
from core.converter import convert_language_name_to_iso6391 | |
from core.files import read_file | |
from core.text_to_speech import openai_tts | |
from core.DetectLanguage import detect_language | |
#from core.speech_to_text import huggingface_endpoints_stt | |
from core.speech_to_text import transcribe_audio | |
from core.audio_files import concatenate_audio_files | |
from core.audio_files import split_audio | |
from core.text_to_speech import process_tts_message | |
from core.files import load_ui_language | |
from core.core import process_message | |
from core.core import init_process_mode | |
from core.moderation import api_moderation_openai_text | |
from core.audio_isolation import isolate_audio | |
def hash_file(file): | |
hasher = hashlib.md5() | |
buf = file.read() | |
hasher.update(buf) | |
file.seek(0) | |
return hasher.hexdigest() | |
def callback_change_edited_text(key, value, modified_text): | |
if value["type"] in ["txt"]: | |
st.session_state.changed_uploaded_files[key]["bytes_data"] = st.session_state[modified_text].encode() | |
elif value["type"] in ["wav", "mp3"]: | |
st.session_state.changed_uploaded_files[key]["audio_transcription"] = st.session_state[modified_text] | |
#print(st.session_state.changed_uploaded_files[key]) | |
#def concatenate_files(): | |
# sorted_files = sorted(uploaded_files, key=lambda x: x['order']) | |
# concatenated_text = "" | |
# for file in sorted_files: | |
# if 'edited_text' in file: | |
# concatenated_text += file['edited_text'] + "\n" | |
# return concatenated_text | |
def save_attachment(attachment): | |
"""Sauvegarde la pièce jointe et retourne le chemin.""" | |
# Créer un dossier pour les pièces jointes s'il n'existe pas | |
attachments_dir = 'attachments' | |
os.makedirs(attachments_dir, exist_ok=True) | |
# Générer un nom de fichier unique | |
file_extension = os.path.splitext(attachment.name)[1] | |
filename = f"{uuid.uuid4()}{file_extension}" | |
file_path = os.path.join(attachments_dir, filename) | |
# Sauvegarder le fichier | |
with open(file_path, 'wb') as f: | |
f.write(attachment.getbuffer()) | |
return file_path | |
# Au début du fichier, après les imports | |
st.set_page_config( | |
page_title=f"DEMORRHA - (v{__version__})", | |
page_icon="👹", | |
layout="wide", | |
initial_sidebar_state="collapsed" | |
) | |
def stt_settings(state__stt_voice_isolation): | |
with st.expander(f"{get_translation('parametres_stt')}", | |
expanded=True, | |
icon="🎤"): | |
set__stt_voice_isolation = st.checkbox( | |
get_translation("isolation_voix"), | |
value=state__stt_voice_isolation | |
) | |
if st.button("Submit"): | |
st.session_state.stt_voice_isolation = set__stt_voice_isolation | |
st.rerun() | |
def tts_settings(name__tts_voice, | |
state__tts_with_text, | |
state__tts_with_audio, | |
state__autoplay_tts): | |
with st.expander(f"{get_translation('parametres_tts')}", | |
expanded=True, | |
icon="🔊"): | |
set__tts_voice = st.selectbox( | |
get_translation("choix_voix_tts"), | |
options=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], | |
index=list(["alloy", "echo", "fable", "onyx", "nova", "shimmer"]).index(name__tts_voice) | |
) | |
set__tts_with_text = st.checkbox( | |
get_translation("activer_tts_texte"), | |
value=state__tts_with_text | |
) | |
set__tts_with_audio = st.checkbox( | |
get_translation("activer_tts_audio"), | |
value=state__tts_with_audio | |
) | |
set__autoplay_tts = st.checkbox( | |
get_translation("lecture_auto_tts"), | |
value=state__autoplay_tts | |
) | |
if st.button("Submit"): | |
st.session_state.autoplay_tts = set__autoplay_tts | |
st.session_state.enable_tts_for_input_from_audio_record = set__tts_with_audio | |
st.session_state.enable_tts_for_input_from_text_field = set__tts_with_text | |
st.session_state.tts_voice = set__tts_voice | |
#st.session_state. | |
st.rerun() | |
def recorder_released(): | |
if "rec_widget" in st.session_state: | |
if st.session_state.rec_widget: | |
audio_recorded = True | |
else: | |
audio_recorded = False | |
if audio_recorded: | |
audio = AudioSegment.from_wav(io.BytesIO(st.session_state.rec_widget.getvalue())) | |
st.write(f"Frame rate: {audio.frame_rate}, Frame width: {audio.frame_width}, Duration: {audio.duration_seconds} seconds") | |
if st.session_state.stt_voice_isolation: | |
# Isoler l'audio ici | |
audio = isolate_audio(audio) | |
if not st.session_state.language_detected: | |
# 1. Verifie si audio.duration_seconds est superieur a 600 secondes (10 minutes) | |
if audio.duration_seconds > 600: | |
# PyDub handles time in milliseconds | |
ten_minutes = 10 * 60 * 1000 | |
first_ten_minutes_audio = audio[:ten_minutes] | |
else: | |
# less than ten minutes ... nervermind, the name of this variable is | |
first_ten_minutes_audio = deepcopy(audio) | |
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_extract: | |
first_ten_minutes_audio.export(tmp_extract, format="mp3") | |
tmp_extract.close() | |
# il faut transcrire sans specifier l'argument language dans la fonction transcribe_audio | |
# ensuite on pourra utiliser la fonction detect_language pour detecter la langue du texte transcrit | |
# Transcrire les 10 premiers minutes audio en texte | |
st.session_state.language_detected = detect_language( | |
input_text = transcribe_audio(tmp_extract), | |
temperature = 0.2, | |
context_window = 512, | |
model = "gpt-4o-mini" | |
) | |
first_ten_minutes_audio = AudioSegment.empty() | |
st.markdown( | |
f"- {get_translation('langue_detectee')} {convert_iso6391_to_language_name(st.session_state.language_detected)}" | |
) | |
# ############################################################## | |
try: | |
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file: | |
audio.export(tmp_file, format="mp3") | |
tmp_file.close() | |
# Transcrire l'audio en texte | |
st.session_state.transcription = transcribe_audio( | |
tmp_file, | |
language=st.session_state.language_detected | |
) | |
audio = AudioSegment.empty() | |
st.markdown( | |
f"🎤 {get_translation('transcription_audio')} {st.session_state.transcription}" | |
) | |
st.session_state.audio_list = [] | |
for cursor_selected_lang in st.session_state.selected_languages: | |
st.session_state.target_language = cursor_selected_lang["iso-639-1"] | |
st.session_state.full_response = "" | |
# Initialisation du mode de traitement pour la langue cible actuelle | |
st.session_state.system_prompt, st.session_state.operation_prompt = init_process_mode(from_lang= | |
( | |
st.session_state.language_detected if "language_detected" in st.session_state.language_detected else convert_language_name_to_iso6391( | |
st.session_state.interface_language | |
) | |
), | |
to_lang=st.session_state.target_language | |
) | |
with st.chat_message("assistant", avatar="👻"): | |
message_placeholder = st.empty() | |
st.session_state.response_generator = process_message( | |
st.session_state.transcription, | |
st.session_state.operation_prompt, | |
st.session_state.system_prompt | |
) | |
for response_chunk in st.session_state.response_generator: | |
message_placeholder.markdown(response_chunk) | |
st.session_state.end_response = st.session_state.response_generator.close() | |
if st.session_state.full_response != "": | |
message_placeholder.markdown(st.session_state.full_response) | |
if st.session_state.enable_tts_for_input_from_audio_record: | |
st.session_state.tts_audio, st.session_state.tts_duration = process_tts_message(st.session_state.full_response) | |
if st.session_state.tts_audio: | |
st.session_state.audio_list.append( | |
( st.session_state.tts_audio, | |
st.session_state.tts_duration ) | |
) | |
else: | |
pass | |
if st.session_state.audio_list: | |
st.session_state.final_audio = concatenate_audio_files(st.session_state.audio_list) | |
with st.container(border=True): | |
# Générer un nom de fichier unique | |
st.session_state.timestamp = time.strftime("%Y%m%d-%H%M%S") | |
st.session_state.langues = "_".join([lang["iso-639-1"] for lang in st.session_state.selected_languages]) | |
st.session_state.nom_fichier = f"reponse_audio_{st.session_state.langues}_{st.session_state.timestamp}.mp3" | |
st.audio(st.session_state.final_audio, | |
format="audio/mp3", | |
autoplay=st.session_state.autoplay_tts) | |
st.download_button( | |
label=f"📥 {get_translation('telecharger_audio')}", | |
data=st.session_state.final_audio, | |
file_name=st.session_state.nom_fichier, | |
mime="audio/mp3", | |
use_container_width=True, | |
type="primary", | |
key=f"download_button_{st.session_state.langues}_{st.session_state.timestamp}", | |
) | |
except Exception as e: | |
st.error(f"[AUDIO] - {get_translation('erreur_importation_audio')}: {str(e)}") | |
def main_page(): | |
"""Page principale de l'application.""" | |
if "ui_chat_input_disabled" not in st.session_state: | |
st.session_state.ui_chat_input_disabled = False | |
if "ui_audio_input_disabled" not in st.session_state: | |
st.session_state.ui_audio_input_disabled = False | |
if "ui_filesuploader_disabled" not in st.session_state: | |
st.session_state.ui_filesuploader_disabled = False | |
# Dictionnaire pour stocker les fichiers modifiés | |
if 'changed_uploaded_files' not in st.session_state: | |
st.session_state.changed_uploaded_files = {} | |
# Dictionnaire pour stocker le contenu modifié des fichiers | |
if 'edited_texts' not in st.session_state: | |
st.session_state.edited_texts = {} | |
# Liste pour stocker les fichiers audio | |
if 'audio_files' not in st.session_state: | |
st.session_state.audio_files = [] | |
# Initialisation des variables d'état de session | |
if "ui_loaded" not in st.session_state: | |
st.session_state["ui_loaded"] = False | |
if "language_detected" not in st.session_state: | |
st.session_state["language_detected"] = None | |
if "process_mode" not in st.session_state: | |
st.session_state["process_mode"] = "translation" | |
if "target_language" not in st.session_state: | |
st.session_state.target_language = "en" | |
if "selected_languages" not in st.session_state: | |
st.session_state.selected_languages = [ | |
{"language": "English", "iso-639-1": "en"} | |
] | |
if "interface_language_select" not in st.session_state: | |
st.session_state.interface_language_select = "English" # Langue par défaut | |
if "stt_voice_isolation" not in st.session_state: | |
st.session_state["stt_voice_isolation"] = False | |
if "enable_tts_for_input_from_audio_record" not in st.session_state: | |
st.session_state["enable_tts_for_input_from_audio_record"] = False | |
if "autoplay_tts" not in st.session_state: | |
st.session_state["autoplay_tts"] = False | |
if "enable_tts_for_input_from_text_field" not in st.session_state: | |
st.session_state["enable_tts_for_input_from_text_field"] = False | |
if "tts_voice" not in st.session_state: | |
st.session_state["tts_voice"] = "onyx" | |
# Initialisation de l'historique des messages avec le prompt système | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
def on_languages_change() -> None: | |
"""Fonction de rappel pour le changement de langue(s) de destination.""" | |
selected_language_names: List[str] = st.session_state.language_selector | |
st.session_state.selected_languages = [ | |
{"language": lang, "iso-639-1": convert_language_name_to_iso6391(lang)} | |
for lang in selected_language_names | |
] | |
# Configuration de la barre latérale | |
with st.sidebar: | |
st.logo("img/logo_2.png", icon_image="img/logo_2.png") | |
st.header(get_translation("sidebar_titre")) | |
st.write(f"#### Settings") | |
if st.button(f"Speech-To-Text"): | |
stt_settings(state__stt_voice_isolation=st.session_state.stt_voice_isolation) | |
if st.button(f"Text-To-Speech"): | |
tts_settings( | |
name__tts_voice = st.session_state.tts_voice, | |
state__tts_with_text = st.session_state.enable_tts_for_input_from_text_field, | |
state__tts_with_audio = st.session_state.enable_tts_for_input_from_audio_record, | |
state__autoplay_tts = st.session_state.autoplay_tts | |
) | |
with st.expander(f"{get_translation('a_propos')}", | |
expanded=False, | |
icon="ℹ️"): | |
st.subheader(f"version: {__version__}") | |
st.info(get_translation("info_app")) | |
with st.expander(f"{get_translation('selection_langue')}", | |
expanded=True, | |
icon="🌐"): | |
# Conteneur pour la sélection de langue | |
# Sélection multiple des langues de destination | |
st.multiselect( | |
label=get_translation("langues_destination"), | |
placeholder=get_translation("placeholder_langues"), | |
options=SUPPORTED_LANGUAGES, | |
default=["English"], | |
key="language_selector", | |
max_selections=4, | |
on_change=on_languages_change, | |
format_func=lambda lang: f"{LANGUAGES_EMOJI.get(lang, '')} {lang}" | |
) | |
if st.session_state.get('show_report_form', False): | |
# show_report_form() | |
pass | |
else: | |
with st.container(border=True): | |
# Interface utilisateur pour le chat textuel | |
st.session_state.user_input = st.chat_input( | |
get_translation("entrez_message"), | |
disabled=st.session_state.ui_chat_input_disabled | |
) | |
# Interface utilisateur pour l'upload de fichiers | |
st.session_state.uploaded_files = st.file_uploader( | |
"Choose files to upload", | |
accept_multiple_files=True, | |
type=CHAT_FILES_UPLOAD_ALLOWED_TYPES, | |
key="chat_files_upload", | |
disabled=st.session_state.ui_filesuploader_disabled | |
) | |
if st.session_state.uploaded_files is not None: | |
new_uploaded_files_hashes = [] | |
for file in st.session_state.uploaded_files: | |
uploaded_file_hash = hash_file(file) | |
new_uploaded_files_hashes.append(uploaded_file_hash) | |
if uploaded_file_hash not in st.session_state.changed_uploaded_files.keys(): | |
uploaded_file_name = file.name | |
uploaded_file_type = os.path.splitext(file.name)[1].lstrip('.') | |
st.session_state.changed_uploaded_files[uploaded_file_hash] = { | |
"name": uploaded_file_name, | |
"type": uploaded_file_type, | |
"bytes_data": file.read() | |
} | |
if uploaded_file_type in ["wav", "mp3"]: | |
audio = AudioSegment.from_wav(io.BytesIO(st.session_state.changed_uploaded_files[uploaded_file_hash]["bytes_data"])) | |
with tempfile.NamedTemporaryFile(suffix=f".{uploaded_file_type}", delete=False) as tmp_file: | |
audio.export(tmp_file, format=uploaded_file_type) | |
tmp_file.close() | |
st.session_state.changed_uploaded_files[uploaded_file_hash]["audio_transcription"] = transcribe_audio( | |
filepath=tmp_file.name | |
) | |
audio = AudioSegment.empty() | |
changed_uploaded_files = {} | |
for file_hash in st.session_state.changed_uploaded_files.keys(): | |
if file_hash in new_uploaded_files_hashes: | |
changed_uploaded_files[file_hash] = st.session_state.changed_uploaded_files[file_hash] | |
st.session_state.changed_uploaded_files = changed_uploaded_files | |
for key, value in st.session_state.changed_uploaded_files.items(): | |
if key in new_uploaded_files_hashes: | |
with st.container(border=True): | |
if value["type"] in ["txt"]: | |
st.write(f"**filename:** {value['name']}") | |
st.write(f"**filetype:** {value['type']}") | |
file_content = value["bytes_data"].decode() | |
with st.expander("View file content"): | |
st.text_area( f"Edit text - {value["name"]}", | |
value=file_content, | |
key=value["name"], | |
on_change=callback_change_edited_text, | |
args=(key, value, value["name"]) | |
) | |
elif value["type"] in ["wav", "mp3"]: | |
st.write(f"**Filename:** {value['name']}") | |
audio = AudioSegment.from_file(io.BytesIO(value["bytes_data"])) | |
st.write(f"Frame rate: {audio.frame_rate}, Frame width: {audio.frame_width}, Duration: {audio.duration_seconds} seconds") | |
st.audio(value["bytes_data"], | |
format=f"audio/{value['type']}", | |
autoplay=False | |
) | |
if "audio_transcription" in value.keys(): | |
audio_file_transcription = value["audio_transcription"] | |
with st.expander("View audio transcription"): | |
st.text_area( f"Edit text - {value['name']}", | |
value=audio_file_transcription, | |
key=value["name"], | |
on_change=callback_change_edited_text, | |
args=(key, value, value["name"]) | |
) | |
# Interface utilisateur pour l'enregistrement audio | |
st.audio_input( | |
"Record a voice message", | |
on_change=recorder_released, | |
key="rec_widget", | |
disabled=st.session_state.ui_audio_input_disabled | |
) | |
if st.session_state.user_input: | |
# Appeler la fonction de modération | |
moderation_result = api_moderation_openai_text(st.session_state.user_input) | |
if moderation_result.get("flagged"): | |
st.error("Votre message a été jugé inapproprié et ne peut pas être traité.") | |
return # Arrêter le traitement si le message est inapproprié | |
elif "error" in moderation_result: | |
st.error(moderation_result["error"]) | |
return # Gérer les erreurs de modération | |
# Réinitialiser l'état précédent | |
st.session_state.full_response = "" | |
with st.chat_message("user", avatar="👤"): | |
st.markdown(st.session_state.user_input) | |
# Traitement du message texte de l'utilisateur | |
if st.session_state.language_detected is None: | |
st.session_state.language_detected = detect_language( | |
input_text = st.session_state.user_input, | |
temperature = 0.01, | |
context_window = 512, | |
model="gpt-4o" | |
) | |
st.session_state.audio_list = [] | |
for cursor_selected_lang in st.session_state.selected_languages: | |
st.session_state.target_language = cursor_selected_lang["iso-639-1"] | |
target_language_name = cursor_selected_lang["language"] | |
# Réinitialiser les messages avant de traiter une nouvelle entrée | |
st.session_state.messages = [] | |
st.session_state.full_response = "" | |
# Initialisation du mode de traitement pour la langue cible actuelle | |
st.session_state.system_prompt, st.session_state.operation_prompt = init_process_mode(from_lang= | |
( | |
st.session_state.language_detected if "language_detected" in st.session_state.language_detected else convert_language_name_to_iso6391( | |
st.session_state.interface_language | |
) | |
), | |
to_lang=st.session_state.target_language | |
) | |
# display error with st.error ; if (st.session_state.system_prompt, st.session_state.operation_prompt) is "", "" or None, None, and raise error | |
if (not st.session_state.system_prompt) or (not st.session_state.operation_prompt): | |
st.error("Erreur : Les prompts système ou d'opération sont vides.") | |
raise ValueError("Les prompts système ou d'opération ne peuvent pas être vides.") | |
with st.status(f'({target_language_name}) - {get_translation("traduction_en_cours")}', expanded=True) as response_status: | |
with st.chat_message("assistant", avatar="👻"): | |
message_placeholder = st.empty() | |
st.session_state.response_generator = process_message( | |
st.session_state.user_input, | |
st.session_state.operation_prompt, | |
st.session_state.system_prompt | |
) | |
response_status.update(label=f'({target_language_name}) - {get_translation("traduction_en_cours")}', state="running", expanded=True) | |
for response_chunk in st.session_state.response_generator: | |
message_placeholder.markdown(response_chunk) | |
st.session_state.end_response = st.session_state.response_generator.close() # Obtenir la réponse complète à la fin | |
if st.session_state.full_response != "": | |
message_placeholder.markdown(st.session_state.full_response) | |
if st.session_state.enable_tts_for_input_from_text_field: | |
response_status.update(label=f'({target_language_name}) - {get_translation("traduction_terminee")} ; {get_translation("synthese_vocale_en_cours")}', state="running", expanded=False) | |
st.session_state.tts_audio, st.session_state.tts_duration = process_tts_message(st.session_state.full_response) | |
del st.session_state.full_response | |
if st.session_state.tts_audio: | |
st.audio(base64.b64decode(st.session_state.tts_audio.encode()), format="audio/mp3", autoplay=False) | |
st.session_state.audio_list.append((base64.b64decode(st.session_state.tts_audio.encode()), st.session_state.tts_duration)) | |
response_status.update(label=f'({target_language_name}) - {get_translation("traduction_terminee")} ; {get_translation("synthese_vocale_terminee")}', state="complete", expanded=False) | |
else: | |
response_status.update(label=f'({target_language_name}) - {get_translation("erreur_synthese_vocale")}', state="error", expanded=False) | |
else: | |
response_status.update(label=f'({target_language_name}) - {get_translation("traduction_terminee")}', state="complete", expanded=False) | |
else: | |
response_status.update(label=f'({target_language_name}) - {get_translation("erreur_traduction")}', state="error", expanded=False) | |
if st.session_state.audio_list: | |
with st.status(f"{get_translation('concatenation_audio_en_cours')}", expanded=False) as audio_status: | |
audio_status.update(label=f"{get_translation('concatenation_audio_en_cours')}", state="running", expanded=False) | |
try: | |
st.session_state.final_audio = concatenate_audio_files(st.session_state.audio_list) | |
with st.container(border=True): | |
# Générer un nom de fichier unique | |
st.session_state.timestamp = time.strftime("%Y%m%d-%H%M%S") | |
st.session_state.langues = "_".join([lang["iso-639-1"] for lang in st.session_state.selected_languages]) | |
st.session_state.nom_fichier = f"reponse_audio_{st.session_state.langues}_{st.session_state.timestamp}.mp3" | |
st.audio(st.session_state.final_audio, format="audio/mp3", autoplay=st.session_state.autoplay_tts) | |
st.download_button( | |
label=f"📥 {get_translation('telecharger_audio')}", | |
data=st.session_state.final_audio, | |
file_name=st.session_state.nom_fichier, | |
mime="audio/mp3", | |
use_container_width=True, | |
type="primary", | |
key=f"download_button_{st.session_state.langues}_{st.session_state.timestamp}", | |
) | |
audio_status.update(label=f"{get_translation('concatenation_audio_terminee')}", state="complete", expanded=True) | |
except Exception as e: | |
st.error(f"{get_translation('erreur_concatenation_audio')} : {str(e)}") | |
audio_status.update(label=f"{get_translation('erreur_concatenation_audio')} : {str(e)}", state="error", expanded=True) | |
main_page() | |