demorrha / core /speech_to_text.py
rick
corrections d'erreurs dans le code...
00f58d9 unverified
raw
history blame
8.54 kB
#coding: utf-8
# Importation des bibliothèques nécessaires selon les bonnes pratiques PEP8
import requests # Pour envoyer des requêtes HTTP à l'API
import json # Pour traiter les réponses JSON de l'API
from os import getenv
from pydub import AudioSegment
from openai import OpenAI
from io import BytesIO
#from typing import Any
#from typing import Dict
from typing import IO
from typing import List
from typing import Optional
#from typing import Tuple
from typing import Union
from core.DetectLanguage import detect_language
def huggingface_endpoints_stt(fichier_audio: str) -> str:
# Définir l'URL de l'endpoint d'inférence sur Hugging Face
API_URL = f"{getenv('hf_endpoint_whisper_large_v3_turbo')}"
# Inclure votre token d'accès Hugging Face dans les en-têtes de la requête
headers = {
"Authorization": f"Bearer {getenv('HF_API_TOKEN')}"
}
"""
Envoie un fichier audio au modèle Whisper et renvoie la transcription textuelle.
Arguments:
fichier_audio (str): Chemin vers le fichier audio à envoyer pour la transcription.
Retour:
str: Texte transcrit à partir de l'audio.
"""
# Ajouter le type de contenu audio à l'en-tête de la requête
headers["Content-Type"] = f"audio/{fichier_audio.split('.')[-1]}"
# Ouvrir le fichier audio en mode binaire
with open(fichier_audio, "rb") as audio:
# Envoyer une requête POST à l'API avec le fichier audio
response = requests.post(API_URL, headers=headers, data=audio)
# Vérifier si la requête a réussi (code 200)
if response.status_code == 200:
# Extraire la transcription du texte de la réponse JSON
transcription = json.loads(response.content.decode("utf-8"))
return transcription.get("text", "Pas de transcription disponible.")
else:
# En cas d'erreur, afficher le code de statut et le message
raise Exception(f"Erreur API: {response.status_code}, {response.text}")
# ############################################################
def transcribe_audio(filepath: Union[str, IO], language: Optional[str] = None) -> str:
"""
Transcrit un fichier audio temporaire en texte.
Args:
filepath Chemin vers le fichier audio temporaire à transcrire.
language (Optional[str]): La langue de l'audio. Par défaut None.
Returns:
str: Le texte transcrit.
"""
max_size_mb = 25
client = OpenAI(api_key=getenv("OPENAI_API_KEY"))
try:
transcriptions = []
with open(filepath if isinstance(filepath, str) else filepath.name, "rb") as f:
# filepath peut etre un chemin vers un fichier audio ou un objet IO
# verifier si le fichier audio fait plus de 25 Mo
# Diviser l'audio en segments de taille maximale
#segments = split_audio(f, max_size_mb)
f.seek(0)
audio = AudioSegment.from_file(f)
duration_ms = len(audio)
segment_duration_ms = int(
(max_size_mb * 1024 * 1024 * 8) /
(audio.frame_rate * audio.sample_width * audio.channels)
)
for start in range(0, duration_ms, segment_duration_ms):
end = min(start + segment_duration_ms, duration_ms)
segment = audio[start:end]
buffer = BytesIO()
segment.export(buffer, format="mp3")
buffer.seek(0)
if not( language ):
response = client.audio.transcriptions.create(
model="whisper-1",
file=("audio.mp3", buffer),
response_format="text"
)
else:
response = client.audio.transcriptions.create(
model="whisper-1",
file=("audio.mp3", buffer),
language=language,
response_format="text"
)
transcriptions.append(response)
return " ".join(transcriptions)
except Exception as e:
print(f"Erreur lors de la transcription de l'audio : {e}")
return ""
# ############################################################
def translate_audio(filepath: Union[str, IO]) -> str:
"""
Traduit un fichier audio temporaire en Anglais.
Args:
filepath Chemin vers le fichier audio temporaire à traduire.
Returns:
str: Le texte traduit.
"""
max_size_mb = 25
translated_text = []
client = OpenAI(api_key=getenv("OPENAI_API_KEY"))
try:
with open(filepath if isinstance(filepath, str) else filepath.name, "rb") as f:
# filepath peut etre un chemin vers un fichier audio ou un objet IO
f.seek(0)
audio = AudioSegment.from_file(f)
duration_ms = len(audio)
segment_duration_ms = int(
(max_size_mb * 1024 * 1024 * 8) /
(audio.frame_rate * audio.sample_width * audio.channels)
)
for start in range(0, duration_ms, segment_duration_ms):
end = min(start + segment_duration_ms, duration_ms)
segment = audio[start:end]
buffer = BytesIO()
segment.export(buffer, format="mp3")
buffer.seek(0)
translation = client.audio.translations.create(
model="whisper-1",
file=("audio.mp3", buffer)
)
translated_text.append(translation)
return " ".join(translated_text)
except Exception as e:
print(f"Erreur lors de la traduction de l'audio : {e}")
return ""
# ############################################################
class SpeechToText(object):
def __init__(self,
api_key: str):
self.api_key = api_key
self.client = OpenAI(api_key=self.api_key)
def aquire_audio(self,
filepath: Union[str, IO, List[Union[str, IO]]]):
"""
Integrer la detection de langue :
Ajoute un appel a la fonction detect_language juste apres l'aquisition de l'audio et avant de choisir entre transcrire ou traduire.
"""
if isinstance(filepath, str):
file_paths = [filepath]
elif isinstance(filepath, IO):
file_paths = [filepath.name]
else:
file_paths = [f'{file_path}' if isinstance(filepath, List) and isinstance(file_path, str) else file_path.name for file_path in filepath]
# create the list 'file_streams'
file_streams = [open(filepath, "rb") for filepath in file_paths]
def create_assistant():
return self.client.beta.assistants.create(
name="Audio Language Detector",
instructions=" ".join([
"Act as an language detection function for an audio file.",
"You are the assistant designed to detect the language of an audio file.",
"This assistant is designed to detect the language of an audio file.",
"You receive an audio file as input, and you analyze it to determine the language spoken in the audio.",
"The assistant will return the detected language of the audio in ISO 639-1 format.",
""
]),
model="gpt-4o",
tools=[{"type": "file_search"}]
)
def create_vector_store():
return self.client.beta.vector_stores.create(
name="Audio Language Detection"
)
assistant = create_assistant()
vectore_store = create_vector_store()
file_batch = self.client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=vectore_store.id,
files=file_streams
)
# update the assistant to use the vector store
assistant = self.client.beta.assistants.update(
assistant_id=assistant.id,
tool_ressources={"file_search": {"vector_store_ids": [vectore_store.id]}}
)
## Create a thread
### Upload the user provided audio
message_file = self.client.files.create(
file=open(file_paths[0], "rb")
)