Spaces:
Running
Running
import os | |
import re | |
import httpx | |
import json | |
from typing import List, Tuple, Dict | |
from dataclasses import dataclass | |
import gradio as gr | |
import base64 | |
from mistralai import Mistral | |
from scrapling.fetchers import Fetcher | |
from newspaper import Article | |
from trafilatura import extract | |
import wave | |
import time | |
import asyncio | |
import uuid | |
api_key = os.environ["MISTRAL_API_KEY"] | |
client = Mistral(api_key=api_key) | |
def get_text_from_document(document_url: str) -> str: | |
ocr_response = client.ocr.process( | |
model="mistral-ocr-latest", | |
document={"type": "document_url", "document_url": document_url}, | |
include_image_base64=False, | |
) | |
pages_text = [] | |
for page_number, page in enumerate(ocr_response.pages, start=1): | |
page_content = f"--- Page {page_number} ---\n{page.markdown}\n\n" | |
pages_text.append(page_content) | |
final_text = "".join(pages_text) | |
return final_text | |
def get_text_from_link(link: str) -> str: | |
try: | |
page = Fetcher.get(link, stealthy_headers=True, follow_redirects=True) | |
content = extract(page.html_content, with_metadata=True) | |
if content: | |
return content | |
except Exception as e: | |
print(f"Trafilatura extraction failed for {link}: {str(e)}") | |
try: | |
article = Article(link) | |
article.download() | |
article.parse() | |
metadata_text = f"#Title: {article.title}\n" | |
if article.authors: | |
metadata_text += f"Authors: {', '.join(article.authors)}\n" | |
if article.publish_date: | |
metadata_text += f"Published: {article.publish_date}\n" | |
if article.keywords: | |
metadata_text += f"Keywords: {', '.join(article.keywords)}\n" | |
if article.summary: | |
metadata_text += f"Summary: {article.summary}\n\n" | |
return metadata_text + article.text | |
except Exception as e: | |
print(f"Newspaper extraction failed for {link}: {str(e)}") | |
return None | |
def just_text(text: str) -> str: | |
if not text: | |
raise ValueError("Input text cannot be empty") | |
return text | |
def build_prompt(text: str) -> str: | |
template = """{ | |
"conversation": [ | |
{"speaker": "Olivia", "text": ""}, | |
{"speaker": "Brian", "text": ""} | |
] | |
}""" | |
prompt = """ | |
Turn the text above into a casual podcast conversation between two hosts. | |
- Use a relaxed, informal tone like you're chatting with a friend | |
- Include natural conversation fillers like 'you know', 'I mean', 'like' | |
- Feel free to go off on brief relevant tangents or share quick personal takes | |
- Keep the back-and-forth flowing naturally | |
- Cover the key points but maintain a conversational style | |
- Aim for about 1 minute of casual discussion. | |
Output in this JSON format:""" | |
return f"{text}\n{prompt}\n{template}" | |
def extract_conversation(text: str) -> Dict: | |
prompt = build_prompt(text) | |
max_retries = 3 | |
attempt = 0 | |
while attempt < max_retries: | |
try: | |
chat_completion = client.chat.complete( | |
model="codestral-latest", | |
messages=[ | |
{ | |
"role": "system", | |
"content": "You are a helpful assistant.", | |
}, | |
{ | |
"role": "user", | |
"content": prompt, | |
}, | |
], | |
response_format={ | |
"type": "json_object", | |
}, | |
) | |
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
json_match = re.search(pattern, chat_completion.choices[0].message.content) | |
if not json_match: | |
raise ValueError("No valid JSON found in response") | |
result = json.loads(json_match.group()) | |
if "conversation" not in result: | |
if attempt == max_retries - 1: | |
raise ValueError( | |
"Response JSON missing 'conversation' key after all retries" | |
) | |
attempt += 1 | |
continue | |
return result | |
except Exception as e: | |
if attempt == max_retries - 1: | |
raise RuntimeError( | |
f"Failed to extract conversation after {max_retries} attempts: {e}" | |
) | |
attempt += 1 | |
async def generate_audio(text: str, voice: str, file_out_path: str) -> str: | |
url = "https://eswardivi--kokoro-api-kokoro-generate.modal.run/" | |
querystring = {"text": text, "voice": voice} | |
payload = "" | |
headers = { | |
"Accept": "*/*", | |
"Accept-Encoding": "gzip, deflate, br", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
"Connection": "keep-alive", | |
} | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
url, headers=headers, params=querystring, data=payload, timeout=90.0 | |
) | |
audio_data = response.content | |
with open(file_out_path, "wb") as f: | |
f.write(audio_data) | |
return file_out_path | |
def merge_audio_files(audio_files: List[str]) -> str: | |
random_name = str(uuid.uuid4()) | |
merged_file = f"{random_name}.wav" | |
with wave.open(audio_files[0], "rb") as first_wav: | |
params = first_wav.getparams() | |
merged_audio = wave.open(merged_file, "wb") | |
merged_audio.setparams(params) | |
for audio_file in audio_files: | |
with wave.open(audio_file, "rb") as wav_file: | |
merged_audio.writeframes(wav_file.readframes(wav_file.getnframes())) | |
os.remove(audio_file) | |
merged_audio.close() | |
return merged_file | |
async def wake_up_api(): | |
url = "https://eswardivi--kokoro-api-kokoro-wake-up.modal.run/" | |
async with httpx.AsyncClient() as client: | |
response = await client.get(url, timeout=90.0) | |
if response.status_code == 200: | |
print("API is awake") | |
else: | |
print("API is not awake Yet") | |
def generate_podcast(input_type: str, input: str): | |
""" | |
Generate a podcast-style conversation from various input types. | |
This function takes content from a document URL, webpage link, or raw text and | |
converts it into a natural-sounding podcast dialogue between two hosts. The conversation | |
is then synthesized into audio using text-to-speech. | |
Args: | |
input_type (str): The type of input to process. Must be one of: | |
- "Document": URL to a document (PDF, etc.) to extract text from | |
- "Link": URL to a webpage to scrape content from | |
- "Text": Raw text input to convert directly | |
input (str): The actual input content matching the specified input_type: | |
- For "Document": Document or arxiv URL (e.g. "https://example.com/doc.pdf") | |
- For "Link": Webpage URL (e.g. "https://example.com/article") | |
- For "Text": Plain text content | |
Returns: | |
str: Path to the generated audio file (.wav format) containing the synthesized | |
podcast conversation. | |
Raises: | |
ValueError: If the input text cannot be extracted or is empty | |
RuntimeError: If conversation extraction fails after maximum retries | |
""" | |
async def async_process(): | |
await wake_up_api() | |
start_time = time.time() | |
if input_type == "Document": | |
text = get_text_from_document(input) | |
elif input_type == "Link": | |
text = get_text_from_link(input) | |
elif input_type == "Text": | |
text = input | |
if not text: | |
raise ValueError("Input text cannot be empty") | |
text_time = time.time() | |
print(f"Text Extracted ({text_time - start_time:.2f}s)") | |
conversation = extract_conversation(text) | |
conversation_time = time.time() | |
print(f"Conversation Extracted ({conversation_time - text_time:.2f}s)") | |
batch_size = 8 | |
tasks = [] | |
for i in range(0, len(conversation["conversation"]), batch_size): | |
batch = conversation["conversation"][i : i + batch_size] | |
batch_tasks = [] | |
for j, message in enumerate(batch, start=i): | |
if message["speaker"] == "Olivia": | |
voice = "af_heart" | |
elif message["speaker"] == "Brian": | |
voice = "am_fenrir" | |
else: | |
voice = "am_fenrir" | |
batch_tasks.append( | |
generate_audio(message["text"], voice, f"output_{j}.mp3") | |
) | |
tasks.extend(await asyncio.gather(*batch_tasks)) | |
audio_time = time.time() | |
print(f"Audio Generated ({audio_time - conversation_time:.2f}s)") | |
audio_files = [ | |
f"output_{index}.mp3" for index in range(len(conversation["conversation"])) | |
] | |
files_time = time.time() | |
print(f"Audio Files Listed ({files_time - audio_time:.2f}s)") | |
merged_audio = merge_audio_files(audio_files) | |
merge_time = time.time() | |
print(f"Merged Audio Generated ({merge_time - files_time:.2f}s)") | |
print(f"Total Time: {merge_time - start_time:.2f}s") | |
return merged_audio | |
return asyncio.run(async_process()) | |
with gr.Blocks(title="Podcast Generator") as demo: | |
gr.Markdown( | |
""" | |
# ποΈ Podcast Generator | |
Generate engaging podcast conversations from documents, links, or text input. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
input_type = gr.Dropdown( | |
choices=["Document", "Link", "Text"], | |
label="Input Type", | |
value="Document", | |
interactive=True, | |
) | |
input_text = gr.Textbox( | |
label="Input", placeholder="Enter Document URL, Link or Text", lines=5 | |
) | |
generate_btn = gr.Button("Generate Podcast π§", variant="primary") | |
with gr.Column(scale=1): | |
output_audio = gr.Audio(label="Generated Podcast") | |
generate_btn.click( | |
fn=generate_podcast, | |
inputs=[input_type, input_text], | |
outputs=output_audio, | |
api_name="generate", | |
) | |
demo.launch(mcp_server=True) | |