from typing import Any, Dict, List, Optional from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from googleapiclient.discovery import build import os import pickle from datetime import datetime from loguru import logger import json import hashlib from google.oauth2.service_account import Credentials from .utils import timing_decorator_sync from .constants import SHEET_RANGE SCOPES = ['https://www.googleapis.com/auth/spreadsheets'] def generate_conversation_id(user_id: str, page_id: str, timestamp: str) -> str: """ Tạo conversation_id duy nhất dựa trên user_id, page_id và timestamp. """ hash_input = f"{user_id}:{page_id}:{timestamp}" return hashlib.sha256(hash_input.encode()).hexdigest()[:32] class SheetsClient: def __init__(self, credentials_file: str, token_file: str, sheet_id: str): """ Khởi tạo SheetsClient với thông tin xác thực và sheet_id. Input: credentials_file (str), token_file (str), sheet_id (str) Output: SheetsClient instance. """ self.credentials_file = credentials_file self.token_file = token_file self.sheet_id = sheet_id self.creds = None self.service = None @timing_decorator_sync def authenticate(self) -> None: """ Xác thực với Google Sheets API, tạo self.service. Đọc credentials từ biến môi trường GOOGLE_SHEETS_CREDENTIALS_JSON nếu có, nếu không thì dùng file. Input: None Output: None (raise exception nếu lỗi) """ credentials_json = os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON") if credentials_json: info = json.loads(credentials_json) creds = Credentials.from_service_account_info(info, scopes=SCOPES) self.creds = creds else: if os.path.exists(self.token_file): with open(self.token_file, 'rb') as token: self.creds = pickle.load(token) if not self.creds or not self.creds.valid: if self.creds and self.creds.expired: self.creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( self.credentials_file, SCOPES) self.creds = flow.run_local_server(port=0) with open(self.token_file, 'wb') as token: pickle.dump(self.creds, token) self.service = build('sheets', 'v4', credentials=self.creds) @timing_decorator_sync def get_conversation_history(self, user_id: str, page_id: str) -> List[Dict[str, Any]]: """ Lấy lịch sử hội thoại chưa hoàn thành của user từ Google Sheets. Input: user_id (str), page_id (str) Output: list[dict] các dòng hội thoại chưa hoàn thành. """ try: if not self.service: self.authenticate() if not self.service: raise RuntimeError("Google Sheets service not initialized") range_name = SHEET_RANGE result = self.service.spreadsheets().values().get( spreadsheetId=self.sheet_id, range=range_name ).execute() values = result.get('values', []) history = [] for row in values: row = row + [""] * (12 - len(row)) try: timestamps = json.loads(row[10]) if row[10] else [] except Exception: timestamps = [] if not isinstance(timestamps, list): timestamps = [timestamps] if row[4] == user_id and row[5] == page_id and row[11].lower() == 'false': history.append({ 'conversation_id': row[0], 'originalcommand': row[1], 'originalcontent': row[2], 'originalattachments': json.loads(row[3]) if row[3] else [], 'recipient_id': row[4], 'page_id': row[5], 'originaltext': row[6], 'originalvehicle': row[7], 'originalaction': row[8], 'originalpurpose': row[9], 'timestamp': timestamps, 'isdone': row[11].lower() == 'true' }) return history except Exception as e: logger.error(f"Error getting conversation history: {e}") return [] @timing_decorator_sync def log_conversation( self, conversation_id: str, recipient_id: str, page_id: str, originaltext: str = "", originalcommand: str = "", originalcontent: str = "", originalattachments: Optional[List[str]] = None, originalvehicle: str = "", originalaction: str = "", originalpurpose: str = "", timestamp: Any = None, isdone: bool = False ) -> Optional[Dict[str, Any]]: """ Ghi log hội thoại vào Google Sheets. Dùng các trường original* cho các cột tương ứng trong sheet và các logic liên quan. """ try: if not self.service: self.authenticate() if not self.service: raise RuntimeError("Google Sheets service not initialized") # Get existing data to check for duplicates result = self.service.spreadsheets().values().get( spreadsheetId=self.sheet_id, range=SHEET_RANGE ).execute() values = result.get('values', []) # logger.info(f"[DEBUG] Gsheet values {values}") ts = datetime.now().isoformat() # Đảm bảo timestamp luôn là list if timestamp is None: timestamp = [] elif not isinstance(timestamp, list): timestamp = [timestamp] if not conversation_id: # Check for duplicates before creating new conversation for row in values: if len(row) >= 11: try: row_timestamps = json.loads(row[10]) if row[10] else [] except Exception: row_timestamps = [] if not isinstance(row_timestamps, list): row_timestamps = [row_timestamps] row_recipient_id = row[4] row_page_id = row[5] if (str(timestamp) in [str(ts) for ts in row_timestamps] and str(row_recipient_id) == str(recipient_id) and str(row_page_id) == str(page_id)): # Found duplicate, return existing conversation logger.info(f"Found duplicate conversation for user {recipient_id}, page {page_id}, timestamp {timestamp}") return { 'conversation_id': row[0], 'originalcommand': row[1], 'originalcontent': row[2], 'originalattachments': json.loads(row[3]) if row[3] else [], 'recipient_id': row[4], 'page_id': row[5], 'originaltext': row[6], 'originalvehicle': row[7], 'originalaction': row[8], 'originalpurpose': row[9], 'timestamp': row_timestamps, 'isdone': row[11].lower() == 'true' if len(row) > 11 else False } # No duplicate found, create new conversation conversation_id = generate_conversation_id(recipient_id, page_id, ts) new_row = [ conversation_id, originalcommand, originalcontent, json.dumps(originalattachments or []), recipient_id, page_id, originaltext, originalvehicle, originalaction, originalpurpose, json.dumps(timestamp), str(isdone).lower() ] body = { 'values': [new_row] } range_name = SHEET_RANGE self.service.spreadsheets().values().append( spreadsheetId=self.sheet_id, range=range_name, valueInputOption='RAW', body=body ).execute() logger.info(f"Thêm mới conversation: {conversation_id} | Giá trị: {dict(zip(['conversation_id','originalcommand','originalcontent','originalattachments','recipient_id','page_id','originaltext','originalvehicle','originalaction','originalpurpose','timestamp','isdone'], new_row))}") # Return the conversation data directly return { 'conversation_id': conversation_id, 'originalcommand': originalcommand, 'originalcontent': originalcontent, 'originalattachments': originalattachments or [], 'recipient_id': recipient_id, 'page_id': page_id, 'originaltext': originaltext, 'originalvehicle': originalvehicle, 'originalaction': originalaction, 'originalpurpose': originalpurpose, 'timestamp': timestamp, 'isdone': isdone } else: # Update existing conversation if not values: logger.error("No data in sheet, cannot update conversation.") return None row_index = None for i, row in enumerate(values): if row[0] == conversation_id: row_index = i break logger.info(f"[DEBUG] Gsheet row index {row_index}") if row_index is not None: sheet_row_number = row_index + 2 # +2 vì values[0] là dòng 2 trên sheet current_row = values[row_index] logger.info(f"[DEBUG] Gsheet current row {current_row}") while len(current_row) < 13: current_row.append("") try: current_timestamps = json.loads(current_row[10]) if current_row[10] else [] except Exception: current_timestamps = [] if not isinstance(current_timestamps, list): current_timestamps = [current_timestamps] # Chỉ append nếu chưa có for ts in timestamp: if ts not in current_timestamps: current_timestamps.append(ts) new_row = [ conversation_id, originalcommand if originalcommand else current_row[1], originalcontent if originalcontent else current_row[2], json.dumps(originalattachments) if originalattachments is not None else current_row[3], recipient_id if recipient_id else current_row[4], page_id if page_id else current_row[5], originaltext if originaltext else current_row[6], originalvehicle if originalvehicle else current_row[7], originalaction if originalaction else current_row[8], originalpurpose if originalpurpose else current_row[9], json.dumps(current_timestamps), str(isdone).lower() if isdone is not None else current_row[11] ] update_range = f"{SHEET_RANGE.split('!')[0]}!A{sheet_row_number}" logger.info(f"[DEBUG] Gsheet update range {update_range}") body = { 'values': [new_row] } self.service.spreadsheets().values().update( spreadsheetId=self.sheet_id, range=update_range, valueInputOption='RAW', body=body ).execute() changed_cols = ['conversation_id','originalcommand','originalcontent','originalattachments','recipient_id','page_id','originaltext','originalvehicle','originalaction','originalpurpose','timestamp','isdone'] for idx, (old, new) in enumerate(zip(current_row, new_row)): if old != new: changed_cols.append(changed_cols[idx]) logger.info(f"Cập nhật conversation: {conversation_id} tại dòng {sheet_row_number} | Cột cập nhật: {changed_cols} | Giá trị mới: {dict(zip(changed_cols, new_row))}") # Return the updated conversation data return { 'conversation_id': conversation_id, 'originalcommand': new_row[1], 'originalcontent': new_row[2], 'originalattachments': json.loads(new_row[3]) if new_row[3] else [], 'recipient_id': new_row[4], 'page_id': new_row[5], 'originaltext': new_row[6], 'originalvehicle': new_row[7], 'originalaction': new_row[8], 'originalpurpose': new_row[9], 'timestamp': current_timestamps, 'isdone': new_row[11].lower() == 'true' } return None except Exception as e: logger.error(f"Error logging conversation: {e}") return None