fix racing issues when sending message
Browse files- app/chat_channel.py +8 -0
- app/message_processor.py +6 -11
app/chat_channel.py
CHANGED
@@ -37,6 +37,14 @@ class ChatChannel:
|
|
37 |
def invalidate_page_token(self):
|
38 |
self.page_token = None
|
39 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
40 |
def get_or_create_conversation(self, sender_id: str):
|
41 |
if sender_id not in self.conversations:
|
42 |
self.conversations[sender_id] = MessageProcessor(self, sender_id)
|
|
|
37 |
def invalidate_page_token(self):
|
38 |
self.page_token = None
|
39 |
|
40 |
+
def get_sheets_client(self):
|
41 |
+
settings = get_settings()
|
42 |
+
return SheetsClient(
|
43 |
+
settings.google_sheets_credentials_file,
|
44 |
+
settings.google_sheets_token_file,
|
45 |
+
settings.conversation_sheet_id
|
46 |
+
)
|
47 |
+
|
48 |
def get_or_create_conversation(self, sender_id: str):
|
49 |
if sender_id not in self.conversations:
|
50 |
self.conversations[sender_id] = MessageProcessor(self, sender_id)
|
app/message_processor.py
CHANGED
@@ -44,12 +44,11 @@ class MessageProcessor:
|
|
44 |
|
45 |
# Get conversation history (run in thread pool)
|
46 |
loop = asyncio.get_event_loop()
|
47 |
-
sheets_client = self.channel.
|
48 |
history = []
|
49 |
-
|
50 |
-
|
51 |
-
|
52 |
-
)
|
53 |
logger.info(f"[DEBUG] history: {history}")#
|
54 |
|
55 |
log_kwargs = {
|
@@ -99,10 +98,7 @@ class MessageProcessor:
|
|
99 |
}
|
100 |
else:
|
101 |
# 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
|
102 |
-
|
103 |
-
conv = await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**log_kwargs))
|
104 |
-
else:
|
105 |
-
conv = log_kwargs.copy()
|
106 |
if not conv:
|
107 |
logger.error("Không thể tạo conversation mới!")
|
108 |
return
|
@@ -116,8 +112,7 @@ class MessageProcessor:
|
|
116 |
if timestamp not in conv['timestamp']:
|
117 |
conv['timestamp'].append(timestamp)
|
118 |
logger.info(f"[DEBUG] Message history sau update: {conv}")
|
119 |
-
|
120 |
-
await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
|
121 |
|
122 |
# Get page access token (cache)
|
123 |
page_token = self.channel.get_page_token()
|
|
|
44 |
|
45 |
# Get conversation history (run in thread pool)
|
46 |
loop = asyncio.get_event_loop()
|
47 |
+
sheets_client = self.channel.get_sheets_client()
|
48 |
history = []
|
49 |
+
history = await loop.run_in_executor(
|
50 |
+
None, lambda: sheets_client.get_conversation_history(sender_id, page_id)
|
51 |
+
)
|
|
|
52 |
logger.info(f"[DEBUG] history: {history}")#
|
53 |
|
54 |
log_kwargs = {
|
|
|
98 |
}
|
99 |
else:
|
100 |
# 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
|
101 |
+
conv = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**log_kwargs))
|
|
|
|
|
|
|
102 |
if not conv:
|
103 |
logger.error("Không thể tạo conversation mới!")
|
104 |
return
|
|
|
112 |
if timestamp not in conv['timestamp']:
|
113 |
conv['timestamp'].append(timestamp)
|
114 |
logger.info(f"[DEBUG] Message history sau update: {conv}")
|
115 |
+
await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
|
|
|
116 |
|
117 |
# Get page access token (cache)
|
118 |
page_token = self.channel.get_page_token()
|