fix racing issues when sending message
Browse files- app/chat_channel.py +8 -4
- app/main.py +4 -5
- app/message_processor.py +22 -6
- app/reranker.py +3 -14
app/chat_channel.py
CHANGED
@@ -25,10 +25,9 @@ class ChatChannel:
|
|
25 |
base_url=settings.gemini_base_url,
|
26 |
model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
|
27 |
)
|
28 |
-
self.
|
29 |
-
self.reranker = Reranker(facebook_client=self.facebook)
|
30 |
self.embedder = EmbeddingClient()
|
31 |
-
self.
|
32 |
|
33 |
def get_page_token(self, force_refresh=False):
|
34 |
if self.page_token is None or force_refresh:
|
@@ -36,4 +35,9 @@ class ChatChannel:
|
|
36 |
return self.page_token
|
37 |
|
38 |
def invalidate_page_token(self):
|
39 |
-
self.page_token = None
|
|
|
|
|
|
|
|
|
|
|
|
25 |
base_url=settings.gemini_base_url,
|
26 |
model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
|
27 |
)
|
28 |
+
self.reranker = Reranker()
|
|
|
29 |
self.embedder = EmbeddingClient()
|
30 |
+
self.conversations = {} # sender_id -> MessageProcessor
|
31 |
|
32 |
def get_page_token(self, force_refresh=False):
|
33 |
if self.page_token is None or force_refresh:
|
|
|
35 |
return self.page_token
|
36 |
|
37 |
def invalidate_page_token(self):
|
38 |
+
self.page_token = None
|
39 |
+
|
40 |
+
def get_or_create_conversation(self, sender_id: str):
|
41 |
+
if sender_id not in self.conversations:
|
42 |
+
self.conversations[sender_id] = MessageProcessor(self, sender_id)
|
43 |
+
return self.conversations[sender_id]
|
app/main.py
CHANGED
@@ -181,13 +181,12 @@ async def webhook(request: Request):
|
|
181 |
if not message_data:
|
182 |
return {"status": "ok"}
|
183 |
|
184 |
-
# --- Refactor: Lấy page_id,
|
185 |
page_id = message_data.get("page_id")
|
186 |
-
|
187 |
-
# Lấy hoặc tạo ChatChannel
|
188 |
channel = channel_manager.get_or_create_channel("facebook", page_id)
|
189 |
-
|
190 |
-
await
|
191 |
return {"status": "ok"}
|
192 |
except Exception as e:
|
193 |
logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
|
|
|
181 |
if not message_data:
|
182 |
return {"status": "ok"}
|
183 |
|
184 |
+
# --- Refactor: Lấy page_id, sender_id, channel, conversation, gọi message_processor ---
|
185 |
page_id = message_data.get("page_id")
|
186 |
+
sender_id = message_data.get("sender_id")
|
|
|
187 |
channel = channel_manager.get_or_create_channel("facebook", page_id)
|
188 |
+
conversation = channel.get_or_create_conversation(sender_id)
|
189 |
+
await conversation.process_message(message_data)
|
190 |
return {"status": "ok"}
|
191 |
except Exception as e:
|
192 |
logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
|
app/message_processor.py
CHANGED
@@ -2,12 +2,21 @@ from typing import Dict, Any, List
|
|
2 |
import asyncio
|
3 |
import traceback
|
4 |
from loguru import logger
|
5 |
-
from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES
|
6 |
from .utils import get_random_message
|
|
|
7 |
|
8 |
class MessageProcessor:
|
9 |
-
def __init__(self, channel):
|
10 |
self.channel = channel
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
11 |
|
12 |
async def process_message(self, message_data: Dict[str, Any]):
|
13 |
# Refactor logic từ main.py vào đây
|
@@ -120,7 +129,7 @@ class MessageProcessor:
|
|
120 |
return
|
121 |
# Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
|
122 |
try:
|
123 |
-
await self.
|
124 |
except Exception as e:
|
125 |
if "expired" in str(e).lower():
|
126 |
logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
|
@@ -181,7 +190,7 @@ class MessageProcessor:
|
|
181 |
# 6. Gửi response và cập nhật final state
|
182 |
# Replace all occurrences of '**' with '*' before sending
|
183 |
response_to_send = response.replace('**', '*') if isinstance(response, str) else response
|
184 |
-
await self.
|
185 |
if hasattr(self.channel, 'sheets'):
|
186 |
await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
|
187 |
return
|
@@ -256,10 +265,17 @@ class MessageProcessor:
|
|
256 |
async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
|
257 |
if not matches:
|
258 |
return "Không tìm thấy kết quả phù hợp."
|
259 |
-
await self.
|
260 |
try:
|
261 |
reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
|
262 |
if reranked:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
263 |
matches = reranked
|
264 |
except Exception as e:
|
265 |
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
|
@@ -328,7 +344,7 @@ class MessageProcessor:
|
|
328 |
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
|
329 |
f"\n\nCâu hỏi của người dùng: {question}\n"
|
330 |
)
|
331 |
-
await self.
|
332 |
try:
|
333 |
answer = await self.channel.llm.generate_text(prompt)
|
334 |
if answer and answer.strip():
|
|
|
2 |
import asyncio
|
3 |
import traceback
|
4 |
from loguru import logger
|
5 |
+
from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES
|
6 |
from .utils import get_random_message
|
7 |
+
from .facebook import FacebookClient
|
8 |
|
9 |
class MessageProcessor:
|
10 |
+
def __init__(self, channel, sender_id):
|
11 |
self.channel = channel
|
12 |
+
self.sender_id = sender_id
|
13 |
+
# FacebookClient riêng cho từng conversation
|
14 |
+
self.facebook = FacebookClient(
|
15 |
+
app_secret=channel.llm.gemini_client.limit_manager.get_settings().facebook_app_secret,
|
16 |
+
page_id=channel.page_id,
|
17 |
+
page_token=channel.get_page_token(),
|
18 |
+
sender_id=sender_id
|
19 |
+
)
|
20 |
|
21 |
async def process_message(self, message_data: Dict[str, Any]):
|
22 |
# Refactor logic từ main.py vào đây
|
|
|
129 |
return
|
130 |
# Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
|
131 |
try:
|
132 |
+
await self.facebook.send_message(message=get_random_message(PROCESSING_STATUS_MESSAGES))
|
133 |
except Exception as e:
|
134 |
if "expired" in str(e).lower():
|
135 |
logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
|
|
|
190 |
# 6. Gửi response và cập nhật final state
|
191 |
# Replace all occurrences of '**' with '*' before sending
|
192 |
response_to_send = response.replace('**', '*') if isinstance(response, str) else response
|
193 |
+
await self.facebook.send_message(message=response_to_send)
|
194 |
if hasattr(self.channel, 'sheets'):
|
195 |
await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
|
196 |
return
|
|
|
265 |
async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
|
266 |
if not matches:
|
267 |
return "Không tìm thấy kết quả phù hợp."
|
268 |
+
await self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES))
|
269 |
try:
|
270 |
reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
|
271 |
if reranked:
|
272 |
+
# Gửi Facebook message sau khi hoàn thành
|
273 |
+
if self.facebook:
|
274 |
+
try:
|
275 |
+
message = get_random_message(BATCH_STATUS_MESSAGES)
|
276 |
+
await self.facebook.send_message(message=f"... {message} ...")
|
277 |
+
except Exception as e:
|
278 |
+
logger.error(f"[RERANK][FACEBOOK] Error sending batch message: {e}")
|
279 |
matches = reranked
|
280 |
except Exception as e:
|
281 |
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
|
|
|
344 |
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
|
345 |
f"\n\nCâu hỏi của người dùng: {question}\n"
|
346 |
)
|
347 |
+
await self.facebook.send_message(message=f"... {get_random_message(SUMMARY_STATUS_MESSAGES)} ...")
|
348 |
try:
|
349 |
answer = await self.channel.llm.generate_text(prompt)
|
350 |
if answer and answer.strip():
|
app/reranker.py
CHANGED
@@ -5,11 +5,11 @@ from loguru import logger
|
|
5 |
import asyncio
|
6 |
import hashlib
|
7 |
import time
|
8 |
-
from .constants import BATCH_STATUS_MESSAGES
|
9 |
-
from .utils import get_random_message
|
10 |
|
11 |
class Reranker:
|
12 |
-
def __init__(self
|
13 |
settings = get_settings()
|
14 |
self.provider = getattr(settings, 'rerank_provider', settings.llm_provider)
|
15 |
self.model = getattr(settings, 'rerank_model', settings.llm_model)
|
@@ -21,14 +21,11 @@ class Reranker:
|
|
21 |
# self.client = CohereClient(settings.cohere_api_key, model=self.model)
|
22 |
else:
|
23 |
raise NotImplementedError(f"Rerank provider {self.provider} not supported yet.")
|
24 |
-
self.facebook_client = facebook_client
|
25 |
-
|
26 |
# Cải thiện cache với TTL và quản lý memory
|
27 |
self._rerank_cache = {}
|
28 |
self._cache_ttl = 3600 # 1 giờ
|
29 |
self._max_cache_size = 200 # Tăng cache size
|
30 |
self._cache_timestamps = {}
|
31 |
-
|
32 |
# Sử dụng max_docs_to_rerank từ config
|
33 |
self.max_docs_to_rerank = settings.max_docs_to_rerank
|
34 |
|
@@ -244,14 +241,6 @@ class Reranker:
|
|
244 |
doc['rerank_score'] = 0
|
245 |
scored.append(doc)
|
246 |
|
247 |
-
# Gửi Facebook message sau khi hoàn thành
|
248 |
-
if self.facebook_client:
|
249 |
-
try:
|
250 |
-
message = get_random_message(BATCH_STATUS_MESSAGES)
|
251 |
-
await self.facebook_client.send_message(message=f"... {message} ...")
|
252 |
-
except Exception as e:
|
253 |
-
logger.error(f"[RERANK][FACEBOOK] Error sending batch message: {e}")
|
254 |
-
|
255 |
# Sort theo score và trả về top_k
|
256 |
scored = sorted(scored, key=lambda x: x['rerank_score'], reverse=True)
|
257 |
result = scored[:top_k]
|
|
|
5 |
import asyncio
|
6 |
import hashlib
|
7 |
import time
|
8 |
+
# from .constants import BATCH_STATUS_MESSAGES
|
9 |
+
# from .utils import get_random_message
|
10 |
|
11 |
class Reranker:
|
12 |
+
def __init__(self):
|
13 |
settings = get_settings()
|
14 |
self.provider = getattr(settings, 'rerank_provider', settings.llm_provider)
|
15 |
self.model = getattr(settings, 'rerank_model', settings.llm_model)
|
|
|
21 |
# self.client = CohereClient(settings.cohere_api_key, model=self.model)
|
22 |
else:
|
23 |
raise NotImplementedError(f"Rerank provider {self.provider} not supported yet.")
|
|
|
|
|
24 |
# Cải thiện cache với TTL và quản lý memory
|
25 |
self._rerank_cache = {}
|
26 |
self._cache_ttl = 3600 # 1 giờ
|
27 |
self._max_cache_size = 200 # Tăng cache size
|
28 |
self._cache_timestamps = {}
|
|
|
29 |
# Sử dụng max_docs_to_rerank từ config
|
30 |
self.max_docs_to_rerank = settings.max_docs_to_rerank
|
31 |
|
|
|
241 |
doc['rerank_score'] = 0
|
242 |
scored.append(doc)
|
243 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
244 |
# Sort theo score và trả về top_k
|
245 |
scored = sorted(scored, key=lambda x: x['rerank_score'], reverse=True)
|
246 |
result = scored[:top_k]
|