Spaces:
Runtime error
Runtime error
| # utils.py | |
| import os | |
| import logging | |
| import regex as re | |
| import json | |
| from typing import List, Optional | |
| from schemas.chat import Message | |
| from redis.asyncio import Redis | |
| import bcrypt | |
| from datetime import datetime, timedelta | |
| from jose import jwt | |
| from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES | |
| from typing import List, Dict, Optional | |
| from unidecode import unidecode | |
| from db.mongoDB import mongo_db | |
| import secrets | |
| from fastapi import HTTPException, status | |
| from langchain_community.chat_message_histories import RedisChatMessageHistory | |
| from redis.exceptions import RedisError | |
| logger = logging.getLogger(__name__) | |
| async def save_chat_to_redis( | |
| r: Redis, # Hoặc redis.asyncio.Redis | |
| chat_id: str, | |
| user_question_content: str, # Nội dung câu hỏi gốc hoặc đã xử lý (tùy bạn) | |
| assistant_answer_content: str, | |
| user_question_timestamp: datetime, # Cung cấp timestamp | |
| assistant_answer_timestamp: datetime # Cung cấp timestamp | |
| ) -> bool: | |
| """ | |
| Lưu tin nhắn mới của người dùng và trợ lý vào Redis với định dạng chuẩn hóa. | |
| Cập nhật 'updated_at' và 'message_count' trong metadata. | |
| """ | |
| if not all([chat_id, user_question_content, assistant_answer_content]): | |
| logger.error("chat_id, user_question_content, và assistant_answer_content không được rỗng.") | |
| # raise ValueError("Đầu vào không hợp lệ.") # Hoặc trả về False | |
| return False | |
| messages_key = f"conversation_messages:{chat_id}" | |
| meta_key = f"conversation_meta:{chat_id}" | |
| try: | |
| # Tạo Pydantic models cho tin nhắn | |
| user_message = Message(role="user", content=user_question_content, timestamp=user_question_timestamp) | |
| assistant_message = Message(role="assistant", content=assistant_answer_content, timestamp=assistant_answer_timestamp) | |
| # Sử dụng pipeline cho các thao tác Redis | |
| pipe = await r.pipeline() | |
| await pipe.rpush(messages_key, user_message.model_dump_json()) # Pydantic V2 | |
| await pipe.rpush(messages_key, assistant_message.model_dump_json()) # Pydantic V2 | |
| # Hoặc .json() cho Pydantic V1 | |
| # Đặt TTL cho key messages nếu nó mới được tạo (hoặc luôn refresh TTL) | |
| # Nếu bạn muốn TTL chỉ đặt một lần, bạn cần kiểm tra sự tồn tại của key trước | |
| # hoặc kiểm tra llen trước khi push (phức tạp hơn với pipeline). | |
| # Cách đơn giản là luôn đặt lại TTL. | |
| await pipe.expire(messages_key, 86400) # 24 giờ | |
| # Cập nhật metadata | |
| await pipe.hset(meta_key, "updated_at", assistant_answer_timestamp.isoformat()) | |
| await pipe.hincrby(meta_key, "message_count", 2) # Tăng số lượng tin nhắn | |
| await pipe.expire(meta_key, 86400) # Refresh TTL cho meta | |
| await pipe.execute() | |
| logger.info(f"Đã lưu 2 tin nhắn mới vào {messages_key} và cập nhật {meta_key}.") | |
| return True | |
| except RedisError as e: | |
| logger.error(f"Lỗi Redis khi lưu tin nhắn cho chat_id {chat_id}: {e}", exc_info=True) | |
| raise # Re-raise để service xử lý HTTPException | |
| except Exception as e: | |
| logger.error(f"Lỗi không mong muốn khi lưu vào Redis cho chat_id {chat_id}: {e}", exc_info=True) | |
| # raise # Hoặc trả về False tùy theo cách bạn muốn xử lý | |
| return False | |
| async def get_redis_history(r: Redis, chat_id: str, max_messages: int = 100) -> List[Message]: | |
| """ | |
| Lấy lịch sử hội thoại từ Redis với định dạng chuẩn hóa. | |
| Args: | |
| r (Redis): Đối tượng Redis client. | |
| chat_id (str): ID của hội thoại. | |
| max_messages (int): Số tin nhắn tối đa trả về (mặc định 100). | |
| Returns: | |
| List[Message]: Danh sách tin nhắn (role, content, timestamp). | |
| Raises: | |
| ValueError: Nếu chat_id rỗng. | |
| redis.RedisError: Nếu có lỗi khi tương tác với Redis. | |
| """ | |
| # Kiểm tra đầu vào | |
| if not chat_id: | |
| logger.error("chat_id không được rỗng") | |
| raise ValueError("chat_id là bắt buộc") | |
| messages_key = f"conversation_messages:{chat_id}" | |
| try: | |
| # Kiểm tra kết nối Redis | |
| r.ping() | |
| # Lấy tin nhắn (giới hạn max_messages từ cuối) | |
| history_raw =await r.lrange(messages_key, -max_messages, -1) | |
| chat_history = [] | |
| for item in history_raw: | |
| try: | |
| parsed = json.loads(item) | |
| if not isinstance(parsed, dict): | |
| logger.warning(f"Tin nhắn không phải dict trong {messages_key}: {item}") | |
| continue | |
| # Kiểm tra các trường bắt buộc | |
| role = parsed.get("role") | |
| content = parsed.get("content") | |
| timestamp = parsed.get("timestamp") | |
| if not all([role, content, timestamp]): | |
| logger.warning(f"Tin nhắn thiếu trường trong {messages_key}: {parsed}") | |
| continue | |
| # Đảm bảo role hợp lệ | |
| if role not in ["user", "assistant"]: | |
| logger.warning(f"Role không hợp lệ trong {messages_key}: {role}") | |
| continue | |
| chat_history.append(Message( | |
| role=role, | |
| content=content, | |
| timestamp=timestamp | |
| )) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Lỗi parse JSON trong {messages_key}: {item}, lỗi: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Lỗi xử lý tin nhắn trong {messages_key}: {item}, lỗi: {e}") | |
| continue | |
| logger.info(f"Lấy {len(chat_history)} tin nhắn từ {messages_key}") | |
| return chat_history | |
| except r.RedisError as e: | |
| logger.error(f"Lỗi khi lấy lịch sử từ Redis cho chat_id {chat_id}: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Lỗi không mong muốn khi lấy lịch sử từ Redis cho chat_id {chat_id}: {e}") | |
| return [] | |
| async def delete_chat_from_redis(r: Redis, chat_id: str) -> bool: | |
| """ | |
| Xóa dữ liệu hội thoại và metadata từ Redis. | |
| Args: | |
| r (Redis): Đối tượng Redis client. | |
| chat_id (str): ID của hội thoại. | |
| Returns: | |
| bool: True nếu xóa thành công, False nếu thất bại. | |
| Raises: | |
| ValueError: Nếu chat_id rỗng. | |
| redis.RedisError: Nếu có lỗi khi tương tác với Redis. | |
| """ | |
| # Kiểm tra đầu vào | |
| if not chat_id: | |
| logger.error("chat_id không được rỗng") | |
| raise ValueError("chat_id là bắt buộc") | |
| redis_key = f"conversation:{chat_id}" | |
| meta_key = f"chat:{chat_id}:meta" | |
| try: | |
| # Kiểm tra kết nối Redis | |
| await r.ping() | |
| # Kiểm tra sự tồn tại của các key | |
| keys_to_delete = [] | |
| if await r.exists(redis_key): | |
| keys_to_delete.append(redis_key) | |
| if await r.exists(meta_key): | |
| keys_to_delete.append(meta_key) | |
| if not keys_to_delete: | |
| logger.info(f"Không tìm thấy dữ liệu cho chat_id {chat_id} trong Redis") | |
| return True # Không có gì để xóa, coi như thành công | |
| # Xóa các key | |
| deleted_count = await r.delete(*keys_to_delete) | |
| logger.info(f"Đã xóa {deleted_count} key cho chat_id {chat_id}: {keys_to_delete}") | |
| return True | |
| except r.RedisError as e: | |
| logger.error(f"Lỗi khi xóa dữ liệu từ Redis cho chat_id {chat_id}: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Lỗi không mong muốn khi xóa dữ liệu từ Redis cho chat_id {chat_id}: {e}") | |
| return False | |
| def hash_password(password: str) -> str: | |
| return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
| def verify_password(password: str, hashed: str) -> bool: | |
| return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) | |
| def create_access_token(data: dict, expires_delta: timedelta = None): | |
| to_encode = data.copy() | |
| expire = datetime.now() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| async def create_refresh_token(email: str) -> str: | |
| """ | |
| Tạo và lưu refresh token vào cơ sở dữ liệu. | |
| Args: | |
| email (str): Địa chỉ email của người dùng. | |
| Returns: | |
| str: Refresh token được tạo. | |
| Raises: | |
| HTTPException: Nếu có lỗi khi lưu token. | |
| """ | |
| try: | |
| # Generate refresh token | |
| refresh_token = secrets.token_urlsafe(32) | |
| expire = datetime.now() + timedelta(days=7) | |
| # Store refresh token in database | |
| result = await mongo_db.users.update_one( | |
| {"email": email.lower()}, | |
| { | |
| "$set": { | |
| "refresh_token": refresh_token, | |
| "refresh_token_expiry": expire, | |
| "refresh_token_timestamp": datetime.now(), | |
| "revoked": False | |
| } | |
| } | |
| ) | |
| if result.modified_count != 1: | |
| logger.error(f"Không thể lưu refresh token cho email: {email}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Lỗi khi lưu refresh token." | |
| ) | |
| logger.info(f"Refresh token created for email: {email}") | |
| return refresh_token | |
| except Exception as e: | |
| logger.error(f"Lỗi khi tạo refresh token: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Lỗi hệ thống khi tạo refresh token." | |
| ) | |
| def load_legal_dictionary(path: str = 'legal_terms.json') -> list: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data['dictionary'] | |
| def is_definition_question(query: str) -> bool: | |
| definition_keywords = ["là gì", "định nghĩa", "nghĩa là gì", "hiểu thế nào", "khái niệm"] | |
| query_lower = query.lower() | |
| return any(keyword in query_lower for keyword in definition_keywords) | |
| def normalize_text_for_matching(text: str) -> str: | |
| """ | |
| Chuẩn hóa text cho việc so khớp: chữ thường, loại bỏ ký tự đặc biệt (chỉ giữ chữ và số), | |
| loại bỏ dấu tiếng Việt, chuẩn hóa khoảng trắng. | |
| """ | |
| if not text or not isinstance(text, str): | |
| return "" | |
| text_no_diacritics = unidecode(text.lower()) # Chuyển không dấu và chữ thường | |
| # Loại bỏ tất cả ký tự không phải là chữ cái hoặc số hoặc khoảng trắng | |
| text_alphanumeric = re.sub(r'[^\w\s]', '', text_no_diacritics, flags=re.UNICODE) | |
| return re.sub(r'\s+', ' ', text_alphanumeric).strip() | |
| def search_term_in_dictionary(query: str, dictionary: List[Dict]) -> Optional[Dict]: | |
| """ | |
| Tìm kiếm thuật ngữ trong từ điển. | |
| Chỉ tìm nếu là câu hỏi định nghĩa. | |
| Cải thiện logic so khớp. | |
| """ | |
| if not is_definition_question(query): | |
| logger.debug(f"'{query}' không phải câu hỏi định nghĩa, bỏ qua tìm từ điển.") | |
| return None | |
| if not dictionary: | |
| logger.warning("Từ điển rỗng, không thể tìm kiếm.") | |
| return None | |
| # Cố gắng trích xuất thuật ngữ chính từ câu hỏi định nghĩa | |
| # Ví dụ: "Khái niệm hợp đồng lao động là gì?" -> "hợp đồng lao động" | |
| # Đây là một regex đơn giản, có thể cần tinh chỉnh | |
| term_to_search_raw = query | |
| match = re.match(r"^(.*?)\s+(là gì|định nghĩa|nghĩa là gì|hiểu thế nào|khái niệm)\??$", query.lower().strip(), re.IGNORECASE) | |
| if match: | |
| term_to_search_raw = match.group(1).strip() | |
| logger.info(f"Trích xuất thuật ngữ từ câu hỏi định nghĩa: '{term_to_search_raw}'") | |
| query_normalized_for_match = normalize_text_for_matching(term_to_search_raw) | |
| if not query_normalized_for_match: | |
| logger.debug("Thuật ngữ tìm kiếm rỗng sau khi chuẩn hóa.") | |
| return None | |
| logger.info(f"Tìm kiếm thuật ngữ đã chuẩn hóa (không dấu): '{query_normalized_for_match}'") | |
| # Sắp xếp từ điển theo độ dài thuật ngữ giảm dần (để ưu tiên khớp cụm dài hơn) | |
| # và chuẩn hóa thuật ngữ từ điển một lần | |
| normalized_dictionary = [] | |
| for entry in dictionary: | |
| term = entry.get("term") | |
| if term and isinstance(term, str): | |
| normalized_dictionary.append({ | |
| "original_entry": entry, | |
| "normalized_term": normalize_text_for_matching(term) | |
| }) | |
| # Sắp xếp theo độ dài thuật ngữ đã chuẩn hóa giảm dần | |
| # Điều này giúp "an toàn lao động" được khớp trước "an toàn" hoặc "lao động" | |
| # nếu query là "an toàn lao động là gì" | |
| normalized_dictionary.sort(key=lambda x: len(x["normalized_term"]), reverse=True) | |
| # Tìm kiếm khớp chính xác (sau khi chuẩn hóa cả query và term từ điển) | |
| for item in normalized_dictionary: | |
| if item["normalized_term"] == query_normalized_for_match: | |
| logger.info(f"Tìm thấy khớp chính xác (sau chuẩn hóa): '{item['original_entry']['term']}'") | |
| return item["original_entry"] | |
| # Tìm kiếm "chứa" (thuật ngữ từ điển là một phần của query đã chuẩn hóa) | |
| # Điều này hữu ích nếu query_normalized_for_match dài hơn thuật ngữ từ điển | |
| # Ví dụ: query_normalized = "dinh nghia an toan lao dong", term_normalized = "an toan lao dong" | |
| for item in normalized_dictionary: | |
| if item["normalized_term"] and item["normalized_term"] in query_normalized_for_match: | |
| logger.info(f"Tìm thấy khớp 'chứa' (từ điển trong query): '{item['original_entry']['term']}' (query norm: '{query_normalized_for_match}')") | |
| return item["original_entry"] | |
| logger.info(f"Không tìm thấy thuật ngữ '{query_normalized_for_match}' trong từ điển.") | |
| return None | |
| def minimal_preprocess_for_llm(text: str) -> str: | |
| """ | |
| Thực hiện tiền xử lý tối thiểu trước khi đưa vào LLM. | |
| Chỉ chuẩn hóa khoảng trắng và chuyển thành chữ thường. | |
| """ | |
| if not text or not text.strip(): | |
| # Vẫn cần kiểm tra input rỗng | |
| raise ValueError("Input không được rỗng") | |
| # 1. Chuẩn hóa khoảng trắng | |
| processed_text = re.sub(r'\s+', ' ', text).strip() | |
| # 2. Chuyển thành chữ thường để nhất quán | |
| processed_text = processed_text.lower() | |
| return processed_text | |
| async def save_chat_to_mongo(conversations_collection,chat_id: str, user_email: str,user_question_content: str, # Nội dung câu hỏi | |
| assistant_answer_content: str, # Nội dung trả lời | |
| user_question_timestamp: datetime, | |
| assistant_answer_timestamp: datetime): | |
| user_message = { | |
| "role": "user", | |
| "content": user_question_content, | |
| "timestamp": user_question_timestamp | |
| } | |
| assistant_message = { | |
| "role": "assistant", | |
| "content": assistant_answer_content, | |
| "timestamp": assistant_answer_timestamp | |
| } | |
| conversation = conversations_collection.find_one({"conversation_id": chat_id}) | |
| if not conversation: | |
| conversation = { | |
| "user_id": user_email, | |
| "conversation_id": chat_id, | |
| "messages": [user_message, assistant_message], | |
| "created_at": datetime.now(), | |
| "updated_at": datetime.now() | |
| } | |
| conversations_collection.insert_one(conversation) | |
| else: | |
| conversations_collection.update_one( | |
| {"conversation_id": chat_id}, | |
| { | |
| "$push": {"messages": {"$each": [user_message, assistant_message]}}, | |
| "$set": {"updated_at": datetime.now()} | |
| } | |
| ) | |
| async def get_langchain_chat_history(app_state, chat_id: str) -> RedisChatMessageHistory: | |
| """ | |
| Retrieves and synchronizes chat history for Langchain. | |
| """ | |
| redis_url = os.environ.get("REDIS_URL_LANGCHAIN", os.environ.get("REDIS_URL")) | |
| if not redis_url: | |
| raise ValueError("Redis URL for chat history is required.") | |
| # Đây là history mà Langchain sẽ sử dụng để đọc/ghi | |
| langchain_chat_history = RedisChatMessageHistory( # Hoặc RedisChatMessageHistoryAsync | |
| url=redis_url, | |
| session_id=chat_id, | |
| ttl=86400, # 1 day | |
| ) | |
| # Đồng bộ hóa: Lấy từ key "source of truth" của chúng ta và nạp vào key của Langchain | |
| messages_key = f"conversation_messages:{chat_id}" | |
| # Sử dụng await nếu redis client của app_state là async | |
| raw_messages_from_our_redis = app_state.redis.lrange(messages_key, 0, -1) | |
| # Xóa history cũ trong key của Langchain để tránh trùng lặp khi đồng bộ | |
| # Nếu dùng RedisChatMessageHistoryAsync: await langchain_chat_history.aclear() | |
| langchain_chat_history.clear() # Cho bản đồng bộ | |
| for msg_json_bytes in raw_messages_from_our_redis: | |
| msg_data = json.loads(msg_json_bytes) | |
| message = Message(**msg_data) # Validate | |
| if message.role == "user": | |
| # Nếu dùng RedisChatMessageHistoryAsync: await langchain_chat_history.aadd_user_message(message.content) | |
| langchain_chat_history.add_user_message(message.content) | |
| elif message.role == "assistant": | |
| # await langchain_chat_history.aadd_ai_message(message.content) | |
| langchain_chat_history.add_ai_message(message.content) | |
| return langchain_chat_history | |
| # api/utils.py | |
| import hashlib | |
| logger = logging.getLogger(__name__) | |
| def calculate_file_hash(filepath: str) -> str: | |
| sha256_hash = hashlib.sha256() | |
| with open(filepath, "rb") as f: | |
| for byte_block in iter(lambda: f.read(4096), b""): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() | |
| # def check_if_hash_exists(file_hash: str) -> bool: | |
| # if not os.path.exists(config.PROCESSED_HASH_LOG): | |
| # return False | |
| # try: | |
| # with open(config.PROCESSED_HASH_LOG, "r") as f: | |
| # processed_hashes = {line.strip() for line in f} | |
| # return file_hash in processed_hashes | |
| # except IOError as e: | |
| # logger.error(f"Could not read hash log file: {e}") | |
| # return False | |