FBChatBot / app /sheets.py
VietCat's picture
refactor llm/embedding flow
87b2780
from typing import Any, Dict, List, Optional
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
import os
import pickle
from datetime import datetime
from loguru import logger
import json
import hashlib
from google.oauth2.service_account import Credentials
from .utils import timing_decorator_sync
from .constants import SHEET_RANGE
SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
def generate_conversation_id(user_id: str, page_id: str, timestamp: str) -> str:
"""
Tạo conversation_id duy nhất dựa trên user_id, page_id và timestamp.
"""
hash_input = f"{user_id}:{page_id}:{timestamp}"
return hashlib.sha256(hash_input.encode()).hexdigest()[:32]
class SheetsClient:
def __init__(self, credentials_file: str, token_file: str, sheet_id: str):
"""
Khởi tạo SheetsClient với thông tin xác thực và sheet_id.
Input: credentials_file (str), token_file (str), sheet_id (str)
Output: SheetsClient instance.
"""
self.credentials_file = credentials_file
self.token_file = token_file
self.sheet_id = sheet_id
self.creds = None
self.service = None
@timing_decorator_sync
def authenticate(self) -> None:
"""
Xác thực với Google Sheets API, tạo self.service.
Đọc credentials từ biến môi trường GOOGLE_SHEETS_CREDENTIALS_JSON nếu có, nếu không thì dùng file.
Input: None
Output: None (raise exception nếu lỗi)
"""
credentials_json = os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON")
if credentials_json:
info = json.loads(credentials_json)
creds = Credentials.from_service_account_info(info, scopes=SCOPES)
self.creds = creds
else:
if os.path.exists(self.token_file):
with open(self.token_file, 'rb') as token:
self.creds = pickle.load(token)
if not self.creds or not self.creds.valid:
if self.creds and self.creds.expired:
self.creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_file, SCOPES)
self.creds = flow.run_local_server(port=0)
with open(self.token_file, 'wb') as token:
pickle.dump(self.creds, token)
self.service = build('sheets', 'v4', credentials=self.creds)
@timing_decorator_sync
def get_conversation_history(self, user_id: str, page_id: str) -> List[Dict[str, Any]]:
"""
Lấy lịch sử hội thoại chưa hoàn thành của user từ Google Sheets.
Input: user_id (str), page_id (str)
Output: list[dict] các dòng hội thoại chưa hoàn thành.
"""
try:
if not self.service:
self.authenticate()
if not self.service:
raise RuntimeError("Google Sheets service not initialized")
range_name = SHEET_RANGE
result = self.service.spreadsheets().values().get(
spreadsheetId=self.sheet_id,
range=range_name
).execute()
values = result.get('values', [])
history = []
for row in values:
row = row + [""] * (12 - len(row))
try:
timestamps = json.loads(row[10]) if row[10] else []
except Exception:
timestamps = []
if not isinstance(timestamps, list):
timestamps = [timestamps]
if row[4] == user_id and row[5] == page_id and row[11].lower() == 'false':
history.append({
'conversation_id': row[0],
'originalcommand': row[1],
'originalcontent': row[2],
'originalattachments': json.loads(row[3]) if row[3] else [],
'recipient_id': row[4],
'page_id': row[5],
'originaltext': row[6],
'originalvehicle': row[7],
'originalaction': row[8],
'originalpurpose': row[9],
'timestamp': timestamps,
'isdone': row[11].lower() == 'true'
})
return history
except Exception as e:
logger.error(f"Error getting conversation history: {e}")
return []
@timing_decorator_sync
def log_conversation(
self,
conversation_id: str,
recipient_id: str,
page_id: str,
originaltext: str = "",
originalcommand: str = "",
originalcontent: str = "",
originalattachments: Optional[List[str]] = None,
originalvehicle: str = "",
originalaction: str = "",
originalpurpose: str = "",
timestamp: Any = None,
isdone: bool = False
) -> Optional[Dict[str, Any]]:
"""
Ghi log hội thoại vào Google Sheets.
Dùng các trường original* cho các cột tương ứng trong sheet và các logic liên quan.
"""
try:
if not self.service:
self.authenticate()
if not self.service:
raise RuntimeError("Google Sheets service not initialized")
# Get existing data to check for duplicates
result = self.service.spreadsheets().values().get(
spreadsheetId=self.sheet_id,
range=SHEET_RANGE
).execute()
values = result.get('values', [])
# logger.info(f"[DEBUG] Gsheet values {values}")
ts = datetime.now().isoformat()
# Đảm bảo timestamp luôn là list
if timestamp is None:
timestamp = []
elif not isinstance(timestamp, list):
timestamp = [timestamp]
if not conversation_id:
# Check for duplicates before creating new conversation
for row in values:
if len(row) >= 11:
try:
row_timestamps = json.loads(row[10]) if row[10] else []
except Exception:
row_timestamps = []
if not isinstance(row_timestamps, list):
row_timestamps = [row_timestamps]
row_recipient_id = row[4]
row_page_id = row[5]
if (str(timestamp) in [str(ts) for ts in row_timestamps] and str(row_recipient_id) == str(recipient_id) and str(row_page_id) == str(page_id)):
# Found duplicate, return existing conversation
logger.info(f"Found duplicate conversation for user {recipient_id}, page {page_id}, timestamp {timestamp}")
return {
'conversation_id': row[0],
'originalcommand': row[1],
'originalcontent': row[2],
'originalattachments': json.loads(row[3]) if row[3] else [],
'recipient_id': row[4],
'page_id': row[5],
'originaltext': row[6],
'originalvehicle': row[7],
'originalaction': row[8],
'originalpurpose': row[9],
'timestamp': row_timestamps,
'isdone': row[11].lower() == 'true' if len(row) > 11 else False
}
# No duplicate found, create new conversation
conversation_id = generate_conversation_id(recipient_id, page_id, ts)
new_row = [
conversation_id,
originalcommand,
originalcontent,
json.dumps(originalattachments or []),
recipient_id,
page_id,
originaltext,
originalvehicle,
originalaction,
originalpurpose,
json.dumps(timestamp),
str(isdone).lower()
]
body = {
'values': [new_row]
}
range_name = SHEET_RANGE
self.service.spreadsheets().values().append(
spreadsheetId=self.sheet_id,
range=range_name,
valueInputOption='RAW',
body=body
).execute()
logger.info(f"Thêm mới conversation: {conversation_id} | Giá trị: {dict(zip(['conversation_id','originalcommand','originalcontent','originalattachments','recipient_id','page_id','originaltext','originalvehicle','originalaction','originalpurpose','timestamp','isdone'], new_row))}")
# Return the conversation data directly
return {
'conversation_id': conversation_id,
'originalcommand': originalcommand,
'originalcontent': originalcontent,
'originalattachments': originalattachments or [],
'recipient_id': recipient_id,
'page_id': page_id,
'originaltext': originaltext,
'originalvehicle': originalvehicle,
'originalaction': originalaction,
'originalpurpose': originalpurpose,
'timestamp': timestamp,
'isdone': isdone
}
else:
# Update existing conversation
if not values:
logger.error("No data in sheet, cannot update conversation.")
return None
row_index = None
for i, row in enumerate(values):
if row[0] == conversation_id:
row_index = i
break
logger.info(f"[DEBUG] Gsheet row index {row_index}")
if row_index is not None:
sheet_row_number = row_index + 2 # +2 vì values[0] là dòng 2 trên sheet
current_row = values[row_index]
logger.info(f"[DEBUG] Gsheet current row {current_row}")
while len(current_row) < 13:
current_row.append("")
try:
current_timestamps = json.loads(current_row[10]) if current_row[10] else []
except Exception:
current_timestamps = []
if not isinstance(current_timestamps, list):
current_timestamps = [current_timestamps]
# Chỉ append nếu chưa có
for ts in timestamp:
if ts not in current_timestamps:
current_timestamps.append(ts)
new_row = [
conversation_id,
originalcommand if originalcommand else current_row[1],
originalcontent if originalcontent else current_row[2],
json.dumps(originalattachments) if originalattachments is not None else current_row[3],
recipient_id if recipient_id else current_row[4],
page_id if page_id else current_row[5],
originaltext if originaltext else current_row[6],
originalvehicle if originalvehicle else current_row[7],
originalaction if originalaction else current_row[8],
originalpurpose if originalpurpose else current_row[9],
json.dumps(current_timestamps),
str(isdone).lower() if isdone is not None else current_row[11]
]
update_range = f"{SHEET_RANGE.split('!')[0]}!A{sheet_row_number}"
logger.info(f"[DEBUG] Gsheet update range {update_range}")
body = {
'values': [new_row]
}
self.service.spreadsheets().values().update(
spreadsheetId=self.sheet_id,
range=update_range,
valueInputOption='RAW',
body=body
).execute()
changed_cols = ['conversation_id','originalcommand','originalcontent','originalattachments','recipient_id','page_id','originaltext','originalvehicle','originalaction','originalpurpose','timestamp','isdone']
for idx, (old, new) in enumerate(zip(current_row, new_row)):
if old != new:
changed_cols.append(changed_cols[idx])
logger.info(f"Cập nhật conversation: {conversation_id} tại dòng {sheet_row_number} | Cột cập nhật: {changed_cols} | Giá trị mới: {dict(zip(changed_cols, new_row))}")
# Return the updated conversation data
return {
'conversation_id': conversation_id,
'originalcommand': new_row[1],
'originalcontent': new_row[2],
'originalattachments': json.loads(new_row[3]) if new_row[3] else [],
'recipient_id': new_row[4],
'page_id': new_row[5],
'originaltext': new_row[6],
'originalvehicle': new_row[7],
'originalaction': new_row[8],
'originalpurpose': new_row[9],
'timestamp': current_timestamps,
'isdone': new_row[11].lower() == 'true'
}
return None
except Exception as e:
logger.error(f"Error logging conversation: {e}")
return None