Spaces:
Sleeping
Sleeping
import os | |
import chainlit as cl | |
import PyPDF2 | |
import httpx | |
import requests | |
from typing import List, Dict, Any | |
from markdown import markdown | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain.chains import RetrievalQAWithSourcesChain | |
from langchain.prompts.chat import ( | |
ChatPromptTemplate, | |
SystemMessagePromptTemplate, | |
HumanMessagePromptTemplate, | |
) | |
# Configuración Deepseek Actualizada | |
DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY") | |
EMBEDDINGS_URL = "https://api.deepseek.com/v1/embeddings" # URL corregida | |
CHAT_URL = "https://api.deepseek.com/v1/chat/completions" | |
class DeepseekEmbeddings: | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
def embed_documents(self, texts: List[str]) -> List[List[float]]: | |
headers = { | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"input": texts, | |
"model": "deepseek-embedding", # Modelo actualizado | |
"encoding_type": "float" | |
} | |
response = requests.post(EMBEDDINGS_URL, json=data, headers=headers) | |
if response.status_code == 200: | |
return [item["embedding"] for item in response.json()["data"]] | |
else: | |
raise ValueError(f"Error en embeddings: {response.text}") | |
class DeepseekChat: | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
self.client = httpx.AsyncClient() | |
async def agenerate(self, messages: List[Dict[str, str]], **kwargs) -> str: | |
headers = { | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": "deepseek-chat", | |
"messages": messages, | |
"temperature": 0.3, | |
"max_tokens": 2000, | |
"top_p": 0.9 | |
} | |
response = await self.client.post(CHAT_URL, json=data, headers=headers) | |
if response.status_code == 200: | |
return response.json()['choices'][0]['message']['content'] | |
raise ValueError(f"Error en el chat: {response.text}") | |
system_template = """Eres un experto en gestión de conflictos con habilidades avanzadas de análisis. Puedes: | |
1. Responder preguntas generales y técnicas | |
2. Generar tablas comparativas en markdown | |
3. Analizar documentos en profundidad | |
4. Combinar múltiples fuentes de información | |
Instrucciones: | |
- Usa formato markdown para tablas y listas | |
- Para preguntas técnicas, usa los documentos como fuente principal | |
- Incluye siempre fuentes relevantes | |
- Si no hay información suficiente, indica qué aspectos no están cubiertos en los documentos | |
Contexto documental: | |
{summaries}""" | |
messages = [ | |
SystemMessagePromptTemplate.from_template(system_template), | |
HumanMessagePromptTemplate.from_template("**Pregunta:** {question}\n**Respuesta (usar markdown si es necesario):**") | |
] | |
prompt = ChatPromptTemplate.from_messages(messages) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
async def on_chat_start(): | |
await cl.Message(content="Bienvenido al sistema experto en gestión de conflictos").send() | |
pdf_paths = [ | |
"gestion de conflictos.pdf", | |
"Managing Conflict with Your Boss .pdf" | |
] | |
all_texts = [] | |
all_metadatas = [] | |
for path in pdf_paths: | |
try: | |
base_name = os.path.basename(path) | |
with open(path, "rb") as f: | |
reader = PyPDF2.PdfReader(f) | |
pdf_text = " ".join([page.extract_text() or "" for page in reader.pages]) | |
chunks = text_splitter.split_text(pdf_text) | |
all_texts.extend(chunks) | |
all_metadatas.extend([{ | |
"source": base_name, | |
"page": (i // 3) + 1 | |
} for i, _ in enumerate(chunks)]) | |
except Exception as e: | |
await cl.Message(content=f"Error cargando {path}: {str(e)}").send() | |
return | |
try: | |
embeddings = DeepseekEmbeddings(DEEPSEEK_API_KEY) | |
docsearch = await cl.make_async(Chroma.from_texts)( | |
all_texts, | |
embeddings, | |
metadatas=all_metadatas | |
) | |
except Exception as e: | |
await cl.Message(content=f"Error creando embeddings: {str(e)}").send() | |
return | |
try: | |
chain = RetrievalQAWithSourcesChain.from_chain_type( | |
DeepseekChat(DEEPSEEK_API_KEY), | |
chain_type="stuff", | |
retriever=docsearch.as_retriever(search_kwargs={"k": 3}), | |
return_source_documents=True, | |
chain_type_kwargs={"prompt": prompt} | |
) | |
except Exception as e: | |
await cl.Message(content=f"Error configurando la cadena: {str(e)}").send() | |
return | |
cl.user_session.set("chain", chain) | |
cl.user_session.set("metadatas", all_metadatas) | |
cl.user_session.set("texts", all_texts) | |
await cl.Message(content="Sistema listo. Puedes hacer preguntas o pedir análisis con tablas").send() | |
async def main(message: cl.Message): | |
query = message.content | |
chain = cl.user_session.get("chain") | |
try: | |
res = await chain.acall(query) | |
answer = res["answer"] | |
# Formatear markdown | |
formatted_answer = markdown(answer) | |
# Manejo de fuentes | |
sources = res.get("sources", "") | |
metadatas = cl.user_session.get("metadatas") | |
texts = cl.user_session.get("texts") | |
source_elements = [] | |
unique_sources = set() | |
for src in sources.split(","): | |
src = src.strip() | |
if src: | |
matches = [i for i, m in enumerate(metadatas) if m["source"] == src] | |
if matches: | |
unique_sources.add(src) | |
source_elements.append(cl.Text( | |
content=texts[matches[0]], | |
name=f"{src} (Página {metadatas[matches[0]]['page']})" | |
)) | |
if unique_sources: | |
formatted_answer += f"\n\n**Fuentes verificadas:** {', '.join(unique_sources)}" | |
await cl.Message( | |
content=formatted_answer, | |
elements=source_elements[:3], | |
language="markdown" | |
).send() | |
except Exception as e: | |
await cl.Message(content=f"Error procesando la consulta: {str(e)}").send() | |
if __name__ == "__main__": | |
from chainlit.cli import run_chainlit | |
run_chainlit(__file__) |