Update pdf_processing.py
Browse files- pdf_processing.py +119 -18
pdf_processing.py
CHANGED
|
@@ -5,6 +5,7 @@ import os
|
|
| 5 |
import tempfile
|
| 6 |
import traceback
|
| 7 |
from typing import Tuple, Optional, List, Dict, Any
|
|
|
|
| 8 |
|
| 9 |
def convert_rect_to_dict(rect: fitz.Rect) -> Optional[Dict[str, float]]:
|
| 10 |
"""Converts a fitz.Rect object to a dictionary."""
|
|
@@ -46,37 +47,137 @@ def try_map_issues_to_page_rects(
|
|
| 46 |
print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}")
|
| 47 |
return mapped_count
|
| 48 |
|
|
|
|
| 49 |
def extract_pdf_text(file_input: Any) -> str:
|
| 50 |
-
"""
|
| 51 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 52 |
actual_path_to_process = None
|
|
|
|
|
|
|
|
|
|
| 53 |
try:
|
|
|
|
| 54 |
if isinstance(file_input, str):
|
| 55 |
actual_path_to_process = file_input
|
| 56 |
elif hasattr(file_input, 'read') and callable(file_input.read):
|
| 57 |
-
|
| 58 |
-
|
| 59 |
-
|
| 60 |
-
|
| 61 |
-
|
| 62 |
-
actual_path_to_process = temp_file_path_for_pymupdf4llm
|
| 63 |
else:
|
| 64 |
raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.")
|
| 65 |
|
| 66 |
-
|
| 67 |
-
|
| 68 |
-
|
| 69 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 70 |
|
| 71 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 72 |
|
| 73 |
-
print(f"Total
|
| 74 |
return markdown_text
|
| 75 |
-
|
| 76 |
except Exception as e:
|
| 77 |
-
print(f"Error
|
| 78 |
traceback.print_exc()
|
| 79 |
return ""
|
| 80 |
finally:
|
| 81 |
-
if
|
| 82 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
import tempfile
|
| 6 |
import traceback
|
| 7 |
from typing import Tuple, Optional, List, Dict, Any
|
| 8 |
+
from collections import Counter
|
| 9 |
|
| 10 |
def convert_rect_to_dict(rect: fitz.Rect) -> Optional[Dict[str, float]]:
|
| 11 |
"""Converts a fitz.Rect object to a dictionary."""
|
|
|
|
| 47 |
print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}")
|
| 48 |
return mapped_count
|
| 49 |
|
| 50 |
+
# The function is modified as requested.
|
| 51 |
def extract_pdf_text(file_input: Any) -> str:
|
| 52 |
+
"""
|
| 53 |
+
Extracts text from a PDF, filters it to include only the majority font,
|
| 54 |
+
and then converts this filtered text to Markdown using PyMuPDF4LLM.
|
| 55 |
+
The "majority font" is defined by the combination of font name and
|
| 56 |
+
(rounded) font size that accounts for the most characters in the document.
|
| 57 |
+
"""
|
| 58 |
+
input_temp_file_path = None # For when file_input is a stream
|
| 59 |
actual_path_to_process = None
|
| 60 |
+
original_doc = None
|
| 61 |
+
new_doc = None # The new document we will build
|
| 62 |
+
|
| 63 |
try:
|
| 64 |
+
# 1. Handle Input to get actual_path_to_process
|
| 65 |
if isinstance(file_input, str):
|
| 66 |
actual_path_to_process = file_input
|
| 67 |
elif hasattr(file_input, 'read') and callable(file_input.read):
|
| 68 |
+
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file_obj:
|
| 69 |
+
input_temp_file_path = temp_file_obj.name
|
| 70 |
+
file_input.seek(0) # Ensure reading from the beginning of the stream
|
| 71 |
+
temp_file_obj.write(file_input.read())
|
| 72 |
+
actual_path_to_process = input_temp_file_path
|
|
|
|
| 73 |
else:
|
| 74 |
raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.")
|
| 75 |
|
| 76 |
+
original_doc = fitz.open(actual_path_to_process)
|
| 77 |
+
if not original_doc.page_count:
|
| 78 |
+
print("PDF has no pages.")
|
| 79 |
+
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
| 80 |
+
os.remove(input_temp_file_path) # clean up if we exit early
|
| 81 |
+
return ""
|
| 82 |
+
|
| 83 |
+
# 2. Collect Font Data & Text from all Spans
|
| 84 |
+
all_spans_details: List[Dict[str, Any]] = [] # Explicitly type for clarity
|
| 85 |
+
font_char_counts: Counter = Counter()
|
| 86 |
+
|
| 87 |
+
print(f"Original PDF ('{os.path.basename(actual_path_to_process if isinstance(actual_path_to_process, str) else 'stream')}') has {original_doc.page_count} pages. Analyzing fonts...")
|
| 88 |
+
for page_num in range(original_doc.page_count):
|
| 89 |
+
page = original_doc[page_num]
|
| 90 |
+
text_dict = page.get_text("dict")
|
| 91 |
+
for block in text_dict.get("blocks", []):
|
| 92 |
+
if block.get("type") == 0: # Process only text blocks (type 0)
|
| 93 |
+
for line in block.get("lines", []):
|
| 94 |
+
for span in line.get("spans", []):
|
| 95 |
+
font_name = span["font"]
|
| 96 |
+
font_size_rounded = int(round(span["size"]))
|
| 97 |
+
text = span["text"]
|
| 98 |
+
|
| 99 |
+
span_detail = {
|
| 100 |
+
"text": text,
|
| 101 |
+
"font_name": font_name,
|
| 102 |
+
"font_size_rounded": font_size_rounded,
|
| 103 |
+
"original_font_size": span["size"],
|
| 104 |
+
"bbox": span["bbox"],
|
| 105 |
+
"page_num": page_num
|
| 106 |
+
}
|
| 107 |
+
all_spans_details.append(span_detail)
|
| 108 |
+
font_char_counts[(font_name, font_size_rounded)] += len(text)
|
| 109 |
+
|
| 110 |
+
if not font_char_counts:
|
| 111 |
+
print("No text with font information found in PDF.")
|
| 112 |
+
# Cleanup and return if no text info
|
| 113 |
+
if original_doc: original_doc.close()
|
| 114 |
+
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
| 115 |
+
os.remove(input_temp_file_path)
|
| 116 |
+
return ""
|
| 117 |
+
|
| 118 |
+
# 3. Determine Majority Font
|
| 119 |
+
majority_font_tuple_info = font_char_counts.most_common(1)[0]
|
| 120 |
+
(majority_font_name, majority_font_size_rounded) = majority_font_tuple_info[0]
|
| 121 |
+
char_count = majority_font_tuple_info[1]
|
| 122 |
+
print(f"Majority font combination: Name='{majority_font_name}', RoundedSize={majority_font_size_rounded}pt (with {char_count} characters).")
|
| 123 |
+
|
| 124 |
+
# 4. Create a New PDF Document with Only the Majority Font Text
|
| 125 |
+
new_doc = fitz.Document()
|
| 126 |
+
print("Constructing new PDF with text from majority font only...")
|
| 127 |
+
|
| 128 |
+
for p_num in range(original_doc.page_count):
|
| 129 |
+
original_page_for_dim = original_doc[p_num]
|
| 130 |
+
new_pdf_page = new_doc.new_page(width=original_page_for_dim.rect.width,
|
| 131 |
+
height=original_page_for_dim.rect.height)
|
| 132 |
+
|
| 133 |
+
spans_to_write = [
|
| 134 |
+
s_detail for s_detail in all_spans_details
|
| 135 |
+
if s_detail["page_num"] == p_num and \
|
| 136 |
+
s_detail["font_name"] == majority_font_name and \
|
| 137 |
+
s_detail["font_size_rounded"] == majority_font_size_rounded
|
| 138 |
+
]
|
| 139 |
+
|
| 140 |
+
for span_data in spans_to_write:
|
| 141 |
+
text_to_insert = span_data["text"]
|
| 142 |
+
original_bbox = fitz.Rect(span_data["bbox"])
|
| 143 |
+
font_size_for_render = span_data["original_font_size"]
|
| 144 |
+
|
| 145 |
+
insertion_result = new_pdf_page.insert_textbox(
|
| 146 |
+
original_bbox,
|
| 147 |
+
text_to_insert,
|
| 148 |
+
fontsize=font_size_for_render,
|
| 149 |
+
fontname="helv", # Using Helvetica for simplicity
|
| 150 |
+
align=0
|
| 151 |
+
)
|
| 152 |
+
if insertion_result < 0:
|
| 153 |
+
print(f"Warning: Textbox insertion for '{text_to_insert[:30].replace(chr(10), ' ')}...' in rect {original_bbox} on new page {p_num} might have issues (code: {insertion_result}).")
|
| 154 |
|
| 155 |
+
print(f"New PDF constructed with {new_doc.page_count} pages.")
|
| 156 |
+
|
| 157 |
+
# 5. Convert the In-Memory Filtered PDF Document to Markdown
|
| 158 |
+
if new_doc.page_count > 0:
|
| 159 |
+
print(f"Converting filtered PDF Document object to Markdown using pymupdf4llm...")
|
| 160 |
+
markdown_text = pymupdf4llm.to_markdown(new_doc)
|
| 161 |
+
else:
|
| 162 |
+
print("The new PDF document (filtered) is empty. No markdown will be generated.")
|
| 163 |
+
markdown_text = ""
|
| 164 |
|
| 165 |
+
print(f"Total Markdown text length from filtered PDF: {len(markdown_text)} characters.")
|
| 166 |
return markdown_text
|
| 167 |
+
|
| 168 |
except Exception as e:
|
| 169 |
+
print(f"Error in extract_pdf_text: {str(e)}")
|
| 170 |
traceback.print_exc()
|
| 171 |
return ""
|
| 172 |
finally:
|
| 173 |
+
if original_doc:
|
| 174 |
+
original_doc.close()
|
| 175 |
+
if new_doc:
|
| 176 |
+
new_doc.close()
|
| 177 |
+
|
| 178 |
+
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
| 179 |
+
try:
|
| 180 |
+
os.remove(input_temp_file_path)
|
| 181 |
+
print(f"Cleaned up temporary input file: {input_temp_file_path}")
|
| 182 |
+
except Exception as e_clean:
|
| 183 |
+
print(f"Error cleaning up temporary input file {input_temp_file_path}: {e_clean}")
|