VietCat commited on
Commit
96f79c9
·
1 Parent(s): f6cdf9d

update messenger instance

Browse files
app/chat_channel.py CHANGED
@@ -5,6 +5,7 @@ from .embedding import EmbeddingClient
5
  from .facebook import FacebookClient
6
  from .config import get_settings
7
  from app.message_processor import MessageProcessor
 
8
 
9
  class ChatChannel:
10
  def __init__(self, page_id: str, channel_type: str = "facebook"):
@@ -18,9 +19,9 @@ class ChatChannel:
18
  base_url=settings.gemini_base_url,
19
  model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
20
  )
21
- self.reranker = Reranker()
 
22
  self.embedder = EmbeddingClient()
23
- self.facebook = FacebookClient(settings.facebook_app_secret)
24
  self.message_processor = MessageProcessor(self)
25
 
26
  def get_page_token(self, force_refresh=False):
 
5
  from .facebook import FacebookClient
6
  from .config import get_settings
7
  from app.message_processor import MessageProcessor
8
+ from typing import Optional
9
 
10
  class ChatChannel:
11
  def __init__(self, page_id: str, channel_type: str = "facebook"):
 
19
  base_url=settings.gemini_base_url,
20
  model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
21
  )
22
+ self.facebook = FacebookClient(settings.facebook_app_secret, page_id=None, page_token=None, sender_id=None)
23
+ self.reranker = Reranker(facebook_client=self.facebook)
24
  self.embedder = EmbeddingClient()
 
25
  self.message_processor = MessageProcessor(self)
26
 
27
  def get_page_token(self, force_refresh=False):
app/facebook.py CHANGED
@@ -9,7 +9,7 @@ from loguru import logger
9
  from .utils import timing_decorator_async, timing_decorator_sync
10
 
11
  class FacebookClient:
12
- def __init__(self, app_secret: str):
13
  """
14
  Khởi tạo FacebookClient với app_secret.
15
  Input: app_secret (str) - Facebook App Secret.
@@ -17,6 +17,22 @@ class FacebookClient:
17
  """
18
  self.app_secret = app_secret
19
  self._client = httpx.AsyncClient()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
  @timing_decorator_async
22
  async def verify_webhook(self, token: str, challenge: str, verify_token: str) -> int:
@@ -48,21 +64,17 @@ class FacebookClient:
48
  return hmac.compare_digest(signature[7:], expected)
49
 
50
  @timing_decorator_async
51
- async def send_message(self, page_access_token: str, recipient_id: str, message: str) -> Dict[str, Any]:
52
- """
53
- Gửi message tới user qua Facebook Messenger API.
54
- Input: page_access_token (str), recipient_id (str), message (str)
55
- Output: dict (response từ Facebook API)
56
- """
57
-
58
  logger.debug(f"Đang gửi tin nhắn đến Facebook Messenger....\n\t{message}")
59
  url = f"https://graph.facebook.com/v18.0/me/messages?access_token={page_access_token}"
60
-
61
  payload = {
62
  "recipient": {"id": recipient_id},
63
  "message": {"text": message}
64
  }
65
-
66
  try:
67
  response = await self._client.post(url, json=payload)
68
  response.raise_for_status()
 
9
  from .utils import timing_decorator_async, timing_decorator_sync
10
 
11
  class FacebookClient:
12
+ def __init__(self, app_secret: str, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None):
13
  """
14
  Khởi tạo FacebookClient với app_secret.
15
  Input: app_secret (str) - Facebook App Secret.
 
17
  """
18
  self.app_secret = app_secret
19
  self._client = httpx.AsyncClient()
20
+ self.page_id = page_id
21
+ self.page_token = page_token
22
+ self.sender_id = sender_id
23
+
24
+ def update_context(self, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None):
25
+ """
26
+ Cập nhật các thông tin context (page_id, page_token, sender_id) của client.
27
+ Input: page_id (str), page_token (str), sender_id (str)
28
+ Output: None
29
+ """
30
+ if page_id is not None:
31
+ self.page_id = page_id
32
+ if page_token is not None:
33
+ self.page_token = page_token
34
+ if sender_id is not None:
35
+ self.sender_id = sender_id
36
 
