import openai import os import logging from typing import Dict, Any, Optional from pathlib import Path import tempfile import io logger = logging.getLogger(__name__) class VoiceSynthesizer: """Handles text-to-speech conversion for lecture content""" def __init__(self, openai_api_key: str): self.client = openai.OpenAI(api_key=openai_api_key) self.supported_voices = [ "alloy", "echo", "fable", "onyx", "nova", "shimmer" ] self.default_voice = "nova" def set_api_key(self, api_key: str): """Set the OpenAI API key dynamically.""" self.client = openai.OpenAI(api_key=api_key) def synthesize_lecture(self, lecture_content: str, voice: str = None, output_path: str = None) -> Dict[str, Any]: """ Convert lecture text to speech using OpenAI TTS Args: lecture_content: The lecture text to convert voice: Voice to use (alloy, echo, fable, onyx, nova, shimmer) output_path: Where to save the audio file Returns: Dict with success status, file path, and metadata """ try: if not lecture_content.strip(): return { 'success': False, 'error': 'No content provided for synthesis', 'file_path': None, 'duration': 0 } # Validate and set voice selected_voice = voice if voice in self.supported_voices else self.default_voice # Prepare content for TTS (remove markdown formatting) clean_content = self._clean_content_for_tts(lecture_content) # Split content into chunks if too long (OpenAI TTS has limits) chunks = self._split_content(clean_content, max_length=4000) if not output_path: output_path = os.path.join("output", f"lecture_audio_{hash(lecture_content)}.mp3") # Ensure output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) if len(chunks) == 1: # Single chunk - direct synthesis response = self.client.audio.speech.create( model="tts-1", voice=selected_voice, input=chunks[0], response_format="mp3" ) # Save the audio file with open(output_path, "wb") as f: f.write(response.content) else: # Multiple chunks - synthesize and combine self._synthesize_multiple_chunks(chunks, selected_voice, output_path) # Get file size and estimate duration file_size = os.path.getsize(output_path) estimated_duration = self._estimate_audio_duration(clean_content) return { 'success': True, 'file_path': output_path, 'voice': selected_voice, 'duration': estimated_duration, 'file_size': file_size, 'chunks_count': len(chunks) } except Exception as e: logger.error(f"Voice synthesis failed: {str(e)}") return { 'success': False, 'error': str(e), 'file_path': None, 'duration': 0 } def _clean_content_for_tts(self, content: str) -> str: """Clean markdown and formatting for better TTS output""" import re # Remove markdown headers content = re.sub(r'^#{1,6}\s+', '', content, flags=re.MULTILINE) # Remove markdown emphasis content = re.sub(r'\*\*(.*?)\*\*', r'\1', content) # Bold content = re.sub(r'\*(.*?)\*', r'\1', content) # Italic # Remove markdown links content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content) # Remove horizontal rules content = re.sub(r'^---+$', '', content, flags=re.MULTILINE) # Clean up extra whitespace content = re.sub(r'\n{3,}', '\n\n', content) content = re.sub(r' {2,}', ' ', content) # Add pauses for better speech flow content = re.sub(r'\n\n', '\n\n... \n\n', content) # Longer pause between sections return content.strip() def _split_content(self, content: str, max_length: int = 4000) -> list: """Split content into chunks suitable for TTS API""" if len(content) <= max_length: return [content] chunks = [] sentences = content.split('. ') current_chunk = "" for sentence in sentences: # Check if adding this sentence would exceed the limit if len(current_chunk) + len(sentence) + 2 > max_length: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " else: # Single sentence is too long, split by words words = sentence.split() word_chunk = "" for word in words: if len(word_chunk) + len(word) + 1 > max_length: if word_chunk: chunks.append(word_chunk.strip()) word_chunk = word + " " else: # Single word is too long, truncate chunks.append(word[:max_length]) else: word_chunk += word + " " if word_chunk: current_chunk = word_chunk + ". " else: current_chunk += sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return [chunk for chunk in chunks if chunk.strip()] def _synthesize_multiple_chunks(self, chunks: list, voice: str, output_path: str): """Synthesize multiple chunks and combine them""" import tempfile import shutil temp_files = [] try: # Synthesize each chunk for i, chunk in enumerate(chunks): temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk_{i}.mp3") temp_files.append(temp_file.name) temp_file.close() response = self.client.audio.speech.create( model="tts-1", voice=voice, input=chunk, response_format="mp3" ) with open(temp_file.name, "wb") as f: f.write(response.content) # Combine audio files (simple concatenation for MP3) with open(output_path, "wb") as outfile: for temp_file in temp_files: with open(temp_file, "rb") as infile: shutil.copyfileobj(infile, outfile) finally: # Clean up temporary files for temp_file in temp_files: try: os.unlink(temp_file) except: pass def _estimate_audio_duration(self, content: str) -> int: """Estimate audio duration in seconds based on content length""" # Average speaking rate: ~150 words per minute word_count = len(content.split()) duration_minutes = word_count / 150 return int(duration_minutes * 60) def get_available_voices(self) -> Dict[str, str]: """Get list of available voices with descriptions""" return { "alloy": "Neutral, balanced voice", "echo": "Crisp, clear voice", "fable": "Warm, engaging voice", "onyx": "Deep, authoritative voice", "nova": "Pleasant, professional voice (default)", "shimmer": "Bright, energetic voice" } def validate_voice(self, voice: str) -> bool: """Validate if the provided voice is supported""" return voice in self.supported_voices