Spaces:
Running
Running
File size: 6,723 Bytes
60d1d13 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
import re
from typing import List, Dict
from markitdown import MarkItDown
from langchain.schema import Document
from src.processor.text_utils import VietnameseTextProcessor
class WordDocumentProcessor:
"""Process Word document content with semantic chunking using MarkItDown"""
def __init__(self):
self.text_processor = VietnameseTextProcessor()
self.md_converter = MarkItDown()
# Section patterns for Vietnamese content
self.section_patterns = {
"policy": r"(?:quy định|chính sách|policy)",
"procedure": r"(?:hướng dẫn|cách|thao tác|bước)",
"scope": r"(?:phạm vi|áp dụng|scope)",
"fee": r"(?:phí|chiết khấu|fee|discount)",
"timeout": r"(?:timeout|treo|đang xử lý|processing)",
}
def process_word_document(self, file_path: str) -> List[Document]:
"""Process Word document with semantic section chunking using MarkItDown"""
try:
# Convert document using MarkItDown
result = self.md_converter.convert(file_path)
content = result.text_content
print(f"Document converted successfully. Content length: {len(content)}")
print(f"First 500 characters:\n{content[:500]}...")
except Exception as e:
print(f"Error converting document with MarkItDown: {e}")
return []
documents = []
# Extract semantic sections
sections = self._extract_semantic_sections(content)
for section in sections:
processed_document = self._process_section(section, file_path)
if processed_document:
documents.append(processed_document)
return documents
def _extract_semantic_sections(self, content: str) -> List[Dict]:
"""Extract meaningful sections from Markdown content produced by MarkItDown"""
sections = []
# Since MarkItDown produces proper Markdown, we can use more sophisticated parsing
# Split by headers (## or # followed by text)
header_pattern = r"\n(?=#{1,6}\s+)"
major_sections = re.split(header_pattern, content)
for section_text in major_sections:
if len(section_text.strip()) < 30: # Skip very short sections
continue
section_info = self._analyze_markdown_section(section_text)
if section_info:
sections.append(section_info)
# If no clear sections found, create chunks from paragraphs
if not sections:
sections = self._fallback_paragraph_chunking(content)
return sections
def _analyze_markdown_section(self, section_text: str) -> Dict:
"""Analyze Markdown section to determine type and extract content"""
lines = section_text.strip().split("\n")
if not lines:
return None
# Find the header line (starts with #)
header = ""
body_start_idx = 0
for i, line in enumerate(lines):
if line.strip().startswith("#"):
header = line.strip()
body_start_idx = i + 1
break
# If no header found, use first line as header
if not header and lines:
header = lines[0].strip()
body_start_idx = 1
# Get body content (rest of lines)
body_lines = lines[body_start_idx:] if body_start_idx < len(lines) else []
body = "\n".join(body_lines).strip()
# Skip if body is too short or just references
if len(body) < 20 or (
len(body.split()) < 10
and any(word in body.lower() for word in ["csv", "file", "document"])
):
return None
# Determine section type
section_type = self._determine_section_type(header + " " + body)
return {
"header": header,
"body": body,
"type": section_type,
"full_text": section_text.strip(),
}
def _fallback_paragraph_chunking(self, content: str) -> List[Dict]:
"""Fallback method to chunk content by paragraphs when no clear sections are found"""
paragraphs = [
p.strip()
for p in content.split("\n\n")
if p.strip() and len(p.strip()) > 50
]
sections = []
for i, paragraph in enumerate(paragraphs):
# Use first line or sentence as header
sentences = paragraph.split(".")
header = sentences[0] if sentences else f"Section {i+1}"
section_type = self._determine_section_type(paragraph)
sections.append(
{
"header": header,
"body": paragraph,
"type": section_type,
"full_text": paragraph,
}
)
return sections
def _determine_section_type(self, text: str) -> str:
"""Determine section type based on content"""
text_lower = text.lower()
for section_type, pattern in self.section_patterns.items():
if re.search(pattern, text_lower):
return section_type
# Default to general content
return "general"
def _process_section(self, section: Dict, source_file: str) -> Document:
"""Process individual section into chunk, preserving Markdown structure"""
header = section["header"]
body = section["body"]
section_type = section["type"]
# Clean and format content
if header.startswith("#"):
# For Markdown headers, clean them up but preserve structure
clean_header = re.sub(r"^#+\s*", "", header).strip()
# Keep the structure but make it cleaner for content
content = f"{clean_header}\n\n{body}".strip()
else:
content = f"{header}\n\n{body}".strip()
# Remove excessive whitespace and normalize
content = re.sub(r"\n\s*\n\s*\n", "\n\n", content) # Remove triple+ newlines
content = re.sub(r"[ \t]+", " ", content) # Normalize spaces and tabs
return Document(
page_content=content,
metadata={
"doc_type": section_type,
"section_header": clean_header if header.startswith("#") else header,
"category": "word_document",
"has_markdown": (
"yes"
if any(marker in content for marker in ["#", "*", "|", "```"])
else "no"
),
"content_length": len(content),
"source_file": source_file,
},
)
|