37
  @timing_decorator_async
38
  async def verify_webhook(self, token: str, challenge: str, verify_token: str) -> int:
 
64
  return hmac.compare_digest(signature[7:], expected)
65
 
66
  @timing_decorator_async
67
+ async def send_message(self, page_access_token: Optional[str] = None, recipient_id: Optional[str] = None, message: str = "") -> Dict[str, Any]:
68
+ page_access_token = page_access_token or self.page_token
69
+ recipient_id = recipient_id or self.sender_id
70
+ if not page_access_token or not recipient_id:
71
+ raise ValueError("FacebookClient: page_access_token and recipient_id must not be None when sending a message.")
 
 
72
  logger.debug(f"Đang gửi tin nhắn đến Facebook Messenger....\n\t{message}")
73
  url = f"https://graph.facebook.com/v18.0/me/messages?access_token={page_access_token}"
 
74
  payload = {
75
  "recipient": {"id": recipient_id},
76
  "message": {"text": message}
77
  }
 
78
  try:
79
  response = await self._client.post(url, json=payload)
80
  response.raise_for_status()
app/message_processor.py CHANGED
@@ -109,6 +109,7 @@ class MessageProcessor:
109
 
110
  # Get page access token (cache)
111
  page_token = self.channel.get_page_token()
 
112
  if page_token:
113
  logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}")
114
  else:
@@ -117,13 +118,14 @@ class MessageProcessor:
117
  return
118
  # Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
119
  try:
120
- await self.channel.facebook.send_message(page_token, message_data['sender_id'], "Ok, để mình check. Bạn chờ mình chút xíu nhé!")
121
  except Exception as e:
122
  if "expired" in str(e).lower():
123
  logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
124
  self.channel.invalidate_page_token()
125
  page_token = self.channel.get_page_token(force_refresh=True)
126
- await self.channel.facebook.send_message(page_token, message_data['sender_id'], "Ok, để mình check. Bạn chờ mình chút xíu nhé!")
 
127
  else:
128
  raise
129
 
@@ -151,7 +153,7 @@ class MessageProcessor:
151
  hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "")
152
  hanh_vi_vi_pham = hanh_vi_vi_pham.strip()
153
  logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}")
154
- await self.channel.facebook.send_message(page_token, sender_id, f"... đang tìm kiếm hành vi {hanh_vi_vi_pham} .....")
155
  # 4. Update lại conversation với thông tin đầy đủ
156
  update_kwargs = {
157
  'conversation_id': conv['conversation_id'],
@@ -175,7 +177,7 @@ class MessageProcessor:
175
  response = await self.process_business_logic(conv, page_token)
176
  logger.info(f"[DEBUG] Message history sau khi process: {conv}")
177
  # 6. Gửi response và cập nhật final state
178
- await self.channel.facebook.send_message(page_token, sender_id, response)
179
  if hasattr(self.channel, 'sheets'):
180
  await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
181
  return
@@ -250,7 +252,7 @@ class MessageProcessor:
250
  async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
251
  if not matches:
252
  return "Không tìm thấy kết quả phù hợp."
253
- await self.channel.facebook.send_message(page_token, sender_id, f"Tôi tìm thấy một số quy định rồi .....")
254
  try:
255
  reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
256
  if reranked:
@@ -316,7 +318,7 @@ class MessageProcessor:
316
  "\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
317
  f"\n\nCâu hỏi của người dùng: {question}\n"
318
  )
319
- await self.channel.facebook.send_message(page_token, sender_id, f"Được rồi, để tôi tóm tắt lại nhé .....")
320
  try:
321
  answer = await self.channel.llm.generate_text(prompt)
322
  if answer and answer.strip():
 
109
 
110
  # Get page access token (cache)
111
  page_token = self.channel.get_page_token()
112
+ self.channel.facebook.update_context(page_id=page_id, page_token=page_token, sender_id=sender_id)
113
  if page_token:
114
  logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}")
115
  else:
 
118
  return
119
  # Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
120
  try:
121
+ await self.channel.facebook.send_message(message="Ok, để mình check. Bạn chờ mình chút xíu nhé!")
122
  except Exception as e:
