import datetime # Imports required for Google Sheets integration import gspread import random import time import functools from gspread.exceptions import SpreadsheetNotFound, APIError from oauth2client.service_account import ServiceAccountCredentials import pandas as pd import json import gradio as gr GSERVICE_ACCOUNT_INFO = { "type": "service_account", "project_id": "txagent", "private_key_id": "cc1a12e427917244a93faf6f19e72b589a685e65", "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDPEoNWIhiMdXA+\ncuwLgo06WUS5Jqe5dTAUJXZ6r5rLvSIqkuTt8xHQZJ1p5Itg8ywjONz/0R04jbHx\nalTlg0fgIu+6AU+oMb7HNZ9twG0O/A+M/NdGJKb8asj5jaEoSlWBT6lpSUvaZn+T\nulsyW147g0H5EtXBeh40zs7m2Q3kv1TzeCBuUlRRHx66EAeDwGwWiBXoxcanOINa\nrlZTFTGofhAGoz2Qm/L0hTIrjP/7DBg87Y5Z7eWbSjR944Ni5Fp0rfShuO+QZRrL\n+Cy9h11fKmuNfjWA3YlUfxmsERK2U867bXDGU8jJ3HTZ7/9D/kDunDJKF3xkJj+L\ng8mpsPGfAgMBAAECggEAOdQ7Po5GGc/gWWhh2HMMuutcQHL1q1r5Yt71gBzTl6uJ\nw6cDbRqRcofu2Dhd3mT7Ahkqyvyc8wLLW5bs/63SoFtRZLpiAyBlXZ/xlsaDDojB\nVQf1nN62jc7Ksrrlc2mTCIp1TvSLzQIMBfco6d7PacJl5cfnT2Gp1uicqqaadTOj\nLEr61ttO7eQ5g30hQvXnRwY5yXulKROOU5Zl6tESYRJZGAaV/KBKjnQgq3v+SV5o\n33q+g8IdKJtRqJIK66L04G916xOz0QhehToNHHC12K9XNCztNFI+CGnzZ3PKBZk9\naZUVVx3VGr7G9qqJzk4xmo6kY5rdlOjlKPqnls738QKBgQDw2lcfRMRqa1ag9wx6\nsez3oS+JPne/+NMGQn90V9seQb0Bp9jjYX9W4nICg5jtsZs9YKPsH0LhuVjPjQi0\nOLdZY0Ux9fJSeAB1GEffD3T5qUdJ5qCZn8QRuadBxkVm2mygIRsuiMSwxBiuwzed\no81aQ8/QlQqOUuKDDwhi3WjS0QKBgQDcGFDGnUZqDyQbxZCEI6toBlnKO9srJIWg\nIVsGsdRSzZzOXBwAjSS/ZEn1STywYPGKWE9lgP5hOtTn3oPGNeC2BXr+p4dvZVyl\nWAlsxgb/+8dI2cGQ+tYrhskozPyVtHSiUGf+8ghqVfuWLTDKSznccGYGPeIiNS1M\nxccATr05bwKBgQDc/EhBjVP2HIRAbkwJ62R0FHVMJH+1KPVd4feVZOLMER78/OcY\nQaWXr29R9TKErJe2KgxdIpW4C9p7nHhm+z7nChk77OCoYChzR5LyC/mU9IdPPAcQ\nzTEV3lSjGeslorVV+uo4uQ5W7aWD++P0hI1vC5cKVyV3Tn88JrfYFjQOcQKBgQCG\nlqfujIZenNurz+hLpbRPbHLD5E5l13OPNFaBhYUdDXbyCgllnOn3z9AaGqruAJoz\ny0TiATuNIXjIQZ27O38qT7eiubdsO0OoKGm7Bm2JY+G9fsuLaJhHDak9NfzPXwZj\nq1+s2zyiKeorL39CdTXwwxrgfj8mQ/ZrmBXU7lFwKQKBgQDUNzNXQlsLwuYqpKJr\nxYI6qo+3T4fRFsS02aqwvciHcg1b0iKb2sKnH1nwf/RAoCffRDic1J5i6BtsYmpa\nUMiXog8hgbeTALnQar+8Nq7vvpyORxmCCFY5bzxngy/T1GNdAuhaTv7n0iH3VXtk\nfpM1DKwTfLNCX6kQbLOoRR8j7w==\n-----END PRIVATE KEY-----\n", "client_email": "shanghua@txagent.iam.gserviceaccount.com", "client_id": "108950722202634464257", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/shanghua%40txagent.iam.gserviceaccount.com", "universe_domain": "googleapis.com" } # Optionally, get the sheet name from environment (or use a default) # GSHEET_NAME = os.environ.get("GSHEET_NAME", "Your Google Sheet Name") GSHEET_NAME = "TxAgent_data_collection" #Exponential backoff retry decorator def exponential_backoff_gspread(max_retries=30, max_backoff_sec=64, base_delay_sec=1, target_exception=APIError): """ Decorator to implement exponential backoff for gspread API calls. Retries a function call if it raises a specific exception (defaults to APIError) that matches the Google Sheets API rate limit error (HTTP 429). Args: max_retries (int): Maximum number of retry attempts. max_backoff_sec (int): Maximum delay between retries in seconds. base_delay_sec (int): Initial delay in seconds for the first retry. target_exception (Exception): The specific exception type to catch. """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): retries = 0 while True: # Loop indefinitely until success or max retries exceeded try: # Attempt to execute the wrapped function return func(*args, **kwargs) except target_exception as e: # Check if the error is the specific 429 Quota Exceeded error # We parse the string representation as gspread's APIError includes the status code there. error_message = str(e) is_rate_limit_error = "[429]" in error_message and ( "Quota exceeded" in error_message or "Too Many Requests" in error_message ) if is_rate_limit_error: retries += 1 if retries > max_retries: print(f"Max retries ({max_retries}) exceeded for {func.__name__}. Last error: {e}") raise e # Re-raise the last exception after exhausting retries # Calculate exponential backoff delay with random jitter (0-1 second) backoff_delay = min(max_backoff_sec, base_delay_sec * (2 ** (retries - 1)) + random.uniform(0, 1)) print( f"Rate limit hit for {func.__name__} (Attempt {retries}/{max_retries}). " f"Retrying in {backoff_delay:.2f} seconds. Error: {e}" ) time.sleep(backoff_delay) else: # If it's a different kind of APIError (e.g., 403 Forbidden, 404 Not Found), re-raise immediately. print(f"Non-rate-limit APIError encountered in {func.__name__}: {e}") raise e except Exception as e: # Catch any other unexpected exceptions during the function execution print(f"An unexpected error occurred in {func.__name__}: {e}") raise e # Re-raise unexpected errors return wrapper return decorator #2) Initialize Google Sheets client # Define the scopes scope = [ "https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive", ] # Authenticate immediately on import creds = ServiceAccountCredentials.from_json_keyfile_dict(GSERVICE_ACCOUNT_INFO, scope) client = gspread.authorize(creds) @exponential_backoff_gspread(max_retries=30, max_backoff_sec=64) def read_sheet_to_df(custom_sheet_name=None, sheet_index=0): """ Read all data from a Google Sheet into a pandas DataFrame. Parameters: custom_sheet_name (str): The name of the Google Sheet to open. If None, uses GSHEET_NAME. sheet_index (int): Index of the worksheet within the spreadsheet (default is 0, the first sheet). Returns: pandas.DataFrame: DataFrame containing the sheet data, with the first row used as headers. """ # Determine which sheet name to use if custom_sheet_name is None: custom_sheet_name = GSHEET_NAME # Open the spreadsheet try: spreadsheet = client.open(custom_sheet_name) except gspread.SpreadsheetNotFound: return None # Select the desired worksheet try: worksheet = spreadsheet.get_worksheet(sheet_index) except IndexError: return None # Fetch all data: first row as header, remaining as records data = worksheet.get_all_records() # Convert to DataFrame df = pd.DataFrame(data) return df @exponential_backoff_gspread(max_retries=30, max_backoff_sec=64) def append_to_sheet(user_data=None, custom_row_dict=None, custom_sheet_name=None, add_header_when_create_sheet=False): """ Append a new row to a Google Sheet. If 'custom_row' is provided, append that row. Otherwise, append a default row constructed from the provided user_data. """ if custom_sheet_name is None: custom_sheet_name = GSHEET_NAME try: # Try to open the spreadsheet by name spreadsheet = client.open(custom_sheet_name) is_new = False except SpreadsheetNotFound: # If it doesn't exist, create it spreadsheet = client.create(custom_sheet_name) # Optionally, share the new spreadsheet with designated emails spreadsheet.share('shanghuagao@gmail.com', perm_type='user', role='writer') spreadsheet.share('rzhu@college.harvard.edu', perm_type='user', role='writer') is_new = True print("Spreadsheet ID:", spreadsheet.id) # Access the first worksheet sheet = spreadsheet.sheet1 if is_new and add_header_when_create_sheet: # headers come from the keys of our row dict if custom_row_dict is not None: headers = list(custom_row_dict.keys()) else: headers = list(user_data.keys()) sheet.append_row(headers) if custom_row_dict is not None: custom_row = [custom_row_dict.get(header) for header in list(custom_row_dict.keys())] else: # Construct the default row with a timestamp and user_data fields custom_row = [str(datetime.datetime.now()), user_data["question"], user_data["final_answer"], user_data["trace"]] # Append the custom or default row to the sheet sheet.append_row(custom_row) def format_chat(response): chat_history = [] # Keep track of the last assistant message's tool_calls last_tool_calls = [] for msg in response: if msg["role"] == "assistant": content = msg.get("content", "") # Extract tool_calls from this assistant message (if any) last_tool_calls = json.loads(msg.get("tool_calls", "[]")) # Emit the assistant's main message chat_history.append( gr.ChatMessage(role="assistant", content=content) ) elif msg["role"] == "tool": # For each tool response, we pair it with the corresponding call for i, tool_call in enumerate(last_tool_calls): name = tool_call.get("name", "") args = tool_call.get("arguments", {}) # Determine icon + title if name == "Tool_RAG": title = "🧰 Tool RAG" else: title = f"🛠️ {name}" # Parse and pretty-print the tool response content raw = msg.get("content", "") try: parsed = json.loads(raw) pretty = json.dumps(parsed) except json.JSONDecodeError: pretty = raw # Add as a single ChatMessage with metadata.title and metadata.log. # Display the arguments as the first part of the content, clearly separated from the response, # and display the tool response content as contiguous text. chat_history.append( gr.ChatMessage( role="assistant", content=f"Input: {json.dumps(args)}\n\nResponse:\n{pretty}", metadata={ "title": title } ) ) # Clear after rendering last_tool_calls = [] return chat_history