from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from loguru import logger import json from typing import Dict, Any, List, Optional import asyncio from concurrent.futures import ThreadPoolExecutor import os import traceback import difflib from .config import Settings, get_settings from .facebook import FacebookClient from .sheets import SheetsClient from .supabase_db import SupabaseClient from .embedding import EmbeddingClient from .utils import setup_logging, extract_command, extract_keywords, timing_decorator_async, timing_decorator_sync, ensure_log_dir, validate_config from .constants import VEHICLE_KEYWORDS, SHEET_RANGE, VEHICLE_KEYWORD_TO_COLUMN from .health import router as health_router from .llm import create_llm_client from .reranker import Reranker from .request_limit_manager import RequestLimitManager from .law_document_chunker import LawDocumentChunker from app.channel_manager import channel_manager app = FastAPI(title="WeBot Facebook Messenger API") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize clients settings = get_settings() setup_logging(settings.log_level) logger.info("[STARTUP] Đang lấy PORT từ biến môi trường hoặc config...") port = int(os.environ.get("PORT", settings.port if hasattr(settings, 'port') else 7860)) logger.info(f"[STARTUP] PORT sử dụng: {port}") logger.info("[STARTUP] Khởi tạo global RequestLimitManager...") # Global RequestLimitManager instance - singleton request_limit_manager = RequestLimitManager("gemini") logger.info("[STARTUP] Khởi tạo FacebookClient...") facebook_client = FacebookClient(settings.facebook_app_secret) logger.info("[STARTUP] Khởi tạo SheetsClient...") sheets_client = SheetsClient( settings.google_sheets_credentials_file, settings.google_sheets_token_file, settings.conversation_sheet_id ) logger.info("[STARTUP] Khởi tạo SupabaseClient...") supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key) logger.info("[STARTUP] Khởi tạo EmbeddingClient...") embedding_client = EmbeddingClient() # Keywords to look for in messages VEHICLE_KEYWORDS = ["xe máy", "ô tô", "xe đạp", "xe hơi"] # Khởi tạo LLM client (ví dụ dùng HFS, bạn có thể đổi provider tuỳ ý) # llm_client = create_llm_client( # provider="hfs", # base_url="https://vietcat-gemma34b.hf.space" # ) # Khởi tạo LLM client Gemini llm_client = create_llm_client( provider="gemini", base_url=settings.gemini_base_url, model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash" ) reranker = Reranker() # Khởi tạo LawDocumentChunker law_chunker = LawDocumentChunker() law_chunker.llm_client = llm_client logger.info("[STARTUP] Mount health router...") app.include_router(health_router) logger.info("[STARTUP] Validate config...") validate_config(settings) executor = ThreadPoolExecutor(max_workers=4) message_text = None def flatten_timestamp(ts): flat = [] for t in ts: if isinstance(t, list): flat.extend(flatten_timestamp(t)) else: flat.append(t) return flat def normalize_vehicle_keyword(keyword: str) -> str: """ Chuẩn hoá giá trị phương tiện về đúng từ khoá gần nhất trong VEHICLE_KEYWORDS. Nếu không khớp, trả về keyword gốc. """ if not keyword: return "" matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6) if matches: # Trả về đúng case trong VEHICLE_KEYWORDS for k in VEHICLE_KEYWORDS: if k.lower() == matches[0]: return k return keyword @app.get("/") async def root(): """Endpoint root để kiểm tra trạng thái app.""" logger.info("[HEALTH] Truy cập endpoint root /") return {"status": "ok"} @app.get("/webhook") # async def verify_webhook(request: Request): """ Xác thực webhook Facebook Messenger. Input: request (Request) - request từ Facebook với các query params. Output: Trả về challenge nếu verify thành công, lỗi nếu thất bại. """ params = dict(request.query_params) mode = params.get("hub.mode") token = str(params.get("hub.verify_token", "")) challenge = str(params.get("hub.challenge", "")) if not all([mode, token, challenge]): raise HTTPException(status_code=400, detail="Missing parameters") return await facebook_client.verify_webhook( token, challenge, settings.facebook_verify_token ) @app.post("/webhook") @timing_decorator_async async def webhook(request: Request): """ Nhận và xử lý message từ Facebook Messenger webhook. Input: request (Request) - request chứa payload JSON từ Facebook. Output: JSON status. """ logger.info(f"[DEBUG] Nhận message từ Facebook Messenger webhook...") body_bytes = await request.body() # Verify request is from Facebook if not facebook_client.verify_signature(request, body_bytes): raise HTTPException(status_code=403, detail="Invalid signature") try: body = json.loads(body_bytes) # Kiểm tra an toàn echo is_echo = ( isinstance(body, dict) and "entry" in body and isinstance(body["entry"], list) and len(body["entry"]) > 0 and "messaging" in body["entry"][0] and isinstance(body["entry"][0]["messaging"], list) and len(body["entry"][0]["messaging"]) > 0 and body["entry"][0]["messaging"][0].get("message", {}).get("is_echo", False) ) if is_echo: logger.info(f"[DEBUG] Message is echo, skipping...") return {"status": "ok"} else: message_data = facebook_client.parse_message(body) logger.info(f"[DEBUG] message_data: {message_data}") if not message_data: return {"status": "ok"} # --- Refactor: Lấy page_id, sender_id, channel, conversation, gọi message_processor --- page_id = message_data.get("page_id") sender_id = message_data.get("sender_id") channel = channel_manager.get_or_create_channel("facebook", page_id) conversation = channel.get_or_create_conversation(sender_id) await conversation.process_message(message_data) return {"status": "ok"} except Exception as e: logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}") raise HTTPException(status_code=500, detail="Internal server error") # ==================== DOCUMENT CHUNK MANAGEMENT APIs ==================== @app.delete("/api/document-chunks/clear") @timing_decorator_async async def delete_all_document_chunks(): """ API xóa toàn bộ bảng document_chunks. """ try: logger.info("[API] Starting delete all document chunks") success = supabase_client.delete_all_document_chunks() if success: logger.info("[API] Successfully deleted all document chunks") return {"status": "success", "message": "Đã xóa toàn bộ document chunks"} else: logger.error("[API] Failed to delete all document chunks") raise HTTPException(status_code=500, detail="Lỗi khi xóa document chunks") except Exception as e: logger.error(f"[API] Error in delete_all_document_chunks: {e}") raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") @app.post("/api/document-chunks/update") @timing_decorator_async async def update_specific_document(file_name: str, document_id: int): """ API cập nhật file xác định trong thư mục data. Args: file_name: Tên file trong thư mục data (ví dụ: "luat_giao_thong.txt") document_id: ID văn bản luật """ try: logger.info(f"[API] Starting update specific document: {file_name}, document_id: {document_id}") # Kiểm tra file tồn tại file_path = f"data/{file_name}" if not os.path.exists(file_path): logger.error(f"[API] File not found: {file_path}") raise HTTPException(status_code=404, detail=f"File không tồn tại: {file_name}") # Xóa chunks cũ của document_id này (nếu có) logger.info(f"[API] Deleting old chunks for document_id: {document_id}") supabase_client.delete_document_chunks_by_vanbanid(document_id) # Xử lý văn bản mới logger.info(f"[API] Processing document: {file_path}") success = await law_chunker.process_law_document(file_path, document_id) if success: logger.info(f"[API] Successfully updated document: {file_name}") return { "status": "success", "message": f"Đã cập nhật thành công văn bản: {file_name}", "document_id": document_id, "file_name": file_name } else: logger.error(f"[API] Failed to update document: {file_name}") raise HTTPException(status_code=500, detail=f"Lỗi khi xử lý văn bản: {file_name}") except HTTPException: raise except Exception as e: logger.error(f"[API] Error in update_specific_document: {e}") raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") @app.post("/api/document-chunks/update-all") @timing_decorator_async async def update_all_documents(): """ API cập nhật tự động toàn bộ file trong thư mục data. """ try: logger.info("[API] Starting update all documents") # Kiểm tra thư mục data tồn tại data_dir = "data" if not os.path.exists(data_dir): logger.warning(f"[API] Data directory not found: {data_dir}") return { "status": "warning", "message": "Thư mục data không tồn tại", "processed_files": [], "failed_files": [] } # Lấy danh sách file .txt trong thư mục data txt_files = [f for f in os.listdir(data_dir) if f.endswith('.txt')] if not txt_files: logger.warning("[API] No .txt files found in data directory") return { "status": "warning", "message": "Không tìm thấy file .txt nào trong thư mục data", "processed_files": [], "failed_files": [] } logger.info(f"[API] Found {len(txt_files)} .txt files to process") processed_files = [] failed_files = [] # Xử lý từng file for i, file_name in enumerate(txt_files, 1): try: logger.info(f"[API] Processing file {i}/{len(txt_files)}: {file_name}") # Sử dụng index làm document_id (có thể thay đổi logic này) document_id = i # Xóa chunks cũ của document_id này (nếu có) supabase_client.delete_document_chunks_by_vanbanid(document_id) # Xử lý văn bản file_path = os.path.join(data_dir, file_name) success = await law_chunker.process_law_document(file_path, document_id) if success: processed_files.append({ "file_name": file_name, "document_id": document_id, "status": "success" }) logger.info(f"[API] Successfully processed: {file_name}") else: failed_files.append({ "file_name": file_name, "document_id": document_id, "status": "failed", "error": "Processing failed" }) logger.error(f"[API] Failed to process: {file_name}") except Exception as e: logger.error(f"[API] Error processing {file_name}: {e}") failed_files.append({ "file_name": file_name, "document_id": i, "status": "failed", "error": str(e) }) # Tổng kết total_files = len(txt_files) success_count = len(processed_files) failed_count = len(failed_files) logger.info(f"[API] Update all completed: {success_count}/{total_files} files processed successfully") return { "status": "success", "message": f"Đã xử lý {success_count}/{total_files} files thành công", "total_files": total_files, "processed_files": processed_files, "failed_files": failed_files } except Exception as e: logger.error(f"[API] Error in update_all_documents: {e}") raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") @app.get("/api/document-chunks/view") @timing_decorator_async async def view_all_document_chunks(): """ API xem toàn bộ dữ liệu trong bảng document_chunks theo cấu trúc cây. """ try: logger.info("[API] Starting view all document chunks") # Lấy dữ liệu từ Supabase chunks_data = supabase_client.get_all_document_chunks() # Thống kê cơ bản total_chunks = len(chunks_data) unique_documents = len(set(chunk.get('vanbanid') for chunk in chunks_data if chunk.get('vanbanid'))) # Nhóm theo vanbanid chunks_by_document = {} for chunk in chunks_data: vanbanid = chunk.get('vanbanid') if vanbanid not in chunks_by_document: chunks_by_document[vanbanid] = [] chunks_by_document[vanbanid].append(chunk) # Thống kê chi tiết document_stats = [] hierarchical_data = [] for vanbanid, chunks in chunks_by_document.items(): # Thống kê document_stats.append({ "vanbanid": vanbanid, "chunk_count": len(chunks), "document_title": chunks[0].get('document_title', 'Unknown') if chunks else 'Unknown' }) # Tạo cấu trúc cây cho từng văn bản tree_structure = build_chunk_tree(chunks) hierarchical_data.append({ "vanbanid": vanbanid, "document_title": chunks[0].get('document_title', 'Unknown') if chunks else 'Unknown', "chunk_count": len(chunks), "chunks": tree_structure }) return { "status": "success", "message": f"Đã lấy {total_chunks} chunks từ {unique_documents} văn bản", "summary": { "total_chunks": total_chunks, "unique_documents": unique_documents, "document_stats": document_stats }, "data": hierarchical_data } except Exception as e: logger.error(f"[API] Error in view_all_document_chunks: {e}") raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") def build_chunk_tree(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Xây dựng cấu trúc cây từ danh sách chunks phẳng. Cách đơn giản: tìm root nodes (cha=None) trước, sau đó tìm children. """ if not chunks: return [] # Debug: Kiểm tra tất cả chunks root_count = 0 child_count = 0 for chunk in chunks: if chunk.get('cha') is None: root_count += 1 logger.debug(f"[TREE] Root chunk: {chunk.get('content', '')[:100]}") else: child_count += 1 logger.debug(f"[TREE] Child chunk: {chunk.get('content', '')[:100]} -> parent: {chunk.get('cha')}") logger.info(f"[TREE] Found {root_count} root chunks and {child_count} child chunks from {len(chunks)} total chunks") # Tạo dictionary để truy cập nhanh chunks_dict = {chunk['id']: chunk for chunk in chunks} def build_node(chunk_id: str) -> Dict[str, Any]: """Tạo node và tìm tất cả children của nó.""" chunk = chunks_dict[chunk_id] # Tạo node node = { "id": chunk_id, "content": chunk.get('content', ''), "vanbanid": chunk.get('vanbanid'), "cha": chunk.get('cha'), "document_title": chunk.get('document_title', ''), "article_number": chunk.get('article_number'), "article_title": chunk.get('article_title', ''), "clause_number": chunk.get('clause_number', ''), "sub_clause_letter": chunk.get('sub_clause_letter', ''), "context_summary": chunk.get('context_summary', ''), "data": chunk, # Toàn bộ dữ liệu gốc "children": [] } # Tìm tất cả children của node này children_count = 0 for other_chunk in chunks: if other_chunk.get('cha') == chunk_id: child_node = build_node(other_chunk['id']) node["children"].append(child_node) children_count += 1 logger.debug(f"[TREE] Node {chunk_id[:8]}... has {children_count} children") return node # Tìm tất cả root nodes (cha=None) và sắp xếp theo thứ tự xuất hiện root_chunks = [] processed_ids = set() # Để tránh xử lý trùng lặp for chunk in chunks: if chunk.get('cha') is None and chunk['id'] not in processed_ids: root_node = build_node(chunk['id']) root_chunks.append(root_node) processed_ids.add(chunk['id']) logger.info(f"[TREE] Added root node: {chunk.get('content', '')[:100]}") logger.info(f"[TREE] Built tree with {len(root_chunks)} root nodes from {len(chunks)} total chunks") logger.info(f"[TREE] Root chunks: {[chunk.get('content', '')[:50] for chunk in root_chunks]}") return root_chunks @app.get("/api/document-chunks/status") @timing_decorator_async async def get_document_chunks_status(): """ API lấy thông tin trạng thái của document chunks. """ try: logger.info("[API] Getting document chunks status") # Lấy thống kê từ Supabase # Note: Cần implement method này trong SupabaseClient nếu cần # Kiểm tra thư mục data data_dir = "data" txt_files = [] if os.path.exists(data_dir): txt_files = [f for f in os.listdir(data_dir) if f.endswith('.txt')] return { "status": "success", "data_directory": data_dir, "available_files": txt_files, "file_count": len(txt_files), "message": f"Tìm thấy {len(txt_files)} file .txt trong thư mục data" } except Exception as e: logger.error(f"[API] Error in get_document_chunks_status: {e}") raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}") if __name__ == "__main__": import uvicorn logger.info("[STARTUP] Bắt đầu chạy uvicorn server...") uvicorn.run( "app.main:app", host="0.0.0.0", port=port )