FBChatBot / app /message_processor.py
VietCat's picture
refactor message flow on purposes
968d612
from typing import Dict, Any, List
import asyncio
import traceback
from loguru import logger
from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES
from .utils import get_random_message
from .facebook import FacebookClient
from app.config import get_settings
import re
class MessageProcessor:
def __init__(self, channel, sender_id):
self.channel = channel
self.sender_id = sender_id
# FacebookClient riêng cho từng conversation
self.facebook = FacebookClient(
app_secret=get_settings().facebook_app_secret,
page_id=channel.page_id,
page_token=channel.get_page_token(),
sender_id=sender_id
)
async def process_message(self, message_data: Dict[str, Any]):
# Refactor logic từ main.py vào đây
# Lưu ý: self.channel.supabase, self.channel.llm, ...
if not message_data or not isinstance(message_data, dict):
logger.error(f"[ERROR] Invalid message_data: {message_data}")
return
required_fields = ["sender_id", "page_id", "text", "timestamp"]
for field in required_fields:
if field not in message_data:
logger.error(f"[ERROR] Missing field {field} in message_data: {message_data}")
return
sender_id = message_data["sender_id"]
page_id = message_data["page_id"]
message_text = message_data["text"]
timestamp = message_data["timestamp"]
attachments = message_data.get('attachments', [])
logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message")
# Nếu không có message_text và attachments, không xử lý
if not message_text and not attachments:
logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
return
# Get conversation history (run in thread pool)
loop = asyncio.get_event_loop()
sheets_client = self.channel.get_sheets_client()
history = []
history = await loop.run_in_executor(
None, lambda: sheets_client.get_conversation_history(sender_id, page_id)
)
logger.info(f"[DEBUG] history: {history}")#
log_kwargs = {
'conversation_id': None,
'recipient_id': sender_id,
'page_id': page_id,
'originaltext': message_text,
'originalcommand': '',
'originalcontent': '',
'originalattachments': attachments,
'originalvehicle': '',
'originalaction': '',
'originalpurpose': '',
'timestamp': [timestamp],
'isdone': False
}
logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
conv = None
if history:
# 1. Chặn duplicate message (trùng sender_id, page_id, timestamp)
for row in history:
row_timestamps = self.flatten_timestamp(row.get('timestamp', []))
if isinstance(row_timestamps, list) and len(row_timestamps) == 1 and isinstance(row_timestamps[0], list):
row_timestamps = row_timestamps[0]
if (
str(timestamp) in [str(ts) for ts in row_timestamps]
and str(row.get('recipient_id')) == str(sender_id)
and str(row.get('page_id')) == str(page_id)
):
logger.info("[DUPLICATE] Message duplicate, skipping log.")
return
conv = {
'conversation_id': row.get('conversation_id'),
'recipient_id': row.get('recipient_id'),
'page_id': row.get('page_id'),
'originaltext': row.get('originaltext'),
'originalcommand': row.get('originalcommand'),
'originalcontent': row.get('originalcontent'),
'originalattachments': row.get('originalattachments'),
'originalvehicle': row.get('originalvehicle'),
'originalaction': row.get('originalaction'),
'originalpurpose': row.get('originalpurpose'),
'timestamp': row_timestamps,
'isdone': row.get('isdone')
}
else:
# 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
conv = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**log_kwargs))
if not conv:
logger.error("Không thể tạo conversation mới!")
return
else:
logger.info(f"[DEBUG] Message history: {conv}")
for key, value in log_kwargs.items():
if value not in (None, "", []) and conv.get(key) in (None, "", []):
conv[key] = value
# Thêm timestamp mới nếu chưa có
conv['timestamp'] = self.flatten_timestamp(conv['timestamp'])
if timestamp not in conv['timestamp']:
conv['timestamp'].append(timestamp)
logger.info(f"[DEBUG] Message history sau update: {conv}")
await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
# Get page access token (cache)
page_token = self.channel.get_page_token()
# Không cần update context FacebookClient nữa
if page_token:
logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}")
else:
logger.info(f"[DEBUG] page_token: None")
logger.error(f"No access token found for page {message_data['page_id']}")
return
# Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
try:
await self.facebook.send_message(message=get_random_message(PROCESSING_STATUS_MESSAGES))
except Exception as e:
if "expired" in str(e).lower():
logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
self.channel.invalidate_page_token()
page_token = self.channel.get_page_token(force_refresh=True)
# Có thể update lại page_token cho self.facebook nếu cần
self.facebook.page_token = page_token
# await self.facebook.send_message(message="Ok, để mình check. Bạn chờ mình chút xíu nhé!")
else:
raise
# Extract command and keywords
from app.utils import extract_command, extract_keywords
from app.constants import VEHICLE_KEYWORDS
command, remaining_text = extract_command(message_text)
# Sử dụng LLM để phân tích message_text và extract keywords, mục đích, hành vi vi phạm
llm_analysis = await self.channel.llm.analyze(message_text)
logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
muc_dich = None
hanh_vi_vi_pham = None
if isinstance(llm_analysis, dict):
keywords = [self.normalize_vehicle_keyword(llm_analysis.get('phuong_tien', ''))]
muc_dich = llm_analysis.get('muc_dich')
hanh_vi_vi_pham = llm_analysis.get('hanh_vi_vi_pham')
elif isinstance(llm_analysis, list) and len(llm_analysis) > 0:
keywords = [self.normalize_vehicle_keyword(llm_analysis[0].get('phuong_tien', ''))]
muc_dich = llm_analysis[0].get('muc_dich')
hanh_vi_vi_pham = llm_analysis[0].get('hanh_vi_vi_pham')
else:
keywords = extract_keywords(message_text, VEHICLE_KEYWORDS)
hanh_vi_vi_pham = message_text
for kw in keywords:
hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "")
hanh_vi_vi_pham = hanh_vi_vi_pham.strip()
logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}")
# await self.channel.facebook.send_message(message=f"... đang tìm kiếm quy định liên quan đến {hanh_vi_vi_pham} .....")
# 4. Update lại conversation với thông tin đầy đủ
update_kwargs = {
'conversation_id': conv['conversation_id'],
'recipient_id': sender_id,
'page_id': page_id,
'originaltext': message_text,
'originalcommand': command,
'originalcontent': remaining_text,
'originalattachments': attachments,
'originalvehicle': ','.join(keywords),
'originalaction': hanh_vi_vi_pham,
'originalpurpose': muc_dich,
'timestamp': self.flatten_timestamp(conv['timestamp']),
'isdone': False
}
for key, value in update_kwargs.items():
if value not in (None, "", []) and conv.get(key) in (None, "", []):
conv[key] = value
logger.info(f"[DEBUG] Message history update cuối cùng: {conv}")
# 5. Rẽ nhánh xử lý theo mục đích (muc_dich)
# Lấy muc_dich từ history nếu có, hoặc từ message mới phân tích
muc_dich_to_use = None
if history and conv.get('originalpurpose'):
muc_dich_to_use = conv.get('originalpurpose')
else:
muc_dich_to_use = muc_dich
logger.info(f"[DEBUG] Định hướng mục đích xử lý: {muc_dich_to_use}")
response = None
if not command:
if muc_dich_to_use == "hỏi về mức phạt":
response = await self.handle_muc_phat(conv, message_text, page_token, sender_id)
elif muc_dich_to_use == "hỏi về quy tắc giao thông":
response = await self.handle_quy_tac(conv, message_text)
elif muc_dich_to_use == "hỏi về báo hiệu đường bộ":
response = await self.handle_bao_hieu(conv, message_text)
elif muc_dich_to_use == "hỏi về quy trình xử lý vi phạm giao thông":
response = await self.handle_quy_trinh(conv, message_text)
elif muc_dich_to_use == "thông tin cá nhân của AI":
response = await self.handle_ca_nhan(conv, message_text)
else:
response = await self.handle_khac(conv, message_text)
else:
# Có command
if command == "xong":
post_url = await self.create_facebook_post(page_token, conv['recipient_id'], [conv])
if post_url:
response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}"
else:
response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau."
conv['isdone'] = True
else:
response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất."
conv['isdone'] = False
# 6. Gửi response và cập nhật final state
await self.facebook.send_message(message=response)
if hasattr(self.channel, 'sheets'):
await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
return
def flatten_timestamp(self, ts):
flat = []
for t in ts:
if isinstance(t, list):
flat.extend(self.flatten_timestamp(t))
else:
flat.append(t)
return flat
def normalize_vehicle_keyword(self, keyword: str) -> str:
from app.constants import VEHICLE_KEYWORDS
import difflib
if not keyword:
return ""
matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6)
if matches:
for k in VEHICLE_KEYWORDS:
if k.lower() == matches[0]:
return k
return keyword
async def process_business_logic(self, log_kwargs: Dict[str, Any], page_token: str) -> str:
command = log_kwargs.get('originalcommand', '')
vehicle = log_kwargs.get('originalvehicle', '')
action = log_kwargs.get('originalaction', '')
message = log_kwargs.get('originaltext', '')
# Tách vehicle thành list keywords
keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()]
if not command:
if keywords:
# Có thông tin phương tiện
if action:
logger.info(f"[DEBUG] tạo embedding: {action}")
embedding = await self.channel.embedder.create_embedding(action)
logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})")
matches = self.channel.supabase.match_documents(
embedding,
vehicle_keywords=keywords,
user_question=action
)
logger.info(f"[DEBUG] matches: {matches}")
if matches:
response = await self.format_search_results(message, matches, page_token, log_kwargs['recipient_id'])
else:
response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp."
else:
logger.info(f"[DEBUG] Không có hành vi vi phạm: {message}")
response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn."
log_kwargs['isdone'] = True
else:
# Không có thông tin phương tiện
response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)"
log_kwargs['isdone'] = False
else:
# Có command
if command == "xong":
post_url = await self.create_facebook_post(page_token, log_kwargs['recipient_id'], [log_kwargs])
if post_url:
response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}"
else:
response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau."
log_kwargs['isdone'] = True
else:
response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất."
log_kwargs['isdone'] = False
return response
async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
if not matches:
return "Không tìm thấy kết quả phù hợp."
await self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES))
try:
reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
if reranked:
# Gửi Facebook message sau khi hoàn thành
# Tạm comment đi để test
# if self.facebook:
# try:
# message = get_random_message(BATCH_STATUS_MESSAGES)
# await self.facebook.send_message(message=f"... {message} ...")
# except Exception as e:
# logger.error(f"[RERANK][FACEBOOK] Error sending batch message: {e}")
matches = reranked
except Exception as e:
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
top = None
top_result_text = ""
full_result_text = ""
def arr_to_str(arr, sep=", "):
if not arr:
return ""
if isinstance(arr, list):
return sep.join([str(x) for x in arr if x not in (None, "")])
return str(arr)
for i, match in enumerate(matches, 1):
if not top or (match.get('similarity', 0) > top.get('similarity', 0)):
top = match
full_result_text += f"\n{(match.get('structure') or '').strip()}:\n"
tieude = (match.get('tieude') or '').strip()
noidung = (match.get('noidung') or '').strip()
hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
full_result_text += f"Thực hiện hành vi:\n{hanhvi}"
canhantu = arr_to_str(match.get('canhantu'))
canhanden = arr_to_str(match.get('canhanden'))
if canhantu or canhanden:
full_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
tochuctu = arr_to_str(match.get('tochuctu'))
tochucden = arr_to_str(match.get('tochucden'))
if tochuctu or tochucden:
full_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
if hpbsnoidung:
full_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
if bpkpnoidung:
full_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
impounding = match.get('impounding')
if impounding:
full_result_text += f"\nTạm giữ phương tiên: 07 ngày"
if top and (top.get('tieude') or top.get('noidung')):
tieude = (top.get('tieude') or '').strip()
noidung = (top.get('noidung') or '').strip()
hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
top_result_text += f"Thực hiện hành vi:\n{hanhvi}"
canhantu = arr_to_str(top.get('canhantu'))
canhanden = arr_to_str(top.get('canhanden'))
if canhantu or canhanden:
top_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
tochuctu = arr_to_str(top.get('tochuctu'))
tochucden = arr_to_str(top.get('tochucden'))
if tochuctu or tochucden:
top_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
hpbsnoidung = arr_to_str(top.get('hpbsnoidung'), sep="; ")
if hpbsnoidung:
top_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
bpkpnoidung = arr_to_str(top.get('bpkpnoidung'), sep="; ")
if bpkpnoidung:
top_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
impounding = top.get('impounding')
if impounding:
top_result_text += f"\nTạm giữ phương tiên: 07 ngày"
else:
result_text = "Không có kết quả phù hợp!"
prompt = (
"Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên các đoạn luật sau. "
"Chỉ sử dụng thông tin có trong các đoạn, không tự đoán.\n"
f"\nCác đoạn luật liên quan:\n{full_result_text}"
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
f"\n\nCâu hỏi của người dùng: {question}\n"
)
await self.facebook.send_message(message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}")
try:
answer = await self.channel.llm.generate_text(prompt)
if answer and answer.strip():
logger.info(f"LLM trả về câu trả lời: \n\tanswer: {answer}")
return answer.strip()
else:
logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}")
except Exception as e:
logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}")
fallback = "Tóm tắt các đoạn luật liên quan:\n\n"
for i, match in enumerate(matches, 1):
fallback += f"Đoạn {i}:\n"
tieude = (match.get('tieude') or '').strip()
noidung = (match.get('noidung') or '').strip()
if tieude or noidung:
fallback += f" - Hành vi: {(tieude + ' ' + noidung).strip()}\n"
canhantu = arr_to_str(match.get('canhantu'))
canhanden = arr_to_str(match.get('canhanden'))
if canhantu or canhanden:
fallback += f" - Cá nhân bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ\n"
tochuctu = arr_to_str(match.get('tochuctu'))
tochucden = arr_to_str(match.get('tochucden'))
if tochuctu or tochucden:
fallback += f" - Tổ chức bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ\n"
hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
if hpbsnoidung:
fallback += f" - Hình phạt bổ sung: {hpbsnoidung}\n"
bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
if bpkpnoidung:
fallback += f" - Biện pháp khắc phục hậu quả: {bpkpnoidung}\n"
impounding = match.get('impounding')
if impounding:
fallback += f"\nTạm giữ phương tiên: 07 ngày"
fallback += "\n"
return fallback.strip()
async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
return "https://facebook.com/mock_post_url"
async def handle_muc_phat(self, conv, message_text, page_token, sender_id):
vehicle = conv.get('originalvehicle', '')
action = conv.get('originalaction', '')
keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()]
if keywords:
if action:
logger.info(f"[DEBUG] tạo embedding: {action}")
embedding = await self.channel.embedder.create_embedding(action)
logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})")
matches = self.channel.supabase.match_documents(
embedding,
vehicle_keywords=keywords,
user_question=action
)
logger.info(f"[DEBUG] matches: {matches}")
if matches:
response = await self.format_search_results(message_text, matches, page_token, sender_id)
else:
response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp."
else:
logger.info(f"[DEBUG] Không có hành vi vi phạm: {message_text}")
response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn."
conv['isdone'] = True
else:
response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)"
conv['isdone'] = False
return response
async def handle_quy_tac(self, conv, message_text):
answer = await self.channel.llm.generate_text(message_text)
conv['isdone'] = True
return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng trả lời về quy tắc giao thông sẽ sớm có mặt."
async def handle_bao_hieu(self, conv, message_text):
answer = await self.channel.llm.generate_text(message_text)
conv['isdone'] = True
return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng trả lời về báo hiệu đường bộ sẽ sớm có mặt."
async def handle_quy_trinh(self, conv, message_text):
answer = await self.channel.llm.generate_text(message_text)
conv['isdone'] = True
return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng trả lời về quy trình xử lý vi phạm giao thông sẽ sớm có mặt."
async def handle_ca_nhan(self, conv, message_text):
# Nếu câu hỏi là về thông tin cá nhân của bot, hướng dẫn LLM trả lời đúng
prompt = (
'Với các thông tin sau: "Bạn có tên là WeThoong AI, là trợ lý giao thông thông minh. Bạn được anh Viet Cat tạo ra và facebook cá nhân của anh ý là https://facebook.com/vietcat". '
'Không được trả lời bạn là AI của Google, OpenAI, hay bất kỳ hãng nào khác. '
'Hãy trả lời thông minh, hài hước, ngắn gọn cho câu hỏi sau:\n'
f'Câu hỏi:\n"{message_text}"'
)
answer = await self.channel.llm.generate_text(prompt)
conv['isdone'] = True
return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng này sẽ sớm có mặt."
async def handle_khac(self, conv, message_text):
answer = await self.channel.llm.generate_text(message_text)
conv['isdone'] = True
return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng này sẽ sớm có mặt."