import nltk from nltk.tokenize import sent_tokenize from typing import List, Dict, Optional import re try: from llama_index.core.schema import TextNode except ImportError: class TextNode: def __init__(self, text: str, metadata: Optional[Dict] = None): self.text = text self.metadata = metadata if metadata is not None else {} def __repr__(self): return f"TextNode(text='{self.text[:50]}...', metadata={self.metadata})" try: nltk.data.find('tokenizers/punkt') except Exception: try: nltk.download('punkt', quiet=True) except Exception as e: print(f"Warning: Failed to download nltk 'punkt' tokenizer. Error: {e}") def pre_segment_into_major_units(text: str) -> List[Dict[str, str]]: """Segments text into major units based on patterns like 'Unit X: Title'.""" keywords = ["Unit", "Chapter", "Section", "Module", "Part"] keyword_pattern = "|".join(keywords) try: unit_delimiters = list(re.finditer( r"^((?:%s)\s*\d+:\s*.*?)(?=\n|$)" % keyword_pattern, text, re.MULTILINE | re.IGNORECASE )) except re.error as e: print(f"Regex error in pre_segment_into_major_units: {e}") unit_delimiters = [] if not unit_delimiters: if text.strip(): return [{ "title_line": "Full Document Content", "content": text.strip(), "is_primary_unit": False }] return [] segmented_units = [] for i, match_obj in enumerate(unit_delimiters): unit_title_line = match_obj.group(1).strip() content_start_index = match_obj.end() if i + 1 < len(unit_delimiters): content_end_index = unit_delimiters[i+1].start() else: content_end_index = len(text) unit_content = text[content_start_index:content_end_index].strip() if unit_content: segmented_units.append({ "title_line": unit_title_line, "content": unit_content, "is_primary_unit": True }) return segmented_units def smart_chunk_with_content_awareness( text: str, max_chunk_chars: int = 6000, overlap_chars: int = 200, metadata: Optional[Dict] = None ) -> List[TextNode]: """Splits text into chunks based on paragraphs with content awareness.""" if not text.strip(): return [] raw_paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] if not raw_paragraphs: raw_paragraphs = [text.strip()] chunks = [] current_chunk_content = "" for para_text in raw_paragraphs: # Handle oversized paragraphs if len(para_text) > max_chunk_chars: if current_chunk_content.strip(): chunks.append(TextNode(text=current_chunk_content, metadata=dict(metadata or {}))) current_chunk_content = "" # Split large paragraph at sentence boundaries chunks.extend(_split_oversized_paragraph(para_text, max_chunk_chars, metadata)) continue # Check if adding paragraph would exceed limit separator_len = len("\n\n") if current_chunk_content else 0 if current_chunk_content and (len(current_chunk_content) + separator_len + len(para_text) > max_chunk_chars): chunks.append(TextNode(text=current_chunk_content, metadata=dict(metadata or {}))) # Extract overlap using your existing logic overlap_text = _extract_overlap_content(current_chunk_content, overlap_chars) current_chunk_content = overlap_text if current_chunk_content and para_text: current_chunk_content += "\n\n" + para_text elif para_text: current_chunk_content = para_text else: # Add paragraph to current chunk if current_chunk_content: current_chunk_content += "\n\n" + para_text else: current_chunk_content = para_text if current_chunk_content.strip(): chunks.append(TextNode(text=current_chunk_content, metadata=dict(metadata or {}))) return chunks def _split_oversized_paragraph(para_text: str, max_chunk_chars: int, metadata: Optional[Dict]) -> List[TextNode]: """Split oversized paragraph at sentence boundaries when possible.""" try: sentences = sent_tokenize(para_text) except Exception: # Fallback to simple splitting return [TextNode(text=para_text[i:i+max_chunk_chars], metadata=dict(metadata or {})) for i in range(0, len(para_text), max_chunk_chars)] chunks = [] current_content = "" for sentence in sentences: if len(sentence) > max_chunk_chars: # Handle extremely long sentences if current_content: chunks.append(TextNode(text=current_content, metadata=dict(metadata or {}))) current_content = "" # Split long sentence by characters for i in range(0, len(sentence), max_chunk_chars): chunk_text = sentence[i:i+max_chunk_chars] chunks.append(TextNode(text=chunk_text, metadata=dict(metadata or {}))) elif current_content and len(current_content) + len(sentence) + 1 > max_chunk_chars: chunks.append(TextNode(text=current_content, metadata=dict(metadata or {}))) current_content = sentence else: current_content += (" " if current_content else "") + sentence if current_content: chunks.append(TextNode(text=current_content, metadata=dict(metadata or {}))) return chunks def _extract_overlap_content(current_chunk_content: str, overlap_chars: int) -> str: """Extract overlap content using your existing logic.""" if overlap_chars <= 0 or not current_chunk_content: return "" try: sentences = sent_tokenize(current_chunk_content) temp_overlap_content = "" for s_idx in range(len(sentences) - 1, -1, -1): s = sentences[s_idx] test_length = len(s) + len(temp_overlap_content) + (1 if temp_overlap_content else 0) if test_length <= overlap_chars: temp_overlap_content = s + (" " if temp_overlap_content else "") + temp_overlap_content else: if not temp_overlap_content and len(s) > overlap_chars: temp_overlap_content = s[-overlap_chars:] break return temp_overlap_content.strip() except Exception: if len(current_chunk_content) > overlap_chars: return current_chunk_content[-overlap_chars:] else: return current_chunk_content