# -*- coding: utf-8 -*- """Pibit.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1Lug_ARdBOTu2e87sThol1luVksA0986T """ import re import json import datetime from collections import defaultdict, Counter from typing import List, Dict, Tuple, Set, Optional import unicodedata import math import gradio as gr import pandas as pd import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots class PibitInsuranceTokenizer: """ Specialized tokenizer for insurance domain documents, designed for Pibit.ai's underwriting automation platform. Handles loss run documents, policy documents, claims data, and other insurance-specific text processing needs. """ def __init__(self, vocab_size=15000, model_type="insurance_bpe"): self.vocab_size = vocab_size self.model_type = model_type self.special_tokens = [ "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "" ] self.vocab = {} self.id_to_token = {} self.token_frequencies = Counter() self.merges = [] self.bpe_ranks = {} # COMPUTATION-LIGHT: Compiling regex patterns once at initialization is highly efficient. # This avoids re-compiling the same pattern for every function call. self.insurance_patterns = self._load_insurance_patterns() self.financial_pattern = re.compile(r'\$[\d,]+(?:\.\d{2})?') self.policy_pattern = re.compile(r'[A-Z]{2,4}[\d\-]{6,12}') self.date_pattern = re.compile(r'\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}|\d{4}[\/\-]\d{1,2}[\/\-]\d{1,2}') self.percentage_pattern = re.compile(r'\d+(?:\.\d+)?%') self._initialize_special_tokens() def _load_insurance_patterns(self) -> Dict[str, List[str]]: """Load insurance domain-specific patterns and terminology.""" return { 'coverage_types': ['general liability', 'workers compensation', 'property coverage', 'commercial auto', 'cyber liability', 'professional liability', 'directors officers', 'employment practices', 'umbrella coverage', 'commercial crime', 'boiler machinery', 'builders risk'], 'claim_types': ['bodily injury', 'property damage', 'medical payments', 'personal injury', 'products liability', 'completed operations', 'fire damage', 'theft', 'vandalism', 'water damage', 'slip and fall', 'motor vehicle accident', 'workplace injury'], 'risk_factors': ['hazardous materials', 'high risk activity', 'prior claims', 'safety violations', 'regulatory issues', 'financial distress', 'industry classification', 'geographic risk', 'seasonal business', 'new venture', 'construction defects', 'product recall'], 'financial_terms': ['deductible', 'premium', 'limit', 'retention', 'aggregate', 'occurrence', 'claims made', 'prior acts', 'extended reporting', 'loss ratio', 'experience modification', 'rate', 'exposure'], 'underwriting_terms': ['risk assessment', 'loss run', 'acord forms', 'submission', 'renewal', 'policy period', 'effective date', 'expiration', 'carrier', 'excess', 'reinsurance', 'facultative', 'treaty', 'reserve', 'incurred', 'paid', 'outstanding', 'ibnr'] } def _initialize_special_tokens(self): """Initialize special tokens in vocabulary.""" for i, token in enumerate(self.special_tokens): self.vocab[token] = i self.id_to_token[i] = token def _preprocess_text(self, text: str) -> str: """ Insurance-specific text preprocessing. Normalizes financial amounts, dates, policy numbers, and other entities. """ # COMPUTATION-LIGHT: Unicode normalization and regex substitutions are very fast C-level operations. text = unicodedata.normalize('NFKC', text) text = self.financial_pattern.sub('', text) text = self.date_pattern.sub('', text) text = self.policy_pattern.sub('', text) text = self.percentage_pattern.sub('', text) text = self._normalize_insurance_terms(text) return text.strip() def _normalize_insurance_terms(self, text: str) -> str: """Normalize insurance-specific terminology.""" abbreviations = { r'\bGL\b': 'general liability', r'\bWC\b': 'workers compensation', r'\bAuto\b': 'automobile', r'\bD&O\b': 'directors officers', r'\bE&O\b': 'errors omissions', r'\bEPLI\b': 'employment practices liability', r'\bBI\b': 'bodily injury', r'\bPD\b': 'property damage', r'\bMP\b': 'medical payments', r'\bTPA\b': 'third party administrator', r'\bMGA\b': 'managing general agent', r'\bACV\b': 'actual cash value', r'\bRCV\b': 'replacement cost value' } for abbrev, full_form in abbreviations.items(): text = re.sub(abbrev, full_form, text, flags=re.IGNORECASE) return text def _extract_insurance_entities(self, text: str) -> List[Tuple[str, str]]: """Extract insurance-specific entities from text.""" # COMPUTATION-LIGHT: Finding all matches with `finditer` is highly optimized. entities = [] for match in self.financial_pattern.finditer(text): entities.append(('AMOUNT', match.group())) for match in self.date_pattern.finditer(text): entities.append(('DATE', match.group())) for match in self.policy_pattern.finditer(text): entities.append(('POLICY', match.group())) for match in self.percentage_pattern.finditer(text): entities.append(('PERCENTAGE', match.group())) return entities def _tokenize_with_domain_awareness(self, text: str) -> List[str]: """ Domain-aware tokenization that preserves insurance terminology. """ # COMPUTATION-LIGHT: A single pass with regex to get initial tokens. word_pattern = r"[a-zA-Z]+(?:'[a-zA-Z]+)?|[0-9]+(?:\.[0-9]+)?|[^\w\s]" tokens = re.findall(word_pattern, text.lower()) # COMPUTATION-LIGHT: A single while loop to merge compound terms. Its complexity is linear O(n) with respect to the number of tokens. merged_tokens = [] i = 0 while i < len(tokens): found_compound = False for length in [3, 2]: if i + length <= len(tokens): candidate = ' '.join(tokens[i:i+length]) for category, terms in self.insurance_patterns.items(): if candidate in terms: merged_tokens.append(candidate.replace(' ', '_')) i += length found_compound = True break if found_compound: break if not found_compound: merged_tokens.append(tokens[i]) i += 1 return merged_tokens def _get_word_frequencies_insurance(self, texts: List[str]) -> Dict[str, int]: """Get word frequencies with insurance domain emphasis.""" # COMPUTATION-LIGHT: Dictionary lookups and updates are very fast, close to constant time O(1) on average. word_freqs = defaultdict(int) for text in texts: preprocessed_text = self._preprocess_text(text) tokens = self._tokenize_with_domain_awareness(preprocessed_text) for token in tokens: token_chars = ' '.join(list(token)) + ' ' word_freqs[token_chars] += 1 if self._is_insurance_term(token): word_freqs[token_chars] += 2 return word_freqs def _is_insurance_term(self, token: str) -> bool: """Check if token is an insurance-specific term.""" token_lower = token.lower().replace('_', ' ') for category, terms in self.insurance_patterns.items(): if token_lower in terms: return True insurance_keywords = {'claim', 'policy', 'premium', 'deductible', 'coverage', 'liability', 'underwrite', 'insured', 'carrier', 'risk', 'loss', 'damage', 'accident', 'incident', 'hazard', 'peril', 'exposure', 'limit'} return token_lower in insurance_keywords def _get_pairs(self, word: str) -> Set[Tuple[str, str]]: """Get all adjacent pairs in a word.""" pairs = set() prev_char = word[0] for char in word[1:]: pairs.add((prev_char, char)) prev_char = char return pairs def _merge_word(self, word: str, pair: Tuple[str, str]) -> str: """Merge a specific pair in a word.""" return ' '.join(word.split()).replace(f'{pair[0]} {pair[1]}', f'{pair[0]}{pair[1]}') def _train_insurance_bpe(self, texts: List[str]) -> None: """ # COMPUTATION-HEAVY: This is the most intensive part of the code. # BPE training involves multiple loops over the vocabulary and pairs, which can be slow, # especially as the vocabulary and number of merges grow. # This should only be run ONCE during setup, not during user interaction. """ word_freqs = self._get_word_frequencies_insurance(texts) vocab = set() for word in word_freqs.keys(): vocab.update(word.split()) for category, terms in self.insurance_patterns.items(): for term in terms: vocab.add(term.replace(' ', '_')) num_merges = self.vocab_size - len(self.vocab) - len(vocab) for merge_idx in range(num_merges): pairs = defaultdict(int) for word, freq in word_freqs.items(): word_pairs = self._get_pairs(word.split()) for pair in word_pairs: pairs[pair] += freq if not pairs: break best_pair = max(pairs, key=pairs.get) new_word_freqs = {} for word, freq in word_freqs.items(): new_word = self._merge_word(word, best_pair) new_word_freqs[new_word] = freq word_freqs = new_word_freqs self.merges.append(best_pair) self.bpe_ranks[best_pair] = merge_idx merged_token = best_pair[0] + best_pair[1] vocab.add(merged_token) def _apply_bpe(self, word: str) -> List[str]: """Apply BPE merges to a word.""" if len(word) == 1: return [word] word_tokens = list(word) word = ' '.join(word_tokens) + ' ' while True: pairs = self._get_pairs(word.split()) if not pairs: break bigram = min(pairs, key=lambda pair: self.bpe_ranks.get(pair, float('inf'))) if bigram not in self.bpe_ranks: break word = self._merge_word(word, bigram) return word.split() def train(self, texts: List[str]) -> None: """Train the insurance domain tokenizer.""" if self.model_type == "insurance_bpe": self._train_insurance_bpe(texts) all_tokens = set() for text in texts: preprocessed = self._preprocess_text(text) tokens = self._tokenize_with_domain_awareness(preprocessed) for token in tokens: bpe_tokens = self._apply_bpe(token) all_tokens.update(bpe_tokens) else: all_tokens = set() for text in texts: preprocessed = self._preprocess_text(text) tokens = self._tokenize_with_domain_awareness(preprocessed) all_tokens.update(tokens) for token in tokens: self.token_frequencies[token] += 3 if self._is_insurance_term(token) else 1 if len(all_tokens) > self.vocab_size - len(self.special_tokens): if self.model_type != "insurance_bpe": insurance_terms = [t for t in all_tokens if self._is_insurance_term(t)] other_terms = sorted([t for t in all_tokens if not self._is_insurance_term(t)], key=lambda x: self.token_frequencies[x], reverse=True) max_others = self.vocab_size - len(self.special_tokens) - len(insurance_terms) all_tokens = insurance_terms + other_terms[:max_others] start_idx = len(self.special_tokens) for i, token in enumerate(sorted(list(all_tokens))): token_id = start_idx + i self.vocab[token] = token_id self.id_to_token[token_id] = token def tokenize(self, text: str) -> List[str]: """Tokenize insurance document text.""" preprocessed = self._preprocess_text(text) tokens = self._tokenize_with_domain_awareness(preprocessed) if self.model_type == "insurance_bpe": result = [] for token in tokens: if token in self.vocab: result.append(token) else: result.extend(self._apply_bpe(token)) return result return tokens def encode(self, text: str) -> List[int]: """Encode text to token IDs.""" tokens = self.tokenize(text) return [self.vocab.get(token, self.vocab[""]) for token in tokens] def decode(self, token_ids: List[int]) -> str: """Decode token IDs back to text.""" tokens = [self.id_to_token[tid] for tid in token_ids if tid in self.id_to_token] text = ' '.join(tokens).replace(' ', '').replace('_', ' ') # Clean up special tokens that shouldn't be in the final text for special in self.special_tokens: if special not in ["", "", "", ""]: text = text.replace(special, '') return text.strip() def analyze_document(self, text: str) -> Dict: """Analyze insurance document and extract key information.""" # COMPUTATION-LIGHT: Analysis is fast as it reuses efficient tokenization and regex methods. entities = self._extract_insurance_entities(text) tokens = self.tokenize(text) if not tokens: return {'document_type': 'Unknown', 'total_tokens': 0, 'insurance_terms': 0, 'insurance_term_ratio': 0, 'entities': [], 'key_terms': [], 'risk_score': 0, 'confidence': 0} insurance_term_count = sum(1 for token in tokens if self._is_insurance_term(token)) doc_type = self._identify_document_type(text, tokens) risk_score = self._calculate_risk_score(text, tokens, entities) return { 'document_type': doc_type, 'total_tokens': len(tokens), 'insurance_terms': insurance_term_count, 'insurance_term_ratio': insurance_term_count / len(tokens), 'entities': entities, 'key_terms': list(set([t for t in tokens if self._is_insurance_term(t)]))[:20], 'risk_score': risk_score, 'confidence': min(0.95, insurance_term_count / len(tokens) * 2) } def _calculate_risk_score(self, text: str, tokens: List[str], entities: List[Tuple[str, str]]) -> float: """Calculate risk score based on document content.""" risk_score = 0.5 high_risk_terms = ['prior claims', 'safety violations', 'hazardous materials', 'high risk activity', 'regulatory issues', 'financial distress'] for term in high_risk_terms: if term.replace(' ', '_') in tokens or term in text.lower(): risk_score += 0.1 amounts = [float(re.sub(r'[\$,]', '', entity[1])) for entity in entities if entity[0] == 'AMOUNT' and re.sub(r'[\$,]', '', entity[1]).replace('.','',1).isdigit()] if amounts: max_amount = max(amounts) if max_amount > 1000000: risk_score += 0.2 elif max_amount > 100000: risk_score += 0.1 return min(1.0, max(0.0, risk_score)) def _identify_document_type(self, text: str, tokens: List[str]) -> str: """Identify the type of insurance document.""" doc_indicators = { 'loss_run': ['loss_run', 'claims_history', 'paid_losses', 'incurred', 'reserves', 'loss_ratio'], 'policy': ['policy_number', 'effective_date', 'expiration_date', 'coverage_limit', 'policy_period'], 'claim': ['claim_number', 'date_of_loss', 'claimant', 'adjuster', 'claim_details'], 'submission': ['submission', 'application', 'proposal', 'quote_request', 'underwriting'], 'certificate': ['certificate', 'evidence_of_coverage', 'additional_insured'] } scores = {doc_type: sum(2 if ind.replace(' ', '_') in tokens else 1 if ind.replace('_', ' ') in text.lower() else 0 for ind in indicators) for doc_type, indicators in doc_indicators.items()} if not scores or max(scores.values()) == 0: return 'general_insurance' return max(scores, key=scores.get) def get_vocab_size(self) -> int: return len(self.vocab) # --- SINGLE GLOBAL INSTANCE --- # The tokenizer is created and trained only ONCE when the script starts. # All functions will now use this single, pre-trained instance. print("Initializing and training the Pibit.ai Insurance Tokenizer... Please wait.") tokenizer = PibitInsuranceTokenizer(vocab_size=8000, model_type="insurance_bpe") # Default training documents default_training_docs = [ "Loss Run Report for Policy GL2024-001234. Effective Date: 01/01/2024 to 12/31/2024. Insured: ABC Manufacturing Company. Coverage: General Liability, $1,000,000 per occurrence, $2,000,000 aggregate. Claims History: Claim 1: Slip and fall incident, Date of Loss: 03/15/2024, Paid: $25,000, Incurred: $45,000, Status: Closed. Claim 2: Product liability claim, Date of Loss: 07/22/2024, Paid: $0, Incurred: $75,000, Status: Open, Reserves: $75,000. Total Paid Losses: $25,000. Total Incurred Losses: $120,000. Loss Ratio: 12%. Experience Modification Factor: 0.85", "Workers Compensation Submission. Applicant: XYZ Construction LLC. Policy Period: 01/01/2025 to 01/01/2026. Industry Code: 5645 - Residential Construction. Annual Payroll: $2,500,000. Prior Coverage: Carrier ABC, Premium: $125,000, Deductible: $5,000. Claims Experience: 3 claims in past 3 years. Workplace injury, back strain: $15,000 paid. Fall from height accident: $85,000 incurred. Repetitive motion injury: $12,000 paid. Risk Factors: High-risk construction activities, prior OSHA violations. Underwriting Notes: Requires safety program implementation.", "Commercial Property Loss Notice. Policy Number: CP2024-567890. Insured: Downtown Office Building LLC. Date of Loss: 11/08/2024. Cause of Loss: Water damage from burst pipe. Coverage Details: Building Coverage: $5,000,000. Business Personal Property: $500,000. Deductible: $10,000. Claim Details: Estimated Repair Cost: $125,000. Business Interruption Loss: $25,000. Adjuster: John Smith, License #12345. Initial Reserve: $150,000.", "Underwriting Risk Assessment Report. Account: Tech Startup Solutions Inc. Line of Business: Cyber Liability Insurance. Requested Limits: $5,000,000 per claim, $10,000,000 aggregate. Risk Factors Analysis: Industry: Technology/Software Development. Revenue: $15,000,000 annually. Security Measures: Multi-factor authentication: Yes. Encryption protocols: AES-256. Employee training: Quarterly. Incident response plan: In place. Prior Claims: None reported. Competitive Premium Quote: $45,000 annually. Recommended Deductible: $25,000." ] tokenizer.train(default_training_docs) print("Tokenizer is ready!") # --- Gradio App Functions --- def create_analysis_plots(analysis_data): """Create visualization plots for document analysis.""" fig_gauge = go.Figure(go.Indicator( mode = "gauge+number", value = analysis_data['risk_score'] * 100, domain = {'x': [0, 1], 'y': [0, 1]}, title = {'text': "Risk Score"}, gauge = {'axis': {'range': [None, 100]}, 'bar': {'color': "#2E86AB"}, 'steps': [{'range': [0, 40], 'color': "lightgreen"}, {'range': [40, 70], 'color': "yellow"}, {'range': [70, 100], 'color': "lightcoral"}]})) fig_gauge.update_layout(height=300, margin=dict(l=20, r=20, t=50, b=20)) insurance_tokens = analysis_data['insurance_terms'] other_tokens = analysis_data['total_tokens'] - insurance_tokens fig_pie = px.pie(values=[insurance_tokens, other_tokens], names=['Insurance Terms', 'Other Terms'], title='Token Distribution', color_discrete_sequence=['#FF6B6B', '#4ECDC4']) fig_pie.update_layout(height=300, margin=dict(l=20, r=20, t=50, b=20)) return fig_gauge, fig_pie def analyze_insurance_document(text): """ Main function to analyze insurance documents. This now uses the single, globally-trained tokenizer and is very fast. """ if not text.strip(): return "Please enter some text to analyze.", go.Figure(), go.Figure(), pd.DataFrame(), "" # The core change: No more retraining! Just analyze. analysis = tokenizer.analyze_document(text) summary = f""" ## 📊 Pibit.ai Insurance Document Analysis Report ### đŸĸ Document Classification - **Document Type**: {analysis['document_type'].title().replace('_', ' ')} - **Analysis Confidence**: {analysis['confidence']:.1%} ### 📈 Token Analysis - **Total Tokens**: {analysis['total_tokens']:,} - **Insurance-Specific Terms**: {analysis['insurance_terms']:,} - **Domain Relevance**: {analysis['insurance_term_ratio']:.1%} ### âš ī¸ Risk Assessment - **Risk Score**: {analysis['risk_score']:.2f} / 1.00 - **Risk Level**: {"🔴 HIGH" if analysis['risk_score'] > 0.7 else "🟡 MEDIUM" if analysis['risk_score'] > 0.4 else "đŸŸĸ LOW"} ### đŸˇī¸ Entities Detected {len(analysis['entities'])} entities found: """ for entity_type, entity_value in analysis['entities'][:10]: summary += f"- **{entity_type}**: {entity_value}\n" if len(analysis['entities']) > 10: summary += f"- ... and {len(analysis['entities']) - 10} more\n" summary += f"\n### 🔑 Key Insurance Terms\n" summary += ", ".join([f"`{term.replace('_', ' ')}`" for term in analysis['key_terms']]) fig_gauge, fig_pie = create_analysis_plots(analysis) entities_df = pd.DataFrame(analysis['entities'], columns=['Entity Type', 'Value']) tokens = tokenizer.tokenize(text[:500]) tokenization_example = f"**Sample Tokenization** (first 500 characters):\n\n{' | '.join(tokens[:20])}" if len(tokens) > 20: tokenization_example += f" | ... ({len(tokens)} total tokens)" return summary, fig_gauge, fig_pie, entities_df, tokenization_example def tokenize_text(text): """Tokenize text and return tokens.""" if not text.strip(): return "Please enter some text to tokenize." tokens = tokenizer.tokenize(text) token_ids = tokenizer.encode(text) result = f"**Tokens ({len(tokens)}):**\n{' | '.join(tokens)}\n\n**Token IDs:**\n{' '.join(map(str, token_ids[:50]))}" if len(token_ids) > 50: result += f" ... ({len(token_ids)} total IDs)" return result def get_tokenizer_stats(): """Get tokenizer statistics.""" vocab_size = tokenizer.get_vocab_size() insurance_terms = sum(1 for token in tokenizer.vocab.keys() if tokenizer._is_insurance_term(token)) return f""" ## 🔧 Pibit.ai Insurance Tokenizer Statistics - **Total Vocabulary Size**: {vocab_size:,} - **Insurance-Specific Terms**: {insurance_terms:,} - **Special Tokens**: {len(tokenizer.special_tokens)} - **Model Type**: {tokenizer.model_type} """ # Create the Gradio interface with gr.Blocks(theme=gr.themes.Soft(), title="Pibit.ai Insurance Tokenizer") as demo: gr.HTML("""

