pdf2txt_parser_converter / pymupdf_test_WIP.py
ctranslate2-4you's picture
Create pymupdf_test_WIP.py
2431bf9 verified
raw
history blame
18.8 kB
# ── Standard library ────────────────────────────────────────────────────────
import os
import sys
import json
import math
import queue
import shutil
import logging
import tempfile
import threading
import subprocess
import multiprocessing
from pathlib import Path
from multiprocessing import Pool
# ── Third-party ─────────────────────────────────────────────────────────────
import fitz # PyMuPDF
import tkinter as tk
from tkinter import filedialog, messagebox
from joblib import cpu_count, Parallel, delayed
# ── Parser configuration ────────────────────────────────────────────────────
PARALLEL_THRESHOLD = 16 # pages – switch to multiprocessing above this
LINE_TOLERANCE = 1 # pts for snapping nearly-identical rulings
MIN_RECT_AREA = 1e4 # ptsΒ² – ignore tiny rectangles
# ── pdfplumber-style clustering helpers ─────────────────────────────────────
def cluster_list(xs, tol):
"""Return list of clusters (each a list) grouped by ≀ tol apart."""
xs = sorted(xs)
if len(xs) < 2:
return [[x] for x in xs]
groups, grp = [], [xs[0]]
for x in xs[1:]:
if x - grp[-1] <= tol:
grp.append(x)
else:
groups.append(grp)
grp = [x]
groups.append(grp)
return groups
def make_cluster_dict(vals, tol):
"""Map each value to a cluster id (0,1,2,…) using tolerance."""
clusters = cluster_list(sorted(set(vals)), tol)
mapping = {}
for cid, cl in enumerate(clusters):
for v in cl:
mapping[v] = cid
return mapping
# ── Utility funcs ───────────────────────────────────────────────────────────
def clean_cell_text(text):
if not isinstance(text, str):
return ""
text = text.replace("-\n", "").replace("\n", " ")
return " ".join(text.split())
def safe_join(row):
return [clean_cell_text(str(c)) if c is not None else "" for c in row]
def clamp_bbox(bbox, page_rect):
x0, y0, x1, y1 = bbox
x0 = max(page_rect.x0, min(x0, page_rect.x1))
x1 = max(page_rect.x0, min(x1, page_rect.x1))
y0 = max(page_rect.y0, min(y0, page_rect.y1))
y1 = max(page_rect.y0, min(y1, page_rect.y1))
return (x0, y0, x1, y1)
# ── Improved table detection with snapping ─────────────────────────────────
def detect_table_bboxes(page: fitz.Page, tol=LINE_TOLERANCE):
"""
Detect table rectangles by:
1. Collecting very thin horizontal & vertical strokes
2. Snapping their positions with tolerance `tol`
3. Forming a grid from unique row & column positions
4. Returning a list[fitz.Rect] for each cell rectangle
"""
horiz_raw, vert_raw = [], []
for d in page.get_drawings():
if d["type"] != 1: # stroke only
continue
x0, y0, x1, y1 = d["bbox"]
if abs(y1 - y0) < 2: # horizontal line
y_mid = (y0 + y1) / 2
horiz_raw.append((y_mid, x0, x1))
elif abs(x1 - x0) < 2: # vertical line
x_mid = (x0 + x1) / 2
vert_raw.append((x_mid, y0, y1))
if not horiz_raw or not vert_raw:
return []
row_map = make_cluster_dict([y for y, _, _ in horiz_raw], tol)
col_map = make_cluster_dict([x for x, _, _ in vert_raw], tol)
# Average positions per cluster id
rows = {}
for y, x0, x1 in horiz_raw:
cid = row_map[y]
rows.setdefault(cid, []).append(y)
cols = {}
for x, y0, y1 in vert_raw:
cid = col_map[x]
cols.setdefault(cid, []).append(x)
row_pos = sorted(sum(v)/len(v) for v in rows.values())
col_pos = sorted(sum(v)/len(v) for v in cols.values())
rects = []
for r0, r1 in zip(row_pos, row_pos[1:]):
for c0, c1 in zip(col_pos, col_pos[1:]):
rect = fitz.Rect(c0, r0, c1, r1)
if rect.get_area() >= MIN_RECT_AREA:
rects.append(rect)
# Remove duplicates / contained rects
unique = []
for rect in rects:
if not any(u.contains(rect) or rect.contains(u) for u in unique):
unique.append(rect)
return unique
# ── Table extraction (simple text grouping) ────────────────────────────────
def extract_table(page: fitz.Page, table_rect: fitz.Rect):
"""Group words inside `table_rect` into JSON rows [dict]."""
words = [
w for w in page.get_text("words")
if table_rect.x0 <= w[0] <= table_rect.x1
and table_rect.y0 <= w[1] <= table_rect.y1
]
words.sort(key=lambda w: (w[1], w[0])) # sort by y then x
# cluster words by line
lines, cury, cur = [], None, []
for w in words:
if cury is None or abs(w[1] - cury) > 5:
if cur:
lines.append(cur)
cur = [w]
cury = w[1]
else:
cur.append(w)
if cur:
lines.append(cur)
if not lines:
return []
line_texts = [" ".join(w[4] for w in ln) for ln in lines]
headers = safe_join([line_texts[0]])
rows = [safe_join([lt]) for lt in line_texts[1:]]
return [dict(zip(headers, r)) for r in rows]
# ── Per-page worker ────────────────────────────────────────────────────────
def process_page(args):
page_number, pdf_path = args
try:
with fitz.open(pdf_path) as doc:
page = doc.load_page(page_number)
page_rect = page.rect
output = f"Page {page_number + 1}\n"
# Detect tables
table_rects = detect_table_bboxes(page)
table_jsons = []
for rect in table_rects:
tbl = extract_table(page, rect)
if tbl:
table_jsons.append(json.dumps(tbl, indent=1, ensure_ascii=False))
# Words outside tables
tbl_boxes = [clamp_bbox(rect, page_rect) for rect in table_rects]
words = page.get_text("words")
outside = [
w for w in words
if not any(b[0] <= w[0] <= b[2] and b[1] <= w[1] <= b[3] for b in tbl_boxes)
]
outside.sort(key=lambda w: (w[1], w[0]))
cury, cur, text = None, [], ""
for w in outside:
if cury is None or abs(w[1] - cury) > 10:
if cur:
text += " ".join(cur) + "\n"
cur, cury = [w[4]], w[1]
else:
cur.append(w[4])
if cur:
text += " ".join(cur) + "\n"
output += text.strip() + "\n"
for idx, tbl in enumerate(table_jsons, 1):
output += f'"table {idx}":\n{tbl}\n'
return page_number, output
except fitz.FileDataError as e:
return page_number, f"[ERROR] Page {page_number+1} ({pdf_path}): encrypted / unreadable – {e}"
except Exception as e:
return page_number, f"[ERROR] Page {page_number+1} ({pdf_path}): {e}"
# ── Document-level processing ───────────────────────────────────────────────
def process_pdf(pdf_path):
try:
if not os.path.exists(pdf_path):
return f"[ERROR] File not found: {pdf_path}"
print(f"[INFO] Starting processing: {pdf_path}")
try:
with fitz.open(pdf_path) as doc:
num_pages = doc.page_count
except fitz.FileDataError as e:
return f"[ERROR] Cannot open PDF: {pdf_path} – {e}"
except Exception as e:
return f"[ERROR] General error opening PDF: {pdf_path} – {e}"
pages = [(i, pdf_path) for i in range(num_pages)]
results = run_serial(pages) if num_pages <= PARALLEL_THRESHOLD else run_parallel(pages)
results.sort(key=lambda x: x[0])
final_output = "\n".join(t for _, t in results)
base = os.path.splitext(os.path.basename(pdf_path))[0]
out_dir = os.path.dirname(pdf_path)
out_path = os.path.join(out_dir, f"{base}.txt")
with open(out_path, "w", encoding="utf-8", errors="ignore") as f:
f.write(final_output)
print(f"[INFO] Processing complete: {out_path}")
except (EOFError, BrokenPipeError, KeyboardInterrupt):
return "[INFO] Processing interrupted by user."
except Exception as e:
return f"[ERROR] Unexpected error with '{pdf_path}': {e}"
def run_serial(pages): return [process_page(a) for a in pages]
def run_parallel(pages):
cores = min(max(1, cpu_count() - 2), len(pages))
print(f"Starting parallel processing with {cores} cores…")
with Pool(cores) as pool:
return pool.map(process_page, pages)
# ── Batch CLI entrypoint ────────────────────────────────────────────────────
def process_pdfs_main():
pdfs = sys.argv[1:]
if not pdfs:
print("No PDF files provided.")
return
small, large = [], []
for p in pdfs:
if not os.path.exists(p):
print(f"File not found: {p}")
continue
try:
with fitz.open(p) as doc:
(small if doc.page_count <= PARALLEL_THRESHOLD else large).append(p)
except fitz.FileDataError:
print(f"[ERROR] Password-protected PDF skipped: {p}")
except Exception as e:
print(f"[ERROR] Error opening {p}: {e}")
if small:
cores = min(max(1, cpu_count() - 2), len(small))
print(f"\n[Phase 1] Parallel processing of {len(small)} small PDFs with {cores} cores …")
for r in Parallel(n_jobs=cores)(delayed(process_pdf)(p) for p in small):
print(r)
for p in large:
print(f"\n[Phase 2] Processing large PDF: {os.path.basename(p)}")
print(process_pdf(p))
# ── Tkinter GUI ─────────────────────────────────────────────────────────────
class FileManager:
def __init__(self, master):
self.master = master
master.title("Parser-Sevenof9 β€” PyMuPDF")
self.files, self.last_selected = [], None
tk.Label(master, text="Selected PDF files:").pack(pady=5)
list_frame = tk.Frame(master); list_frame.pack(pady=5)
sb_list = tk.Scrollbar(list_frame)
self.listbox = tk.Listbox(list_frame, selectmode=tk.MULTIPLE, width=80, height=6,
yscrollcommand=sb_list.set)
sb_list.config(command=self.listbox.yview)
self.listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); sb_list.pack(side=tk.RIGHT, fill=tk.Y)
self.listbox.bind("<<ListboxSelect>>", self.show_text)
self.listbox.bind("<Button-1>", self.on_click)
self.listbox.bind("<Shift-Button-1>", self.on_shift_click)
self.ctx = tk.Menu(master, tearoff=0)
self.ctx.add_command(label="Remove selected", command=self.remove_file)
self.listbox.bind("<Button-3>", lambda e: self.ctx.tk_popup(e.x_root, e.y_root) if self.listbox.curselection() else None)
btn_frame = tk.Frame(master); btn_frame.pack(pady=10)
tk.Button(btn_frame, text="Add Folder", command=self.add_folder).pack(side=tk.LEFT, padx=5)
tk.Button(btn_frame, text="Select Files", command=self.add_file).pack(side=tk.LEFT, padx=5)
tk.Button(btn_frame, text="Remove Selected",command=self.remove_file).pack(side=tk.LEFT, padx=5)
tk.Button(btn_frame, text="Remove All", command=self.remove_all).pack(side=tk.LEFT, padx=5)
tk.Button(master, text="Stop", command=self.stop_parser).pack(pady=5)
tk.Button(master, text="Start Parser", command=self.start_parser).pack(pady=10)
tx_frame = tk.Frame(master); tx_frame.pack(padx=10, pady=5)
sb_text = tk.Scrollbar(tx_frame)
self.text = tk.Text(tx_frame, height=15, width=100, wrap=tk.WORD, yscrollcommand=sb_text.set)
sb_text.config(command=self.text.yview)
self.text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); sb_text.pack(side=tk.RIGHT, fill=tk.Y)
tk.Label(master, text="Progress:").pack()
prog_frame = tk.Frame(master); prog_frame.pack(padx=10, pady=5)
sb_prog = tk.Scrollbar(prog_frame)
self.prog = tk.Text(prog_frame, height=8, width=100, state=tk.DISABLED, yscrollcommand=sb_prog.set)
sb_prog.config(command=self.prog.yview)
self.prog.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); sb_prog.pack(side=tk.RIGHT, fill=tk.Y)
self.parser_proc = None
# ── Listbox helpers ───────────────────────────────────────────────────
def on_click(self, e):
idx = self.listbox.nearest(e.y)
self.listbox.selection_clear(0, tk.END); self.listbox.selection_set(idx)
self.last_selected = idx; self.show_text(None)
return "break"
def on_shift_click(self, e):
idx = self.listbox.nearest(e.y)
if self.last_selected is None: self.last_selected = idx
lo, hi = sorted((self.last_selected, idx))
self.listbox.selection_clear(0, tk.END)
for i in range(lo, hi+1): self.listbox.selection_set(i)
return "break"
# ── File ops ─────────────────────────────────────────────────────────
def add_folder(self):
folder = filedialog.askdirectory(title="Select Folder")
if not folder: return
for root, _, fs in os.walk(folder):
for f in fs:
if f.lower().endswith(".pdf"):
p = os.path.join(root, f)
if p not in self.files:
self.files.append(p); self.listbox.insert(tk.END, p)
def add_file(self):
for p in filedialog.askopenfilenames(title="Select PDF Files", filetypes=[("PDF Files","*.pdf")]):
if p not in self.files:
self.files.append(p); self.listbox.insert(tk.END, p)
def remove_file(self):
sel = self.listbox.curselection()
if not sel:
messagebox.showwarning("Notice","Please select an entry to remove."); return
for idx in reversed(sel):
self.listbox.delete(idx); del self.files[idx]
self.text.delete(1.0, tk.END)
def remove_all(self):
self.listbox.delete(0, tk.END); self.files.clear(); self.text.delete(1.0, tk.END)
# ── Parser control ───────────────────────────────────────────────────
def start_parser(self):
if not self.files:
messagebox.showinfo("No Files","Please select at least one file."); return
self.prog.config(state=tk.NORMAL); self.prog.delete(1.0, tk.END)
self.prog.insert(tk.END,"Starting parser…\n"); self.prog.config(state=tk.DISABLED)
threading.Thread(target=self.run_parser).start()
def stop_parser(self):
if self.parser_proc and self.parser_proc.poll() is None:
self.parser_proc.terminate(); self.append_prog("Parser process was stopped.\n")
else:
self.append_prog("No active parser process to stop.\n")
def run_parser(self):
try:
self.parser_proc = subprocess.Popen(
[sys.executable, __file__] + self.files,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, encoding="utf-8", errors="ignore", bufsize=4096
)
for line in self.parser_proc.stdout:
self.append_prog(line)
self.parser_proc.stdout.close(); self.parser_proc.wait()
if self.parser_proc.returncode == 0:
self.append_prog("\nParser finished successfully.\n")
self.shell_msg("Parser Done","The parser was executed successfully.")
else:
self.append_prog("\nError while running the parser.\n")
self.shell_msg("Error","Error while running the parser.")
except Exception as e:
self.append_prog(f"Error: {e}\n"); self.shell_msg("Error",f"Execution error:\n{e}")
finally:
self.parser_proc = None
# ── GUI helpers ──────────────────────────────────────────────────────
def append_prog(self, txt):
self.prog.after(0, lambda:self._ins(txt))
def _ins(self, txt):
self.prog.config(state=tk.NORMAL); self.prog.insert(tk.END, txt)
self.prog.see(tk.END); self.prog.config(state=tk.DISABLED)
def shell_msg(self, title, msg):
self.master.after(0, lambda: messagebox.showinfo(title, msg))
def show_text(self, _):
sel = self.listbox.curselection()
if not sel: return
path = self.files[sel[0]]
txt = os.path.splitext(path)[0] + ".txt"
self.text.delete(1.0, tk.END)
if os.path.exists(txt):
try:
with open(txt,"r",encoding="utf-8",errors="ignore") as f:
self.text.insert(tk.END, f.read())
except Exception as e:
self.text.insert(tk.END,f"Error loading text file:\n{e}")
else:
self.text.insert(tk.END,"[No corresponding .txt file found]")
# ── Main guard ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
multiprocessing.freeze_support()
if len(sys.argv) > 1:
process_pdfs_main()
else:
root = tk.Tk(); FileManager(root); root.mainloop()