# Standard libraries import base64 import io import json import os import re import tempfile import time from os import getenv from typing import Any from typing import Dict from typing import IO from typing import List from typing import Optional from typing import Tuple from typing import Union # Third-party libraries import requests import streamlit as st from audiorecorder import audiorecorder from openai import OpenAI from pydub import AudioSegment def load_ui_language(file_path: Optional[str] = "ui_lang_support.json") -> Dict[str, Any]: """ Charge les traductions de l'interface utilisateur à partir d'un fichier JSON. Args: file_path (Optional[str]): Chemin vers le fichier JSON contenant les traductions. Returns: Dict[str, Any]: Un dictionnaire contenant les traductions de l'interface utilisateur. """ try: with open(file_path, 'r', encoding='utf-8') as file: return json.load(file) except FileNotFoundError: print(f"{get_translation('erreur_fichier_non_trouve')} {file_path}") return {} except json.JSONDecodeError: print(f"{get_translation('erreur_lecture_fichier')} JSON decoding error") return {} except IOError as e: print(f"{get_translation('erreur_lecture_fichier')} {e}") return {} # Dictionary to store translations translations = load_ui_language() def get_translation(key: str) -> str: """ Obtient la traduction pour une clé donnée basée sur la langue d'interface sélectionnée. Args: key (str): La clé de traduction. Returns: str: Le texte traduit. """ return translations[st.session_state.interface_language][key] # OpenAI client configuration with API key client = OpenAI(api_key=getenv("OPENAI_API_KEY")) def read_file(file_name: str) -> str: """ Lit et retourne le contenu des fichiers texte. Args: file_name (str): Le nom du fichier à lire. Returns: str: Le contenu du fichier ou un message d'erreur. """ try: with open(file_name, 'r', encoding='utf-8') as file: content = file.read() return content except FileNotFoundError: return f"{get_translation('erreur_fichier_non_trouve')} {file_name}" except IOError as e: return f"{get_translation('erreur_lecture_fichier')} {str(e)}" def split_audio(audio_file: str, max_size_mb: int = 25) -> List[str]: """ Divise un fichier audio en segments de 25 Mo ou moins. Args: audio_file (str): Chemin vers le fichier audio. max_size_mb (int): Taille maximale de chaque segment en Mo. Returns: List[str]: Liste des chemins vers les segments audio divisés. """ try: audio = AudioSegment.from_wav(audio_file) duration_ms = len(audio) segment_duration_ms = int( (max_size_mb * 1024 * 1024 * 8) / (audio.frame_rate * audio.sample_width * audio.channels) ) segments = [] for start in range(0, duration_ms, segment_duration_ms): end = min(start + segment_duration_ms, duration_ms) segment = audio[start:end] with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_segment: segment.export(temp_segment.name, format="wav") segments.append(temp_segment.name) return segments except IOError as e: print(f"Erreur lors de la lecture ou de l'écriture du fichier audio : {e}") return [] except ValueError as e: print(f"Erreur de valeur lors du traitement de l'audio : {e}") return [] # Fonction modifiée pour transcrire l'audio en texte def transcribe_audio(audio_file: IO, language: Optional[str] = None) -> str: """ Transcrit un fichier audio en texte. Args: audio_file (IO): Le fichier audio à transcrire. language (Optional[str]): La langue de l'audio. Par défaut None. Returns: str: Le texte transcrit. """ max_size_mb = 25 file_size_mb = os.path.getsize(audio_file.name) / (1024 * 1024) try: if file_size_mb > max_size_mb: segments = split_audio(audio_file.name, max_size_mb) full_transcript = "" for segment in segments: with open(segment, "rb") as audio_segment: transcript = client.audio.transcriptions.create( model="whisper-1", file=audio_segment, language=language ) full_transcript += f"{transcript.text} " os.unlink(segment) # Supprime le fichier temporaire return full_transcript.strip() else: with open(audio_file.name, "rb") as audio_file: transcript = client.audio.transcriptions.create( model="whisper-1", file=audio_file, language=language ) return transcript.text except IOError as e: print(f"Erreur d'entrée/sortie lors de la transcription : {e}") return "" except client.APIError as e: print(f"Erreur API lors de la transcription : {e}") return "" # Fonction pour détecter la langue d'un texte donné def detect_language(input_text: str, temperature: float = 0.01) -> str: """ Détecte la langue d'un texte donné. Args: input_text (str): Le texte dont il faut détecter la langue. temperature (float): La température pour le modèle de langage. Par défaut à 0.01. Returns: str: La langue détectée au format ISO-639-1. Raises: ValueError: Si la réponse de l'API est invalide. requests.RequestException: En cas d'erreur de communication avec l'API. """ system_prompt = ( "Agissez comme une fonction de détection de langue. " "Je fournirai du texte dans n'importe quelle langue, et vous détecterez sa langue. " "Fournissez le résultat de votre détection au format ISO-639-1. " "Votre réponse doit représenter l'argument `language` et ne contenir " "que sa valeur sous forme de chaîne. " "Fournir la langue d'entrée au format ISO-639-1 améliorera la précision et la latence." ) try: response = client.chat.completions.create( model="gpt-4o-mini", temperature=temperature, messages=[ { "role": "system", "content": system_prompt }, { "role": "user", "content": input_text } ] ) detected_language = response.choices[0].message.content if not detected_language: raise ValueError("La réponse de l'API est vide") return detected_language except requests.RequestException as e: raise requests.RequestException(f"Erreur de communication avec l'API : {str(e)}") except Exception as e: raise ValueError(f"Erreur inattendue lors de la détection de la langue : {str(e)}") def get_duration_pydub(audio_file: str) -> float: """ Obtient la durée d'un fichier audio en utilisant pydub. Args: audio_file (str): Chemin vers le fichier audio. Returns: float: Durée du fichier audio en secondes. """ try: audio = AudioSegment.from_file(audio_file) return audio.duration_seconds except FileNotFoundError: print(f"Erreur : Le fichier audio '{audio_file}' n'a pas été trouvé.") return 0.0 except Exception as e: print(f"Erreur lors de la lecture du fichier audio : {str(e)}") return 0.0 def text_to_speech(text: str) -> Tuple[Optional[bytes], float]: """ Convertit du texte en parole en utilisant l'API OpenAI. Args: text (str): Le texte à convertir en parole. Returns: Tuple[Optional[bytes], float]: Un tuple contenant les octets audio et la durée de l'audio en secondes. """ try: response = client.audio.speech.create( model="tts-1", voice=st.session_state.tts_voice, input=text ) # Sauvegarde l'audio dans un fichier temporaire with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: response.stream_to_file(temp_audio.name) # Lit le contenu du fichier audio with open(temp_audio.name, "rb") as audio_file: audio_bytes = audio_file.read() # Obtient la durée de l'audio en secondes audio_duration = get_duration_pydub(temp_audio.name) return audio_bytes, audio_duration except Exception as e: print(f"Erreur lors de la conversion texte-parole : {str(e)}") return None, 0.0 def concatenate_audio_files(audio_list: List[Tuple[bytes, float]]) -> Optional[bytes]: """ Concatène plusieurs fichiers audio avec des effets sonores. Args: audio_list (List[Tuple[bytes, float]]): Une liste de tuples, chacun contenant des octets audio et la durée. Returns: Optional[bytes]: L'audio concaténé sous forme d'octets, ou None en cas d'erreur. """ # Créer un segment audio vide final_audio = AudioSegment.empty() try: # Charger les effets sonores begin_sound = AudioSegment.from_mp3( "sound-effects/voice-message-play-begin/voice-message-play-begin-1.mp3" ) end_sound = AudioSegment.from_mp3( "sound-effects/voice-message-play-ending/voice-message-play-ending-1.mp3" ) # 5 secondes de silence silence = AudioSegment.silent(duration=1500) # 1500 ms = 1.5 secondes for audio_bytes, _ in audio_list: # Convertir les octets en un segment audio segment = AudioSegment.from_mp3(io.BytesIO(audio_bytes)) # Ajouter le son de début, le segment TTS, le son de fin et le silence final_audio += begin_sound + segment + end_sound + silence # Convertir le segment audio final en octets buffer = io.BytesIO() final_audio.export(buffer, format="mp3") return buffer.getvalue() except IOError as e: print(f"Erreur lors de la lecture ou de l'écriture des fichiers audio : {e}") return None except Exception as e: print(f"Une erreur inattendue s'est produite : {e}") return None def process_message( message: str, operation_prompt: str = "", tts_enabled: bool = False ) -> Tuple[Optional[bytes], Optional[float]]: """ Traite les messages des utilisateurs et génère une réponse. Args: message (str): Le message d'entrée de l'utilisateur. operation_prompt (str, optional): Prompt supplémentaire pour l'opération. Par défaut "". tts_enabled (bool, optional): Si la synthèse vocale est activée. Par défaut False. Returns: Tuple[Optional[bytes], Optional[float]]: Un tuple contenant l'audio TTS et sa durée, ou (None, None) si TTS est désactivé ou en cas d'erreur. """ payload_content = f'{operation_prompt} :\n"""\n{message}\n"""' st.session_state.messages.append({"role": "user", "content": payload_content}) with st.chat_message("user"): st.markdown(message) with st.chat_message("assistant"): message_placeholder = st.empty() full_response = "" try: for response in client.chat.completions.create( model="gpt-4o-mini", messages=st.session_state.messages, stream=True, temperature=0.1): full_response += (response.choices[0].delta.content or "") message_placeholder.markdown(full_response + "▌") # Utiliser regex pour supprimer les trois premiers et derniers guillemets doubles full_response = re.sub(r'^"{3}|"{3}$', '', full_response.strip()) message_placeholder.markdown(full_response) except Exception as e: st.error(f"Une erreur s'est produite lors de la génération de la réponse : {e}") return None, None st.session_state.messages.append( {"role": "assistant", "content": full_response} ) if tts_enabled: try: tts_audio, tts_duration = text_to_speech(full_response) return tts_audio, tts_duration except Exception as e: st.error(f"Une erreur s'est produite lors de la conversion texte-parole : {e}") return None, None return None, None class GlobalSystemPrompts: """Class to store global system prompts.""" @staticmethod def linguascribe(): """ Retrieve the system prompt for the Linguascribe feature. Returns: str: The system prompt for Linguascribe. """ try: system_prompt = read_file('linguascribe.prompt') return system_prompt except FileNotFoundError: print("Le fichier 'linguascribe.prompt' n'a pas été trouvé.") return "" except IOError as e: print(f"Erreur lors de la lecture du fichier 'linguascribe.prompt': {e}") return "" # Function to configure the translation mode def set_translation_mode(from_lang: str, dest_lang: str) -> Tuple[str, str]: """ Configure les prompts globaux pour le mode de traduction. Args: from_lang (str): La langue source. dest_lang (str): La langue de destination. Returns: Tuple[str, str]: Un tuple contenant le prompt système et le prompt d'opération. """ system_prompt = GlobalSystemPrompts.linguascribe() operation_prompt = f"Translate({from_lang} to {dest_lang})" return system_prompt, operation_prompt # List of languages supported by the application SUPPORTED_LANGUAGES = [ "Afrikaans", "Arabic", "Armenian", "Azerbaijani", "Belarusian", "Bosnian", "Bulgarian", "Catalan", "Chinese", "Croatian", "Czech", "Danish", "Dutch", "English", "Estonian", "Finnish", "French", "Galician", "German", "Greek", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Indonesian", "Italian", "Japanese", "Kannada", "Kazakh", "Korean", "Latvian", "Lithuanian", "Macedonian", "Malay", "Marathi", "Maori", "Nepali", "Norwegian", "Persian", "Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Slovak", "Slovenian", "Spanish", "Swahili", "Swedish", "Tagalog", "Tamil", "Thai", "Turkish", "Ukrainian", "Urdu", "Vietnamese", "Welsh" ] def convert_language_name_to_iso6391(language_data: Union[str, Dict[str, str]]) -> str: """ Convertit un nom de langue en son code ISO 639-1. Args: language_data (Union[str, Dict[str, str]]): Le nom de la langue ou un dictionnaire contenant le nom de la langue. Returns: str: Le code ISO 639-1 pour la langue donnée, ou 'en' si non trouvé. """ # Dictionnaire associant les noms de langues aux codes ISO 639-1 language_to_iso: Dict[str, str] = { "Afrikaans": "af", "Arabic": "ar", "Armenian": "hy", "Azerbaijani": "az", "Belarusian": "be", "Bosnian": "bs", "Bulgarian": "bg", "Catalan": "ca", "Chinese": "zh", "Croatian": "hr", "Czech": "cs", "Danish": "da", "Dutch": "nl", "English": "en", "Estonian": "et", "Finnish": "fi", "French": "fr", "Galician": "gl", "German": "de", "Greek": "el", "Hebrew": "he", "Hindi": "hi", "Hungarian": "hu", "Icelandic": "is", "Indonesian": "id", "Italian": "it", "Japanese": "ja", "Kannada": "kn", "Kazakh": "kk", "Korean": "ko", "Latvian": "lv", "Lithuanian": "lt", "Macedonian": "mk", "Malay": "ms", "Marathi": "mr", "Maori": "mi", "Nepali": "ne", "Norwegian": "no", "Persian": "fa", "Polish": "pl", "Portuguese": "pt", "Romanian": "ro", "Russian": "ru", "Serbian": "sr", "Slovak": "sk", "Slovenian": "sl", "Spanish": "es", "Swahili": "sw", "Swedish": "sv", "Tagalog": "tl", "Tamil": "ta", "Thai": "th", "Turkish": "tr", "Ukrainian": "uk", "Urdu": "ur", "Vietnamese": "vi", "Welsh": "cy" } # Vérifier si language_data est un dictionnaire if isinstance(language_data, dict): language_name = language_data.get('language', '') else: language_name = language_data try: # Retourner le code ISO 639-1 correspondant au nom de la langue return language_to_iso[language_name] except KeyError: # Gérer spécifiquement l'exception KeyError print(f"Langue non trouvée : {language_name}") return "en" # Par défaut 'en' si la langue n'est pas trouvée def on_languages_change() -> None: """Fonction de rappel pour le changement de langue(s) de destination.""" selected_language_names: List[str] = st.session_state.language_selector st.session_state.selected_languages = [ {"language": lang, "iso-639-1": convert_language_name_to_iso6391(lang)} for lang in selected_language_names ] def init_process_mode() -> Tuple[str, str]: """ Initialise le mode de traitement pour la traduction si nécessaire. Returns: Tuple[str, str]: Un tuple contenant le prompt système et le prompt d'opération. """ if st.session_state["process_mode"] == "translation": system_prompt, operation_prompt = set_translation_mode( from_lang=st.session_state.language_detected, dest_lang=st.session_state.target_language ) return system_prompt, operation_prompt return "", "" # Fonction principale de l'application def main(): """Fonction principale qui configure et exécute l'application Streamlit.""" st.title("------- DEMORRHA -------") # Initialisation des variables d'état de session if "language_detected" not in st.session_state: st.session_state["language_detected"] = None if "process_mode" not in st.session_state: st.session_state["process_mode"] = "translation" if "target_language" not in st.session_state: st.session_state.target_language = "en" if "selected_languages" not in st.session_state: st.session_state.selected_languages = [ {"language": "English", "iso-639-1": "en"} ] if "enable_tts_for_input_from_text_field" not in st.session_state: st.session_state["enable_tts_for_input_from_text_field"] = True if "enable_tts_for_input_from_audio_record" not in st.session_state: st.session_state["enable_tts_for_input_from_audio_record"] = True if "interface_language" not in st.session_state: st.session_state.interface_language = "French" # Langue par défaut system_prompt, operation_prompt = init_process_mode() # Initialisation de l'historique des messages avec le prompt système if "messages" not in st.session_state: st.session_state.messages = [] # Vérification de l'existence d'un message système dans st.session_state.messages if not any(message["role"] == "system" for message in st.session_state.messages): st.session_state.messages.insert(0, {"role": "system", "content": system_prompt}) with st.container(border=True): # Interface utilisateur pour le chat textuel if user_input := st.chat_input(get_translation("entrez_message")): # Traitement du message texte de l'utilisateur if st.session_state.language_detected is None: st.session_state.language_detected = detect_language( input_text=user_input, temperature=0.01 ) audio_list = [] for cursor_selected_lang in st.session_state.selected_languages: st.session_state.target_language = cursor_selected_lang["iso-639-1"] # Initialisation du mode de traitement pour la langue cible actuelle system_prompt, operation_prompt = init_process_mode() # Traitement du message utilisateur pour la langue cible actuelle try: tts_audio, tts_duration = process_message( user_input, operation_prompt=f"{operation_prompt}", tts_enabled=st.session_state.enable_tts_for_input_from_text_field ) if tts_audio is not None: audio_list.append((tts_audio, tts_duration)) except Exception as e: st.error(f"Erreur lors du traitement du message : {str(e)}") if audio_list: try: final_audio = concatenate_audio_files(audio_list) with st.container(border=True): st.audio(final_audio, format="audio/mp3", autoplay=True) st.download_button( label=get_translation("telecharger_audio"), data=final_audio, file_name="audio_reponse.mp3", mime="audio/mp3" ) except Exception as e: st.error(f"Erreur lors de la concaténation des fichiers audio : {str(e)}") with st.container(border=True): # Interface utilisateur pour l'enregistrement audio st.write(get_translation("enregistrez_message")) audio = audiorecorder( start_prompt=get_translation("cliquez_enregistrer"), stop_prompt=get_translation("cliquez_arreter"), pause_prompt=get_translation("cliquez_pause"), show_visualizer=True, key="vocal_chat_input" ) # Traitement de l'entrée audio de l'utilisateur if len(audio) > 0: try: with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: audio.export(temp_audio.name, format="wav") transcription = transcribe_audio(temp_audio, language=st.session_state.language_detected) os.unlink(temp_audio.name) # Suppression du fichier temporaire if st.session_state.language_detected is None: st.session_state.language_detected = detect_language( input_text=transcription, temperature=0.01 ) st.write(get_translation("langue_detectee").format(st.session_state.language_detected)) st.write(get_translation("transcription").format(transcription)) audio_list = [] for cursor_selected_lang in st.session_state.selected_languages: st.session_state.target_language = cursor_selected_lang["iso-639-1"] # Initialisation du mode de traitement pour la langue cible actuelle system_prompt, operation_prompt = init_process_mode() # Traitement du message utilisateur pour la langue cible actuelle try: tts_audio, tts_duration = process_message( transcription, operation_prompt=f"{operation_prompt}", tts_enabled=st.session_state.enable_tts_for_input_from_audio_record ) if tts_audio is not None: audio_list.append((tts_audio, tts_duration)) except Exception as e: st.error(f"Erreur lors du traitement du message audio : {str(e)}") if audio_list: try: final_audio = concatenate_audio_files(audio_list) with st.container(border=True): st.audio(final_audio, format="audio/mp3", autoplay=True) # Ajout d'un bouton de téléchargement pour l'audio final st.download_button( label=get_translation("telecharger_audio"), data=final_audio, file_name="audio_concatene.mp3", mime="audio/mp3" ) except Exception as e: st.error(f"Erreur lors de la concaténation des fichiers audio : {str(e)}") except Exception as e: st.error(f"Erreur lors du traitement de l'audio : {str(e)}") # Configuration de la barre latérale with st.sidebar: st.logo("img/logo_2.png", icon_image="img/logo_2.png") st.header(get_translation("sidebar_titre")) st.markdown(f"## {get_translation('a_propos')}") st.info(get_translation("info_app")) with st.container(border=True): st.subheader(get_translation("langue_interface")) # Sélection de la langue de l'interface st.selectbox( label=get_translation("choix_langue_interface"), options=list(translations.keys()), key="interface_language", index=( list(translations.keys()).index("French") if "interface_language" not in st.session_state else list(translations.keys()).index(st.session_state.interface_language) ) ) with st.container(border=True): # Conteneur pour la sélection de langue st.subheader(get_translation("selection_langue")) # Sélection multiple des langues de destination st.multiselect( label=get_translation("langues_destination"), placeholder=get_translation("placeholder_langues"), options=SUPPORTED_LANGUAGES, default=["English"], key="language_selector", max_selections=4, on_change=on_languages_change ) with st.container(border=True): st.subheader(get_translation("parametres_tts")) st.selectbox( get_translation("choix_voix_tts"), options=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], index=3, # "onyx" est à l'index 3 key="tts_voice" ) st.checkbox( get_translation("activer_tts_texte"), key="enable_tts_for_input_from_text_field" ) st.checkbox( get_translation("activer_tts_audio"), key="enable_tts_for_input_from_audio_record" ) # Point d'entrée de l'application if __name__ == "__main__": main()