đŸĸ Pibit.ai Insurance Tokenizer

Specialized NLP Tokenizer for Property & Casualty Insurance Documents

""") with gr.Tabs(): with gr.Tab("📊 Document Analysis"): with gr.Row(): with gr.Column(scale=2): input_text = gr.Textbox(lines=15, placeholder="Paste your insurance document here...", label="📄 Insurance Document Text") analyze_btn = gr.Button("🔍 Analyze Document", variant="primary", size="lg") with gr.Column(scale=3): analysis_output = gr.Markdown(label="📋 Analysis Report") with gr.Row(): risk_gauge = gr.Plot(label="âš ī¸ Risk Assessment") token_pie = gr.Plot(label="đŸĨ§ Token Distribution") entities_table = gr.DataFrame(label="đŸˇī¸ Detected Entities") tokenization_sample = gr.Markdown(label="🔧 Tokenization Sample") # The custom_training input has been removed to fix the performance issue. analyze_btn.click(analyze_insurance_document, inputs=[input_text], outputs=[analysis_output, risk_gauge, token_pie, entities_table, tokenization_sample]) gr.Examples( examples=[ ["Loss Run Report for Policy GL2024-001234\nEffective Date: 01/01/2024 to 12/31/2024\nInsured: ABC Manufacturing Company\nCoverage: General Liability, $1,000,000 per occurrence, $2,000,000 aggregate\nClaims History:\nClaim 1: Slip and fall incident, Date of Loss: 03/15/2024, Paid: $25,000, Incurred: $45,000, Status: Closed"], ["Workers Compensation Submission\nApplicant: XYZ Construction LLC\nPolicy Period: 01/01/2025 to 01/01/2026\nAnnual Payroll: $2,500,000\nRisk Factors: High-risk construction activities, prior OSHA violations"], ["Underwriting Risk Assessment Report\nAccount: Tech Startup Solutions Inc.\nLine of Business: Cyber Liability Insurance\nRequested Limits: $5,000,000 per claim\nSecurity Measures:\n- Multi-factor authentication: Yes\n- Incident response plan: In place\nPrior Claims: None reported"], ], inputs=input_text ) with gr.Tab("🔧 Tokenization Tool"): with gr.Row(): with gr.Column(): tokenize_input = gr.Textbox(lines=8, placeholder="Enter text to tokenize...", label="📝 Text to Tokenize") tokenize_btn = gr.Button("🔧 Tokenize", variant="primary") with gr.Column(): tokenize_output = gr.Markdown(label="đŸŽ¯ Tokenization Results") tokenize_btn.click(tokenize_text, inputs=tokenize_input, outputs=tokenize_output) with gr.Tab("â„šī¸ Tokenizer Info"): tokenizer_info = gr.Markdown() demo.load(get_tokenizer_stats, inputs=None, outputs=tokenizer_info) if __name__ == "__main__": demo.launch(debug=True)