VietCat commited on
Commit
f6cdf9d
·
1 Parent(s): e68c212

refactor chat channel structure

Browse files
app/channel_manager.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from app.chat_channel import ChatChannel
2
+
3
+ class ChannelManager:
4
+ def __init__(self):
5
+ self.channels = {} # {(channel_type, page_id): ChatChannel}
6
+
7
+ def get_or_create_channel(self, channel_type: str, page_id: str) -> ChatChannel:
8
+ key = (channel_type, page_id)
9
+ if key not in self.channels:
10
+ self.channels[key] = ChatChannel(page_id, channel_type)
11
+ return self.channels[key]
12
+
13
+ channel_manager = ChannelManager()
app/chat_channel.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .supabase_db import SupabaseClient
2
+ from .llm import create_llm_client
3
+ from .reranker import Reranker
4
+ from .embedding import EmbeddingClient
5
+ from .facebook import FacebookClient
6
+ from .config import get_settings
7
+ from app.message_processor import MessageProcessor
8
+
9
+ class ChatChannel:
10
+ def __init__(self, page_id: str, channel_type: str = "facebook"):
11
+ self.channel_type = channel_type
12
+ self.page_id = page_id
13
+ self.page_token = None # cache token
14
+ settings = get_settings()
15
+ self.supabase = SupabaseClient(settings.supabase_url, settings.supabase_key)
16
+ self.llm = create_llm_client(
17
+ provider="gemini",
18
+ base_url=settings.gemini_base_url,
19
+ model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
20
+ )
21
+ self.reranker = Reranker()
22
+ self.embedder = EmbeddingClient()
23
+ self.facebook = FacebookClient(settings.facebook_app_secret)
24
+ self.message_processor = MessageProcessor(self)
25
+
26
+ def get_page_token(self, force_refresh=False):
27
+ if self.page_token is None or force_refresh:
28
+ self.page_token = self.supabase.get_page_token(self.page_id)
29
+ return self.page_token
30
+
31
+ def invalidate_page_token(self):
32
+ self.page_token = None
app/message_processor.py ADDED
@@ -0,0 +1,355 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Any, List
2
+ import asyncio
3
+ import traceback
4
+ from loguru import logger
5
+
6
+ class MessageProcessor:
7
+ def __init__(self, channel):
8
+ self.channel = channel
9
+
10
+ async def process_message(self, message_data: Dict[str, Any]):
11
+ # Refactor logic từ main.py vào đây
12
+ # Lưu ý: self.channel.supabase, self.channel.llm, ...
13
+ if not message_data or not isinstance(message_data, dict):
14
+ logger.error(f"[ERROR] Invalid message_data: {message_data}")
15
+ return
16
+ required_fields = ["sender_id", "page_id", "text", "timestamp"]
17
+ for field in required_fields:
18
+ if field not in message_data:
19
+ logger.error(f"[ERROR] Missing field {field} in message_data: {message_data}")
20
+ return
21
+ sender_id = message_data["sender_id"]
22
+ page_id = message_data["page_id"]
23
+ message_text = message_data["text"]
24
+ timestamp = message_data["timestamp"]
25
+ attachments = message_data.get('attachments', [])
26
+ logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message")
27
+
28
+ # Nếu không có message_text và attachments, không xử lý
29
+ if not message_text and not attachments:
30
+ logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
31
+ return
32
+
33
+ # Get conversation history (run in thread pool)
34
+ loop = asyncio.get_event_loop()
35
+ sheets_client = self.channel.supabase # Nếu bạn dùng Google Sheets, thay bằng self.channel.sheets nếu có
36
+ history = []
37
+ if hasattr(self.channel, 'sheets'):
38
+ history = await loop.run_in_executor(
39
+ None, lambda: self.channel.sheets.get_conversation_history(sender_id, page_id)
40
+ )
41
+ logger.info(f"[DEBUG] history: {history}")
42
+
43
+ log_kwargs = {
44
+ 'conversation_id': None,
45
+ 'recipient_id': sender_id,
46
+ 'page_id': page_id,
47
+ 'originaltext': message_text,
48
+ 'originalcommand': '',
49
+ 'originalcontent': '',
50
+ 'originalattachments': attachments,
51
+ 'originalvehicle': '',
52
+ 'originalaction': '',
53
+ 'originalpurpose': '',
54
+ 'timestamp': [timestamp],
55
+ 'isdone': False
56
+ }
57
+
58
+ logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
59
+ conv = None
60
+
61
+ if history:
62
+ # 1. Chặn duplicate message (trùng sender_id, page_id, timestamp)
63
+ for row in history:
64
+ row_timestamps = self.flatten_timestamp(row.get('timestamp', []))
65
+ if isinstance(row_timestamps, list) and len(row_timestamps) == 1 and isinstance(row_timestamps[0], list):
66
+ row_timestamps = row_timestamps[0]
67
+ if (
68
+ str(timestamp) in [str(ts) for ts in row_timestamps]
69
+ and str(row.get('recipient_id')) == str(sender_id)
70
+ and str(row.get('page_id')) == str(page_id)
71
+ ):
72
+ logger.info("[DUPLICATE] Message duplicate, skipping log.")
73
+ return
74
+ conv = {
75
+ 'conversation_id': row.get('conversation_id'),
76
+ 'recipient_id': row.get('recipient_id'),
77
+ 'page_id': row.get('page_id'),
78
+ 'originaltext': row.get('originaltext'),
79
+ 'originalcommand': row.get('originalcommand'),
80
+ 'originalcontent': row.get('originalcontent'),
81
+ 'originalattachments': row.get('originalattachments'),
82
+ 'originalvehicle': row.get('originalvehicle'),
83
+ 'originalaction': row.get('originalaction'),
84
+ 'originalpurpose': row.get('originalpurpose'),
85
+ 'timestamp': row_timestamps,
86
+ 'isdone': row.get('isdone')
87
+ }
88
+ else:
89
+ # 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
90
+ if hasattr(self.channel, 'sheets'):
91
+ conv = await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**log_kwargs))
92
+ else:
93
+ conv = log_kwargs.copy()
94
+ if not conv:
95
+ logger.error("Không thể tạo conversation mới!")
96
+ return
97
+ else:
98
+ logger.info(f"[DEBUG] Message history: {conv}")
99
+ for key, value in log_kwargs.items():
100
+ if value not in (None, "", []) and conv.get(key) in (None, "", []):
101
+ conv[key] = value
102
+ # Thêm timestamp mới nếu chưa có
103
+ conv['timestamp'] = self.flatten_timestamp(conv['timestamp'])
104
+ if timestamp not in conv['timestamp']:
105
+ conv['timestamp'].append(timestamp)
106
+ logger.info(f"[DEBUG] Message history sau update: {conv}")
107
+ if hasattr(self.channel, 'sheets'):
108
+ await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
109
+
110
+ # Get page access token (cache)
111
+ page_token = self.channel.get_page_token()
112
+ if page_token:
113
+ logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}")
114
+ else:
115
+ logger.info(f"[DEBUG] page_token: None")
116
+ logger.error(f"No access token found for page {message_data['page_id']}")
117
+ return
118
+ # Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
119
+ try:
120
+ await self.channel.facebook.send_message(page_token, message_data['sender_id'], "Ok, để mình check. Bạn chờ mình chút xíu nhé!")
121
+ except Exception as e:
122
+ if "expired" in str(e).lower():
123
+ logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
124
+ self.channel.invalidate_page_token()
125
+ page_token = self.channel.get_page_token(force_refresh=True)
126
+ await self.channel.facebook.send_message(page_token, message_data['sender_id'], "Ok, để mình check. Bạn chờ mình chút xíu nhé!")
127
+ else:
128
+ raise
129
+
130
+ # Extract command and keywords
131
+ from app.utils import extract_command, extract_keywords
132
+ from app.constants import VEHICLE_KEYWORDS
133
+ command, remaining_text = extract_command(message_text)
134
+ # Sử dụng LLM để phân tích message_text và extract keywords, mục đích, hành vi vi phạm
135
+ llm_analysis = await self.channel.llm.analyze(message_text)
136
+ logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
137
+ muc_dich = None
138
+ hanh_vi_vi_pham = None
139
+ if isinstance(llm_analysis, dict):
140
+ keywords = [self.normalize_vehicle_keyword(llm_analysis.get('phuong_tien', ''))]
141
+ muc_dich = llm_analysis.get('muc_dich')
142
+ hanh_vi_vi_pham = llm_analysis.get('hanh_vi_vi_pham')
143
+ elif isinstance(llm_analysis, list) and len(llm_analysis) > 0:
144
+ keywords = [self.normalize_vehicle_keyword(llm_analysis[0].get('phuong_tien', ''))]
145
+ muc_dich = llm_analysis[0].get('muc_dich')
146
+ hanh_vi_vi_pham = llm_analysis[0].get('hanh_vi_vi_pham')
147
+ else:
148
+ keywords = extract_keywords(message_text, VEHICLE_KEYWORDS)
149
+ hanh_vi_vi_pham = message_text
150
+ for kw in keywords:
151
+ hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "")
152
+ hanh_vi_vi_pham = hanh_vi_vi_pham.strip()
153
+ logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}")
154
+ await self.channel.facebook.send_message(page_token, sender_id, f"... đang tìm kiếm hành vi {hanh_vi_vi_pham} .....")
155
+ # 4. Update lại conversation với thông tin đầy đủ
156
+ update_kwargs = {
157
+ 'conversation_id': conv['conversation_id'],
158
+ 'recipient_id': sender_id,
159
+ 'page_id': page_id,
160
+ 'originaltext': message_text,
161
+ 'originalcommand': command,
162
+ 'originalcontent': remaining_text,
163
+ 'originalattachments': attachments,
164
+ 'originalvehicle': ','.join(keywords),
165
+ 'originalaction': hanh_vi_vi_pham,
166
+ 'originalpurpose': muc_dich,
167
+ 'timestamp': self.flatten_timestamp(conv['timestamp']),
168
+ 'isdone': False
169
+ }
170
+ for key, value in update_kwargs.items():
171
+ if value not in (None, "", []) and conv.get(key) in (None, "", []):
172
+ conv[key] = value
173
+ logger.info(f"[DEBUG] Message history update cuối cùng: {conv}")
174
+ # 5. Xử lý logic nghiệp vụ
175
+ response = await self.process_business_logic(conv, page_token)
176
+ logger.info(f"[DEBUG] Message history sau khi process: {conv}")
177
+ # 6. Gửi response và cập nhật final state
178
+ await self.channel.facebook.send_message(page_token, sender_id, response)
179
+ if hasattr(self.channel, 'sheets'):
180
+ await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
181
+ return
182
+
183
+ def flatten_timestamp(self, ts):
184
+ flat = []
185
+ for t in ts:
186
+ if isinstance(t, list):
187
+ flat.extend(self.flatten_timestamp(t))
188
+ else:
189
+ flat.append(t)
190
+ return flat
191
+
192
+ def normalize_vehicle_keyword(self, keyword: str) -> str:
193
+ from app.constants import VEHICLE_KEYWORDS
194
+ import difflib
195
+ if not keyword:
196
+ return ""
197
+ matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6)
198
+ if matches:
199
+ for k in VEHICLE_KEYWORDS:
200
+ if k.lower() == matches[0]:
201
+ return k
202
+ return keyword
203
+
204
+ async def process_business_logic(self, log_kwargs: Dict[str, Any], page_token: str) -> str:
205
+ command = log_kwargs.get('originalcommand', '')
206
+ vehicle = log_kwargs.get('originalvehicle', '')
207
+ action = log_kwargs.get('originalaction', '')
208
+ message = log_kwargs.get('originaltext', '')
209
+ # Tách vehicle thành list keywords
210
+ keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()]
211
+ if not command:
212
+ if keywords:
213
+ # Có thông tin phương tiện
214
+ if action:
215
+ logger.info(f"[DEBUG] tạo embedding: {action}")
216
+ embedding = await self.channel.embedder.create_embedding(action)
217
+ logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})")
218
+ matches = self.channel.supabase.match_documents(
219
+ embedding,
220
+ vehicle_keywords=keywords,
221
+ user_question=action
222
+ )
223
+ logger.info(f"[DEBUG] matches: {matches}")
224
+ if matches:
225
+ response = await self.format_search_results(message, matches, page_token, log_kwargs['recipient_id'])
226
+ else:
227
+ response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp."
228
+ else:
229
+ logger.info(f"[DEBUG] Không có hành vi vi phạm: {message}")
230
+ response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn."
231
+ log_kwargs['isdone'] = True
232
+ else:
233
+ # Không có thông tin phương tiện
234
+ response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)"
235
+ log_kwargs['isdone'] = False
236
+ else:
237
+ # Có command
238
+ if command == "xong":
239
+ post_url = await self.create_facebook_post(page_token, log_kwargs['recipient_id'], [log_kwargs])
240
+ if post_url:
241
+ response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}"
242
+ else:
243
+ response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau."
244
+ log_kwargs['isdone'] = True
245
+ else:
246
+ response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất."
247
+ log_kwargs['isdone'] = False
248
+ return response
249
+
250
+ async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
251
+ if not matches:
252
+ return "Không tìm thấy kết quả phù hợp."
253
+ await self.channel.facebook.send_message(page_token, sender_id, f"Tôi tìm thấy một số quy định rồi .....")
254
+ try:
255
+ reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
256
+ if reranked:
257
+ matches = reranked
258
+ except Exception as e:
259
+ logger.error(f"[RERANK] Lỗi khi rerank: {e}")
260
+ top = None
261
+ top_result_text = ""
262
+ full_result_text = ""
263
+ def arr_to_str(arr, sep=", "):
264
+ if not arr:
265
+ return ""
266
+ if isinstance(arr, list):
267
+ return sep.join([str(x) for x in arr if x not in (None, "")])
268
+ return str(arr)
269
+ for i, match in enumerate(matches, 1):
270
+ if not top or (match.get('similarity', 0) > top.get('similarity', 0)):
271
+ top = match
272
+ full_result_text += f"\n{match.get('structure', '').strip()}:\n"
273
+ tieude = (match.get('tieude') or '').strip()
274
+ noidung = (match.get('noidung') or '').strip()
275
+ hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
276
+ full_result_text += f"Thực hiện hành vi:\n{hanhvi}"
277
+ canhantu = arr_to_str(match.get('canhantu'))
278
+ canhanden = arr_to_str(match.get('canhanden'))
279
+ if canhantu or canhanden:
280
+ full_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
281
+ tochuctu = arr_to_str(match.get('tochuctu'))
282
+ tochucden = arr_to_str(match.get('tochucden'))
283
+ if tochuctu or tochucden:
284
+ full_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
285
+ hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
286
+ if hpbsnoidung:
287
+ full_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
288
+ bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
289
+ if bpkpnoidung:
290
+ full_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
291
+ if top and (top.get('tieude') or top.get('noidung')):
292
+ tieude = (top.get('tieude') or '').strip()
293
+ noidung = (top.get('noidung') or '').strip()
294
+ hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
295
+ top_result_text += f"Thực hiện hành vi:\n{hanhvi}"
296
+ canhantu = arr_to_str(top.get('canhantu'))
297
+ canhanden = arr_to_str(top.get('canhanden'))
298
+ if canhantu or canhanden:
299
+ top_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
300
+ tochuctu = arr_to_str(top.get('tochuctu'))
301
+ tochucden = arr_to_str(top.get('tochucden'))
302
+ if tochuctu or tochucden:
303
+ top_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
304
+ hpbsnoidung = arr_to_str(top.get('hpbsnoidung'), sep="; ")
305
+ if hpbsnoidung:
306
+ top_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
307
+ bpkpnoidung = arr_to_str(top.get('bpkpnoidung'), sep="; ")
308
+ if bpkpnoidung:
309
+ top_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
310
+ else:
311
+ result_text = "Không có kết quả phù hợp!"
312
+ prompt = (
313
+ "Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên các đoạn luật sau. "
314
+ "Chỉ sử dụng thông tin có trong các đoạn, không tự đoán.\n"
315
+ f"\nCác đoạn luật liên quan:\n{full_result_text}"
316
+ "\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
317
+ f"\n\nCâu hỏi của người dùng: {question}\n"
318
+ )
319
+ await self.channel.facebook.send_message(page_token, sender_id, f"Được rồi, để tôi tóm tắt lại nhé .....")
320
+ try:
321
+ answer = await self.channel.llm.generate_text(prompt)
322
+ if answer and answer.strip():
323
+ logger.error(f"LLM trả về câu trả lời: \n\tanswer: {answer}")
324
+ return answer.strip()
325
+ else:
326
+ logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}")
327
+ except Exception as e:
328
+ logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}")
329
+ fallback = "Tóm tắt các đoạn luật liên quan:\n\n"
330
+ for i, match in enumerate(matches, 1):
331
+ fallback += f"Đoạn {i}:\n"
332
+ tieude = (match.get('tieude') or '').strip()
333
+ noidung = (match.get('noidung') or '').strip()
334
+ if tieude or noidung:
335
+ fallback += f" - Hành vi: {(tieude + ' ' + noidung).strip()}\n"
336
+ canhantu = arr_to_str(match.get('canhantu'))
337
+ canhanden = arr_to_str(match.get('canhanden'))
338
+ if canhantu or canhanden:
339
+ fallback += f" - Cá nhân bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ\n"
340
+ tochuctu = arr_to_str(match.get('tochuctu'))
341
+ tochucden = arr_to_str(match.get('tochucden'))
342
+ if tochuctu or tochucden:
343
+ fallback += f" - Tổ chức bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ\n"
344
+ hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
345
+ if hpbsnoidung:
346
+ fallback += f" - Hình phạt bổ sung: {hpbsnoidung}\n"
347
+ bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
348
+ if bpkpnoidung:
349
+ fallback += f" - Biện pháp khắc phục hậu quả: {bpkpnoidung}\n"
350
+ fallback += "\n"
351
+ return fallback.strip()
352
+
353
+ async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
354
+ logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
355
+ return "https://facebook.com/mock_post_url"