import os import torchaudio import time import json import shutil from pyannote.audio import Pipeline import whisper from kalbos_nustatymas import ( transcribe_text, transcribe_text_wav2vec, recognize_language ) def analizuoti_kalbetojus(pasirinktas_modelis="Wav2Vec2", failas="/tmp/ivestis.wav"): start_time = time.time() tekstas = [] lietuviskas_tekstas = "" visi_segmentai = [] tekstas.append("🔁 Įkeliama diarizacijos sistema...") diar_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization", use_auth_token=os.getenv("HF_TOKEN") ) tekstas.append("✅ Diarizacijos modelis paruoštas.") tekstas.append(f"🔁 Įkeliamas modelis: {pasirinktas_modelis}...") whisper_modelis = None if pasirinktas_modelis == "Whisper": whisper_modelis = whisper.load_model("large") tekstas.append(f"✅ Modelis {pasirinktas_modelis} paruoštas.") tekstas.append("🧠 Atliekama diarizacija...") diar_result = diar_pipeline(failas) speaker_segments = list(diar_result.itertracks(yield_label=True)) skaiciuokle = {} transkripcijos = {} kalbos_visos = {} waveform, sample_rate = torchaudio.load(failas) TEMP_FOLDER = "/tmp/temp_segmentai" if os.path.exists(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER) os.makedirs(TEMP_FOLDER, exist_ok=True) for i, (segment, _, speaker) in enumerate(speaker_segments): if (segment.end - segment.start) < 1.0: continue output_path = os.path.join(TEMP_FOLDER, f"segment_{i}_{speaker}.wav") start_sample = int(segment.start * sample_rate) end_sample = int(segment.end * sample_rate) torchaudio.save(output_path, waveform[:, start_sample:end_sample], sample_rate) if pasirinktas_modelis == "Whisper": try: result = whisper_modelis.transcribe(output_path) text = result["text"].strip() lang = result["language"].strip().lower() if lang == "ru": retry = whisper_modelis.transcribe(output_path, language="lt") text = retry["text"].strip() lang = "lt (forced)" except Exception: text = "[KLAIDA Whisper transkripcijoje]" lang = "unknown" elif pasirinktas_modelis == "Wav2Vec2": try: best_text = "" best_lang = "unknown" longest = 0 for kalba in ["lt", "en", "de"]: try: txt = transcribe_text_wav2vec(output_path, kalba=kalba).strip() if len(txt) > longest: best_text = txt best_lang = kalba longest = len(txt) except: continue text = best_text lang = best_lang except Exception: text = "[KLAIDA Wav2Vec2 transkripcijoje]" lang = "unknown" else: text = "[Nežinomas modelis]" lang = "unknown" trukme = round(segment.end - segment.start, 2) if speaker not in skaiciuokle: skaiciuokle[speaker] = {} lang_clean = lang.replace(" (forced)", "") skaiciuokle[speaker][lang_clean] = skaiciuokle[speaker].get(lang_clean, 0) + trukme if speaker not in transkripcijos: transkripcijos[speaker] = [] transkripcijos[speaker].append({"tekstas": text, "kalba": lang, "trukme": trukme}) if speaker not in kalbos_visos: kalbos_visos[speaker] = set() kalbos_visos[speaker].add(lang_clean) max_lt_seconds = 0 kalbetojas_lt = None for speaker, kalbos in skaiciuokle.items(): lt_trukme = kalbos.get("lt", 0) if lt_trukme > max_lt_seconds: max_lt_seconds = lt_trukme kalbetojas_lt = speaker sorted_speakers = sorted(skaiciuokle.keys()) for idx, speaker in enumerate(sorted_speakers): etikete = chr(65 + idx) # A, B, C, ... for sak in transkripcijos[speaker]: if speaker == kalbetojas_lt and "lt" in sak["kalba"]: lietuviskas_tekstas += sak["tekstas"] + " " visi_segmentai.append({ "kalbetojas": etikete, "modelis": pasirinktas_modelis, "kalba": sak["kalba"], "tekstas": sak["tekstas"], "trukme": sak["trukme"] }) elapsed_time = round(time.time() - start_time, 2) minutes = int(elapsed_time) // 60 seconds = int(elapsed_time) % 60 formatted_time = f"{minutes} min. {seconds} sek." os.makedirs("rezultatai", exist_ok=True) failas = os.path.join("rezultatai", f"{pasirinktas_modelis.lower()}.json") try: with open(failas, "w", encoding="utf-8") as f: json.dump({ "modelis": pasirinktas_modelis, "apdorojimo_laikas": elapsed_time, "apdorojimo_laikas_tekstu": formatted_time, "segmentai": visi_segmentai }, f, ensure_ascii=False, indent=2) tekstas.append(f"✅ JSON failas įrašytas: {failas}") except Exception as e: tekstas.append(f"❌ Nepavyko įrašyti JSON: {str(e)}") tekstas.append(f"⏱️ Programos vykdymo trukmė: {formatted_time}") return "\n".join(tekstas), lietuviskas_tekstas.strip(), visi_segmentai