dubswayAgenticV2 / app /utils /whisper_llm.py
peace2024's picture
chatbot add
22d1e2b
raw
history blame
5.05 kB
import os
import logging
import requests
import tempfile
import torch
from transformers import pipeline
from faster_whisper import WhisperModel
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import User
# Setup logger
logger = logging.getLogger("app.utils.whisper_llm")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Whisper Model Initialization
def get_whisper_model():
if torch.cuda.is_available():
device = "cuda"
compute_type = "float32"
logger.info("βœ… GPU detected: Using CUDA with float32 compute")
else:
device = "cpu"
compute_type = "int8"
logger.warning("⚠️ GPU not available: Falling back to CPU with int8 compute")
try:
model = WhisperModel("base", device=device, compute_type=compute_type)
logger.info(f"πŸ“¦ Loaded Faster-Whisper model on {device} with compute_type={compute_type}")
return model
except Exception as e:
logger.error(f"❌ Failed to load Whisper model: {e}")
raise
whisper_model = get_whisper_model()
# Summarizer
try:
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
logger.info("πŸ“¦ Hugging Face summarizer pipeline loaded successfully.")
except Exception as e:
logger.error(f"❌ Failed to load summarization pipeline: {e}")
raise
# Chunked summarization
def summarize_in_chunks(text, chunk_size=800, overlap=100):
summaries = []
words = text.split()
step = chunk_size - overlap
for i in range(0, len(words), step):
chunk = " ".join(words[i:i + chunk_size])
if len(chunk.strip()) == 0:
continue
try:
result = summarizer(chunk, max_length=256, min_length=64, do_sample=False)
summaries.append(result[0]['summary_text'])
except Exception as e:
logger.error(f"❌ Chunk summarization failed: {e}")
return " ".join(summaries)
# Async user fetch using AsyncSession
async def get_user(user_id: int, db: AsyncSession):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# 🧠 Core analyzer function with per-user FAISS ingestion
async def analyze(video_url: str, user_id: int, db: AsyncSession):
user = await get_user(user_id, db)
if not user:
raise ValueError(f"❌ User with ID {user_id} not found in database.")
logger.info(f"πŸ“₯ Starting video analysis for user: {user.email} (ID: {user.id})")
# Step 1: Download video to temp file
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp:
with requests.get(video_url, stream=True, timeout=60) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
tmp.write(chunk)
tmp_path = tmp.name
logger.info(f"🎞️ Video saved to temp file: {tmp_path}")
except Exception as e:
logger.error(f"❌ Failed to download video: {e}")
raise
# Step 2: Transcribe
try:
logger.info("🧠 Transcribing audio with Faster-Whisper...")
segments, _ = whisper_model.transcribe(tmp_path)
text = " ".join(segment.text for segment in segments)
logger.info(f"βœ… Transcription completed. Length: {len(text)} characters.")
except Exception as e:
logger.error(f"❌ Transcription failed: {e}")
raise
# Step 3: Summarize
try:
logger.info("πŸ“ Summarizing transcript with Hugging Face model...")
summary = summarize_in_chunks(text)
logger.info("βœ… Summarization completed.")
except Exception as e:
logger.error(f"❌ Summarization failed: {e}")
raise
# Step 4: Save to FAISS store
try:
logger.info("πŸ“Š Creating/updating FAISS vector store for user...")
documents = [Document(page_content=summary)]
embeddings = OpenAIEmbeddings()
user_vector_path = f"vector_store/user_{user_id}"
os.makedirs(user_vector_path, exist_ok=True)
if os.path.exists(os.path.join(user_vector_path, "index.faiss")):
vector_store = FAISS.load_local(user_vector_path, embeddings)
vector_store.add_documents(documents)
else:
vector_store = FAISS.from_documents(documents, embeddings)
vector_store.save_local(user_vector_path)
logger.info(f"βœ… Vector store saved at: {user_vector_path}")
except Exception as e:
logger.error(f"❌ Failed to create vector store: {e}")
raise
return text, summary