|
import os |
|
import random |
|
import pandas as pd |
|
import chainlit as cl |
|
from datetime import datetime |
|
from urllib.parse import quote |
|
|
|
|
|
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "False" |
|
os.environ["ABSL_LOG_LEVEL"] = "ERROR" |
|
|
|
|
|
EXCEL_PATH = os.path.abspath("MasterData SISO - Actualizada - Jan-29-2025.xlsx") |
|
AUDIO_PATH = os.path.abspath("Manual del Operador de Tienda.wav") |
|
CERTIFICATE_TEMPLATE = os.path.abspath("certificate_template_.pdf") |
|
|
|
|
|
PREGUNTAS = [ |
|
{"pregunta": "1. ¿Qué debe hacer el Operador cuando se active la alerta de la pantalla naranja? 🚨", |
|
"opciones": ["A) Ignorar la alerta ❌🚨", "B) Informar al Jefe de Tienda 👨💼", |
|
"C) Recontar el dinero 💰", "D) Cerrar el turno 🔒"], |
|
"respuesta": "B"}, |
|
|
|
{"pregunta": "2. ¿Quién verifica la cantidad de dinero? 💼💵", |
|
"opciones": ["A) El encargado de turno 👷♂️", "B) El Jefe de Tienda ❌", |
|
"C) El Supervisor 👮♂️", "D) El Operador nuevamente 🔄"], |
|
"respuesta": "A"}, |
|
|
|
{"pregunta": "3. ¿Hasta cuándo es responsable el Operador del efectivo? ⏳", |
|
"opciones": ["A) Hasta finalizar su jornada 🕔", "B) Hasta la última recogida 📦", |
|
"C) Hasta cerrar el turno 🔐", "D) Hasta entregar al Jefe 👨💼"], |
|
"respuesta": "B"}, |
|
|
|
{"pregunta": "4. ¿Qué hace el Jefe tras recoger el dinero? 💼➡️📦", |
|
"opciones": ["A) Depositar en caja 🏦", "B) Contar nuevamente 🔢", |
|
"C) Actualizar ISSWEB 💻✅", "D) Entregar al Supervisor 👮♂️"], |
|
"respuesta": "C"}, |
|
|
|
{"pregunta": "5. ¿Qué hacer con cada sobre antes de guardarlo? 📦👀", |
|
"opciones": ["A) Mostrar 5s al CCTV ⏱️✅", "B) Contar otra vez 🔢", |
|
"C) Entregar al Jefe 👨💼", "D) Guardar en bolsillo 👖"], |
|
"respuesta": "A"} |
|
] |
|
|
|
|
|
def cargar_datos(): |
|
"""Carga los datos de Excel y los devuelve en un diccionario.""" |
|
try: |
|
df = pd.read_excel(EXCEL_PATH, dtype={'SAPId': 'int32'}) |
|
for col in ['Fecha', 'Calificacion', 'Curso']: |
|
if col not in df.columns: |
|
df[col] = None |
|
return df.set_index('SAPId').to_dict('index') |
|
except Exception as e: |
|
print(f"Error cargando datos: {str(e)}") |
|
return {} |
|
|
|
def actualizar_excel(sap_id, aprobado): |
|
"""Actualiza la hoja de Excel con el estado del usuario.""" |
|
try: |
|
df = pd.read_excel(EXCEL_PATH) |
|
mask = df['SAPId'] == sap_id |
|
df.loc[mask, 'Fecha'] = datetime.now().strftime('%Y-%m-%d') |
|
df.loc[mask, 'Calificacion'] = 'aprobado' if aprobado else 'no aprobado' |
|
df.loc[mask, 'Curso'] = 'Manual De Operador De Tienda' |
|
df.to_excel(EXCEL_PATH, index=False) |
|
except Exception as e: |
|
print(f"Error actualizando Excel: {str(e)}") |
|
|
|
|
|
@cl.on_chat_start |
|
async def inicio(): |
|
"""Inicia el chat y solicita el SAP ID.""" |
|
SAP_DICT = cargar_datos() |
|
cl.user_session.set("SAP_DICT", SAP_DICT) |
|
await cl.Message("🚀 **Sistema de Certificación - Operador de Tienda**\nIngrese su SAP ID:").send() |
|
|
|
@cl.on_message |
|
async def manejar_mensaje(message: cl.Message): |
|
"""Maneja la entrada del usuario.""" |
|
SAP_DICT = cl.user_session.get("SAP_DICT") |
|
user_data = cl.user_session.get("user_data") |
|
|
|
|
|
if user_data and user_data.get("en_examen"): |
|
await procesar_respuesta(message) |
|
return |
|
|
|
|
|
try: |
|
sap_id = int(message.content) |
|
if sap_id not in SAP_DICT: |
|
return await cl.Message("⛔️ ID no registrado").send() |
|
|
|
usuario = SAP_DICT[sap_id] |
|
respuesta = f""" |
|
🔍 **Usuario identificado** 🔍 |
|
├ N° Personal: {usuario['Número de personal']} |
|
├ Status: {usuario['Status ocupación']} |
|
├ Función: {usuario['FuncionName']} |
|
└ Centro Coste: {usuario['Centro de coste']} |
|
""" |
|
|
|
cl.user_session.set("user_data", { |
|
"sap_id": sap_id, |
|
"puntaje": 0, |
|
"pregunta_actual": 0, |
|
"en_examen": True |
|
}) |
|
|
|
await cl.Message(content=respuesta).send() |
|
await mostrar_pregunta() |
|
|
|
except ValueError: |
|
await cl.Message("⚠️ Solo números permitidos (Ej: 60000001)").send() |
|
|
|
async def mostrar_pregunta(): |
|
"""Muestra la siguiente pregunta del examen.""" |
|
user_data = cl.user_session.get("user_data") |
|
num_pregunta = user_data["pregunta_actual"] |
|
|
|
if num_pregunta >= len(PREGUNTAS): |
|
await mostrar_resultado() |
|
return |
|
|
|
p = PREGUNTAS[num_pregunta] |
|
mensaje = f"📝 **Pregunta {num_pregunta + 1}**\n{p['pregunta']}\n\n" + "\n".join(p['opciones']) |
|
await cl.Message(content=mensaje + "\n\n🔘 Respuesta (A/B/C/D):").send() |
|
|
|
async def procesar_respuesta(message: cl.Message): |
|
"""Procesa la respuesta de la pregunta actual.""" |
|
user_data = cl.user_session.get("user_data") |
|
num_pregunta = user_data["pregunta_actual"] |
|
|
|
respuesta = message.content.strip().upper() |
|
if respuesta not in ['A', 'B', 'C', 'D']: |
|
await cl.Message("⚠️ **Formato incorrecto**. Respuesta = A/B/C/D").send() |
|
return |
|
|
|
correcta = PREGUNTAS[num_pregunta]['respuesta'] |
|
if respuesta == correcta: |
|
user_data["puntaje"] += 1 |
|
await cl.Message("✅ ¡Respuesta correcta! +1 punto").send() |
|
else: |
|
await cl.Message(f"❌ Respuesta incorrecta. La correcta es: {correcta}").send() |
|
|
|
user_data["pregunta_actual"] += 1 |
|
cl.user_session.set("user_data", user_data) |
|
|
|
if user_data["pregunta_actual"] < len(PREGUNTAS): |
|
await mostrar_pregunta() |
|
else: |
|
await mostrar_resultado() |
|
|
|
async def mostrar_resultado(): |
|
"""Muestra el resultado final del examen y adjunta el PDF o el audio.""" |
|
user_data = cl.user_session.get("user_data") |
|
aprobado = user_data['puntaje'] >= 3 |
|
actualizar_excel(user_data['sap_id'], aprobado) |
|
|
|
mensaje = f"🎉 **¡FELICITACIONES! APROBADO**\n" if aprobado else f"❌ **LO SENTIMOS, REPROBADO**\n" |
|
mensaje += f"Puntaje Final: {user_data['puntaje']}/5" |
|
|
|
if aprobado and os.path.exists(CERTIFICATE_TEMPLATE): |
|
await cl.Message(content=mensaje, elements=[cl.File(path=CERTIFICATE_TEMPLATE, name="Certificado de Aprobación")]).send() |
|
elif not aprobado and os.path.exists(AUDIO_PATH): |
|
await cl.Message(content=mensaje, elements=[cl.Audio(name="🎧 Material de Refuerzo", path=AUDIO_PATH, display="inline", autoplay=False)]).send() |
|
else: |
|
await cl.Message(content=mensaje + "\n⚠️ **Error: No se encontró el archivo**").send() |
|
|
|
cl.user_session.set("user_data", None) |
|
await cl.Message(content="\nIngrese nuevo SAP ID para otro examen:").send() |
|
|
|
if __name__ == "__main__": |
|
from chainlit.cli import run_chainlit |
|
run_chainlit(__file__) |
|
|