Update pdf_processing.py
Browse files- pdf_processing.py +54 -99
pdf_processing.py
CHANGED
|
@@ -2,182 +2,137 @@
|
|
| 2 |
import fitz # PyMuPDF
|
| 3 |
import pymupdf4llm
|
| 4 |
import os
|
| 5 |
-
import tempfile
|
| 6 |
import traceback
|
| 7 |
-
from typing import
|
| 8 |
from collections import Counter
|
| 9 |
|
| 10 |
-
def convert_rect_to_dict(rect: fitz.Rect) ->
|
| 11 |
"""Converts a fitz.Rect object to a dictionary."""
|
| 12 |
if not rect or not isinstance(rect, fitz.Rect):
|
| 13 |
-
print(f"Warning: Invalid rect object received: {rect}")
|
| 14 |
return None
|
| 15 |
return {
|
| 16 |
-
"x0": rect.x0,
|
| 17 |
-
"
|
| 18 |
-
"x1": rect.x1,
|
| 19 |
-
"y1": rect.y1,
|
| 20 |
-
"width": rect.width,
|
| 21 |
-
"height": rect.height
|
| 22 |
}
|
| 23 |
|
| 24 |
def try_map_issues_to_page_rects(
|
| 25 |
issues_to_map_for_context: List[Dict[str, Any]],
|
| 26 |
pdf_rects: List[fitz.Rect],
|
| 27 |
-
page_number_for_mapping: int
|
| 28 |
) -> int:
|
| 29 |
-
"""Helper function for mapping LT issues to PDF rectangles."""
|
| 30 |
mapped_count = 0
|
| 31 |
-
|
| 32 |
-
num_available_rects = len(pdf_rects)
|
| 33 |
-
limit = min(num_issues_to_try, num_available_rects)
|
| 34 |
-
|
| 35 |
for i in range(limit):
|
| 36 |
issue_to_update = issues_to_map_for_context[i]
|
| 37 |
-
if issue_to_update['is_mapped_to_pdf']:
|
| 38 |
-
continue
|
| 39 |
pdf_rect = pdf_rects[i]
|
| 40 |
coord_dict = convert_rect_to_dict(pdf_rect)
|
| 41 |
if coord_dict:
|
| 42 |
-
issue_to_update['pdf_coordinates_list'] = [coord_dict]
|
| 43 |
issue_to_update['is_mapped_to_pdf'] = True
|
| 44 |
issue_to_update['mapped_page_number'] = page_number_for_mapping
|
| 45 |
mapped_count += 1
|
| 46 |
-
else:
|
| 47 |
-
print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}")
|
| 48 |
return mapped_count
|
| 49 |
|
| 50 |
-
|
| 51 |
-
def extract_pdf_text(file_input: Any) -> str:
|
| 52 |
"""
|
| 53 |
-
Extracts text from
|
| 54 |
-
|
| 55 |
-
|
| 56 |
-
(rounded) font size that accounts for the most characters in the document.
|
| 57 |
"""
|
| 58 |
-
input_temp_file_path = None # For when file_input is a stream
|
| 59 |
-
actual_path_to_process = None
|
| 60 |
original_doc = None
|
| 61 |
-
new_doc = None
|
| 62 |
-
|
| 63 |
try:
|
| 64 |
-
|
| 65 |
-
if isinstance(file_input, str):
|
| 66 |
-
actual_path_to_process = file_input
|
| 67 |
-
elif hasattr(file_input, 'read') and callable(file_input.read):
|
| 68 |
-
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file_obj:
|
| 69 |
-
input_temp_file_path = temp_file_obj.name
|
| 70 |
-
file_input.seek(0) # Ensure reading from the beginning of the stream
|
| 71 |
-
temp_file_obj.write(file_input.read())
|
| 72 |
-
actual_path_to_process = input_temp_file_path
|
| 73 |
-
else:
|
| 74 |
-
raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.")
|
| 75 |
-
|
| 76 |
-
original_doc = fitz.open(actual_path_to_process)
|
| 77 |
if not original_doc.page_count:
|
| 78 |
-
print("PDF has no pages.")
|
| 79 |
-
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
| 80 |
-
os.remove(input_temp_file_path) # clean up if we exit early
|
| 81 |
return ""
|
| 82 |
|
| 83 |
-
|
| 84 |
-
all_spans_details: List[Dict[str, Any]] = [] # Explicitly type for clarity
|
| 85 |
font_char_counts: Counter = Counter()
|
| 86 |
|
| 87 |
-
|
|
|
|
| 88 |
for page_num in range(original_doc.page_count):
|
| 89 |
page = original_doc[page_num]
|
| 90 |
text_dict = page.get_text("dict")
|
| 91 |
for block in text_dict.get("blocks", []):
|
| 92 |
-
if block.get("type") == 0:
|
| 93 |
for line in block.get("lines", []):
|
| 94 |
for span in line.get("spans", []):
|
| 95 |
font_name = span["font"]
|
| 96 |
font_size_rounded = int(round(span["size"]))
|
| 97 |
text = span["text"]
|
| 98 |
-
|
| 99 |
span_detail = {
|
| 100 |
-
"text": text,
|
| 101 |
-
"font_name": font_name,
|
| 102 |
"font_size_rounded": font_size_rounded,
|
| 103 |
"original_font_size": span["size"],
|
| 104 |
-
"bbox": span["bbox"],
|
| 105 |
-
"page_num": page_num
|
| 106 |
}
|
| 107 |
all_spans_details.append(span_detail)
|
| 108 |
font_char_counts[(font_name, font_size_rounded)] += len(text)
|
| 109 |
-
|
| 110 |
if not font_char_counts:
|
| 111 |
-
print("No text with font information found in PDF.")
|
| 112 |
-
# Cleanup and return if no text info
|
| 113 |
-
if original_doc: original_doc.close()
|
| 114 |
-
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
| 115 |
-
os.remove(input_temp_file_path)
|
| 116 |
return ""
|
| 117 |
|
| 118 |
-
# 3. Determine Majority Font
|
| 119 |
majority_font_tuple_info = font_char_counts.most_common(1)[0]
|
| 120 |
(majority_font_name, majority_font_size_rounded) = majority_font_tuple_info[0]
|
| 121 |
char_count = majority_font_tuple_info[1]
|
| 122 |
-
print(f"Majority font
|
| 123 |
|
| 124 |
-
# 4. Create a New PDF Document with Only the Majority Font Text
|
| 125 |
new_doc = fitz.Document()
|
| 126 |
-
print("Constructing new PDF with
|
| 127 |
-
|
| 128 |
for p_num in range(original_doc.page_count):
|
| 129 |
original_page_for_dim = original_doc[p_num]
|
| 130 |
new_pdf_page = new_doc.new_page(width=original_page_for_dim.rect.width,
|
| 131 |
height=original_page_for_dim.rect.height)
|
| 132 |
-
|
| 133 |
spans_to_write = [
|
| 134 |
s_detail for s_detail in all_spans_details
|
| 135 |
if s_detail["page_num"] == p_num and \
|
| 136 |
s_detail["font_name"] == majority_font_name and \
|
| 137 |
s_detail["font_size_rounded"] == majority_font_size_rounded
|
| 138 |
]
|
| 139 |
-
|
| 140 |
for span_data in spans_to_write:
|
| 141 |
text_to_insert = span_data["text"]
|
| 142 |
original_bbox = fitz.Rect(span_data["bbox"])
|
| 143 |
font_size_for_render = span_data["original_font_size"]
|
| 144 |
-
|
| 145 |
-
|
| 146 |
-
|
| 147 |
-
|
| 148 |
-
fontsize=font_size_for_render,
|
| 149 |
-
fontname="helv", # Using Helvetica for simplicity
|
| 150 |
-
align=0
|
| 151 |
-
)
|
| 152 |
-
if insertion_result < 0:
|
| 153 |
-
print(f"Warning: Textbox insertion for '{text_to_insert[:30].replace(chr(10), ' ')}...' in rect {original_bbox} on new page {p_num} might have issues (code: {insertion_result}).")
|
| 154 |
|
| 155 |
-
print(f"New PDF constructed with {new_doc.page_count} pages.")
|
| 156 |
-
|
| 157 |
-
# 5. Convert the In-Memory Filtered PDF Document to Markdown
|
| 158 |
if new_doc.page_count > 0:
|
| 159 |
-
print(f"Converting filtered PDF Document object to Markdown
|
| 160 |
markdown_text = pymupdf4llm.to_markdown(new_doc)
|
| 161 |
else:
|
| 162 |
-
print("The new PDF
|
| 163 |
-
markdown_text = ""
|
| 164 |
|
| 165 |
-
print(f"
|
| 166 |
return markdown_text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 167 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 168 |
except Exception as e:
|
| 169 |
-
print(f"Error
|
| 170 |
-
traceback.print_exc()
|
| 171 |
return ""
|
| 172 |
finally:
|
| 173 |
-
if
|
| 174 |
-
original_doc.close()
|
| 175 |
-
if new_doc:
|
| 176 |
-
new_doc.close()
|
| 177 |
-
|
| 178 |
-
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
| 179 |
-
try:
|
| 180 |
-
os.remove(input_temp_file_path)
|
| 181 |
-
print(f"Cleaned up temporary input file: {input_temp_file_path}")
|
| 182 |
-
except Exception as e_clean:
|
| 183 |
-
print(f"Error cleaning up temporary input file {input_temp_file_path}: {e_clean}")
|
|
|
|
| 2 |
import fitz # PyMuPDF
|
| 3 |
import pymupdf4llm
|
| 4 |
import os
|
|
|
|
| 5 |
import traceback
|
| 6 |
+
from typing import Any, Dict, List # Use standard List, Dict
|
| 7 |
from collections import Counter
|
| 8 |
|
| 9 |
+
def convert_rect_to_dict(rect: fitz.Rect) -> Dict[str, float] | None:
|
| 10 |
"""Converts a fitz.Rect object to a dictionary."""
|
| 11 |
if not rect or not isinstance(rect, fitz.Rect):
|
| 12 |
+
# print(f"Warning: Invalid rect object received: {rect}") # Can be verbose
|
| 13 |
return None
|
| 14 |
return {
|
| 15 |
+
"x0": rect.x0, "y0": rect.y0, "x1": rect.x1, "y1": rect.y1,
|
| 16 |
+
"width": rect.width, "height": rect.height
|
|
|
|
|
|
|
|
|
|
|
|
|
| 17 |
}
|
| 18 |
|
| 19 |
def try_map_issues_to_page_rects(
|
| 20 |
issues_to_map_for_context: List[Dict[str, Any]],
|
| 21 |
pdf_rects: List[fitz.Rect],
|
| 22 |
+
page_number_for_mapping: int
|
| 23 |
) -> int:
|
|
|
|
| 24 |
mapped_count = 0
|
| 25 |
+
limit = min(len(issues_to_map_for_context), len(pdf_rects))
|
|
|
|
|
|
|
|
|
|
| 26 |
for i in range(limit):
|
| 27 |
issue_to_update = issues_to_map_for_context[i]
|
| 28 |
+
if issue_to_update['is_mapped_to_pdf']: continue
|
|
|
|
| 29 |
pdf_rect = pdf_rects[i]
|
| 30 |
coord_dict = convert_rect_to_dict(pdf_rect)
|
| 31 |
if coord_dict:
|
| 32 |
+
issue_to_update['pdf_coordinates_list'] = [coord_dict]
|
| 33 |
issue_to_update['is_mapped_to_pdf'] = True
|
| 34 |
issue_to_update['mapped_page_number'] = page_number_for_mapping
|
| 35 |
mapped_count += 1
|
|
|
|
|
|
|
| 36 |
return mapped_count
|
| 37 |
|
| 38 |
+
def extract_font_filtered_markdown(pdf_path: str) -> str:
|
|
|
|
| 39 |
"""
|
| 40 |
+
Extracts text from PDF at pdf_path, filters by majority font,
|
| 41 |
+
builds a new PDF in memory, and converts it to Markdown using PyMuPDF4LLM.
|
| 42 |
+
Expects pdf_path to be a valid path to a PDF file.
|
|
|
|
| 43 |
"""
|
|
|
|
|
|
|
| 44 |
original_doc = None
|
| 45 |
+
new_doc = None
|
|
|
|
| 46 |
try:
|
| 47 |
+
original_doc = fitz.open(pdf_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 48 |
if not original_doc.page_count:
|
| 49 |
+
print("FontFilter: PDF has no pages.")
|
|
|
|
|
|
|
| 50 |
return ""
|
| 51 |
|
| 52 |
+
all_spans_details: List[Dict[str, Any]] = []
|
|
|
|
| 53 |
font_char_counts: Counter = Counter()
|
| 54 |
|
| 55 |
+
pdf_basename = os.path.basename(pdf_path)
|
| 56 |
+
print(f"FontFilter: Analyzing fonts in '{pdf_basename}' ({original_doc.page_count} pages)...")
|
| 57 |
for page_num in range(original_doc.page_count):
|
| 58 |
page = original_doc[page_num]
|
| 59 |
text_dict = page.get_text("dict")
|
| 60 |
for block in text_dict.get("blocks", []):
|
| 61 |
+
if block.get("type") == 0:
|
| 62 |
for line in block.get("lines", []):
|
| 63 |
for span in line.get("spans", []):
|
| 64 |
font_name = span["font"]
|
| 65 |
font_size_rounded = int(round(span["size"]))
|
| 66 |
text = span["text"]
|
|
|
|
| 67 |
span_detail = {
|
| 68 |
+
"text": text, "font_name": font_name,
|
|
|
|
| 69 |
"font_size_rounded": font_size_rounded,
|
| 70 |
"original_font_size": span["size"],
|
| 71 |
+
"bbox": span["bbox"], "page_num": page_num
|
|
|
|
| 72 |
}
|
| 73 |
all_spans_details.append(span_detail)
|
| 74 |
font_char_counts[(font_name, font_size_rounded)] += len(text)
|
| 75 |
+
|
| 76 |
if not font_char_counts:
|
| 77 |
+
print("FontFilter: No text with font information found in PDF.")
|
|
|
|
|
|
|
|
|
|
|
|
|
| 78 |
return ""
|
| 79 |
|
|
|
|
| 80 |
majority_font_tuple_info = font_char_counts.most_common(1)[0]
|
| 81 |
(majority_font_name, majority_font_size_rounded) = majority_font_tuple_info[0]
|
| 82 |
char_count = majority_font_tuple_info[1]
|
| 83 |
+
print(f"FontFilter: Majority font: Name='{majority_font_name}', RoundedSize={majority_font_size_rounded}pt ({char_count} chars).")
|
| 84 |
|
|
|
|
| 85 |
new_doc = fitz.Document()
|
| 86 |
+
# print("FontFilter: Constructing new PDF with majority font text...") # Can be verbose
|
|
|
|
| 87 |
for p_num in range(original_doc.page_count):
|
| 88 |
original_page_for_dim = original_doc[p_num]
|
| 89 |
new_pdf_page = new_doc.new_page(width=original_page_for_dim.rect.width,
|
| 90 |
height=original_page_for_dim.rect.height)
|
|
|
|
| 91 |
spans_to_write = [
|
| 92 |
s_detail for s_detail in all_spans_details
|
| 93 |
if s_detail["page_num"] == p_num and \
|
| 94 |
s_detail["font_name"] == majority_font_name and \
|
| 95 |
s_detail["font_size_rounded"] == majority_font_size_rounded
|
| 96 |
]
|
|
|
|
| 97 |
for span_data in spans_to_write:
|
| 98 |
text_to_insert = span_data["text"]
|
| 99 |
original_bbox = fitz.Rect(span_data["bbox"])
|
| 100 |
font_size_for_render = span_data["original_font_size"]
|
| 101 |
+
new_pdf_page.insert_textbox(
|
| 102 |
+
original_bbox, text_to_insert, fontsize=font_size_for_render,
|
| 103 |
+
fontname="helv", align=0
|
| 104 |
+
) # Ignoring insertion_result for brevity here
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 105 |
|
| 106 |
+
# print(f"FontFilter: New PDF constructed with {new_doc.page_count} pages.")
|
| 107 |
+
markdown_text = ""
|
|
|
|
| 108 |
if new_doc.page_count > 0:
|
| 109 |
+
# print(f"FontFilter: Converting filtered PDF Document object to Markdown...") # Verbose
|
| 110 |
markdown_text = pymupdf4llm.to_markdown(new_doc)
|
| 111 |
else:
|
| 112 |
+
print("FontFilter: The new PDF (filtered) is empty. No markdown generated.")
|
|
|
|
| 113 |
|
| 114 |
+
# print(f"FontFilter: Markdown from filtered PDF length: {len(markdown_text)} chars.")
|
| 115 |
return markdown_text
|
| 116 |
+
except Exception as e:
|
| 117 |
+
print(f"Error in extract_font_filtered_markdown for '{pdf_path}': {e}\n{traceback.format_exc()}")
|
| 118 |
+
return ""
|
| 119 |
+
finally:
|
| 120 |
+
if original_doc: original_doc.close()
|
| 121 |
+
if new_doc: new_doc.close()
|
| 122 |
|
| 123 |
+
def extract_plain_text_from_original_pdf(pdf_path: str) -> str:
|
| 124 |
+
"""
|
| 125 |
+
Extracts raw plain text from the PDF at pdf_path without any filtering.
|
| 126 |
+
Expects pdf_path to be a valid path to a PDF file.
|
| 127 |
+
"""
|
| 128 |
+
doc_orig_text = None
|
| 129 |
+
try:
|
| 130 |
+
doc_orig_text = fitz.open(pdf_path)
|
| 131 |
+
full_text_parts = [page.get_text("text") for page in doc_orig_text]
|
| 132 |
+
# print(f"OriginalTextExtract: Extracted {len(doc_orig_text.page_count)} pages of plain text from '{os.path.basename(pdf_path)}'.")
|
| 133 |
+
return "".join(full_text_parts)
|
| 134 |
except Exception as e:
|
| 135 |
+
print(f"Error extracting plain text from original PDF '{pdf_path}': {e}\n{traceback.format_exc()}")
|
|
|
|
| 136 |
return ""
|
| 137 |
finally:
|
| 138 |
+
if doc_orig_text: doc_orig_text.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|