from typing import Dict, Any, List import asyncio import traceback from loguru import logger from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES from .utils import get_random_message from .facebook import FacebookClient from app.config import get_settings import re class MessageProcessor: def __init__(self, channel, sender_id): self.channel = channel self.sender_id = sender_id # FacebookClient riêng cho từng conversation self.facebook = FacebookClient( app_secret=get_settings().facebook_app_secret, page_id=channel.page_id, page_token=channel.get_page_token(), sender_id=sender_id ) async def process_message(self, message_data: Dict[str, Any]): # Refactor logic từ main.py vào đây # Lưu ý: self.channel.supabase, self.channel.llm, ... if not message_data or not isinstance(message_data, dict): logger.error(f"[ERROR] Invalid message_data: {message_data}") return required_fields = ["sender_id", "page_id", "text", "timestamp"] for field in required_fields: if field not in message_data: logger.error(f"[ERROR] Missing field {field} in message_data: {message_data}") return sender_id = message_data["sender_id"] page_id = message_data["page_id"] message_text = message_data["text"] timestamp = message_data["timestamp"] attachments = message_data.get('attachments', []) logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message") # Nếu không có message_text và attachments, không xử lý if not message_text and not attachments: logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...") return # Get conversation history (run in thread pool) loop = asyncio.get_event_loop() sheets_client = self.channel.get_sheets_client() history = [] history = await loop.run_in_executor( None, lambda: sheets_client.get_conversation_history(sender_id, page_id) ) logger.info(f"[DEBUG] history: {history}")# log_kwargs = { 'conversation_id': None, 'recipient_id': sender_id, 'page_id': page_id, 'originaltext': message_text, 'originalcommand': '', 'originalcontent': '', 'originalattachments': attachments, 'originalvehicle': '', 'originalaction': '', 'originalpurpose': '', 'timestamp': [timestamp], 'isdone': False } logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}") conv = None if history: # 1. Chặn duplicate message (trùng sender_id, page_id, timestamp) for row in history: row_timestamps = self.flatten_timestamp(row.get('timestamp', [])) if isinstance(row_timestamps, list) and len(row_timestamps) == 1 and isinstance(row_timestamps[0], list): row_timestamps = row_timestamps[0] if ( str(timestamp) in [str(ts) for ts in row_timestamps] and str(row.get('recipient_id')) == str(sender_id) and str(row.get('page_id')) == str(page_id) ): logger.info("[DUPLICATE] Message duplicate, skipping log.") return conv = { 'conversation_id': row.get('conversation_id'), 'recipient_id': row.get('recipient_id'), 'page_id': row.get('page_id'), 'originaltext': row.get('originaltext'), 'originalcommand': row.get('originalcommand'), 'originalcontent': row.get('originalcontent'), 'originalattachments': row.get('originalattachments'), 'originalvehicle': row.get('originalvehicle'), 'originalaction': row.get('originalaction'), 'originalpurpose': row.get('originalpurpose'), 'timestamp': row_timestamps, 'isdone': row.get('isdone') } else: # 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản conv = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**log_kwargs)) if not conv: logger.error("Không thể tạo conversation mới!") return else: logger.info(f"[DEBUG] Message history: {conv}") for key, value in log_kwargs.items(): if value not in (None, "", []) and conv.get(key) in (None, "", []): conv[key] = value # Thêm timestamp mới nếu chưa có conv['timestamp'] = self.flatten_timestamp(conv['timestamp']) if timestamp not in conv['timestamp']: conv['timestamp'].append(timestamp) logger.info(f"[DEBUG] Message history sau update: {conv}") await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv)) # Get page access token (cache) page_token = self.channel.get_page_token() # Không cần update context FacebookClient nữa if page_token: logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}") else: logger.info(f"[DEBUG] page_token: None") logger.error(f"No access token found for page {message_data['page_id']}") return # Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần try: await self.facebook.send_message(message=get_random_message(PROCESSING_STATUS_MESSAGES)) except Exception as e: if "expired" in str(e).lower(): logger.warning("[FACEBOOK] Token expired, invalidate and refresh") self.channel.invalidate_page_token() page_token = self.channel.get_page_token(force_refresh=True) # Có thể update lại page_token cho self.facebook nếu cần self.facebook.page_token = page_token # await self.facebook.send_message(message="Ok, để mình check. Bạn chờ mình chút xíu nhé!") else: raise # Extract command and keywords from app.utils import extract_command, extract_keywords from app.constants import VEHICLE_KEYWORDS command, remaining_text = extract_command(message_text) # Sử dụng LLM để phân tích message_text và extract keywords, mục đích, hành vi vi phạm llm_analysis = await self.channel.llm.analyze(message_text) logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}") muc_dich = None hanh_vi_vi_pham = None if isinstance(llm_analysis, dict): keywords = [self.normalize_vehicle_keyword(llm_analysis.get('phuong_tien', ''))] muc_dich = llm_analysis.get('muc_dich') hanh_vi_vi_pham = llm_analysis.get('hanh_vi_vi_pham') elif isinstance(llm_analysis, list) and len(llm_analysis) > 0: keywords = [self.normalize_vehicle_keyword(llm_analysis[0].get('phuong_tien', ''))] muc_dich = llm_analysis[0].get('muc_dich') hanh_vi_vi_pham = llm_analysis[0].get('hanh_vi_vi_pham') else: keywords = extract_keywords(message_text, VEHICLE_KEYWORDS) hanh_vi_vi_pham = message_text for kw in keywords: hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "") hanh_vi_vi_pham = hanh_vi_vi_pham.strip() logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}") # await self.channel.facebook.send_message(message=f"... đang tìm kiếm quy định liên quan đến {hanh_vi_vi_pham} .....") # 4. Update lại conversation với thông tin đầy đủ update_kwargs = { 'conversation_id': conv['conversation_id'], 'recipient_id': sender_id, 'page_id': page_id, 'originaltext': message_text, 'originalcommand': command, 'originalcontent': remaining_text, 'originalattachments': attachments, 'originalvehicle': ','.join(keywords), 'originalaction': hanh_vi_vi_pham, 'originalpurpose': muc_dich, 'timestamp': self.flatten_timestamp(conv['timestamp']), 'isdone': False } for key, value in update_kwargs.items(): if value not in (None, "", []) and conv.get(key) in (None, "", []): conv[key] = value logger.info(f"[DEBUG] Message history update cuối cùng: {conv}") # 5. Rẽ nhánh xử lý theo mục đích (muc_dich) # Lấy muc_dich từ history nếu có, hoặc từ message mới phân tích muc_dich_to_use = None if history and conv.get('originalpurpose'): muc_dich_to_use = conv.get('originalpurpose') else: muc_dich_to_use = muc_dich logger.info(f"[DEBUG] Định hướng mục đích xử lý: {muc_dich_to_use}") response = None if not command: if muc_dich_to_use == "hỏi về mức phạt": response = await self.handle_muc_phat(conv, message_text, page_token, sender_id) elif muc_dich_to_use == "hỏi về quy tắc giao thông": response = await self.handle_quy_tac(conv, message_text) elif muc_dich_to_use == "hỏi về báo hiệu đường bộ": response = await self.handle_bao_hieu(conv, message_text) elif muc_dich_to_use == "hỏi về quy trình xử lý vi phạm giao thông": response = await self.handle_quy_trinh(conv, message_text) elif muc_dich_to_use == "thông tin cá nhân của AI": response = await self.handle_ca_nhan(conv, message_text) else: response = await self.handle_khac(conv, message_text) else: # Có command if command == "xong": post_url = await self.create_facebook_post(page_token, conv['recipient_id'], [conv]) if post_url: response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}" else: response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau." conv['isdone'] = True else: response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất." conv['isdone'] = False # 6. Gửi response và cập nhật final state await self.facebook.send_message(message=response) if hasattr(self.channel, 'sheets'): await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv)) return def flatten_timestamp(self, ts): flat = [] for t in ts: if isinstance(t, list): flat.extend(self.flatten_timestamp(t)) else: flat.append(t) return flat def normalize_vehicle_keyword(self, keyword: str) -> str: from app.constants import VEHICLE_KEYWORDS import difflib if not keyword: return "" matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6) if matches: for k in VEHICLE_KEYWORDS: if k.lower() == matches[0]: return k return keyword async def process_business_logic(self, log_kwargs: Dict[str, Any], page_token: str) -> str: command = log_kwargs.get('originalcommand', '') vehicle = log_kwargs.get('originalvehicle', '') action = log_kwargs.get('originalaction', '') message = log_kwargs.get('originaltext', '') # Tách vehicle thành list keywords keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()] if not command: if keywords: # Có thông tin phương tiện if action: logger.info(f"[DEBUG] tạo embedding: {action}") embedding = await self.channel.embedder.create_embedding(action) logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})") matches = self.channel.supabase.match_documents( embedding, vehicle_keywords=keywords, user_question=action ) logger.info(f"[DEBUG] matches: {matches}") if matches: response = await self.format_search_results(message, matches, page_token, log_kwargs['recipient_id']) else: response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp." else: logger.info(f"[DEBUG] Không có hành vi vi phạm: {message}") response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn." log_kwargs['isdone'] = True else: # Không có thông tin phương tiện response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)" log_kwargs['isdone'] = False else: # Có command if command == "xong": post_url = await self.create_facebook_post(page_token, log_kwargs['recipient_id'], [log_kwargs]) if post_url: response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}" else: response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau." log_kwargs['isdone'] = True else: response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất." log_kwargs['isdone'] = False return response async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str: if not matches: return "Không tìm thấy kết quả phù hợp." await self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES)) try: reranked = await self.channel.reranker.rerank(question, matches, top_k=5) if reranked: # Gửi Facebook message sau khi hoàn thành # Tạm comment đi để test # if self.facebook: # try: # message = get_random_message(BATCH_STATUS_MESSAGES) # await self.facebook.send_message(message=f"... {message} ...") # except Exception as e: # logger.error(f"[RERANK][FACEBOOK] Error sending batch message: {e}") matches = reranked except Exception as e: logger.error(f"[RERANK] Lỗi khi rerank: {e}") top = None top_result_text = "" full_result_text = "" def arr_to_str(arr, sep=", "): if not arr: return "" if isinstance(arr, list): return sep.join([str(x) for x in arr if x not in (None, "")]) return str(arr) for i, match in enumerate(matches, 1): if not top or (match.get('similarity', 0) > top.get('similarity', 0)): top = match full_result_text += f"\n{(match.get('structure') or '').strip()}:\n" tieude = (match.get('tieude') or '').strip() noidung = (match.get('noidung') or '').strip() hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ') full_result_text += f"Thực hiện hành vi:\n{hanhvi}" canhantu = arr_to_str(match.get('canhantu')) canhanden = arr_to_str(match.get('canhanden')) if canhantu or canhanden: full_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ" tochuctu = arr_to_str(match.get('tochuctu')) tochucden = arr_to_str(match.get('tochucden')) if tochuctu or tochucden: full_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ" hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ") if hpbsnoidung: full_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}" bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ") if bpkpnoidung: full_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}" impounding = match.get('impounding') if impounding: full_result_text += f"\nTạm giữ phương tiên: 07 ngày" if top and (top.get('tieude') or top.get('noidung')): tieude = (top.get('tieude') or '').strip() noidung = (top.get('noidung') or '').strip() hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ') top_result_text += f"Thực hiện hành vi:\n{hanhvi}" canhantu = arr_to_str(top.get('canhantu')) canhanden = arr_to_str(top.get('canhanden')) if canhantu or canhanden: top_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ" tochuctu = arr_to_str(top.get('tochuctu')) tochucden = arr_to_str(top.get('tochucden')) if tochuctu or tochucden: top_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ" hpbsnoidung = arr_to_str(top.get('hpbsnoidung'), sep="; ") if hpbsnoidung: top_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}" bpkpnoidung = arr_to_str(top.get('bpkpnoidung'), sep="; ") if bpkpnoidung: top_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}" impounding = top.get('impounding') if impounding: top_result_text += f"\nTạm giữ phương tiên: 07 ngày" else: result_text = "Không có kết quả phù hợp!" prompt = ( "Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên các đoạn luật sau. " "Chỉ sử dụng thông tin có trong các đoạn, không tự đoán.\n" f"\nCác đoạn luật liên quan:\n{full_result_text}" "\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần." f"\n\nCâu hỏi của người dùng: {question}\n" ) await self.facebook.send_message(message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}") try: answer = await self.channel.llm.generate_text(prompt) if answer and answer.strip(): logger.info(f"LLM trả về câu trả lời: \n\tanswer: {answer}") return answer.strip() else: logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}") except Exception as e: logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}") fallback = "Tóm tắt các đoạn luật liên quan:\n\n" for i, match in enumerate(matches, 1): fallback += f"Đoạn {i}:\n" tieude = (match.get('tieude') or '').strip() noidung = (match.get('noidung') or '').strip() if tieude or noidung: fallback += f" - Hành vi: {(tieude + ' ' + noidung).strip()}\n" canhantu = arr_to_str(match.get('canhantu')) canhanden = arr_to_str(match.get('canhanden')) if canhantu or canhanden: fallback += f" - Cá nhân bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ\n" tochuctu = arr_to_str(match.get('tochuctu')) tochucden = arr_to_str(match.get('tochucden')) if tochuctu or tochucden: fallback += f" - Tổ chức bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ\n" hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ") if hpbsnoidung: fallback += f" - Hình phạt bổ sung: {hpbsnoidung}\n" bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ") if bpkpnoidung: fallback += f" - Biện pháp khắc phục hậu quả: {bpkpnoidung}\n" impounding = match.get('impounding') if impounding: fallback += f"\nTạm giữ phương tiên: 07 ngày" fallback += "\n" return fallback.strip() async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str: logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}") return "https://facebook.com/mock_post_url" async def handle_muc_phat(self, conv, message_text, page_token, sender_id): vehicle = conv.get('originalvehicle', '') action = conv.get('originalaction', '') keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()] if keywords: if action: logger.info(f"[DEBUG] tạo embedding: {action}") embedding = await self.channel.embedder.create_embedding(action) logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})") matches = self.channel.supabase.match_documents( embedding, vehicle_keywords=keywords, user_question=action ) logger.info(f"[DEBUG] matches: {matches}") if matches: response = await self.format_search_results(message_text, matches, page_token, sender_id) else: response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp." else: logger.info(f"[DEBUG] Không có hành vi vi phạm: {message_text}") response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn." conv['isdone'] = True else: response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)" conv['isdone'] = False return response async def handle_quy_tac(self, conv, message_text): answer = await self.channel.llm.generate_text(message_text) conv['isdone'] = True return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng trả lời về quy tắc giao thông sẽ sớm có mặt." async def handle_bao_hieu(self, conv, message_text): answer = await self.channel.llm.generate_text(message_text) conv['isdone'] = True return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng trả lời về báo hiệu đường bộ sẽ sớm có mặt." async def handle_quy_trinh(self, conv, message_text): answer = await self.channel.llm.generate_text(message_text) conv['isdone'] = True return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng trả lời về quy trình xử lý vi phạm giao thông sẽ sớm có mặt." async def handle_ca_nhan(self, conv, message_text): # Nếu câu hỏi là về thông tin cá nhân của bot, hướng dẫn LLM trả lời đúng prompt = ( 'Với các thông tin sau: "Bạn có tên là WeThoong AI, là trợ lý giao thông thông minh. Bạn được anh Viet Cat tạo ra và facebook cá nhân của anh ý là https://facebook.com/vietcat". ' 'Không được trả lời bạn là AI của Google, OpenAI, hay bất kỳ hãng nào khác. ' 'Hãy trả lời thông minh, hài hước, ngắn gọn cho câu hỏi sau:\n' f'Câu hỏi:\n"{message_text}"' ) answer = await self.channel.llm.generate_text(prompt) conv['isdone'] = True return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng này sẽ sớm có mặt." async def handle_khac(self, conv, message_text): answer = await self.channel.llm.generate_text(message_text) conv['isdone'] = True return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng này sẽ sớm có mặt."