Spaces:
Sleeping
Sleeping
import os | |
import gc | |
import logging | |
from typing import Any, Dict | |
import torch | |
import yt_dlp | |
import gradio as gr | |
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
from huggingface_hub import login, InferenceClient | |
# Set up basic logging. | |
logging.basicConfig(level=logging.INFO) | |
# ------------------------------- | |
# Download Audio from Video URL | |
# ------------------------------- | |
def download_audio(url: str) -> str: | |
""" | |
Download audio from a video URL and convert it to MP3 format. | |
""" | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
} | |
try: | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
audio_file = ydl.prepare_filename(info) | |
if not audio_file.endswith('.mp3'): | |
audio_file = audio_file.rsplit('.', 1)[0] + '.mp3' | |
logging.info("Audio downloaded successfully: %s", audio_file) | |
return audio_file | |
except Exception as e: | |
logging.error("Error downloading audio: %s", e) | |
raise RuntimeError("Audio download failed") from e | |
# --------------------------------------- | |
# Set Up Speech Recognition Model & Pipe | |
# --------------------------------------- | |
if torch.cuda.is_available(): | |
model_device = "cuda" | |
pipeline_device = 0 # GPU device index for Hugging Face pipeline. | |
torch_dtype = torch.float16 | |
speech_model_id = "openai/whisper-large-v3-turbo" | |
batch_size = 16 | |
stride_length_s_tuple = (4, 2) | |
else: | |
model_device = "cpu" | |
pipeline_device = -1 # CPU for pipeline. | |
torch_dtype = torch.float32 | |
speech_model_id = "openai/whisper-tiny" | |
batch_size = 2 | |
stride_length_s_tuple = None | |
try: | |
model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
speech_model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
) | |
model.to(model_device) | |
processor = AutoProcessor.from_pretrained(speech_model_id) | |
except Exception as e: | |
logging.error("Error loading the speech model: %s", e) | |
raise | |
pipe = pipeline( | |
"automatic-speech-recognition", | |
model=model, | |
tokenizer=processor.tokenizer, | |
feature_extractor=processor.feature_extractor, | |
torch_dtype=torch_dtype, | |
device=pipeline_device, | |
) | |
# -------------------------------------- | |
# Transcription and SRT Conversion | |
# -------------------------------------- | |
def transcribe_audio(audio_path: str, batch_size: int) -> Dict[str, Any]: | |
""" | |
Transcribe the audio file using the configured pipeline. | |
""" | |
try: | |
result = pipe( | |
audio_path, | |
chunk_length_s=10, | |
stride_length_s=stride_length_s_tuple, | |
batch_size=batch_size, | |
return_timestamps=True, | |
) | |
return result | |
except Exception as e: | |
logging.error("Error during transcription: %s", e) | |
raise | |
def seconds_to_srt_time(seconds: float) -> str: | |
""" | |
Convert seconds to SRT time format (HH:MM:SS,mmm). | |
""" | |
if seconds is None or not isinstance(seconds, (int, float)): | |
return "00:00:00,000" | |
hours = int(seconds // 3600) | |
minutes = int((seconds % 3600) // 60) | |
secs = int(seconds % 60) | |
millis = int((seconds - int(seconds)) * 1000) | |
return f"{hours:02}:{minutes:02}:{secs:02},{millis:03}" | |
def convert_to_srt(transcribed: Dict[str, Any]) -> str: | |
""" | |
Convert transcription chunks into SRT format. | |
""" | |
srt_output = [] | |
if "chunks" in transcribed: | |
for i, chunk in enumerate(transcribed["chunks"], start=1): | |
if chunk.get("timestamp") is not None: | |
start_time = seconds_to_srt_time(chunk["timestamp"][0]) | |
end_time = seconds_to_srt_time(chunk["timestamp"][1]) | |
srt_output.append(f"{i}\n{start_time} --> {end_time}\n{chunk['text']}\n") | |
else: | |
srt_output.append(f"{i}\n{chunk['text']}\n") | |
return "\n".join(srt_output) | |
else: | |
logging.warning("No chunks found; returning plain text.") | |
return transcribed.get("text", "") | |
# ------------------------------ | |
# Hugging Face Login Adjustment | |
# ------------------------------ | |
def hf_login() -> None: | |
""" | |
Log in to Hugging Face using the token from environment variables. | |
""" | |
huggingface_api_token = os.environ.get('HF_TOKEN') | |
if not huggingface_api_token: | |
raise ValueError("HF_TOKEN not set in environment variables.") | |
login(token=huggingface_api_token) | |
logging.info("Logged in to Hugging Face successfully.") | |
# Log in once (this can be done at startup) | |
hf_login() | |
# ------------------------------------------- | |
# Generate Video Chapters from the Transcript | |
# ------------------------------------------- | |
def generate_chapters(srt_text: str) -> str: | |
""" | |
Generate video chapters from the SRT transcript using a text generation model. | |
""" | |
chapter_model_id = "Qwen/Qwen2.5-Coder-32B-Instruct" # or another model if desired | |
client = InferenceClient(model=chapter_model_id) | |
prompt = ( | |
"Based on the following video transcript, generate a numbered list of concise, SEO-friendly video chapters with timestamps. " | |
"Keep related parts together to limit the number of chapters (up to 5-10 chapters). " | |
"Each chapter should be in the format '<timestamp> <chapter title>', where the first chapter starts at 0:00. " | |
"Timestamps should be in the format 'm:ss' as needed. For example:\n\n" | |
"0:00 Intro\n" | |
"1:34 Why the GPT wrapper is bad\n" | |
"2:14 Smart users workflow\n\n" | |
"Only output the chapters list in the provided format. Stop after one list.\n" | |
"Transcript:\n" | |
f"{srt_text}\n\n" | |
"Chapters:" | |
) | |
generation_parameters = { | |
"max_new_tokens": 300, | |
"temperature": 0.5, | |
"top_p": 0.95, | |
"do_sample": True, | |
} | |
try: | |
generated_text = client.text_generation(prompt, **generation_parameters) | |
return generated_text | |
except Exception as e: | |
logging.error("Error generating chapters: %s", e) | |
raise | |
# ------------------------------------------- | |
# Main Processing Function for Gradio UI | |
# ------------------------------------------- | |
def process_video(video_url: str): | |
# Download audio from the provided URL. | |
audio_file = download_audio(video_url) | |
logging.info("Audio file saved as: %s", audio_file) | |
# Transcribe the audio. | |
transcribed_text = transcribe_audio(audio_file, batch_size) | |
# Clean up memory. | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
# Convert transcription to SRT format. | |
srt_text = convert_to_srt(transcribed_text) | |
# Generate chapters from the SRT. | |
response = generate_chapters(srt_text) | |
# Extract only the chapters part and add a footer | |
cleaned_text = response.split("Chapters:")[1] if "Chapters:" in response else response | |
chapters = f"{cleaned_text.strip()}\n\nGenerated using free 'GenAI ChapterCraft' tool." | |
return srt_text, chapters | |
# ------------------------------------------- | |
# Gradio Interface Definition | |
# ------------------------------------------- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Video Chapter Generator") | |
with gr.Row(): | |
video_url_input = gr.Textbox(label="Video URL", placeholder="Enter video URL here", lines=1) | |
with gr.Row(): | |
process_button = gr.Button("Process Video") | |
with gr.Row(): | |
srt_output = gr.Textbox(label="SRT Transcript", interactive=False, lines=15, show_copy_button=True) | |
with gr.Row(): | |
chapters_output = gr.Textbox(label="Generated Chapters", interactive=False, lines=10, show_copy_button=True) | |
process_button.click(fn=process_video, inputs=video_url_input, outputs=[srt_output, chapters_output]) | |
# Launch the Gradio app | |
demo.launch() | |