123
  if "expired" in str(e).lower():
124
  logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
125
  self.channel.invalidate_page_token()
126
  page_token = self.channel.get_page_token(force_refresh=True)
127
+ self.channel.facebook.update_context(page_id=page_id, page_token=page_token, sender_id=sender_id)
128
+ await self.channel.facebook.send_message(message="Ok, để mình check. Bạn chờ mình chút xíu nhé!")
129
  else:
130
  raise
131
 
 
153
  hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "")
154
  hanh_vi_vi_pham = hanh_vi_vi_pham.strip()
155
  logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}")
156
+ await self.channel.facebook.send_message(message=f"... đang tìm kiếm hành vi {hanh_vi_vi_pham} .....")
157
  # 4. Update lại conversation với thông tin đầy đủ
158
  update_kwargs = {
159
  'conversation_id': conv['conversation_id'],
 
177
  response = await self.process_business_logic(conv, page_token)
178
  logger.info(f"[DEBUG] Message history sau khi process: {conv}")
179
  # 6. Gửi response và cập nhật final state
180
+ await self.channel.facebook.send_message(message=response)
181
  if hasattr(self.channel, 'sheets'):
182
  await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
183
  return
 
252
  async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
253
  if not matches:
254
  return "Không tìm thấy kết quả phù hợp."
255
+ await self.channel.facebook.send_message(message=f"... đã tìm thấy một số quy định .....")
256
  try:
257
  reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
258
  if reranked:
 
318
  "\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
319
  f"\n\nCâu hỏi của người dùng: {question}\n"
320
  )
321
+ await self.channel.facebook.send_message(message=f"... Được rồi, để tôi tóm tắt lại nhé .....")
322
  try:
323
  answer = await self.channel.llm.generate_text(prompt)
324
  if answer and answer.strip():
app/reranker.py CHANGED
@@ -5,7 +5,7 @@ from loguru import logger
5
  import asyncio
6
 
7
  class Reranker:
8
- def __init__(self):
9
  settings = get_settings()
10
  self.provider = getattr(settings, 'rerank_provider', settings.llm_provider)
11
  self.model = getattr(settings, 'rerank_model', settings.llm_model)
@@ -17,6 +17,7 @@ class Reranker:
17
  # self.client = CohereClient(settings.cohere_api_key, model=self.model)
18
  else:
19
  raise NotImplementedError(f"Rerank provider {self.provider} not supported yet.")
 
20
 
21
  async def _score_doc(self, query: str, doc: Dict) -> Dict:
22
  """
@@ -84,6 +85,12 @@ class Reranker:
84
  scored.append(result)
85
 
86
  logger.info(f"[RERANK] Completed batch {i//batch_size + 1}, processed {len(scored)} docs so far")
 
 
 
 
 
 
87
 
88
  # Sort theo score và trả về top_k
89
  scored = sorted(scored, key=lambda x: x['rerank_score'], reverse=True)
 
5
  import asyncio
6
 
7
  class Reranker:
8
+ def __init__(self, facebook_client=None):
9
  settings = get_settings()
10
  self.provider = getattr(settings, 'rerank_provider', settings.llm_provider)
11
  self.model = getattr(settings, 'rerank_model', settings.llm_model)
 
17
  # self.client = CohereClient(settings.cohere_api_key, model=self.model)
18
  else:
19
  raise NotImplementedError(f"Rerank provider {self.provider} not supported yet.")
20
+ self.facebook_client = facebook_client
21
 
22
  async def _score_doc(self, query: str, doc: Dict) -> Dict:
23
  """
 
85
  scored.append(result)
86
 
87
  logger.info(f"[RERANK] Completed batch {i//batch_size + 1}, processed {len(scored)} docs so far")
88
+ # Send Facebook message after each batch
89
+ if self.facebook_client:
90
+ try:
91
+ await self.facebook_client.send_message(message=f"... tôi đang sắp xếp lại chút ...")
92
+ except Exception as e:
93
+ logger.error(f"[RERANK][FACEBOOK] Error sending batch message: {e}")
94
 
95
  # Sort theo score và trả về top_k
96
  scored = sorted(scored, key=lambda x: x['rerank_score'], reverse=True)