import os import gc import logging from typing import Any, Dict import torch import yt_dlp import gradio as gr from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline from huggingface_hub import login, InferenceClient # Set up basic logging. logging.basicConfig(level=logging.INFO) # ------------------------------- # Download Audio from Video URL # ------------------------------- def download_audio(url: str) -> str: """ Download audio from a video URL and convert it to MP3 format. """ ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) audio_file = ydl.prepare_filename(info) if not audio_file.endswith('.mp3'): audio_file = audio_file.rsplit('.', 1)[0] + '.mp3' logging.info("Audio downloaded successfully: %s", audio_file) return audio_file except Exception as e: logging.error("Error downloading audio: %s", e) raise RuntimeError("Audio download failed") from e # --------------------------------------- # Set Up Speech Recognition Model & Pipe # --------------------------------------- if torch.cuda.is_available(): model_device = "cuda" pipeline_device = 0 # GPU device index for Hugging Face pipeline. torch_dtype = torch.float16 speech_model_id = "openai/whisper-large-v3-turbo" batch_size = 16 stride_length_s_tuple = (4, 2) else: model_device = "cpu" pipeline_device = -1 # CPU for pipeline. torch_dtype = torch.float32 speech_model_id = "openai/whisper-tiny" batch_size = 2 stride_length_s_tuple = None try: model = AutoModelForSpeechSeq2Seq.from_pretrained( speech_model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True ) model.to(model_device) processor = AutoProcessor.from_pretrained(speech_model_id) except Exception as e: logging.error("Error loading the speech model: %s", e) raise pipe = pipeline( "automatic-speech-recognition", model=model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, torch_dtype=torch_dtype, device=pipeline_device, ) # -------------------------------------- # Transcription and SRT Conversion # -------------------------------------- def transcribe_audio(audio_path: str, batch_size: int) -> Dict[str, Any]: """ Transcribe the audio file using the configured pipeline. """ try: result = pipe( audio_path, chunk_length_s=10, stride_length_s=stride_length_s_tuple, batch_size=batch_size, return_timestamps=True, ) return result except Exception as e: logging.error("Error during transcription: %s", e) raise def seconds_to_srt_time(seconds: float) -> str: """ Convert seconds to SRT time format (HH:MM:SS,mmm). """ if seconds is None or not isinstance(seconds, (int, float)): return "00:00:00,000" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) millis = int((seconds - int(seconds)) * 1000) return f"{hours:02}:{minutes:02}:{secs:02},{millis:03}" def convert_to_srt(transcribed: Dict[str, Any]) -> str: """ Convert transcription chunks into SRT format. """ srt_output = [] if "chunks" in transcribed: for i, chunk in enumerate(transcribed["chunks"], start=1): if chunk.get("timestamp") is not None: start_time = seconds_to_srt_time(chunk["timestamp"][0]) end_time = seconds_to_srt_time(chunk["timestamp"][1]) srt_output.append(f"{i}\n{start_time} --> {end_time}\n{chunk['text']}\n") else: srt_output.append(f"{i}\n{chunk['text']}\n") return "\n".join(srt_output) else: logging.warning("No chunks found; returning plain text.") return transcribed.get("text", "") # ------------------------------ # Hugging Face Login Adjustment # ------------------------------ def hf_login() -> None: """ Log in to Hugging Face using the token from environment variables. """ huggingface_api_token = os.environ.get('HF_TOKEN') if not huggingface_api_token: raise ValueError("HF_TOKEN not set in environment variables.") login(token=huggingface_api_token) logging.info("Logged in to Hugging Face successfully.") # Log in once (this can be done at startup) hf_login() # ------------------------------------------- # Generate Video Chapters from the Transcript # ------------------------------------------- def generate_chapters(srt_text: str) -> str: """ Generate video chapters from the SRT transcript using a text generation model. """ chapter_model_id = "Qwen/Qwen2.5-Coder-32B-Instruct" # or another model if desired client = InferenceClient(model=chapter_model_id) prompt = ( "Based on the following video transcript, generate a numbered list of concise, SEO-friendly video chapters with timestamps. " "Keep related parts together to limit the number of chapters (up to 5-10 chapters). " "Each chapter should be in the format ' ', where the first chapter starts at 0:00. " "Timestamps should be in the format 'm:ss' as needed. For example:\n\n" "0:00 Intro\n" "1:34 Why the GPT wrapper is bad\n" "2:14 Smart users workflow\n\n" "Only output the chapters list in the provided format. Stop after one list.\n" "Transcript:\n" f"{srt_text}\n\n" "Chapters:" ) generation_parameters = { "max_new_tokens": 300, "temperature": 0.5, "top_p": 0.95, "do_sample": True, } try: generated_text = client.text_generation(prompt, **generation_parameters) return generated_text except Exception as e: logging.error("Error generating chapters: %s", e) raise # ------------------------------------------- # Main Processing Function for Gradio UI # ------------------------------------------- def process_video(video_url: str): # Download audio from the provided URL. audio_file = download_audio(video_url) logging.info("Audio file saved as: %s", audio_file) # Transcribe the audio. transcribed_text = transcribe_audio(audio_file, batch_size) # Clean up memory. gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() # Convert transcription to SRT format. srt_text = convert_to_srt(transcribed_text) # Generate chapters from the SRT. response = generate_chapters(srt_text) # Extract only the chapters part and add a footer cleaned_text = response.split("Chapters:")[1] if "Chapters:" in response else response chapters = f"{cleaned_text.strip()}\n\nGenerated using free 'GenAI ChapterCraft' tool." return srt_text, chapters # ------------------------------------------- # Gradio Interface Definition # ------------------------------------------- with gr.Blocks() as demo: gr.Markdown("# Video Chapter Generator") with gr.Row(): video_url_input = gr.Textbox(label="Video URL", placeholder="Enter video URL here", lines=1) with gr.Row(): process_button = gr.Button("Process Video") with gr.Row(): srt_output = gr.Textbox(label="SRT Transcript", interactive=False, lines=15, show_copy_button=True) with gr.Row(): chapters_output = gr.Textbox(label="Generated Chapters", interactive=False, lines=10, show_copy_button=True) process_button.click(fn=process_video, inputs=video_url_input, outputs=[srt_output, chapters_output]) # Launch the Gradio app demo.launch()