File size: 7,479 Bytes
0e9d6b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
PDF β†’ Summary β†’ Audio β†’ Talk to PDF β†’ Diagram
All Hugging Face APIs.
"""

import os
import tempfile
import time
from typing import List

import fitz  # PyMuPDF
import requests
import gradio as gr

# ================== Config ==================
CHUNK_CHARS = 20000
HF_SUMMARY_MODEL = "Groq/Llama-3-Groq-8B-Tool-Use"  # text-generation
HF_TTS_MODEL = "espnet/kan-bayashi_ljspeech_vits"
HF_IMAGE_MODEL = "runwayml/stable-diffusion-v1-5"

pdf_text_storage = {"text": "", "processed": False}

# ================== Utils ==================
def extract_text_from_pdf(file_path: str) -> str:
    doc = fitz.open(file_path)
    text = "\n\n".join(page.get_text("text") for page in doc)
    doc.close()
    return text.strip()

def chunk_text(text: str, max_chars: int) -> List[str]:
    if not text:
        return []
    parts, start, L = [], 0, len(text)
    while start < L:
        end = min(start + max_chars, L)
        if end < L:
            back = text.rfind("\n", start, end)
            if back == -1:
                back = text.rfind(" ", start, end)
            if back != -1 and back > start:
                end = back
        parts.append(text[start:end].strip())
        start = end
    return parts

# ================== Hugging Face Summarization ==================
def summarize_chunk_hf(chunk_text: str, hf_token: str) -> str:
    headers = {"Authorization": f"Bearer {hf_token}"}
    payload = {
        "inputs": f"Summarize the following text into a concise paragraph (~180 words max):\n\n{chunk_text}",
        "parameters": {"max_new_tokens": 800, "temperature": 0.2}
    }
    resp = requests.post(f"https://api-inference.huggingface.co/models/{HF_SUMMARY_MODEL}", headers=headers, json=payload, timeout=120)
    resp.raise_for_status()
    output = resp.json()
    if isinstance(output, list) and "generated_text" in output[0]:
        return output[0]["generated_text"]
    return str(output)

def summarize_document(extracted_text: str, hf_token: str) -> str:
    if len(extracted_text) <= CHUNK_CHARS:
        return summarize_chunk_hf(extracted_text, hf_token)
    chunks = chunk_text(extracted_text, CHUNK_CHARS)
    summaries = []
    for ch in chunks:
        try:
            summaries.append(summarize_chunk_hf(ch, hf_token))
        except Exception as e:
            summaries.append(f"(error summarizing chunk: {str(e)})")
    final_prompt = "Combine and refine the following summaries into a single clear summary (200-300 words):\n\n" + " ".join(summaries)
    return summarize_chunk_hf(final_prompt, hf_token)

# ================== Hugging Face TTS ==================
def hf_tts(summary_text: str, hf_token: str, model: str = HF_TTS_MODEL) -> str:
    url = f"https://api-inference.huggingface.co/models/{model}"
    headers = {"Authorization": f"Bearer {hf_token}"}
    payload = {"inputs": summary_text}
    resp = requests.post(url, headers=headers, json=payload, timeout=120)
    resp.raise_for_status()
    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    tmp.write(resp.content)
    tmp.close()
    return tmp.name

# ================== Talk to PDF ==================
def ask_pdf_question(question: str, hf_token: str) -> str:
    if not pdf_text_storage["processed"]:
        return "❌ Please process a PDF first!"
    if not question.strip():
        return "❌ Please enter a question!"
    prompt = f"Here is the PDF content:\n\n{pdf_text_storage['text'][:15000]}\n\nUser Question: {question}\nAnswer strictly based on PDF content."
    headers = {"Authorization": f"Bearer {hf_token}"}
    payload = {"inputs": prompt, "parameters": {"max_new_tokens": 500, "temperature": 0}}
    resp = requests.post(f"https://api-inference.huggingface.co/models/{HF_SUMMARY_MODEL}", headers=headers, json=payload, timeout=120)
    resp.raise_for_status()
    output = resp.json()
    if isinstance(output, list) and "generated_text" in output[0]:
        return f"πŸ€– {output[0]['generated_text'].strip()}"
    return str(output)

# ================== Diagram via HF ==================
def generate_diagram(summary: str, hf_token: str) -> str:
    headers = {"Authorization": f"Bearer {hf_token}"}
    payload = {"inputs": f"detailed diagram, clean illustration of: {summary[:500]}"}
    resp = requests.post(f"https://api-inference.huggingface.co/models/{HF_IMAGE_MODEL}", headers=headers, json=payload, timeout=60)
    if resp.status_code == 200 and len(resp.content) > 1000:
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
        tmp.write(resp.content)
        tmp.close()
        return tmp.name
    # fallback: text placeholder
    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".txt")
    tmp.write(f"Diagram generation failed. Summary: {summary[:200]}...".encode())
    tmp.close()
    return tmp.name

# ================== Main Pipeline ==================
def process_pdf_pipeline(pdf_file, hf_token):
    try:
        if not hf_token.strip():
            return "❌ Missing Hugging Face token!", None, None, "Process a PDF first!"
        if pdf_file is None:
            return "❌ Please upload a PDF!", None, None, "Process a PDF first!"

        pdf_path = pdf_file.name if hasattr(pdf_file, "name") else str(pdf_file)
        text = extract_text_from_pdf(pdf_path)
        if not text.strip():
            return "❌ PDF contains no extractable text!", None, None, "Process a PDF first!"

        pdf_text_storage["text"] = text
        pdf_text_storage["processed"] = True

        summary = summarize_document(text, hf_token)
        audio_path = hf_tts(summary, hf_token)
        diagram_path = generate_diagram(summary, hf_token)

        return summary, audio_path, diagram_path, "βœ… PDF processed!"
    except Exception as e:
        pdf_text_storage["processed"] = False
        return f"❌ Error: {str(e)}", None, None, "Process a PDF first!"

# ================== Gradio UI ==================
def build_ui():
    hf_token_env = os.environ.get("HF_TOKEN", "")

    with gr.Blocks(title="πŸ”₯ PDF AI Pipeline") as demo:
        gr.Markdown("## πŸ”₯ Hugging Face PDF Processor")

        with gr.Row():
            with gr.Column(scale=1):
                pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"])
                hf_token = gr.Textbox(label="HF Token", value=hf_token_env, type="password")
                process_btn = gr.Button("πŸš€ PROCESS PDF")
            with gr.Column(scale=2):
                summary_output = gr.Textbox(label="Summary", lines=12)
                audio_output = gr.Audio(label="Audio", type="filepath")
                diagram_output = gr.Image(label="Diagram", interactive=False)

        gr.Markdown("## πŸ’¬ Chat with PDF")
        question_input = gr.Textbox(label="Your Question")
        ask_btn = gr.Button("πŸ“¨ ASK")
        chat_output = gr.Textbox(label="Response", lines=8)

        process_btn.click(
            fn=process_pdf_pipeline,
            inputs=[pdf_input, hf_token],
            outputs=[summary_output, audio_output, diagram_output, gr.Textbox(label="Status")]
        )

        ask_btn.click(
            fn=ask_pdf_question,
            inputs=[question_input, hf_token],
            outputs=[chat_output]
        )

        question_input.submit(
            fn=ask_pdf_question,
            inputs=[question_input, hf_token],
            outputs=[chat_output]
        )

    return demo

if __name__ == "__main__":
    demo = build_ui()
    demo.launch(share=True, debug=True)