|
import streamlit as st |
|
import pdfplumber |
|
import pandas as pd |
|
import re |
|
import spacy |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, AutoModelForTokenClassification, pipeline |
|
import base64 |
|
import io |
|
from datetime import datetime |
|
import json |
|
|
|
from pathlib import Path |
|
import os |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config( |
|
page_title="Regulatory Report Checker", |
|
page_icon="π", |
|
layout="wide" |
|
) |
|
|
|
|
|
st.title("Regulatory Report Checker") |
|
st.markdown(""" |
|
This application analyzes SEC filings (10-K, 13F, etc.) to extract: |
|
- Regulatory obligations |
|
- Risk statements |
|
- Regulatory agency references |
|
- Potential violations |
|
""") |
|
|
|
st.markdown(""" |
|
<meta http-equiv="Content-Security-Policy" content="default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; frame-src 'self';"> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
|
def display_pdf(file, height=350): |
|
|
|
if isinstance(file, str): |
|
|
|
if os.path.exists(file): |
|
with open(file, "rb") as f: |
|
base64_pdf = base64.b64encode(f.read()).decode("utf-8") |
|
else: |
|
st.error("Selected PDF not found.") |
|
return |
|
else: |
|
|
|
base64_pdf = base64.b64encode(file.read()).decode("utf-8") |
|
|
|
file.seek(0) |
|
|
|
pdf_display = f""" |
|
<iframe |
|
src="data:application/pdf;base64,{base64_pdf}" |
|
width="100%" |
|
height="{height}px" |
|
style="border: 1px solid #ccc; border-radius: 10px;" |
|
type="application/pdf"> |
|
</iframe> |
|
""" |
|
st.markdown(pdf_display, unsafe_allow_html=True) |
|
|
|
|
|
sample_pdfs = { |
|
"π Meridian Financial Services, Inc. Annual Report (10-K)": "example.pdf", |
|
"π Annual Report (10-K)": "Mock_Form_10K.pdf", |
|
"π Sample Investment Holdings (13F)": "Mock_Form_13F.pdf", |
|
} |
|
|
|
|
|
if "selected_pdf" not in st.session_state: |
|
st.session_state["selected_pdf"] = list(sample_pdfs.values())[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
st.sidebar.header("Analysis Settings") |
|
|
|
|
|
nlp_model = st.sidebar.selectbox( |
|
"Select NLP Model", |
|
["distilbert-base-uncased", "deepset/deberta-v3-base-squad2", "distilbert-base-cased-distilled-squad"] |
|
) |
|
|
|
|
|
entity_types = st.sidebar.multiselect( |
|
"Entity Types to Extract", |
|
["Obligation", "Regulatory Agency", "Risk", "Deadline", "Penalty", "Amount"], |
|
default=["Obligation", "Regulatory Agency", "Risk"] |
|
) |
|
|
|
|
|
qa_mode = st.sidebar.checkbox("Enable Question Answering", value=True) |
|
|
|
|
|
if qa_mode: |
|
default_questions = [ |
|
"What are the regulatory obligations mentioned?", |
|
"Are there any violations or risk statements?", |
|
"What regulatory agencies are mentioned?", |
|
"What are the compliance deadlines?" |
|
] |
|
|
|
|
|
st.sidebar.subheader("Custom Questions") |
|
custom_questions = [] |
|
|
|
|
|
for i, default_q in enumerate(default_questions): |
|
q = st.sidebar.text_input(f"Question {i+1}", value=default_q) |
|
if q: |
|
custom_questions.append(q) |
|
|
|
|
|
new_q = st.sidebar.text_input("Additional Question") |
|
if new_q: |
|
custom_questions.append(new_q) |
|
|
|
|
|
st.sidebar.subheader("Risk Keywords") |
|
default_risk_keywords = "non-compliance, penalty, violation, risk, fine, investigation, audit, failure, breach, warning" |
|
risk_keywords = st.sidebar.text_area("Enter risk keywords (comma separated)", value=default_risk_keywords) |
|
risk_keywords_list = [keyword.strip() for keyword in risk_keywords.split(",")] |
|
|
|
|
|
confidence_threshold = st.sidebar.slider("Confidence Threshold", 0.0, 1.0, 0.5) |
|
|
|
|
|
|
|
|
|
@st.cache_data |
|
def extract_text_from_pdf(pdf_file): |
|
text_by_page = {} |
|
|
|
with pdfplumber.open(pdf_file) as pdf: |
|
for i, page in enumerate(pdf.pages): |
|
text = page.extract_text() |
|
if text: |
|
text_by_page[i+1] = text |
|
|
|
|
|
full_text = "\n\n".join(text_by_page.values()) |
|
|
|
return full_text, text_by_page |
|
|
|
|
|
def highlight_risk_terms(text, risk_terms): |
|
highlighted_text = text |
|
for term in risk_terms: |
|
pattern = re.compile(r'\b' + re.escape(term) + r'\b', re.IGNORECASE) |
|
highlighted_text = pattern.sub(f"**:red[{term}]**", highlighted_text) |
|
return highlighted_text |
|
|
|
|
|
def perform_ner(text, entity_types): |
|
|
|
nlp = spacy.load("en_core_web_sm") |
|
|
|
|
|
ruler = nlp.add_pipe("entity_ruler") |
|
|
|
|
|
patterns = [] |
|
|
|
|
|
if "Regulatory Agency" in entity_types: |
|
agencies = ["SEC", "FINRA", "CFTC", "FDIC", "Federal Reserve", "OCC", "CFPB", |
|
"FTC", "IRS", "DOJ", "EPA", "FDA", "OSHA", "Securities and Exchange Commission"] |
|
for agency in agencies: |
|
patterns.append({"label": "REGULATORY_AGENCY", "pattern": agency}) |
|
|
|
|
|
if "Obligation" in entity_types: |
|
obligation_triggers = ["must", "required to", "shall", "obligation to", "mandated", |
|
"compliance with", "comply with", "required by", "in accordance with"] |
|
for trigger in obligation_triggers: |
|
patterns.append({"label": "OBLIGATION", "pattern": [{"LOWER": trigger}]}) |
|
|
|
|
|
if "Risk" in entity_types: |
|
risk_triggers = ["risk", "exposure", "vulnerable", "susceptible", "hazard", |
|
"threat", "danger", "liability", "non-compliance", "violation"] |
|
for trigger in risk_triggers: |
|
patterns.append({"label": "RISK", "pattern": trigger}) |
|
|
|
|
|
if "Deadline" in entity_types: |
|
deadline_triggers = ["by", "due", "deadline", "within", "no later than"] |
|
for trigger in deadline_triggers: |
|
patterns.append({"label": "DEADLINE", "pattern": [{"LOWER": trigger}, {"ENT_TYPE": "DATE"}]}) |
|
|
|
|
|
if "Penalty" in entity_types: |
|
penalty_triggers = ["fine", "penalty", "sanction", "enforcement", "punitive", "disciplinary"] |
|
for trigger in penalty_triggers: |
|
patterns.append({"label": "PENALTY", "pattern": trigger}) |
|
|
|
|
|
ruler.add_patterns(patterns) |
|
|
|
|
|
doc = nlp(text) |
|
|
|
|
|
entities = [] |
|
for ent in doc.ents: |
|
if ent.label_ in ["REGULATORY_AGENCY", "OBLIGATION", "RISK", "DEADLINE", "PENALTY"] or ent.label_ == "MONEY": |
|
entity_type = ent.label_ |
|
if ent.label_ == "MONEY" and "Amount" in entity_types: |
|
entity_type = "AMOUNT" |
|
|
|
entities.append({ |
|
"text": ent.text, |
|
"start": ent.start_char, |
|
"end": ent.end_char, |
|
"type": entity_type, |
|
"context": text[max(0, ent.start_char - 50):min(len(text), ent.end_char + 50)] |
|
}) |
|
|
|
return entities |
|
|
|
|
|
@st.cache_resource |
|
def load_qa_model(model_name): |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForQuestionAnswering.from_pretrained(model_name) |
|
qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer) |
|
return qa_pipeline |
|
|
|
def perform_qa(text, questions, qa_pipeline, confidence_threshold): |
|
|
|
max_length = 512 |
|
chunks = [] |
|
|
|
|
|
sentences = re.split(r'(?<=[.!?])\s+', text) |
|
current_chunk = "" |
|
|
|
for sentence in sentences: |
|
if len(current_chunk) + len(sentence) < max_length: |
|
current_chunk += sentence + " " |
|
else: |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = sentence + " " |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
|
|
|
|
if not chunks: |
|
chunks = [text] |
|
|
|
|
|
results = [] |
|
|
|
for question in questions: |
|
best_answer = {"answer": "", "score": 0, "context": ""} |
|
|
|
for chunk in chunks: |
|
try: |
|
result = qa_pipeline(question=question, context=chunk) |
|
if result["score"] > best_answer["score"] and result["score"] >= confidence_threshold: |
|
best_answer = { |
|
"answer": result["answer"], |
|
"score": result["score"], |
|
"context": chunk[max(0, result["start"] - 100):min(len(chunk), result["end"] + 100)] |
|
} |
|
except Exception as e: |
|
st.error(f"Error processing chunk with question '{question}': {str(e)}") |
|
continue |
|
|
|
if best_answer["answer"]: |
|
results.append({ |
|
"question": question, |
|
"answer": best_answer["answer"], |
|
"confidence": best_answer["score"], |
|
"context": best_answer["context"] |
|
}) |
|
else: |
|
results.append({ |
|
"question": question, |
|
"answer": "No answer found with sufficient confidence.", |
|
"confidence": 0, |
|
"context": "" |
|
}) |
|
|
|
return results |
|
|
|
|
|
def get_download_link(data, filename, text): |
|
"""Generate a link to download the given data as a file""" |
|
if isinstance(data, pd.DataFrame): |
|
csv = data.to_csv(index=False) |
|
b64 = base64.b64encode(csv.encode()).decode() |
|
else: |
|
b64 = base64.b64encode(json.dumps(data, indent=4).encode()).decode() |
|
|
|
href = f'<a href="data:file/txt;base64,{b64}" download="{filename}">{text}</a>' |
|
return href |
|
|
|
|
|
|
|
preview_col, upload_col = st.columns([1, 1]) |
|
|
|
with upload_col: |
|
st.header("Upload Document") |
|
uploaded_file = st.file_uploader("Upload SEC Filing (PDF)", type=["pdf"]) |
|
|
|
|
|
st.markdown("### Or choose a sample:") |
|
st.markdown( |
|
"In case the preview is not working, you can find these samples at [Notion](https://www.notion.so/Sample-Mock-Documents-for-Analysis-1d14cfc2eb35804cafa7e7db7531b1b8?pvs=4)") |
|
|
|
sample_cols = st.columns(len(sample_pdfs)) |
|
|
|
for i, (label, file_path) in enumerate(sample_pdfs.items()): |
|
with sample_cols[i]: |
|
if st.button(label): |
|
st.session_state["selected_pdf"] = file_path |
|
|
|
try: |
|
with open(file_path, "rb") as f: |
|
file_bytes = f.read() |
|
uploaded_file = io.BytesIO(file_bytes) |
|
uploaded_file.name = file_path |
|
except FileNotFoundError: |
|
st.error(f"Sample file {file_path} not found.") |
|
|
|
with preview_col: |
|
st.header("Document Preview") |
|
|
|
if uploaded_file: |
|
display_pdf(uploaded_file, height=400) |
|
|
|
elif st.session_state["selected_pdf"]: |
|
display_pdf(st.session_state["selected_pdf"], height=400) |
|
|
|
else: |
|
st.info("Upload a PDF or select a sample to preview.") |
|
|
|
|
|
|
|
if uploaded_file: |
|
if hasattr(uploaded_file, 'seek'): |
|
uploaded_file.seek(0) |
|
with st.spinner("Processing PDF file..."): |
|
|
|
full_text, text_by_page = extract_text_from_pdf(uploaded_file) |
|
|
|
|
|
st.success(f"Successfully extracted text from {len(text_by_page)} pages") |
|
|
|
|
|
with st.expander("View Extracted Text"): |
|
page_selection = st.selectbox( |
|
"Select page to view", |
|
["All"] + list(text_by_page.keys()) |
|
) |
|
|
|
if page_selection == "All": |
|
st.text_area("Full Text", full_text, height=300) |
|
else: |
|
st.text_area(f"Page {page_selection}", text_by_page[page_selection], height=300) |
|
|
|
|
|
st.header("Analysis Results") |
|
|
|
|
|
ner_tab, qa_tab, risk_tab, summary_tab = st.tabs(["Entity Recognition", "Question Answering", "Risk Analysis", "Summary"]) |
|
|
|
|
|
with ner_tab: |
|
with st.spinner("Performing Entity Recognition..."): |
|
entities = perform_ner(full_text, entity_types) |
|
|
|
if entities: |
|
|
|
entities_by_type = {} |
|
for entity in entities: |
|
if entity["type"] not in entities_by_type: |
|
entities_by_type[entity["type"]] = [] |
|
entities_by_type[entity["type"]].append(entity) |
|
|
|
|
|
for entity_type, type_entities in entities_by_type.items(): |
|
st.subheader(f"{entity_type} Entities") |
|
|
|
|
|
df = pd.DataFrame([{ |
|
"Text": e["text"], |
|
"Context": e["context"] |
|
} for e in type_entities]) |
|
|
|
st.dataframe(df, use_container_width=True) |
|
|
|
|
|
st.markdown( |
|
get_download_link( |
|
df, |
|
f"{entity_type.lower()}_entities.csv", |
|
f"Download {entity_type} Entities as CSV" |
|
), |
|
unsafe_allow_html=True |
|
) |
|
else: |
|
st.info("No entities detected. Try adjusting the entity types in the sidebar.") |
|
|
|
|
|
with qa_tab: |
|
if qa_mode: |
|
with st.spinner("Please note: Response times may take up to a minute due to CPU usage on the free tier of Hugging Face."): |
|
|
|
try: |
|
qa_pipeline = load_qa_model(nlp_model) |
|
qa_results = perform_qa(full_text, custom_questions, qa_pipeline, confidence_threshold) |
|
|
|
|
|
for result in qa_results: |
|
st.subheader(result["question"]) |
|
|
|
if result["confidence"] > 0: |
|
st.markdown(f"**Answer:** {result['answer']}") |
|
st.markdown(f"**Confidence:** {result['confidence']:.2f}") |
|
|
|
with st.expander("Show Context"): |
|
|
|
highlighted_context = result["context"].replace( |
|
result["answer"], |
|
f"**:blue[{result['answer']}]**" |
|
) |
|
st.markdown(highlighted_context) |
|
else: |
|
st.info("No answer found with sufficient confidence.") |
|
|
|
|
|
qa_df = pd.DataFrame(qa_results) |
|
st.markdown( |
|
get_download_link( |
|
qa_df, |
|
"qa_results.csv", |
|
"Download QA Results as CSV" |
|
), |
|
unsafe_allow_html=True |
|
) |
|
except Exception as e: |
|
st.error(f"Error performing question answering: {str(e)}") |
|
else: |
|
st.info("Question Answering is disabled. Enable it from the sidebar.") |
|
|
|
|
|
with risk_tab: |
|
with st.spinner("Analyzing Risk Keywords..."): |
|
|
|
paragraphs = re.split(r'\n\n+', full_text) |
|
risk_paragraphs = [] |
|
|
|
for para in paragraphs: |
|
if any(re.search(r'\b' + re.escape(keyword) + r'\b', para, re.IGNORECASE) for keyword in risk_keywords_list): |
|
|
|
keyword_count = sum(1 for keyword in risk_keywords_list if re.search(r'\b' + re.escape(keyword) + r'\b', para, re.IGNORECASE)) |
|
|
|
|
|
risk_score = min(1.0, keyword_count / 10) |
|
|
|
risk_paragraphs.append({ |
|
"paragraph": para, |
|
"keyword_count": keyword_count, |
|
"risk_score": risk_score, |
|
"highlighted_text": highlight_risk_terms(para, risk_keywords_list) |
|
}) |
|
|
|
if risk_paragraphs: |
|
|
|
risk_paragraphs.sort(key=lambda x: x["risk_score"], reverse=True) |
|
|
|
|
|
st.subheader(f"Found {len(risk_paragraphs)} Paragraphs with Risk Keywords") |
|
|
|
|
|
top_paragraphs = risk_paragraphs[:min(5, len(risk_paragraphs))] |
|
overall_risk = sum(p["risk_score"] for p in top_paragraphs) / len(top_paragraphs) |
|
|
|
|
|
st.subheader("Document Risk Assessment") |
|
st.progress(overall_risk) |
|
risk_level = "Low" if overall_risk < 0.4 else "Medium" if overall_risk < 0.7 else "High" |
|
st.markdown(f"**Risk Level: :{'green' if risk_level == 'Low' else 'orange' if risk_level == 'Medium' else 'red'}[{risk_level}]** (Score: {overall_risk:.2f})") |
|
|
|
|
|
for i, para in enumerate(risk_paragraphs): |
|
with st.expander(f"Risk Paragraph {i+1} (Score: {para['risk_score']:.2f})"): |
|
st.markdown(para["highlighted_text"]) |
|
|
|
|
|
risk_df = pd.DataFrame([{ |
|
"Risk Score": p["risk_score"], |
|
"Keyword Count": p["keyword_count"], |
|
"Paragraph": p["paragraph"] |
|
} for p in risk_paragraphs]) |
|
|
|
st.markdown( |
|
get_download_link( |
|
risk_df, |
|
"risk_paragraphs.csv", |
|
"Download Risk Analysis as CSV" |
|
), |
|
unsafe_allow_html=True |
|
) |
|
else: |
|
st.info("No risk keywords found in the document.") |
|
|
|
|
|
with summary_tab: |
|
st.subheader("Executive Summary") |
|
|
|
|
|
summary_points = [] |
|
|
|
|
|
if entities: |
|
entity_counts = {} |
|
for entity in entities: |
|
entity_type = entity["type"] |
|
if entity_type not in entity_counts: |
|
entity_counts[entity_type] = 0 |
|
entity_counts[entity_type] += 1 |
|
|
|
entity_summary = ", ".join([f"{count} {entity_type}" for entity_type, count in entity_counts.items()]) |
|
summary_points.append(f"Found {entity_summary}.") |
|
|
|
|
|
if 'risk_paragraphs' in locals() and risk_paragraphs: |
|
top_risk = risk_paragraphs[0] |
|
summary_points.append(f"Highest risk section identified with score {top_risk['risk_score']:.2f} containing keywords: {', '.join([kw for kw in risk_keywords_list if re.search(r'\b' + re.escape(kw) + r'\b', top_risk['paragraph'], re.IGNORECASE)])}.") |
|
|
|
|
|
if 'overall_risk' in locals(): |
|
summary_points.append(f"Overall document risk level: {risk_level}.") |
|
|
|
|
|
if qa_mode and 'qa_results' in locals() and qa_results: |
|
|
|
best_qa = max(qa_results, key=lambda x: x["confidence"]) |
|
if best_qa["confidence"] > 0: |
|
summary_points.append(f"Key finding: In response to '{best_qa['question']}', the document states '{best_qa['answer']}' (confidence: {best_qa['confidence']:.2f}).") |
|
|
|
if summary_points: |
|
for point in summary_points: |
|
st.markdown(f"β’ {point}") |
|
else: |
|
st.info("Not enough data to generate a summary. Try adjusting analysis parameters.") |
|
|
|
|
|
all_results = { |
|
"filename": uploaded_file.name, |
|
"analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
"entities": entities if 'entities' in locals() else [], |
|
"qa_results": qa_results if 'qa_results' in locals() else [], |
|
"risk_paragraphs": [{k: v for k, v in p.items() if k != 'highlighted_text'} for p in risk_paragraphs] if 'risk_paragraphs' in locals() else [], |
|
"summary_points": summary_points |
|
} |
|
|
|
st.markdown( |
|
get_download_link( |
|
all_results, |
|
f"regulatory_analysis_{datetime.now().strftime('%Y%m%d%H%M%S')}.json", |
|
"Download Complete Analysis Results (JSON)" |
|
), |
|
unsafe_allow_html=True |
|
) |
|
else: |
|
|
|
st.info("Upload a PDF file to begin analysis. The tool will extract text and perform NLP analysis to identify regulatory obligations, risks, and more.") |
|
|
|
|
|
st.subheader("What This Tool Does") |
|
|
|
col1, col2, col3 = st.columns(3) |
|
|
|
with col1: |
|
st.markdown("**1. Extract Text**") |
|
st.markdown("Upload SEC filings and extract all text content from PDFs.") |
|
|
|
with col2: |
|
st.markdown("**2. Analyze Content**") |
|
st.markdown("Use NLP to identify regulatory entities, answer questions, and flag risk language.") |
|
|
|
with col3: |
|
st.markdown("**3. Export Results**") |
|
st.markdown("Download structured analysis results for review by your legal and compliance teams.") |
|
|
|
|
|
st.markdown("---") |
|
st.markdown(""" |
|
[GitHub Repository](https://koulmesahil.github.io/) | [LinkedIn](https://www.linkedin.com/in/sahilkoul123/) |
|
""") |