Spaces:
Running
Running
| from dataclasses import dataclass | |
| from langchain_core.messages import HumanMessage | |
| from typing import Any, List, Dict, Literal, Tuple, Optional, Union, cast | |
| from pydantic import SecretStr | |
| from _utils.Utils_Class import UtilsClass | |
| from _utils.axiom_logs import AxiomLogs | |
| from _utils.bubble_integrations.enviar_resposta_final import enviar_resposta_final | |
| from _utils.gerar_documento_utils.contextual_retriever import ContextualRetriever | |
| from _utils.gerar_documento_utils.llm_calls import agemini_answer | |
| from _utils.gerar_documento_utils.prompts import ( | |
| create_prompt_auxiliar_do_contextual_prompt, | |
| prompt_gerar_query_dinamicamente, | |
| prompt_para_gerar_titulo, | |
| ) | |
| from _utils.langchain_utils.Chain_class import Chain | |
| from _utils.langchain_utils.LLM_class import LLM, Google_llms | |
| from _utils.langchain_utils.Prompt_class import Prompt | |
| from _utils.langchain_utils.Vector_store_class import VectorStore | |
| from _utils.utils import convert_markdown_to_HTML | |
| from gerar_documento.serializer import ( | |
| GerarDocumentoComPDFProprioSerializerData, | |
| GerarDocumentoSerializerData, | |
| ) | |
| from setup.easy_imports import ( | |
| Chroma, | |
| ChatOpenAI, | |
| PromptTemplate, | |
| BM25Okapi, | |
| Response, | |
| HuggingFaceEmbeddings, | |
| ) | |
| import logging | |
| from _utils.models.gerar_documento import ( | |
| ContextualizedChunk, | |
| DocumentChunk, | |
| RetrievalConfig, | |
| ) | |
| from cohere import Client | |
| from _utils.langchain_utils.Splitter_class import Splitter | |
| import time | |
| from setup.tokens import openai_api_key, cohere_api_key | |
| from setup.logging import Axiom | |
| import tiktoken | |
| from setup.environment import default_model | |
| def reciprocal_rank_fusion(result_lists, weights=None): | |
| """Combine multiple ranked lists using reciprocal rank fusion""" | |
| fused_scores = {} | |
| num_lists = len(result_lists) | |
| if weights is None: | |
| weights = [1.0] * num_lists | |
| for i in range(num_lists): | |
| for doc_id, score in result_lists[i]: | |
| if doc_id not in fused_scores: | |
| fused_scores[doc_id] = 0 | |
| fused_scores[doc_id] += weights[i] * score | |
| # Sort by score in descending order | |
| sorted_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) | |
| return sorted_results | |
| class GerarDocumentoUtils: | |
| axiom_instance: Axiom | |
| temperature = 0.0 | |
| model = default_model | |
| def criar_output_estruturado(self, summaries: List[str | Any], sources: Any): | |
| structured_output = [] | |
| for idx, summary in enumerate(summaries): | |
| source_idx = min(idx, len(sources) - 1) | |
| structured_output.append( | |
| { | |
| "content": summary, | |
| "source": { | |
| "page": sources[source_idx]["page"], | |
| "text": sources[source_idx]["content"][:200] + "...", | |
| "context": sources[source_idx]["context"], | |
| "relevance_score": sources[source_idx]["relevance_score"], | |
| "chunk_id": sources[source_idx]["chunk_id"], | |
| }, | |
| } | |
| ) | |
| return structured_output | |
| def ultima_tentativa_requisicao(self, prompt_gerar_documento_formatado): | |
| llm = LLM() | |
| resposta = llm.open_ai().invoke(prompt_gerar_documento_formatado) | |
| documento_gerado = resposta.content.strip() # type: ignore | |
| if not documento_gerado: | |
| raise Exception( | |
| "Falha ao tentar gerar o documento final por 5 tentativas e também ao tentar na última tentativa com o chat-gpt 4o mini." | |
| ) | |
| else: | |
| return documento_gerado | |
| def create_retrieval_config( | |
| self, | |
| serializer: Union[ | |
| GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any | |
| ], | |
| ): | |
| return RetrievalConfig( | |
| num_chunks=serializer.num_chunks_retrieval, | |
| embedding_weight=serializer.embedding_weight, | |
| bm25_weight=serializer.bm25_weight, | |
| context_window=serializer.context_window, | |
| chunk_overlap=serializer.chunk_overlap, | |
| ) | |
| async def checar_se_resposta_vazia_do_documento_final( | |
| self, llm_ultimas_requests: str, prompt: str | |
| ): | |
| llm = self.select_model_for_last_requests(llm_ultimas_requests) # type: ignore | |
| documento_gerado = "" | |
| tentativas = 0 | |
| while tentativas < 5 and not documento_gerado: | |
| tentativas += 1 | |
| try: | |
| resposta = llm.invoke(prompt) | |
| if hasattr(resposta, "content") and resposta.content.strip(): # type: ignore | |
| if isinstance(resposta.content, list): | |
| resposta.content = "\n".join(resposta.content) # type: ignore | |
| documento_gerado = resposta.content.strip() # type: ignore | |
| else: | |
| print(f"Tentativa {tentativas}: resposta vazia ou inexistente.") | |
| except Exception as e: | |
| llm = self.select_model_for_last_requests("gemini-2.0-flash") | |
| print(f"Tentativa {tentativas}: erro ao invocar o modelo: {e}") | |
| time.sleep(5) | |
| if not documento_gerado: | |
| try: | |
| self.axiom_instance.send_axiom( | |
| "TENTANDO GERAR DOCUMENTO FINAL COM GPT 4o-mini COMO ÚLTIMA TENTATIVA" | |
| ) | |
| documento_gerado = self.ultima_tentativa_requisicao(prompt) | |
| except Exception as e: | |
| raise Exception( | |
| "Falha ao gerar o documento final na última tentativa." | |
| ) from e | |
| return documento_gerado | |
| def select_model_for_last_requests( | |
| self, | |
| llm_ultimas_requests: Literal[ | |
| "gpt-4o-mini", "deepseek-chat", "gemini-2.0-flash", "gemini-2.5-pro" | |
| ], | |
| ): | |
| llm_instance = LLM() | |
| if llm_ultimas_requests == "gpt-4o-mini": | |
| llm = ChatOpenAI( | |
| temperature=self.temperature, | |
| model=self.model, | |
| api_key=SecretStr(openai_api_key), | |
| ) | |
| elif llm_ultimas_requests == "deepseek-chat": | |
| llm = llm_instance.deepseek() | |
| elif llm_ultimas_requests == "gemini-2.0-flash": | |
| llm = llm_instance.google_gemini( | |
| "gemini-2.0-flash", temperature=self.temperature | |
| ) | |
| elif llm_ultimas_requests == "gemini-2.5-pro": | |
| llm = llm_instance.google_gemini( | |
| "gemini-2.5-pro", temperature=self.temperature | |
| ) | |
| elif llm_ultimas_requests == "gemini-2.5-flash": | |
| llm = llm_instance.google_gemini( | |
| "gemini-2.5-flash", temperature=self.temperature | |
| ) | |
| return llm | |
| class GerarDocumento: | |
| lista_pdfs: List[str] | |
| should_use_llama_parse: bool | |
| all_PDFs_chunks: List[DocumentChunk] | |
| full_text_as_array: List[str] | |
| isBubble: bool | |
| chunks_processados: List[ContextualizedChunk] | List[DocumentChunk] | |
| resumo_auxiliar: str | |
| gerar_documento_utils: GerarDocumentoUtils | |
| utils = UtilsClass() | |
| llm = LLM() | |
| enhanced_vector_store: tuple[Chroma, BM25Okapi, List[str]] | |
| query_gerado_dinamicamente_para_o_vector_store: str | |
| structured_output: List[Any] | |
| texto_completo_como_html: str | |
| titulo_do_documento: str | |
| encoding_tiktoken = tiktoken.get_encoding("cl100k_base") | |
| serializer: Union[ | |
| GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any | |
| ] | |
| def __init__( | |
| self, | |
| serializer: Union[ | |
| GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any | |
| ], | |
| isBubble: bool, | |
| axiom_instance: Axiom, | |
| ): | |
| self.gerar_documento_utils = GerarDocumentoUtils(axiom_instance) | |
| self.gerar_documento_utils.temperature = serializer.gpt_temperature | |
| self.config = self.gerar_documento_utils.create_retrieval_config(serializer) | |
| self.serializer = serializer | |
| self.logger = logging.getLogger(__name__) | |
| # self.prompt_auxiliar = prompt_auxiliar | |
| self.gpt_model = serializer.model | |
| self.llm_temperature = serializer.gpt_temperature | |
| self.prompt_gerar_documento = serializer.prompt_gerar_documento | |
| self.should_use_llama_parse = serializer.should_use_llama_parse | |
| self.isBubble = isBubble | |
| self.is_contextualized_chunk = serializer.should_have_contextual_chunks | |
| self.contextual_retriever = ContextualRetriever(serializer) | |
| self.llm_ultimas_requests = serializer.llm_ultimas_requests | |
| self.cohere_client = Client(cohere_api_key) | |
| self.embeddings = HuggingFaceEmbeddings(model_name=serializer.hf_embedding) | |
| self.num_k_rerank = serializer.num_k_rerank | |
| self.model_cohere_rerank = serializer.model_cohere_rerank | |
| self.splitter = Splitter(serializer.chunk_size, serializer.chunk_overlap) | |
| self.prompt_gerar_documento_etapa_2 = serializer.prompt_gerar_documento_etapa_2 | |
| self.prompt_gerar_documento_etapa_3 = serializer.prompt_gerar_documento_etapa_3 | |
| self.vector_store = VectorStore(serializer.hf_embedding) | |
| self.axiom_instance: Axiom = axiom_instance | |
| self.ax = AxiomLogs(axiom_instance) | |
| async def get_text_and_pdf_chunks(self): | |
| all_PDFs_chunks, full_text_as_array = ( | |
| await self.utils.handle_files.get_full_text_and_all_PDFs_chunks( | |
| self.lista_pdfs, | |
| self.splitter, | |
| self.should_use_llama_parse, | |
| self.isBubble, | |
| ) | |
| ) | |
| self.ax.texto_completo_pdf(full_text_as_array) | |
| self.all_PDFs_chunks = all_PDFs_chunks | |
| self.full_text_as_array = full_text_as_array | |
| return all_PDFs_chunks, full_text_as_array | |
| async def generate_chunks_processados(self): | |
| if self.is_contextualized_chunk: | |
| self.ax.inicio_requisicao_contextual() | |
| contextualized_chunks = ( | |
| await self.contextual_retriever.contextualize_all_chunks( | |
| self.all_PDFs_chunks, self.resumo_auxiliar, self.axiom_instance | |
| ) | |
| ) | |
| self.ax.fim_requisicao_contextual() | |
| chunks_processados = ( | |
| contextualized_chunks | |
| if self.is_contextualized_chunk | |
| else self.all_PDFs_chunks | |
| ) | |
| self.chunks_processados = chunks_processados | |
| if len(self.chunks_processados) == 0: | |
| self.chunks_processados = self.all_PDFs_chunks | |
| self.ax.chunks_inicialmente(self.chunks_processados) | |
| return self.chunks_processados | |
| async def generate_query_for_vector_store(self): | |
| prompt_para_gerar_query_dinamico = prompt_gerar_query_dinamicamente( | |
| cast(str, self.resumo_auxiliar) | |
| ) | |
| self.axiom_instance.send_axiom( | |
| "COMEÇANDO REQUISIÇÃO PARA GERAR O QUERY DINAMICAMENTE DO VECTOR STORE" | |
| ) | |
| response = await self.llm.google_gemini_ainvoke( | |
| prompt_para_gerar_query_dinamico, | |
| "gemini-2.0-flash", | |
| temperature=self.llm_temperature, | |
| ) | |
| self.query_gerado_dinamicamente_para_o_vector_store = cast( | |
| str, response.content | |
| ) | |
| self.axiom_instance.send_axiom( | |
| f"query_gerado_dinamicamente_para_o_vector_store: {self.query_gerado_dinamicamente_para_o_vector_store}", | |
| ) | |
| return self.query_gerado_dinamicamente_para_o_vector_store | |
| async def create_enhanced_vector_store(self): | |
| vector_store, bm25, chunk_ids = self.vector_store.create_enhanced_vector_store( | |
| self.chunks_processados, self.is_contextualized_chunk, self.axiom_instance # type: ignore | |
| ) | |
| self.enhanced_vector_store = vector_store, bm25, chunk_ids | |
| return vector_store, bm25, chunk_ids | |
| def retrieve_with_rank_fusion( | |
| self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], query: str | |
| ) -> List[Dict]: | |
| """Combine embedding and BM25 retrieval results""" | |
| try: | |
| # Get embedding results | |
| embedding_results = vector_store.similarity_search_with_score( | |
| query, k=self.config.num_chunks | |
| ) | |
| # Convert embedding results to list of (chunk_id, score) | |
| embedding_list = [ | |
| (doc.metadata["chunk_id"], 1 / (1 + score)) | |
| for doc, score in embedding_results | |
| ] | |
| # Get BM25 results | |
| tokenized_query = query.split() | |
| bm25_scores = bm25.get_scores(tokenized_query) | |
| # Convert BM25 scores to list of (chunk_id, score) | |
| bm25_list = [ | |
| (chunk_ids[i], float(score)) for i, score in enumerate(bm25_scores) | |
| ] | |
| # Sort bm25_list by score in descending order and limit to top N results | |
| bm25_list = sorted(bm25_list, key=lambda x: x[1], reverse=True)[ | |
| : self.config.num_chunks | |
| ] | |
| # Normalize BM25 scores | |
| calculo_max = max( | |
| [score for _, score in bm25_list] | |
| ) # Criei este max() pois em alguns momentos estava vindo valores 0, e reclamava que não podia dividir por 0 | |
| max_bm25 = calculo_max if bm25_list and calculo_max else 1 | |
| bm25_list = [(doc_id, score / max_bm25) for doc_id, score in bm25_list] | |
| # Pass the lists to rank fusion | |
| result_lists = [embedding_list, bm25_list] | |
| weights = [self.config.embedding_weight, self.config.bm25_weight] | |
| combined_results = reciprocal_rank_fusion(result_lists, weights=weights) | |
| return combined_results # type: ignore | |
| except Exception as e: | |
| self.logger.error(f"Error in rank fusion retrieval: {str(e)}") | |
| raise | |
| def rank_fusion_get_top_results( | |
| self, | |
| vector_store: Chroma, | |
| bm25: BM25Okapi, | |
| chunk_ids: List[str], | |
| query: str = "Summarize the main points of this document", | |
| ): | |
| # Get combined results using rank fusion | |
| ranked_results = self.retrieve_with_rank_fusion( | |
| vector_store, bm25, chunk_ids, query | |
| ) | |
| # Prepare context and track sources | |
| contexts = [] | |
| sources = [] | |
| # Get full documents for top results | |
| for chunk_id, score in ranked_results[: self.config.num_chunks]: | |
| results = vector_store.get( | |
| where={"chunk_id": chunk_id}, include=["documents", "metadatas"] | |
| ) | |
| if results["documents"]: | |
| context = results["documents"][0] | |
| metadata = results["metadatas"][0] | |
| contexts.append(context) | |
| sources.append( | |
| { | |
| "content": context, | |
| "page": metadata["page"], | |
| "chunk_id": chunk_id, | |
| "relevance_score": score, | |
| "context": metadata.get("context", ""), | |
| } | |
| ) | |
| return sources, contexts | |
| async def do_last_requests( | |
| self, | |
| ) -> List[Dict]: | |
| try: | |
| self.axiom_instance.send_axiom("COMEÇANDO A FAZER ÚLTIMA REQUISIÇÃO") | |
| vector_store, bm25, chunk_ids = self.enhanced_vector_store | |
| sources, contexts = self.rank_fusion_get_top_results( | |
| vector_store, | |
| bm25, | |
| chunk_ids, | |
| self.query_gerado_dinamicamente_para_o_vector_store, | |
| ) | |
| prompt_gerar_documento = PromptTemplate( | |
| template=cast(str, self.prompt_gerar_documento), | |
| input_variables=["context"], | |
| ) | |
| llm_ultimas_requests = self.llm_ultimas_requests | |
| prompt_instance = Prompt() | |
| context_do_prompt_primeira_etapa = "\n\n".join(contexts) | |
| prompt_primeira_etapa = prompt_gerar_documento.format( | |
| context=context_do_prompt_primeira_etapa, | |
| ) | |
| self.gerar_documento_utils.model = self.gpt_model | |
| self.gerar_documento_utils.temperature = self.llm_temperature | |
| documento_gerado = await self.gerar_documento_utils.checar_se_resposta_vazia_do_documento_final( | |
| llm_ultimas_requests, prompt_primeira_etapa | |
| ) | |
| texto_final_juntando_as_etapas = "" | |
| resposta_primeira_etapa = documento_gerado | |
| texto_final_juntando_as_etapas += resposta_primeira_etapa | |
| self.axiom_instance.send_axiom( | |
| f"RESULTADO ETAPA 1: {resposta_primeira_etapa}" | |
| ) | |
| if self.prompt_gerar_documento_etapa_2: | |
| self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 2") | |
| prompt_etapa_2 = prompt_instance.create_and_invoke_prompt( | |
| self.prompt_gerar_documento_etapa_2, | |
| dynamic_dict={"context": context_do_prompt_primeira_etapa}, | |
| ) | |
| # documento_gerado = llm.invoke(prompt_etapa_2).content | |
| documento_gerado = self.gerar_documento_utils.checar_se_resposta_vazia_do_documento_final( | |
| llm_ultimas_requests, prompt_etapa_2.to_string() | |
| ) | |
| resposta_segunda_etapa = documento_gerado | |
| texto_final_juntando_as_etapas += ( | |
| f"\n\nresposta_segunda_etapa:{resposta_segunda_etapa}" | |
| ) | |
| self.axiom_instance.send_axiom(f"RESULTADO ETAPA 2: {documento_gerado}") | |
| if self.prompt_gerar_documento_etapa_3: | |
| self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 3") | |
| prompt_etapa_3 = prompt_instance.create_and_invoke_prompt( | |
| self.prompt_gerar_documento_etapa_3, | |
| dynamic_dict={ | |
| "context": f"{resposta_primeira_etapa}\n\n{resposta_segunda_etapa}" | |
| }, | |
| ) | |
| # documento_gerado = llm.invoke(prompt_etapa_3).content | |
| documento_gerado = self.gerar_documento_utils.checar_se_resposta_vazia_do_documento_final( | |
| llm_ultimas_requests, prompt_etapa_3.to_string() | |
| ) | |
| texto_final_juntando_as_etapas += f"\n\n{documento_gerado}" | |
| self.axiom_instance.send_axiom(f"RESULTADO ETAPA 3: {documento_gerado}") | |
| # Split the response into paragraphs | |
| summaries = [ | |
| p.strip() for p in texto_final_juntando_as_etapas.split("\n\n") if p.strip() # type: ignore | |
| ] | |
| structured_output = self.gerar_documento_utils.criar_output_estruturado( | |
| summaries, sources | |
| ) | |
| self.axiom_instance.send_axiom("TERMINOU DE FAZER A ÚLTIMA REQUISIÇÃO") | |
| self.structured_output = structured_output | |
| return structured_output | |
| except Exception as e: | |
| self.logger.error(f"Error generating enhanced summary: {str(e)}") | |
| raise | |
| async def generate_complete_text(self): | |
| texto_completo = "\n\n" | |
| for x in self.structured_output: | |
| texto_completo = texto_completo + x["content"] + "\n" | |
| x["source"]["text"] = x["source"]["text"][0:200] | |
| x["source"]["context"] = x["source"]["context"][0:200] | |
| self.texto_completo_como_html = convert_markdown_to_HTML( | |
| texto_completo | |
| ).replace("resposta_segunda_etapa:", "<br><br>") | |
| self.axiom_instance.send_axiom( | |
| f"texto_completo_como_html: {self.texto_completo_como_html}" | |
| ) | |
| async def get_document_title(self): | |
| if self.is_contextualized_chunk: | |
| resumo_para_gerar_titulo = self.resumo_auxiliar | |
| else: | |
| resumo_para_gerar_titulo = self.texto_completo_como_html | |
| prompt = prompt_para_gerar_titulo(resumo_para_gerar_titulo) | |
| response = await agemini_answer( | |
| prompt, "gemini-2.0-flash-lite", temperature=self.llm_temperature | |
| ) | |
| self.titulo_do_documento = response | |
| return self.titulo_do_documento | |
| async def send_to_bubble(self): | |
| self.axiom_instance.send_axiom("COMEÇANDO A REQUISIÇÃO FINAL PARA O BUBBLE") | |
| enviar_resposta_final( | |
| self.serializer.doc_id, # type: ignore | |
| self.serializer.form_response_id, # type: ignore | |
| self.serializer.version, # type: ignore | |
| self.texto_completo_como_html, | |
| False, | |
| cast(str, self.titulo_do_documento), | |
| ) | |
| self.axiom_instance.send_axiom("TERMINOU A REQUISIÇÃO FINAL PARA O BUBBLE") | |
| async def gerar_ementa_final( | |
| self, | |
| llm_ultimas_requests: str, | |
| prompt_primeira_etapa: str, | |
| context_primeiro_prompt: str, | |
| ): | |
| llm = self.gerar_documento_utils.select_model_for_last_requests(llm_ultimas_requests) # type: ignore | |
| prompt_instance = Prompt() | |
| documento_gerado = await self.gerar_documento_utils.checar_se_resposta_vazia_do_documento_final( | |
| llm_ultimas_requests, prompt_primeira_etapa | |
| ) | |
| texto_final_juntando_as_etapas = "" | |
| resposta_primeira_etapa = documento_gerado | |
| texto_final_juntando_as_etapas += resposta_primeira_etapa | |
| self.axiom_instance.send_axiom(f"RESULTADO ETAPA 1: {resposta_primeira_etapa}") | |
| if self.prompt_gerar_documento_etapa_2: | |
| self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 2") | |
| prompt_etapa_2 = prompt_instance.create_and_invoke_prompt( | |
| self.prompt_gerar_documento_etapa_2, | |
| dynamic_dict={"context": context_primeiro_prompt}, | |
| ) | |
| documento_gerado = llm.invoke(prompt_etapa_2).content | |
| resposta_segunda_etapa = documento_gerado | |
| texto_final_juntando_as_etapas += ( | |
| f"\n\nresposta_segunda_etapa:{resposta_segunda_etapa}" | |
| ) | |
| self.axiom_instance.send_axiom(f"RESULTADO ETAPA 2: {documento_gerado}") | |
| if self.prompt_gerar_documento_etapa_3: | |
| self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 3") | |
| prompt_etapa_3 = prompt_instance.create_and_invoke_prompt( | |
| self.prompt_gerar_documento_etapa_3, | |
| dynamic_dict={ | |
| "context": f"{resposta_primeira_etapa}\n\n{resposta_segunda_etapa}" | |
| }, | |
| ) | |
| documento_gerado = llm.invoke(prompt_etapa_3).content | |
| texto_final_juntando_as_etapas += f"\n\n{documento_gerado}" | |
| self.axiom_instance.send_axiom(f"RESULTADO ETAPA 3: {documento_gerado}") | |
| return texto_final_juntando_as_etapas | |
| # Esta função gera a resposta que será usada em cada um das requisições de cada chunk | |
| async def get_response_from_auxiliar_contextual_prompt(self): | |
| llms = LLM() | |
| responses = [] | |
| current_chunk = [] | |
| current_token_count = 0 | |
| chunk_counter = 1 | |
| for part in self.full_text_as_array: | |
| part_tokens = len(self.encoding_tiktoken.encode(part)) | |
| # Check if adding this part would EXCEED the limit | |
| if current_token_count + part_tokens > 600000: | |
| # Process the accumulated chunk before it exceeds the limit | |
| chunk_text = "".join(current_chunk) | |
| print( | |
| f"\nProcessing chunk {chunk_counter} with {current_token_count} tokens" | |
| ) | |
| prompt = create_prompt_auxiliar_do_contextual_prompt(chunk_text) | |
| response = await llms.google_gemini( | |
| temperature=self.llm_temperature | |
| ).ainvoke([HumanMessage(content=prompt)]) | |
| responses.append(response.content) | |
| # Start new chunk with current part | |
| current_chunk = [part] | |
| current_token_count = part_tokens | |
| chunk_counter += 1 | |
| else: | |
| # Safe to add to current chunk | |
| current_chunk.append(part) | |
| current_token_count += part_tokens | |
| # Process the final remaining chunk | |
| if current_chunk: | |
| chunk_text = "".join(current_chunk) | |
| print( | |
| f"\nProcessing final chunk {chunk_counter} with {current_token_count} tokens" | |
| ) | |
| prompt = create_prompt_auxiliar_do_contextual_prompt(chunk_text) | |
| response = await llms.google_gemini( | |
| temperature=self.llm_temperature | |
| ).ainvoke([HumanMessage(content=prompt)]) | |
| responses.append(response.content) | |
| self.resumo_auxiliar = "".join(responses) | |
| self.ax.resumo_inicial_processo(self.resumo_auxiliar) | |
| return self.resumo_auxiliar | |
| def gerar_resposta_compilada(self): | |
| serializer = self.serializer | |
| return { | |
| "num_chunks_retrieval": serializer.num_chunks_retrieval, | |
| "embedding_weight": serializer.embedding_weight, | |
| "bm25_weight": serializer.bm25_weight, | |
| "context_window": serializer.context_window, | |
| "chunk_overlap": serializer.chunk_overlap, | |
| "num_k_rerank": serializer.num_k_rerank, | |
| "model_cohere_rerank": serializer.model_cohere_rerank, | |
| "more_initial_chunks_for_reranking": serializer.more_initial_chunks_for_reranking, | |
| "claude_context_model": serializer.claude_context_model, | |
| "gpt_temperature": serializer.gpt_temperature, | |
| "user_message": serializer.user_message, | |
| "model": serializer.model, | |
| "hf_embedding": serializer.hf_embedding, | |
| "chunk_size": serializer.chunk_size, | |
| "chunk_overlap": serializer.chunk_overlap, | |
| # "prompt_auxiliar": serializer.prompt_auxiliar, | |
| "prompt_gerar_documento": serializer.prompt_gerar_documento[0:200], | |
| } | |