|
|
import os |
|
|
import requests |
|
|
import PyPDF2 |
|
|
from tqdm import tqdm |
|
|
import io |
|
|
|
|
|
|
|
|
API_URL = "https://api-inference.huggingface.co/models/meta-llama/Llama-3.2-1B-Instruct" |
|
|
headers = {"Authorization": f"Bearer {os.environ['HUGGING_FACE_API_KEY']}"} |
|
|
|
|
|
def query(payload): |
|
|
"""Send request to Hugging Face API""" |
|
|
response = requests.post(API_URL, headers=headers, json=payload) |
|
|
return response.json() |
|
|
|
|
|
def extract_text_from_pdf(pdf_content: bytes, max_pages: int = 200) -> str: |
|
|
"""Extract text from first n pages of PDF content""" |
|
|
try: |
|
|
|
|
|
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content)) |
|
|
|
|
|
|
|
|
num_pages = len(pdf_reader.pages) |
|
|
print(f"PDF has {num_pages} total pages. Will process first {max_pages} pages...") |
|
|
|
|
|
extracted_text = [] |
|
|
|
|
|
|
|
|
for page_num in range(min(num_pages, max_pages)): |
|
|
page = pdf_reader.pages[page_num] |
|
|
text = page.extract_text() |
|
|
extracted_text.append(text) |
|
|
print(f"Processed page {page_num + 1}") |
|
|
|
|
|
final_text = '\n'.join(extracted_text) |
|
|
print(f"\nExtraction complete! Total characters: {len(final_text)}") |
|
|
return final_text |
|
|
|
|
|
except Exception as e: |
|
|
print(f"An error occurred: {str(e)}") |
|
|
return None |
|
|
|
|
|
def create_screenplay_chunks(text: str, chunk_size: int = 1000) -> list: |
|
|
"""Split screenplay into chunks at natural break points""" |
|
|
|
|
|
lines = text.split('\n') |
|
|
chunks = [] |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
|
|
|
for line in lines: |
|
|
line_length = len(line) + 1 |
|
|
|
|
|
|
|
|
if current_length + line_length > chunk_size and current_chunk: |
|
|
chunks.append('\n'.join(current_chunk)) |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
|
|
|
|
|
|
if line.strip().upper() == line.strip() and ('INT.' in line or 'EXT.' in line): |
|
|
if current_chunk: |
|
|
chunks.append('\n'.join(current_chunk)) |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
|
|
|
current_chunk.append(line) |
|
|
current_length += line_length |
|
|
|
|
|
|
|
|
if current_chunk: |
|
|
chunks.append('\n'.join(current_chunk)) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def process_screenplay_chunk(chunk: str, chunk_num: int) -> str: |
|
|
"""Process a single chunk of screenplay text""" |
|
|
prompt = f"""<s>[INST]Clean this screenplay text, maintaining exact content and format: |
|
|
|
|
|
{chunk}[/INST]""" |
|
|
|
|
|
try: |
|
|
output = query({ |
|
|
"inputs": prompt, |
|
|
"parameters": { |
|
|
"max_new_tokens": 1000, |
|
|
"temperature": 0.01, |
|
|
"top_p": 0.95 |
|
|
} |
|
|
}) |
|
|
|
|
|
if isinstance(output, list) and len(output) > 0: |
|
|
cleaned_text = output[0].get('generated_text', '') |
|
|
if '[/INST]' in cleaned_text: |
|
|
cleaned_text = cleaned_text.split('[/INST]')[1].strip() |
|
|
|
|
|
if len(cleaned_text) > 0: |
|
|
print(f"Successfully processed chunk {chunk_num + 1}") |
|
|
return cleaned_text |
|
|
print(f"Warning: Using original for chunk {chunk_num + 1}") |
|
|
return chunk |
|
|
else: |
|
|
print(f"Warning: Using original for chunk {chunk_num + 1}") |
|
|
return chunk |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing chunk {chunk_num + 1}: {str(e)}") |
|
|
return chunk |