import os import subprocess import torch import numpy as np import gradio as gr from pyannote.audio import Pipeline from transformers import WhisperProcessor, WhisperForConditionalGeneration from pydub import AudioSegment import datetime import logging import tempfile import shutil # Настройка логирования logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Проверка доступности GPU device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {device}") # Загрузка моделей processor = WhisperProcessor.from_pretrained("openai/whisper-medium") #whisper-large-v2 model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-medium").to(device) # Загрузка токена из переменной окружения hf_token = os.environ.get("HUGGINGFACE_TOKEN") if not hf_token: raise ValueError("HUGGINGFACE_TOKEN не найден в переменных окружения") diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token) diarization_pipeline.to(device) def check_ffmpeg_codecs(): try: result = subprocess.run(['ffmpeg', '-codecs'], capture_output=True, text=True) codecs = result.stdout required_codecs = { 'libvpx': False, 'libvpx-vp9': False, 'libvorbis': False, 'libopus': False, 'libx264': False, 'aac': False } for codec in required_codecs: if codec in codecs: required_codecs[codec] = True return required_codecs except Exception as e: logger.error(f"Error checking FFmpeg codecs: {str(e)}") return None def extract_audio_from_video(video_path, audio_path): try: command = f'ffmpeg -i "{video_path}" -vn -acodec pcm_s16le -ar 16000 -ac 1 "{audio_path}"' result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True) logger.info(f"FFmpeg command executed successfully: {command}") return True except subprocess.CalledProcessError as e: logger.error(f"Error executing FFmpeg command: {e}") logger.error(f"FFmpeg stderr: {e.stderr}") return False def transcribe_audio(audio_path): logger.info(f"Starting transcription of {audio_path}") # Загрузка аудио audio = AudioSegment.from_file(audio_path) audio = audio.set_frame_rate(16000).set_channels(1) # Проверка длительности аудио duration_seconds = len(audio) / 1000 logger.info(f"Audio duration: {duration_seconds} seconds") # Преобразование аудио в numpy array audio_array = np.array(audio.get_array_of_samples()).flatten().astype(np.float32) / 32768.0 # Разделение аудио на части с перекрытием max_chunk_size = 30 * 16000 # 30 секунд при частоте дискретизации 16 кГц overlap = 3 * 16000 # 1 секунда перекрытия chunks = [] start = 0 while start < len(audio_array): end = min(start + max_chunk_size, len(audio_array)) chunk = audio_array[start:end] chunks.append(chunk) start += max_chunk_size - overlap transcription = "" for i, chunk in enumerate(chunks): logger.info(f"Processing chunk {i+1}/{len(chunks)}") input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features.to(device) # Генерация транскрипции with torch.no_grad(): predicted_ids = model.generate(input_features, language="russian") chunk_transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0] transcription += chunk_transcription + " " # Постобработка для удаления дублирующихся фраз из-за перекрытия sentences = transcription.split('.') unique_sentences = [] for sentence in sentences: if sentence.strip() and sentence.strip() not in unique_sentences: unique_sentences.append(sentence.strip()) final_transcription = '. '.join(unique_sentences) + '.' logger.info(f"Transcription completed. Length: {len(final_transcription)}") return final_transcription.strip() def transcribe_and_diarize(video_path): logger.info(f"Processing video: {video_path}") # Создаем временную директорию для аудио файла with tempfile.TemporaryDirectory() as temp_dir: audio_path = os.path.join(temp_dir, "temp_audio.wav") # Извлечение аудио из видео if not extract_audio_from_video(video_path, audio_path): return "Ошибка при извлечении аудио из видео", None try: # Диаризация diarization = diarization_pipeline(audio_path) # Транскрибация transcription = transcribe_audio(audio_path) if not transcription: return "Не удалось получить транскрипцию", None # Объединение результатов final_transcript = [] for turn, _, speaker in diarization.itertracks(yield_label=True): start_time = turn.start end_time = turn.end segment_text = transcription[int(start_time * 100):int(end_time * 100)] # Используем индексы как приблизительные позиции в тексте if segment_text.strip(): # Проверка, что сегмент не пустой final_transcript.append(f"Говорящий {speaker} ({start_time:.2f} - {end_time:.2f}): {segment_text}") if not final_transcript: logger.warning("No speech segments detected") return "Речь не обнаружена", None # Сохранение транскрипции в файл timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"transcription_{timestamp}.txt" with open(filename, "w", encoding="utf-8") as f: f.write("\n".join(final_transcript)) logger.info(f"Transcription saved to {filename}") return "\n".join(final_transcript), filename except Exception as e: logger.error(f"Error during transcription: {str(e)}") return f"Произошла ошибка при обработке: {str(e)}", None def check_system_requirements(): codec_support = check_ffmpeg_codecs() if codec_support is None: return "Не удалось проверить поддержку кодеков. Убедитесь, что FFmpeg установлен корректно." messages = [] if all(codec_support.values()): messages.append("Система поддерживает все необходимые кодеки для обработки MP4 и WebM файлов.") else: messages.append("Внимание: Некоторые необходимые кодеки не обнаружены.") for codec, supported in codec_support.items(): if not supported: messages.append(f"- Кодек {codec} не обнаружен. Возможны проблемы при обработке некоторых файлов.") return "\n".join(messages) def process_video(video_file): system_info = check_system_requirements() if video_file is None: return system_info, "", None else: # Копируем файл во временную директорию с безопасным именем with tempfile.TemporaryDirectory() as temp_dir: safe_filename = "input_video" + os.path.splitext(video_file.name)[1] temp_video_path = os.path.join(temp_dir, safe_filename) shutil.copy(video_file.name, temp_video_path) transcription, file_output = transcribe_and_diarize(temp_video_path) return system_info, transcription, file_output # Обновленный интерфейс Gradio iface = gr.Interface( fn=process_video, inputs=gr.File(label="Загрузите видео (MP4 или WebM)"), outputs=[ gr.Markdown(label="Системные требования"), gr.Textbox(label="Транскрипция"), gr.File(label="Сохраненный файл транскрипции") ], title="Транскрибация MP4 и WebM видеозвонков с диаризацией", description="Загрузите видео в формате MP4 или WebM для получения транскрипции с указанием говорящих. Результат будет сохранен в текстовый файл. Информация о системных требованиях отображается при запуске.", examples=[], cache_examples=False, allow_flagging="never", theme = gr.themes.Soft() ) if __name__ == "__main__": iface.launch(share=True, debug=True)