import os import base64 import json import requests from typing import Dict, List, Any, Optional import fitz # PyMuPDF from PIL import Image import io import re from dataclasses import dataclass, asdict from pathlib import Path from datetime import datetime @dataclass class TextBlock: text: str x: float y: float width: float height: float font_size: float font_name: str is_bold: bool = False is_italic: bool = False block_id: str = "" def to_dict(self) -> Dict[str, Any]: """Convert TextBlock to dictionary""" return asdict(self) @dataclass class ImageData: index: int base64_data: str bbox: tuple width: float height: float format: str = "PNG" def to_dict(self) -> Dict[str, Any]: """Convert ImageData to dictionary""" return asdict(self) @dataclass class TableData: bbox: tuple data: List[List[str]] rows: int columns: int def to_dict(self) -> Dict[str, Any]: """Convert TableData to dictionary""" return asdict(self) @dataclass class PageData: page_number: int text_blocks: List[TextBlock] images: List[ImageData] tables: List[TableData] page_width: float page_height: float word_count: int = 0 character_count: int = 0 def to_dict(self) -> Dict[str, Any]: """Convert PageData to dictionary""" return { "page_number": self.page_number, "text_blocks": [block.to_dict() for block in self.text_blocks], "images": [img.to_dict() for img in self.images], "tables": [table.to_dict() for table in self.tables], "page_width": self.page_width, "page_height": self.page_height, "word_count": self.word_count, "character_count": self.character_count } class PDFToJSONConverter: def __init__(self, huggingface_token: str = None): self.hf_token = huggingface_token self.hf_headers = { "Authorization": f"Bearer {huggingface_token}" if huggingface_token else None } self.models = { "document_layout": "microsoft/layoutlm-base-uncased", "table_detection": "microsoft/table-transformer-detection", "ocr": "microsoft/trocr-base-printed", "math_detection": "facebook/detr-resnet-50" } self.hf_inference_url = "https://api-inference.huggingface.co/models" def pdf_to_base64(self, pdf_path: str) -> str: """Convert PDF file to base64 string""" try: with open(pdf_path, "rb") as pdf_file: return base64.b64encode(pdf_file.read()).decode('utf-8') except Exception as e: raise Exception(f"Error converting PDF to base64: {str(e)}") def extract_pdf_content(self, pdf_path: str) -> Dict[str, Any]: """Extract all content from PDF and return structured data""" doc = None try: if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF file not found: {pdf_path}") doc = fitz.open(pdf_path) if doc is None: raise RuntimeError("Failed to open PDF document") if doc.page_count == 0: raise ValueError("PDF document has no pages") print(f"šŸ“„ PDF opened successfully: {doc.page_count} pages") pages_data = [] document_stats = { "total_pages": doc.page_count, "total_words": 0, "total_characters": 0, "total_images": 0, "total_tables": 0 } for page_num in range(doc.page_count): try: page = doc[page_num] print(f"šŸ”„ Processing page {page_num + 1}/{doc.page_count}") # Extract text blocks text_blocks = [] try: page_dict = page.get_text("dict") text_blocks = self._extract_text_blocks_from_dict(page_dict, page_num) except Exception as e: print(f"āš ļø Dict method failed for page {page_num + 1}, falling back to simple text extraction: {e}") text_blocks = self._extract_text_blocks_simple(page, page_num) # Extract images images = self._extract_images_safely(page, doc, page_num) # Extract tables tables = self._detect_tables_safely(page) # Get page dimensions page_rect = page.rect # Calculate statistics page_text = " ".join([block.text for block in text_blocks]) word_count = len(page_text.split()) char_count = len(page_text) # Create page data page_data = PageData( page_number=page_num + 1, text_blocks=text_blocks, images=images, tables=tables, page_width=page_rect.width, page_height=page_rect.height, word_count=word_count, character_count=char_count ) pages_data.append(page_data) # Update document statistics document_stats["total_words"] += word_count document_stats["total_characters"] += char_count document_stats["total_images"] += len(images) document_stats["total_tables"] += len(tables) except Exception as e: print(f"āŒ Error processing page {page_num + 1}: {e}") # Create empty page data for failed pages empty_page = PageData( page_number=page_num + 1, text_blocks=[], images=[], tables=[], page_width=595, page_height=842, word_count=0, character_count=0 ) pages_data.append(empty_page) result = { "document_info": { "filename": os.path.basename(pdf_path), "file_size": os.path.getsize(pdf_path), "conversion_timestamp": self._get_current_timestamp(), "converter_version": "1.0.0" }, "document_statistics": document_stats, "pages": [page.to_dict() for page in pages_data] } return result except Exception as e: raise Exception(f"Error extracting PDF content: {str(e)}") finally: if doc is not None: try: doc.close() print("āœ… PDF document closed successfully") except Exception as e: print(f"āš ļø Error closing PDF document: {e}") def _extract_text_blocks_from_dict(self, page_dict: dict, page_num: int) -> List[TextBlock]: """Extract text blocks from page dictionary with detailed formatting""" text_blocks = [] for block_idx, block in enumerate(page_dict.get("blocks", [])): if "lines" not in block: continue for line_idx, line in enumerate(block["lines"]): for span_idx, span in enumerate(line["spans"]): text_content = span.get("text", "").strip() if text_content: bbox = span["bbox"] font_info = { "size": span.get("size", 12), "font": span.get("font", "Arial"), "is_bold": "bold" in span.get("font", "").lower() or span.get("flags", 0) & 16, "is_italic": "italic" in span.get("font", "").lower() or span.get("flags", 0) & 2 } text_block = TextBlock( text=text_content, x=round(bbox[0], 2), y=round(bbox[1], 2), width=round(bbox[2] - bbox[0], 2), height=round(bbox[3] - bbox[1], 2), font_size=round(font_info["size"], 2), font_name=font_info["font"], is_bold=font_info["is_bold"], is_italic=font_info["is_italic"], block_id=f"p{page_num}-b{block_idx}-l{line_idx}-s{span_idx}" ) text_blocks.append(text_block) return text_blocks def _extract_text_blocks_simple(self, page, page_num: int) -> List[TextBlock]: """Fallback method for text extraction""" text_blocks = [] try: blocks_data = page.get_text("blocks") for block_idx, block in enumerate(blocks_data): if block[6] == 0: # Text block text = block[4].strip() if text: x0, y0, x1, y1 = block[0], block[1], block[2], block[3] lines = text.split('\n') line_height = (y1 - y0) / max(len(lines), 1) for line_idx, line in enumerate(lines): if line.strip(): text_block = TextBlock( text=line.strip(), x=round(x0, 2), y=round(y0 + (line_idx * line_height), 2), width=round(x1 - x0, 2), height=round(line_height, 2), font_size=12.0, font_name="Arial", is_bold=False, is_italic=False, block_id=f"p{page_num}-simple-b{block_idx}-l{line_idx}" ) text_blocks.append(text_block) except Exception as e: print(f"āš ļø Simple text block extraction failed: {e}") return text_blocks def _extract_images_safely(self, page, doc, page_num) -> List[ImageData]: """Extract images from page and return structured data""" images = [] try: image_list = page.get_images(full=True) for img_index, img_info in enumerate(image_list): try: xref = img_info[0] # Get image rectangles img_rects = [r for r in page.get_image_rects(xref)] if not img_rects: continue bbox = img_rects[0] # Extract image data pix = fitz.Pixmap(doc, xref) if pix.n - pix.alpha < 4: # Valid image img_data = pix.tobytes("png") img_base64 = base64.b64encode(img_data).decode() image_data = ImageData( index=img_index, base64_data=img_base64, bbox=(round(bbox.x0, 2), round(bbox.y0, 2), round(bbox.x1, 2), round(bbox.y1, 2)), width=round(bbox.x1 - bbox.x0, 2), height=round(bbox.y1 - bbox.y0, 2), format="PNG" ) images.append(image_data) pix = None except Exception as e: print(f"āš ļø Error extracting image {img_index} on page {page_num+1}: {e}") continue except Exception as e: print(f"āš ļø General error in image extraction for page {page_num+1}: {e}") return images def _detect_tables_safely(self, page) -> List[TableData]: """Extract tables from page and return structured data""" tables = [] try: tabs = page.find_tables() for tab_index, tab in enumerate(tabs): try: table_data = tab.extract() if table_data: # Clean table data cleaned_data = [] for row in table_data: cleaned_row = [str(cell).strip() if cell else "" for cell in row] if any(cleaned_row): # Only add non-empty rows cleaned_data.append(cleaned_row) if cleaned_data: table_obj = TableData( bbox=(round(tab.bbox.x0, 2), round(tab.bbox.y0, 2), round(tab.bbox.x1, 2), round(tab.bbox.y1, 2)), data=cleaned_data, rows=len(cleaned_data), columns=max(len(row) for row in cleaned_data) if cleaned_data else 0 ) tables.append(table_obj) except Exception as e: print(f"āš ļø Error extracting table {tab_index}: {e}") continue except Exception as e: print(f"āš ļø General error in table detection: {e}") return tables def convert_to_json(self, pdf_content: Dict[str, Any], output_path: str = None, pretty_print: bool = True, include_base64_images: bool = True) -> str: """Convert PDF content to JSON format""" print("šŸ”„ Converting to JSON format...") try: # Create a copy of the content for modification json_content = pdf_content.copy() # Add metadata json_content["conversion_options"] = { "pretty_print": pretty_print, "include_base64_images": include_base64_images, "json_schema_version": "1.0" } # Optionally remove base64 image data to reduce file size if not include_base64_images: for page in json_content["pages"]: for image in page["images"]: image["base64_data"] = "[Base64 data removed - set include_base64_images=True to include]" # Convert to JSON string if pretty_print: json_string = json.dumps(json_content, indent=2, ensure_ascii=False) else: json_string = json.dumps(json_content, ensure_ascii=False) # Save to file if output path provided if output_path: try: Path(output_path).parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: f.write(json_string) print(f"āœ… JSON saved to: {output_path}") print(f"šŸ“Š File size: {len(json_string):,} characters") except Exception as e: print(f"āš ļø Error saving JSON to {output_path}: {e}") return json_string except Exception as e: raise Exception(f"Error converting to JSON: {str(e)}") def create_json_summary(self, pdf_content: Dict[str, Any]) -> Dict[str, Any]: """Create a summary of the PDF content without full data""" summary = { "document_info": pdf_content.get("document_info", {}), "document_statistics": pdf_content.get("document_statistics", {}), "page_summaries": [] } for page in pdf_content.get("pages", []): page_summary = { "page_number": page["page_number"], "text_blocks_count": len(page["text_blocks"]), "images_count": len(page["images"]), "tables_count": len(page["tables"]), "word_count": page["word_count"], "character_count": page["character_count"], "page_dimensions": { "width": page["page_width"], "height": page["page_height"] }, "sample_text": " ".join([block["text"] for block in page["text_blocks"][:3]])[:200] + "..." if page["text_blocks"] else "" } summary["page_summaries"].append(page_summary) return summary def _get_current_timestamp(self) -> str: """Get current timestamp as string""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def process_pdf_to_json(self, pdf_path: str, output_path: str = None, pretty_print: bool = True, include_base64_images: bool = True, create_summary: bool = False, use_hf_models: bool = False) -> str: """Main method to process PDF and convert to JSON""" print(f"šŸš€ Processing PDF to JSON: {pdf_path}") if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF file not found: {pdf_path}") print("šŸ“„ Extracting PDF content...") pdf_content = self.extract_pdf_content(pdf_path) if use_hf_models and self.hf_token: print("šŸ¤– Attempting to enhance with Hugging Face models...") try: print("Note: Hugging Face model integration requires further implementation.") except Exception as e: print(f"āš ļø Hugging Face enhancement failed: {e}") print("šŸ”„ Converting to JSON...") json_content = self.convert_to_json( pdf_content, output_path, pretty_print, include_base64_images ) # Create summary file if requested if create_summary and output_path: summary_path = output_path.replace('.json', '_summary.json') summary_data = self.create_json_summary(pdf_content) summary_json = json.dumps(summary_data, indent=2, ensure_ascii=False) try: with open(summary_path, 'w', encoding='utf-8') as f: f.write(summary_json) print(f"āœ… Summary JSON saved to: {summary_path}") except Exception as e: print(f"āš ļø Error saving summary: {e}") print("āœ… Processing complete!") return json_content def main(): """Main function to demonstrate PDF to JSON conversion""" # Set your Hugging Face token if needed HF_TOKEN = os.getenv("HF_API_TOKEN") # Initialize converter converter = PDFToJSONConverter(huggingface_token=HF_TOKEN) # Define paths pdf_path = "new-pdf.pdf" # Change this to your PDF file path output_path = "converted_document.json" # Output JSON file path try: # Convert PDF to JSON json_content = converter.process_pdf_to_json( pdf_path=pdf_path, output_path=output_path, pretty_print=True, # Format JSON with indentation include_base64_images=True, # Include image data (set False to reduce file size) create_summary=True, # Create additional summary file use_hf_models=False # Set to True if you want to use HuggingFace models ) print(f"āœ… Successfully converted '{pdf_path}' to '{output_path}'") print(f"šŸ“Š JSON length: {len(json_content):,} characters") print(f"šŸ“„ Open '{output_path}' to view the structured JSON data!") # Optional: Print first 500 characters of JSON as preview print("\nšŸ“‹ JSON Preview (first 500 characters):") print("-" * 50) print(json_content[:500] + "..." if len(json_content) > 500 else json_content) except FileNotFoundError as e: print(f"āŒ Error: {e}") print("Please ensure the PDF file exists at the specified path.") except Exception as e: print(f"āŒ An unexpected error occurred: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": main()