from memory import ConversationMemory from config import Config from transformers import AutoTokenizer, AutoModelForCausalLM import torch, gc import unicodedata from typing import Dict, Tuple, Optional, Any import re import pandas as pd import sqlite3 class MojicaAgent: def __init__(self, config: Config): self.config = config self.memory = ConversationMemory() self.essential_columns = [ { "name": "Descripcion", "type": "TEXT", "description": "Nombre del producto", }, {"name": "Cantidad", "type": "REAL", "description": "Unidades vendidas"}, {"name": "Cliente", "type": "TEXT", "description": "Código de cliente"}, { "name": "Razon Social", "type": "TEXT", "description": "Nombre completo del cliente", }, {"name": "Ciudad", "type": "TEXT", "description": "Ciudad del cliente"}, { "name": "Fecha", "type": "TEXT", "description": "Fecha de venta (YYYY-MM-DD)", }, {"name": "Neto", "type": "REAL", "description": "Valor neto de la venta"}, ] self._initialize_model() self.schema = self._load_schema() def _initialize_model(self): def try_load_model(): self.tokenizer = AutoTokenizer.from_pretrained(self.config.MODEL_NAME) self.model = AutoModelForCausalLM.from_pretrained( self.config.MODEL_NAME, device_map="auto", torch_dtype="auto", trust_remote_code=True, ).eval() try: try_load_model() except torch.cuda.OutOfMemoryError: gc.collect() torch.cuda.empty_cache() torch.cuda.ipc_collect() try_load_model() def _load_schema(self) -> Dict: connection = sqlite3.connect(self.config.DB_PATH) cursor = connection.cursor() cursor.execute(f"PRAGMA table_info({self.config.TABLE_NAME})") columns = [ {"name": column[1], "type": column[2]} for column in cursor.fetchall() ] schema = {"table_name": self.config.TABLE_NAME, "columns": columns} connection.close() return schema def _generate_sql_prompt(self, question: str) -> str: memory_context = self.memory.get_context(question) table_name = self.schema["table_name"] # 1. Detectar tipo de pregunta question_type = ( "PRODUCTOS" if "producto" in question.lower() else ( "CERO" if "cero" in question.lower() else ( "CIUDADES" if "ciudad" in question.lower() else ( "DINERO" if "dinero" in question.lower() else "CLIENTES" if "cliente" in question.lower() else "GENERAL" ) ) ) ) # 2. Ejemplos dinámicos examples = { "PRODUCTOS": ( "-- P: 'Top 10 productos más vendidos'\n" 'SELECT "Descripcion", SUM("Cantidad") AS total_vendido\n' f'FROM "{table_name}"\n' 'WHERE "Descripcion" IS NOT NULL\n' 'GROUP BY "Descripcion"\n' "ORDER BY total_vendido DESC\n" "LIMIT 10;\n\n" "-- P: 'Producto con mayor valor neto'\n" 'SELECT "Descripcion", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' 'WHERE "Descripcion" IS NOT NULL\n' 'GROUP BY "Descripcion"\n' "ORDER BY valor_total DESC\n" "LIMIT 1;" "--P: ' ¿En que ciudad es mas comprado el producto Queso Gouda Importado Chileno Colun?'\n" 'SELECT "Descripcion", "Ciudad", SUM("Cantidad") AS total_vendido\n' f'FROM "{table_name}"\n' 'WHERE "Descripcion" LIKE \'%Queso Gouda%\' AND "Cliente" IS NOT NULL\n' 'GROUP BY "Ciudad"\n' "ORDER BY total_vendido DESC\n" "LIMIT 1;\n\n" ), "CERO": ( "-- P: '¿Cuántos clientes cero tenemos en total (al día de hoy)?'\n" 'SELECT COUNT(DISTINCT "Cliente") AS clientes_cero\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" IS NOT NULL\n' 'AND "Cliente" NOT IN (\n' ' SELECT DISTINCT "Cliente"\n' f' FROM "{table_name}"\n' " WHERE DATE(\"Fecha\") > DATE('now', '-28 day')\n" ");\n\n" "-- P: '¿Cuántos clientes cero tenemos en Guadalajara (al día de hoy)?'\n" 'SELECT COUNT(DISTINCT "Cliente") AS clientes_cero\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" IS NOT NULL\n' "AND \"Ciudad\" LIKE '%Guadalajara%'\n" 'AND "Cliente" NOT IN (\n' ' SELECT DISTINCT "Cliente"\n' f' FROM "{table_name}"\n' " WHERE DATE(\"Fecha\") > DATE('now', '-28 day')\n" ");\n\n" "-- P: '¿Cuántos clientes cero tuvimos en Abril?" 'SELECT COUNT(DISTINCT "Cliente") AS clientes_cero\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" IS NOT NULL\n' "AND DATE(\"Fecha\") <= DATE('2025-04-30')\n" 'AND "Cliente" NOT IN (\n' ' SELECT DISTINCT "Cliente"\n' f' FROM "{table_name}"\n' " WHERE DATE(\"Fecha\") BETWEEN DATE('2025-04-30','-27 day') AND DATE('2025-04-30')\n" ");\n\n" ), "DINERO": ( "-- P: '¿Cuánto dinero nos ha generado el producto queso adobera?'\n" 'SELECT "Descripcion", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' 'WHERE "Descripcion" LIKE \'%queso adobera%\' AND "Cliente" IS NOT NULL\n' 'GROUP BY "Descripcion"\n' "ORDER BY valor_total DESC\n" "LIMIT 1;\n\n" "-- P: '¿Cuánto dinero nos ha generado el cliente C1261?'\n" 'SELECT "Cliente", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" = \'C1261\' AND "Cliente" IS NOT NULL\n' 'GROUP BY "Cliente"\n' "ORDER BY valor_total DESC\n" "LIMIT 1;\n\n" "-- P: '¿Cuánto dinero nos ha generado la ciudad de Zapopan?'\n" 'SELECT "Ciudad", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' 'WHERE "Ciudad" LIKE \'%Zapopan%\' AND "Cliente" IS NOT NULL\n' 'GROUP BY "Ciudad"\n' "ORDER BY valor_total DESC\n" "LIMIT 1;\n\n" ), "CIUDADES": ( "-- P: '¿Cuál es la ciudad que más nos compra?'\n" 'SELECT "Ciudad", SUM("Cantidad") AS total_comprado\n' f'FROM "{table_name}"\n' 'WHERE "Ciudad" IS NOT NULL AND "Cliente" IS NOT NULL\n' 'GROUP BY "Ciudad"\n' "ORDER BY total_comprado DESC\n" "LIMIT 1;\n\n" "-- P: '¿Cuál es la ciudad con más clientes registrados?'\n" 'SELECT "Ciudad", COUNT(DISTINCT "Cliente") AS cantidad_clientes\n' f'FROM "{table_name}"\n' 'WHERE "Ciudad" IS NOT NULL AND "Cliente" IS NOT NULL\n' 'GROUP BY "Ciudad"\n' "ORDER BY cantidad_clientes DESC\n" "LIMIT 1;\n\n" "-- P: '¿Cuál es la ciudad que más dinero nos genera?'\n" 'SELECT "Ciudad", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' 'WHERE "Ciudad" IS NOT NULL AND "Cliente" IS NOT NULL\n' 'GROUP BY "Ciudad"\n' "ORDER BY valor_total DESC\n" "LIMIT 1;\n\n" "-- P: '¿Cuál es la ciudad que menos dinero nos genera?'\n" 'SELECT "Ciudad", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' 'WHERE "Ciudad" IS NOT NULL AND "Cliente" IS NOT NULL\n' 'GROUP BY "Ciudad"\n' "ORDER BY valor_total ASC\n" "LIMIT 1;\n\n" ), "CLIENTES": ( "-- P: 'El cliente con mayor valor neto'\n" 'SELECT "Cliente", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' "WHERE \"Cliente\" IS NOT NULL AND \"Fecha\" BETWEEN '2025-01-01' AND '2025-12-31'\n" 'GROUP BY "Cliente"\n' "ORDER BY valor_total DESC\n" "LIMIT 1;\n\n" "-- P: '¿Cuántos clientes tenemos en Guadalajara?'\n" 'SELECT COUNT(DISTINCT "Cliente") AS cantidad_clientes\n' f'FROM "{table_name}"\n' 'WHERE "Ciudad" LIKE \'%Guadalajara%\' AND "Cliente" IS NOT NULL;\n\n' "-- P: 'El cliente con menor valor neto'\n" 'SELECT "Cliente", SUM("Neto") AS valor_total\n' f'FROM "{table_name}"\n' "WHERE \"Cliente\" IS NOT NULL AND \"Fecha\" BETWEEN '2025-01-01' AND '2025-12-31'\n" 'GROUP BY "Cliente"\n' "ORDER BY valor_total ASC\n" "LIMIT 1;\n\n" "-- P: 'Clientes con menos compras en marzo'\n" 'SELECT "Cliente", COUNT(*) AS total_compras\n' f'FROM "{table_name}"\n' "WHERE \"Cliente\" IS NOT NULL AND strftime('%m', \"Fecha\") = '03'\n" 'GROUP BY "Cliente"\n' "ORDER BY total_compras ASC\n" "LIMIT 10;\n\n" "-- P: 'Clientes de Guadalajara con más compras'\n" 'SELECT "Cliente", "Razon Social", COUNT(*) AS total_compras\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" IS NOT NULL AND "Ciudad" = \'Guadalajara\'\n' 'GROUP BY "Cliente", "Razon Social"\n' "ORDER BY total_compras DESC\n" "LIMIT 10;" "-- P: '¿Cuál es la ciudad con más clientes registrados?'\n" 'SELECT "Ciudad", COUNT(DISTINCT "Cliente") AS cantidad_clientes\n' f'FROM "{table_name}"\n' 'WHERE "Ciudad" IS NOT NULL AND "Cliente" IS NOT NULL\n' 'GROUP BY "Ciudad"\n' "ORDER BY cantidad_clientes DESC\n" "LIMIT 1;\n\n" "-- P: 'Un cliente que no nos ha comprado en las últimas 2 semanas'\n" 'SELECT "Cliente"\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" IS NOT NULL\n' 'AND "Cliente" NOT IN (\n' ' SELECT DISTINCT "Cliente"\n' f' FROM "{table_name}"\n' " WHERE DATE(\"Fecha\") >= DATE('now', '-14 day')\n" ")\n" 'GROUP BY "Cliente"\n' "LIMIT 1;\n\n" "-- P: 'Un cliente que no nos ha comprado en las últimas 3 semanas'\n" 'SELECT "Cliente"\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" IS NOT NULL\n' 'AND "Cliente" NOT IN (\n' ' SELECT DISTINCT "Cliente"\n' f' FROM "{table_name}"\n' " WHERE DATE(\"Fecha\") >= DATE('now', '-14 day')\n" ")\n" 'GROUP BY "Cliente"\n' "LIMIT 1;\n\n" "-- P: '¿Cuántos clientes cero tenemos en total?'\n" 'SELECT COUNT(DISTINCT "Cliente") AS clientes_cero\n' f'FROM "{table_name}"\n' 'WHERE "Cliente" IS NOT NULL\n' 'AND "Cliente" NOT IN (\n' ' SELECT DISTINCT "Cliente"\n' f' FROM "{table_name}"\n' " WHERE DATE(\"Fecha\") >= DATE('now', '-28 day')\n" ");\n\n" ), "GENERAL": ( "-- P: 'Ventas totales por mes'\n" 'SELECT strftime(\'%m\', "Fecha") AS mes, SUM("Neto") AS ventas\n' f'FROM "{table_name}"\n' "WHERE mes IS NOT NULL\n" "GROUP BY mes\n" "ORDER BY mes;\n\n" "-- P: 'Producto menos vendido en 2025'\n" 'SELECT "Descripcion", SUM("Cantidad") AS total_vendido\n' f'FROM "{table_name}"\n' "WHERE \"Descripcion\" IS NOT NULL AND \"Fecha\" BETWEEN '2025-01-01' AND '2025-12-31'\n" 'GROUP BY "Descripcion"\n' "ORDER BY total_vendido ASC\n" "LIMIT 1;" ), } # 4. Prompt final con nueva regla return ( f""" ### TAREA ### Generar SOLO código SQL para la pregunta, usando EXCLUSIVAMENTE la tabla: "{table_name}" ### COLUMNAS RELEVANTES ### """ + "\n".join( [ f"- {col['name']} ({col['type']}): {col['description']}" for col in self.essential_columns ] ) + f""" ### CONTEXTO (Últimas interacciones) ### {memory_context if memory_context else "Sin historial relevante"} ### EJEMPLOS ({question_type}) ### {examples[question_type]} ### REGLAS CRÍTICAS ### - Usar siempre nombres exactos de columnas - Usar solo las columnas listadas - Prohibido inventar columnas - Para el nombre del cliente, usar SIEMPRE "Razon Social". - Para un mes específico usar: strftime('%m', "Fecha") = 'MM' - Para cantidades usar SUM("Cantidad"), para dinero usar SUM("Neto") - Agrupar por la dimensión principal (producto/cliente) - Ordenar DESC para 'más/mayor', ASC para 'menos/menor' - Contesta siempre en el idioma en el que se te pregunta no traduzcas. - Año actual: 2025 - No inventes columnas o tablas que no existan - Para preguntas sobre clientes cero, SIEMPRE usar la subconsulta NOT IN con las últimas 4 semanas. - Si se menciona una ciudad, incluir el filtro AND "Ciudad" LIKE '%...%' - Usa LIMIT cuando se te pida un numero finito de datos - Para 'más vendido' usar SUM("Cantidad"), para 'mayor valor' usar SUM("Neto") - Usar "Razon Social" cuando pregunten por el nombre del cliente - Usar "Ciudad" para filtrar o agrupar por ubicación - Queda estrictamente prohibido usar acentos - **Siempre excluir valores nulos con 'IS NOT NULL' en las columnas usadas en WHERE, GROUP BY u ORDER BY** - Para preguntas sobre ciudad SIEMPRE incluir "Ciudad" en la query - Para busquedas por Descripcion siempre usar LIKE - Mandar solo la cantidad de rows que el usuario pide. ### PREGUNTA ACTUAL ### \"\"\"{question}\"\"\" ### SQL: """ ) def _generate_analysis_prompt(self, question: str, result: Any) -> str: # print(str(result)[:1000]) # print(result.to_dict(orient="records")) try: data_json = result.to_dict(orient="records") except: data_json = result return f""" Eres un analista de datos. Explica los siguientes resultados de manera clara y concisa. Pregunta del usuario: {question} Datos obtenidos: {data_json} Reglas OBLIGATORIAS: - NO inventes números, porcentajes ni comparaciones. - SOLO usa los valores que aparecen en Datos. - Si existe la clave 'clientes_cero', debes repetir EXACTAMENTE su valor, formateado, y nada más. - Responde en UNA sola oración, clara y breve. - Escribe cantidades con separador de miles (ej. 12,345). - No agregues interpretación extra si no hay más columnas. Respuesta: """ def _clean_analysis_output(self, ouput: str) -> Optional[str]: pattern = r"Respuesta:([\s\S]+)" match = re.search(pattern, ouput) if match: return match.group(1).strip() else: return "Sin análisis" def _clean_sql_output(self, output: str) -> Optional[str]: # Encuentra todas las posibles queries completas que terminen en ; sql_matches = list( re.finditer( r"(SELECT|WITH|INSERT|UPDATE|DELETE)[\s\S]+?;", output, re.IGNORECASE ) ) if not sql_matches: return None # Tomar la última query encontrada sql = sql_matches[-1].group(0).strip() # Seguridad: bloquear queries peligrosas if any( cmd in sql.upper() for cmd in ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER"] ): return None # Asegurar que termine en ; if not sql.endswith(";"): sql += ";" # ──────────────────────────────── # 1. Quitar acentos de toda la query # ──────────────────────────────── def remove_accents(text: str) -> str: return "".join( c for c in unicodedata.normalize("NFKD", text) if not unicodedata.combining(c) ) sql = remove_accents(sql) # ──────────────────────────────── # 2. Agregar LIMIT si no existe # ──────────────────────────────── # Buscar si ya hay un LIMIT en la query # if not re.search(r"\bLIMIT\s+\d+", sql, re.IGNORECASE): # # Insertar antes del último punto y coma # sql = sql[:-1] + " LIMIT 1;" # puedes cambiar 100 por el valor default que quieras validate_sql = self._validate_and_correct_sql(sql) return validate_sql def _validate_and_correct_sql(self, sql: str) -> str: connection = sqlite3.connect(self.config.DB_PATH) cursor = connection.cursor() cursor.execute(f'PRAGMA table_info("{self.config.TABLE_NAME}")') real_columns = [row[1] for row in cursor.fetchall()] column_lower_map = {col.lower(): col for col in real_columns} aliases = { "city": "Ciudad", "client": "Cliente", "razon_social": "Razon Social", "razón social": "Razon Social", "Sales": "sells", '"Date"': "Fecha", "mojica_Clientes": "sells", "value_total": "valor_total", "strstrftime": "strftime", } alias_map = {k.lower(): v for k, v in aliases.items()} pattern = r"\b\w+\b" def replace_column(m): candidate = m.group(0) # Palabra encontrada key = candidate.lower() # ¿Es una columna? corrected = column_lower_map.get(key) if corrected: return corrected # ¿Es una alias? corrected = alias_map.get(key) if corrected is not None: return corrected return candidate # si no encuentra nada, lo deja igual connection.close() return re.sub(pattern, replace_column, sql).replace("\\", "") def _execute_sql(self, sql: str) -> Any: try: connection = sqlite3.connect(self.config.DB_PATH) result = pd.read_sql_query(sql, connection) connection.close() return result except Exception as e: return f"Error: {str(e)}" def consult(self, question: str) -> Tuple[str, Any]: sql_prompt = self._generate_sql_prompt(question) tokenized_input = self.tokenizer( sql_prompt, return_tensors="pt", truncation=True, max_length=self.config.MAX_TOKENS, ).to(self.config.DEVICE) with torch.no_grad(): tokenized_output_model = self.model.generate( **tokenized_input, max_new_tokens=self.config.MAX_NEW_TOKENS, temperature=0.2, top_p=0.95, top_k=50, repetition_penalty=1.1, do_sample=True, pad_token_id=self.tokenizer.eos_token_id, ) output_model = self.tokenizer.decode( tokenized_output_model[0], skip_special_tokens=True ) sql = self._clean_sql_output(output_model) # print("-----------------------------------------------------") # print(output_model) # print(sql) # print("-----------------------------------------------------") # * Ejecución de SQL y generación de analisis result = self._execute_sql(sql) analysis_prompt = self._generate_analysis_prompt(question, result) analyzed_token_input = self.tokenizer( analysis_prompt, return_tensors="pt", truncation=True, max_length=self.config.MAX_TOKENS, ).to(self.config.DEVICE) with torch.no_grad(): tokenized_analysis_output_model = self.model.generate( **analyzed_token_input, max_new_tokens=self.config.MAX_NEW_TOKENS, temperature=0.65, ) analysis = self.tokenizer.decode( tokenized_analysis_output_model[0], skip_special_tokens=True ) analysis = self._clean_analysis_output(analysis) self.memory.add_interaction(question, sql, result) return sql, result