|
import openai |
|
from docx import Document |
|
from pdfminer.high_level import extract_text |
|
from transformers import GPT2Tokenizer |
|
from dataclasses import dataclass |
|
from typing import List |
|
from tqdm import tqdm |
|
import os |
|
import pandas as pd |
|
import re |
|
|
|
EMBEDDING_SEG_LEN = 1500 |
|
COMPLETIONS_MODEL = "text-davinci-003" |
|
EMBEDDING_MODEL = "gpt-4" |
|
openai.api_key = os.environ["OPENAI_API_KEY"] |
|
EMBEDDING_CTX_LENGTH = 8191 |
|
EMBEDDING_ENCODING = "cl100k_base" |
|
ENCODING = "gpt2" |
|
|
|
@dataclass |
|
class Paragraph: |
|
page_num: int |
|
paragraph_num: int |
|
content: str |
|
|
|
def read_pdf_pdfminer(file_path) -> List[Paragraph]: |
|
text = extract_text(file_path).replace('\n', ' ').strip() |
|
paragraphs = batched(text, EMBEDDING_SEG_LEN) |
|
paragraphs_objs = [] |
|
paragraph_num = 1 |
|
for p in paragraphs: |
|
para = Paragraph(0, paragraph_num, p) |
|
paragraphs_objs.append(para) |
|
paragraph_num += 1 |
|
return paragraphs_objs |
|
|
|
def read_docx(file) -> List[Paragraph]: |
|
doc = Document(file) |
|
paragraphs = [] |
|
for paragraph_num, paragraph in enumerate(doc.paragraphs, start=1): |
|
content = paragraph.text.strip() |
|
if content: |
|
para = Paragraph(1, paragraph_num, content) |
|
paragraphs.append(para) |
|
return paragraphs |
|
|
|
def count_tokens(text): |
|
tokenizer = GPT2Tokenizer.from_pretrained('gpt2') |
|
return len(tokenizer.encode(text)) |
|
|
|
def batched(iterable, n): |
|
l = len(iterable) |
|
for ndx in range(0, l, n): |
|
yield iterable[ndx : min(ndx + n, l)] |
|
|
|
def compute_doc_embeddings(df): |
|
embeddings = {} |
|
for index, row in tqdm(df.iterrows(), total=df.shape[0]): |
|
doc = row["content"] |
|
doc_embedding = get_embedding(doc) |
|
embeddings[index] = doc_embedding |
|
return embeddings |
|
|
|
def enhanced_context_extraction(document, keywords, top_n=5): |
|
paragraphs = [para for para in document.split("\n") if para] |
|
def score_paragraph(para, keywords): |
|
keyword_count = sum([para.lower().count(keyword) for keyword in keywords]) |
|
positions = [para.lower().find(keyword) for keyword in keywords if keyword in para.lower()] |
|
proximity_score = 1 if max(positions) else 0 |
|
return keyword_count + proximity_score |
|
scores = [score_paragraph(para, keywords) for para in paragraphs] |
|
top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_n] |
|
relevant_paragraphs = [paragraphs[i] for i in top_indices] |
|
return " ".join(relevant_paragraphs) |
|
|
|
def targeted_context_extraction(document, keywords, top_n=5): |
|
paragraphs = [para for para in document.split("\n") if para] |
|
scores = [sum([para.lower().count(keyword) for keyword in keywords]) for para in paragraphs] |
|
top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_n] |
|
relevant_paragraphs = [paragraphs[i] for i in top_indices] |
|
return " ".join(relevant_paragraphs) |
|
|
|
|
|
def extract_page_and_clause_references(paragraph: str) -> str: |
|
page_matches = re.findall(r'Page (\d+)', paragraph) |
|
clause_matches = re.findall(r'Clause (\d+\.\d+)', paragraph) |
|
|
|
page_ref = f"Page {page_matches[0]}" if page_matches else "" |
|
clause_ref = f"Clause {clause_matches[0]}" if clause_matches else "" |
|
|
|
return f"({page_ref}, {clause_ref})".strip(", ") |
|
|
|
def refine_answer_based_on_question(question: str, answer: str) -> str: |
|
if "Does the agreement contain" in question: |
|
if "not" in answer or "No" in answer: |
|
refined_answer = f"No, the agreement does not contain {answer}" |
|
else: |
|
refined_answer = f"Yes, the agreement contains {answer}" |
|
else: |
|
refined_answer = answer |
|
|
|
return refined_answer |
|
|
|
def answer_query_with_context(question: str, df: pd.DataFrame, top_n_paragraphs: int = 5) -> str: |
|
question_words = set(question.split()) |
|
|
|
|
|
priority_keywords = ["duration", "term", "period", "month", "year", "day", "week", "agreement", "obligation", "effective date"] |
|
|
|
df['relevance_score'] = df['content'].apply(lambda x: len(question_words.intersection(set(x.split()))) + sum([x.lower().count(pk) for pk in priority_keywords])) |
|
|
|
most_relevant_paragraphs = df.sort_values(by='relevance_score', ascending=False).iloc[:top_n_paragraphs]['content'].tolist() |
|
|
|
context = "\n\n".join(most_relevant_paragraphs) |
|
prompt = f"Question: {question}\n\nContext: {context}\n\nAnswer:" |
|
response = openai.Completion.create(model=COMPLETIONS_MODEL, prompt=prompt, max_tokens=150) |
|
answer = response.choices[0].text.strip() |
|
|
|
|
|
references = extract_page_and_clause_references(context) |
|
answer = refine_answer_based_on_question(question, answer) + " " + references |
|
|
|
return answer |
|
def get_embedding(text): |
|
try: |
|
response = openai.Embed.create( |
|
model=EMBEDDING_MODEL, |
|
context=text, |
|
context_encoding=EMBEDDING_ENCODING, |
|
context_length=EMBEDDING_CTX_LENGTH |
|
) |
|
embedding = response["embedding"] |
|
except Exception as e: |
|
print("Error obtaining embedding:", e) |
|
embedding = [] |
|
return embedding |
|
|