Spaces:
Running
Running
import os | |
from _utils.gerar_relatorio_modelo_usuario.prompts import ( | |
prompt_auxiliar_do_contextual_prompt, | |
create_prompt_auxiliar_do_contextual_prompt, | |
) | |
from _utils.bubble_integrations.obter_arquivo import get_pdf_from_bubble | |
from _utils.chains.Chain_class import Chain | |
from _utils.handle_files import return_document_list_with_llama_parser | |
from _utils.prompts.Prompt_class import Prompt | |
from _utils.splitters.Splitter_class import Splitter | |
from setup.easy_imports import PyPDFLoader | |
from langchain_openai import ChatOpenAI | |
from typing import List, Dict, Tuple, Optional | |
from anthropic import Anthropic, AsyncAnthropic | |
import logging | |
from langchain.schema import Document | |
from llama_index import Document as Llama_Index_Document | |
import asyncio | |
from langchain.prompts import PromptTemplate | |
from typing import List | |
from multiprocessing import Process, Barrier, Queue | |
from dataclasses import dataclass | |
from langchain_core.messages import HumanMessage | |
from asgiref.sync import sync_to_async | |
from setup.easy_imports import ChatPromptTemplate, ChatOpenAI | |
from _utils.gerar_relatorio_modelo_usuario.llm_calls import aclaude_answer, agpt_answer | |
from _utils.gerar_relatorio_modelo_usuario.prompts import contextual_prompt | |
from _utils.models.gerar_relatorio import ( | |
ContextualizedChunk, | |
DocumentChunk, | |
RetrievalConfig, | |
) | |
from _utils.prompts.Prompt_class import prompt as prompt_obj | |
lista_contador = [] | |
class ContextualRetriever: | |
def __init__( | |
self, config: RetrievalConfig, claude_api_key: str, claude_context_model: str | |
): | |
self.config = config | |
# self.claude_client = Anthropic(api_key=claude_api_key) | |
self.claude_client = AsyncAnthropic(api_key=claude_api_key) | |
self.logger = logging.getLogger(__name__) | |
self.bm25 = None | |
self.claude_context_model = claude_context_model | |
async def llm_generate_context( | |
self, page_text: str, chunk: DocumentChunk, resumo_auxiliar | |
) -> str: | |
"""Generate contextual description using ChatOpenAI""" | |
try: | |
print("COMEÇOU A REQUISIÇÃO") | |
prompt = contextual_prompt(page_text, resumo_auxiliar, chunk.content) | |
# response = await aclaude_answer( | |
# self.claude_client, self.claude_context_model, prompt | |
# ) | |
response = await agpt_answer(prompt) | |
return response | |
except Exception as e: | |
self.logger.error( | |
f"Context generation failed for chunk {chunk.chunk_id}: {str(e)}" | |
) | |
return "" | |
# def gerar_resumo_auxiliar_do_contextual_embedding(self): | |
# prompt = Prompt().create_prompt_template( | |
# "", prompt_auxiliar_do_contextual_prompt | |
# ) | |
# Chain(prompt, ChatOpenAI()) | |
# return | |
async def create_contextualized_chunk( | |
self, chunk, single_page_text, response_auxiliar_summary | |
): | |
lista_contador.append(0) | |
print("contador: ", len(lista_contador)) | |
# Código comentado abaixo é para ler as páginas ao redor da página atual do chunk | |
# page_content = "" | |
# for i in range( | |
# max(0, chunk.page_number - 1), | |
# min(len(single_page_text), chunk.page_number + 2), | |
# ): | |
# page_content += single_page_text[i].page_content if single_page_text[i] else "" | |
page_number = chunk.page_number - 1 | |
page_content = single_page_text[page_number].page_content | |
context = await self.llm_generate_context( | |
page_content, chunk, response_auxiliar_summary | |
) | |
return ContextualizedChunk( | |
content=chunk.content, | |
page_number=chunk.page_number, | |
chunk_id=chunk.chunk_id, | |
start_char=chunk.start_char, | |
end_char=chunk.end_char, | |
context=context, | |
) | |
async def contextualize_all_chunks( | |
self, full_text_as_array: List[Document], chunks: List[DocumentChunk] | |
) -> List[ContextualizedChunk]: | |
"""Add context to all chunks""" | |
contextualized_chunks = [] | |
lista_contador = [] | |
full_text = "" | |
for x in full_text_as_array: | |
full_text += x.page_content | |
# prompt_auxiliar_summary = prompt_obj.create_prompt_template( | |
# "", prompt_auxiliar_do_contextual_prompt | |
# ).invoke({"PROCESSO_JURIDICO": full_text}) | |
# response_auxiliar_summary = await ChatOpenAI(max_tokens=128000).ainvoke( | |
# prompt_auxiliar_summary | |
# ) | |
prompt_auxiliar_summary = create_prompt_auxiliar_do_contextual_prompt(full_text) | |
print("\n\n\nprompt_auxiliar_summary: ", prompt_auxiliar_summary) | |
response_auxiliar_summary = await aclaude_answer( | |
self.claude_client, self.claude_context_model, prompt_auxiliar_summary | |
) | |
print("\n\n\n\nresponse_auxiliar_summary: ", response_auxiliar_summary) | |
async with asyncio.TaskGroup() as tg: | |
tasks = [ | |
tg.create_task( | |
self.create_contextualized_chunk( | |
chunk, full_text_as_array, response_auxiliar_summary | |
) | |
) | |
for chunk in chunks | |
] | |
contextualized_chunks = [task.result() for task in tasks] | |
return contextualized_chunks | |
async def get_full_text_and_all_PDFs_chunks( | |
listaPDFs: List[str], | |
splitterObject: Splitter, | |
should_use_llama_parse: bool, | |
): | |
all_PDFs_chunks = [] | |
pages: List[Document] = [] | |
# Load and process document | |
for pdf_path in listaPDFs: | |
if should_use_llama_parse: | |
pages = pages + await return_document_list_with_llama_parser(pdf_path) | |
else: | |
pages = pages + get_pdf_from_bubble(pdf_path) | |
chunks = splitterObject.load_and_split_document(pdf_path, pages) | |
all_PDFs_chunks = all_PDFs_chunks + chunks | |
# Get full text for contextualization | |
# loader = PyPDFLoader(pdf_path) | |
# full_text = "" | |
# full_text = " ".join([page.page_content for page in pages]) | |
return all_PDFs_chunks, pages # , full_text | |
async def contextualize_chunk_based_on_serializer( | |
serializer, contextual_retriever: ContextualRetriever, pages, all_PDFs_chunks | |
): | |
if serializer["should_have_contextual_chunks"]: | |
contextualized_chunks = await contextual_retriever.contextualize_all_chunks( | |
pages, all_PDFs_chunks | |
) | |
chunks_passados = contextualized_chunks | |
is_contextualized_chunk = True | |
else: | |
chunks_passados = all_PDFs_chunks | |
is_contextualized_chunk = False | |
return chunks_passados, is_contextualized_chunk | |