FBChatBot / app /main.py
VietCat's picture
fix racing issues when sending message
4032184
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
import json
from typing import Dict, Any, List, Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor
import os
import traceback
import difflib
from .config import Settings, get_settings
from .facebook import FacebookClient
from .sheets import SheetsClient
from .supabase_db import SupabaseClient
from .embedding import EmbeddingClient
from .utils import setup_logging, extract_command, extract_keywords, timing_decorator_async, timing_decorator_sync, ensure_log_dir, validate_config
from .constants import VEHICLE_KEYWORDS, SHEET_RANGE, VEHICLE_KEYWORD_TO_COLUMN
from .health import router as health_router
from .llm import create_llm_client
from .reranker import Reranker
from .request_limit_manager import RequestLimitManager
from .law_document_chunker import LawDocumentChunker
from app.channel_manager import channel_manager
app = FastAPI(title="WeBot Facebook Messenger API")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize clients
settings = get_settings()
setup_logging(settings.log_level)
logger.info("[STARTUP] Đang lấy PORT từ biến môi trường hoặc config...")
port = int(os.environ.get("PORT", settings.port if hasattr(settings, 'port') else 7860))
logger.info(f"[STARTUP] PORT sử dụng: {port}")
logger.info("[STARTUP] Khởi tạo global RequestLimitManager...")
# Global RequestLimitManager instance - singleton
request_limit_manager = RequestLimitManager("gemini")
logger.info("[STARTUP] Khởi tạo FacebookClient...")
facebook_client = FacebookClient(settings.facebook_app_secret)
logger.info("[STARTUP] Khởi tạo SheetsClient...")
sheets_client = SheetsClient(
settings.google_sheets_credentials_file,
settings.google_sheets_token_file,
settings.conversation_sheet_id
)
logger.info("[STARTUP] Khởi tạo SupabaseClient...")
supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key)
logger.info("[STARTUP] Khởi tạo EmbeddingClient...")
embedding_client = EmbeddingClient()
# Keywords to look for in messages
VEHICLE_KEYWORDS = ["xe máy", "ô tô", "xe đạp", "xe hơi"]
# Khởi tạo LLM client (ví dụ dùng HFS, bạn có thể đổi provider tuỳ ý)
# llm_client = create_llm_client(
# provider="hfs",
# base_url="https://vietcat-gemma34b.hf.space"
# )
# Khởi tạo LLM client Gemini
llm_client = create_llm_client(
provider="gemini",
base_url=settings.gemini_base_url,
model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
)
reranker = Reranker()
# Khởi tạo LawDocumentChunker
law_chunker = LawDocumentChunker()
law_chunker.llm_client = llm_client
logger.info("[STARTUP] Mount health router...")
app.include_router(health_router)
logger.info("[STARTUP] Validate config...")
validate_config(settings)
executor = ThreadPoolExecutor(max_workers=4)
message_text = None
def flatten_timestamp(ts):
flat = []
for t in ts:
if isinstance(t, list):
flat.extend(flatten_timestamp(t))
else:
flat.append(t)
return flat
def normalize_vehicle_keyword(keyword: str) -> str:
"""
Chuẩn hoá giá trị phương tiện về đúng từ khoá gần nhất trong VEHICLE_KEYWORDS.
Nếu không khớp, trả về keyword gốc.
"""
if not keyword:
return ""
matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6)
if matches:
# Trả về đúng case trong VEHICLE_KEYWORDS
for k in VEHICLE_KEYWORDS:
if k.lower() == matches[0]:
return k
return keyword
@app.get("/")
async def root():
"""Endpoint root để kiểm tra trạng thái app."""
logger.info("[HEALTH] Truy cập endpoint root /")
return {"status": "ok"}
@app.get("/webhook") #
async def verify_webhook(request: Request):
"""
Xác thực webhook Facebook Messenger.
Input: request (Request) - request từ Facebook với các query params.
Output: Trả về challenge nếu verify thành công, lỗi nếu thất bại.
"""
params = dict(request.query_params)
mode = params.get("hub.mode")
token = str(params.get("hub.verify_token", ""))
challenge = str(params.get("hub.challenge", ""))
if not all([mode, token, challenge]):
raise HTTPException(status_code=400, detail="Missing parameters")
return await facebook_client.verify_webhook(
token,
challenge,
settings.facebook_verify_token
)
@app.post("/webhook")
@timing_decorator_async
async def webhook(request: Request):
"""
Nhận và xử lý message từ Facebook Messenger webhook.
Input: request (Request) - request chứa payload JSON từ Facebook.
Output: JSON status.
"""
logger.info(f"[DEBUG] Nhận message từ Facebook Messenger webhook...")
body_bytes = await request.body()
# Verify request is from Facebook
if not facebook_client.verify_signature(request, body_bytes):
raise HTTPException(status_code=403, detail="Invalid signature")
try:
body = json.loads(body_bytes)
# Kiểm tra an toàn echo
is_echo = (
isinstance(body, dict)
and "entry" in body
and isinstance(body["entry"], list)
and len(body["entry"]) > 0
and "messaging" in body["entry"][0]
and isinstance(body["entry"][0]["messaging"], list)
and len(body["entry"][0]["messaging"]) > 0
and body["entry"][0]["messaging"][0].get("message", {}).get("is_echo", False)
)
if is_echo:
logger.info(f"[DEBUG] Message is echo, skipping...")
return {"status": "ok"}
else:
message_data = facebook_client.parse_message(body)
logger.info(f"[DEBUG] message_data: {message_data}")
if not message_data:
return {"status": "ok"}
# --- Refactor: Lấy page_id, sender_id, channel, conversation, gọi message_processor ---
page_id = message_data.get("page_id")
sender_id = message_data.get("sender_id")
channel = channel_manager.get_or_create_channel("facebook", page_id)
conversation = channel.get_or_create_conversation(sender_id)
await conversation.process_message(message_data)
return {"status": "ok"}
except Exception as e:
logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail="Internal server error")
# ==================== DOCUMENT CHUNK MANAGEMENT APIs ====================
@app.delete("/api/document-chunks/clear")
@timing_decorator_async
async def delete_all_document_chunks():
"""
API xóa toàn bộ bảng document_chunks.
"""
try:
logger.info("[API] Starting delete all document chunks")
success = supabase_client.delete_all_document_chunks()
if success:
logger.info("[API] Successfully deleted all document chunks")
return {"status": "success", "message": "Đã xóa toàn bộ document chunks"}
else:
logger.error("[API] Failed to delete all document chunks")
raise HTTPException(status_code=500, detail="Lỗi khi xóa document chunks")
except Exception as e:
logger.error(f"[API] Error in delete_all_document_chunks: {e}")
raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}")
@app.post("/api/document-chunks/update")
@timing_decorator_async
async def update_specific_document(file_name: str, document_id: int):
"""
API cập nhật file xác định trong thư mục data.
Args:
file_name: Tên file trong thư mục data (ví dụ: "luat_giao_thong.txt")
document_id: ID văn bản luật
"""
try:
logger.info(f"[API] Starting update specific document: {file_name}, document_id: {document_id}")
# Kiểm tra file tồn tại
file_path = f"data/{file_name}"
if not os.path.exists(file_path):
logger.error(f"[API] File not found: {file_path}")
raise HTTPException(status_code=404, detail=f"File không tồn tại: {file_name}")
# Xóa chunks cũ của document_id này (nếu có)
logger.info(f"[API] Deleting old chunks for document_id: {document_id}")
supabase_client.delete_document_chunks_by_vanbanid(document_id)
# Xử lý văn bản mới
logger.info(f"[API] Processing document: {file_path}")
success = await law_chunker.process_law_document(file_path, document_id)
if success:
logger.info(f"[API] Successfully updated document: {file_name}")
return {
"status": "success",
"message": f"Đã cập nhật thành công văn bản: {file_name}",
"document_id": document_id,
"file_name": file_name
}
else:
logger.error(f"[API] Failed to update document: {file_name}")
raise HTTPException(status_code=500, detail=f"Lỗi khi xử lý văn bản: {file_name}")
except HTTPException:
raise
except Exception as e:
logger.error(f"[API] Error in update_specific_document: {e}")
raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}")
@app.post("/api/document-chunks/update-all")
@timing_decorator_async
async def update_all_documents():
"""
API cập nhật tự động toàn bộ file trong thư mục data.
"""
try:
logger.info("[API] Starting update all documents")
# Kiểm tra thư mục data tồn tại
data_dir = "data"
if not os.path.exists(data_dir):
logger.warning(f"[API] Data directory not found: {data_dir}")
return {
"status": "warning",
"message": "Thư mục data không tồn tại",
"processed_files": [],
"failed_files": []
}
# Lấy danh sách file .txt trong thư mục data
txt_files = [f for f in os.listdir(data_dir) if f.endswith('.txt')]
if not txt_files:
logger.warning("[API] No .txt files found in data directory")
return {
"status": "warning",
"message": "Không tìm thấy file .txt nào trong thư mục data",
"processed_files": [],
"failed_files": []
}
logger.info(f"[API] Found {len(txt_files)} .txt files to process")
processed_files = []
failed_files = []
# Xử lý từng file
for i, file_name in enumerate(txt_files, 1):
try:
logger.info(f"[API] Processing file {i}/{len(txt_files)}: {file_name}")
# Sử dụng index làm document_id (có thể thay đổi logic này)
document_id = i
# Xóa chunks cũ của document_id này (nếu có)
supabase_client.delete_document_chunks_by_vanbanid(document_id)
# Xử lý văn bản
file_path = os.path.join(data_dir, file_name)
success = await law_chunker.process_law_document(file_path, document_id)
if success:
processed_files.append({
"file_name": file_name,
"document_id": document_id,
"status": "success"
})
logger.info(f"[API] Successfully processed: {file_name}")
else:
failed_files.append({
"file_name": file_name,
"document_id": document_id,
"status": "failed",
"error": "Processing failed"
})
logger.error(f"[API] Failed to process: {file_name}")
except Exception as e:
logger.error(f"[API] Error processing {file_name}: {e}")
failed_files.append({
"file_name": file_name,
"document_id": i,
"status": "failed",
"error": str(e)
})
# Tổng kết
total_files = len(txt_files)
success_count = len(processed_files)
failed_count = len(failed_files)
logger.info(f"[API] Update all completed: {success_count}/{total_files} files processed successfully")
return {
"status": "success",
"message": f"Đã xử lý {success_count}/{total_files} files thành công",
"total_files": total_files,
"processed_files": processed_files,
"failed_files": failed_files
}
except Exception as e:
logger.error(f"[API] Error in update_all_documents: {e}")
raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}")
@app.get("/api/document-chunks/view")
@timing_decorator_async
async def view_all_document_chunks():
"""
API xem toàn bộ dữ liệu trong bảng document_chunks theo cấu trúc cây.
"""
try:
logger.info("[API] Starting view all document chunks")
# Lấy dữ liệu từ Supabase
chunks_data = supabase_client.get_all_document_chunks()
# Thống kê cơ bản
total_chunks = len(chunks_data)
unique_documents = len(set(chunk.get('vanbanid') for chunk in chunks_data if chunk.get('vanbanid')))
# Nhóm theo vanbanid
chunks_by_document = {}
for chunk in chunks_data:
vanbanid = chunk.get('vanbanid')
if vanbanid not in chunks_by_document:
chunks_by_document[vanbanid] = []
chunks_by_document[vanbanid].append(chunk)
# Thống kê chi tiết
document_stats = []
hierarchical_data = []
for vanbanid, chunks in chunks_by_document.items():
# Thống kê
document_stats.append({
"vanbanid": vanbanid,
"chunk_count": len(chunks),
"document_title": chunks[0].get('document_title', 'Unknown') if chunks else 'Unknown'
})
# Tạo cấu trúc cây cho từng văn bản
tree_structure = build_chunk_tree(chunks)
hierarchical_data.append({
"vanbanid": vanbanid,
"document_title": chunks[0].get('document_title', 'Unknown') if chunks else 'Unknown',
"chunk_count": len(chunks),
"chunks": tree_structure
})
return {
"status": "success",
"message": f"Đã lấy {total_chunks} chunks từ {unique_documents} văn bản",
"summary": {
"total_chunks": total_chunks,
"unique_documents": unique_documents,
"document_stats": document_stats
},
"data": hierarchical_data
}
except Exception as e:
logger.error(f"[API] Error in view_all_document_chunks: {e}")
raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}")
def build_chunk_tree(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Xây dựng cấu trúc cây từ danh sách chunks phẳng.
Cách đơn giản: tìm root nodes (cha=None) trước, sau đó tìm children.
"""
if not chunks:
return []
# Debug: Kiểm tra tất cả chunks
root_count = 0
child_count = 0
for chunk in chunks:
if chunk.get('cha') is None:
root_count += 1
logger.debug(f"[TREE] Root chunk: {chunk.get('content', '')[:100]}")
else:
child_count += 1
logger.debug(f"[TREE] Child chunk: {chunk.get('content', '')[:100]} -> parent: {chunk.get('cha')}")
logger.info(f"[TREE] Found {root_count} root chunks and {child_count} child chunks from {len(chunks)} total chunks")
# Tạo dictionary để truy cập nhanh
chunks_dict = {chunk['id']: chunk for chunk in chunks}
def build_node(chunk_id: str) -> Dict[str, Any]:
"""Tạo node và tìm tất cả children của nó."""
chunk = chunks_dict[chunk_id]
# Tạo node
node = {
"id": chunk_id,
"content": chunk.get('content', ''),
"vanbanid": chunk.get('vanbanid'),
"cha": chunk.get('cha'),
"document_title": chunk.get('document_title', ''),
"article_number": chunk.get('article_number'),
"article_title": chunk.get('article_title', ''),
"clause_number": chunk.get('clause_number', ''),
"sub_clause_letter": chunk.get('sub_clause_letter', ''),
"context_summary": chunk.get('context_summary', ''),
"data": chunk, # Toàn bộ dữ liệu gốc
"children": []
}
# Tìm tất cả children của node này
children_count = 0
for other_chunk in chunks:
if other_chunk.get('cha') == chunk_id:
child_node = build_node(other_chunk['id'])
node["children"].append(child_node)
children_count += 1
logger.debug(f"[TREE] Node {chunk_id[:8]}... has {children_count} children")
return node
# Tìm tất cả root nodes (cha=None) và sắp xếp theo thứ tự xuất hiện
root_chunks = []
processed_ids = set() # Để tránh xử lý trùng lặp
for chunk in chunks:
if chunk.get('cha') is None and chunk['id'] not in processed_ids:
root_node = build_node(chunk['id'])
root_chunks.append(root_node)
processed_ids.add(chunk['id'])
logger.info(f"[TREE] Added root node: {chunk.get('content', '')[:100]}")
logger.info(f"[TREE] Built tree with {len(root_chunks)} root nodes from {len(chunks)} total chunks")
logger.info(f"[TREE] Root chunks: {[chunk.get('content', '')[:50] for chunk in root_chunks]}")
return root_chunks
@app.get("/api/document-chunks/status")
@timing_decorator_async
async def get_document_chunks_status():
"""
API lấy thông tin trạng thái của document chunks.
"""
try:
logger.info("[API] Getting document chunks status")
# Lấy thống kê từ Supabase
# Note: Cần implement method này trong SupabaseClient nếu cần
# Kiểm tra thư mục data
data_dir = "data"
txt_files = []
if os.path.exists(data_dir):
txt_files = [f for f in os.listdir(data_dir) if f.endswith('.txt')]
return {
"status": "success",
"data_directory": data_dir,
"available_files": txt_files,
"file_count": len(txt_files),
"message": f"Tìm thấy {len(txt_files)} file .txt trong thư mục data"
}
except Exception as e:
logger.error(f"[API] Error in get_document_chunks_status: {e}")
raise HTTPException(status_code=500, detail=f"Lỗi: {str(e)}")
if __name__ == "__main__":
import uvicorn
logger.info("[STARTUP] Bắt đầu chạy uvicorn server...")
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=port
)