Spaces:
Sleeping
Sleeping
import datetime | |
# Imports required for Google Sheets integration | |
import gspread | |
import random | |
import time | |
import functools | |
from gspread.exceptions import SpreadsheetNotFound, APIError | |
from oauth2client.service_account import ServiceAccountCredentials | |
import pandas as pd | |
import json | |
import gradio as gr | |
import os | |
GSERVICE_ACCOUNT_INFO = { | |
"type": "service_account", | |
"project_id": "txagent", | |
"private_key_id": "cc1a12e427917244a93faf6f19e72b589a685e65", | |
"private_key": None, | |
"client_email": "[email protected]", | |
"client_id": "108950722202634464257", | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/shanghua%40txagent.iam.gserviceaccount.com", | |
"universe_domain": "googleapis.com" | |
} | |
GSHEET_NAME = "TxAgent_data_collection" | |
GSheet_API_KEY = os.environ.get("GSheets_Shanghua_PrivateKey") | |
if GSheet_API_KEY is None: | |
print("GSheet_API_KEY not found in environment variables. Please set it.") | |
else: | |
GSheet_API_KEY = GSheet_API_KEY.replace("\\n", "\n") | |
GSERVICE_ACCOUNT_INFO["private_key"] = GSheet_API_KEY | |
#Exponential backoff retry decorator | |
def exponential_backoff_gspread(max_retries=30, max_backoff_sec=64, base_delay_sec=1, target_exception=APIError): | |
""" | |
Decorator to implement exponential backoff for gspread API calls. | |
Retries a function call if it raises a specific exception (defaults to APIError) | |
that matches the Google Sheets API rate limit error (HTTP 429). | |
Args: | |
max_retries (int): Maximum number of retry attempts. | |
max_backoff_sec (int): Maximum delay between retries in seconds. | |
base_delay_sec (int): Initial delay in seconds for the first retry. | |
target_exception (Exception): The specific exception type to catch. | |
""" | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
retries = 0 | |
while True: # Loop indefinitely until success or max retries exceeded | |
try: | |
# Attempt to execute the wrapped function | |
return func(*args, **kwargs) | |
except target_exception as e: | |
# Check if the error is the specific 429 Quota Exceeded error | |
# We parse the string representation as gspread's APIError includes the status code there. | |
error_message = str(e) | |
is_rate_limit_error = "[429]" in error_message and ( | |
"Quota exceeded" in error_message or "Too Many Requests" in error_message | |
) | |
if is_rate_limit_error: | |
retries += 1 | |
if retries > max_retries: | |
print(f"Max retries ({max_retries}) exceeded for {func.__name__}. Last error: {e}") | |
raise e # Re-raise the last exception after exhausting retries | |
# Calculate exponential backoff delay with random jitter (0-1 second) | |
backoff_delay = min(max_backoff_sec, base_delay_sec * (2 ** (retries - 1)) + random.uniform(0, 1)) | |
print( | |
f"Rate limit hit for {func.__name__} (Attempt {retries}/{max_retries}). " | |
f"Retrying in {backoff_delay:.2f} seconds. Error: {e}" | |
) | |
time.sleep(backoff_delay) | |
else: | |
# If it's a different kind of APIError (e.g., 403 Forbidden, 404 Not Found), re-raise immediately. | |
print(f"Non-rate-limit APIError encountered in {func.__name__}: {e}") | |
raise e | |
except Exception as e: | |
# Catch any other unexpected exceptions during the function execution | |
print(f"An unexpected error occurred in {func.__name__}: {e}") | |
raise e # Re-raise unexpected errors | |
return wrapper | |
return decorator | |
#2) Initialize Google Sheets client | |
# Define the scopes | |
scope = [ | |
"https://spreadsheets.google.com/feeds", | |
"https://www.googleapis.com/auth/drive", | |
] | |
# Authenticate immediately on import | |
creds = ServiceAccountCredentials.from_json_keyfile_dict(GSERVICE_ACCOUNT_INFO, scope) | |
client = gspread.authorize(creds) | |
def read_sheet_to_df(custom_sheet_name=None, sheet_index=0): | |
""" | |
Read all data from a Google Sheet into a pandas DataFrame. | |
Parameters: | |
custom_sheet_name (str): The name of the Google Sheet to open. If None, uses GSHEET_NAME. | |
sheet_index (int): Index of the worksheet within the spreadsheet (default is 0, the first sheet). | |
Returns: | |
pandas.DataFrame: DataFrame containing the sheet data, with the first row used as headers. | |
""" | |
# Determine which sheet name to use | |
if custom_sheet_name is None: | |
custom_sheet_name = GSHEET_NAME | |
# Open the spreadsheet | |
try: | |
spreadsheet = client.open(custom_sheet_name) | |
except gspread.SpreadsheetNotFound: | |
return None | |
# Select the desired worksheet | |
try: | |
worksheet = spreadsheet.get_worksheet(sheet_index) | |
except IndexError: | |
return None | |
# Fetch all data: first row as header, remaining as records | |
data = worksheet.get_all_records() | |
# Convert to DataFrame | |
df = pd.DataFrame(data) | |
return df | |
def append_to_sheet(user_data=None, custom_row_dict=None, custom_sheet_name=None, add_header_when_create_sheet=False): | |
""" | |
Append a new row to a Google Sheet. If 'custom_row' is provided, append that row. | |
Otherwise, append a default row constructed from the provided user_data. | |
Ensures that each value is aligned with the correct column header. | |
""" | |
if custom_sheet_name is None: | |
custom_sheet_name = GSHEET_NAME | |
try: | |
# Try to open the spreadsheet by name | |
spreadsheet = client.open(custom_sheet_name) | |
is_new = False | |
print("Spreadsheet opened successfully:", custom_sheet_name) | |
except SpreadsheetNotFound: | |
# If it doesn't exist, create it | |
spreadsheet = client.create(custom_sheet_name) | |
# Optionally, share the new spreadsheet with designated emails | |
spreadsheet.share('[email protected]', perm_type='user', role='writer') | |
spreadsheet.share('[email protected]', perm_type='user', role='writer') | |
is_new = True | |
print("Spreadsheet created successfully:", custom_sheet_name) | |
print("Spreadsheet ID:", spreadsheet.id) | |
# Access the first worksheet | |
sheet = spreadsheet.sheet1 | |
# Check if the sheet has any rows yet | |
existing_values = sheet.get_all_values() | |
is_empty = (existing_values == [[]]) #indicates empty spreadsheet that was cleared in the past | |
# --- Always ensure header row is present and get headers --- | |
if (is_new or is_empty) and add_header_when_create_sheet: | |
# headers come from the keys of our row dict | |
if custom_row_dict is not None: | |
headers = list(custom_row_dict.keys()) | |
else: | |
headers = list(user_data.keys()) | |
sheet.append_row(headers) | |
else: | |
# Read headers from the first row of the sheet | |
headers = sheet.row_values(1) if sheet.row_count > 0 else [] | |
# --- Build row aligned to headers --- | |
if custom_row_dict is not None: | |
# Ensure all values are aligned to headers, fill missing with "" | |
custom_row = [custom_row_dict.get(header, "") for header in headers] | |
else: | |
# Construct the default row with a timestamp and user_data fields | |
custom_row = [str(datetime.datetime.now()), user_data["question"], user_data["final_answer"], user_data["trace"]] | |
# Append the custom or default row to the sheet | |
sheet.append_row(custom_row) | |
def format_chat(response, tool_database_labels): | |
chat_history = [] | |
# Keep track of the last assistant message's tool_calls | |
last_tool_calls = [] | |
for msg in response: | |
if msg["role"] == "assistant": | |
content = msg.get("content", "") | |
# Extract tool_calls from this assistant message (if any) | |
last_tool_calls = json.loads(msg.get("tool_calls", "[]")) | |
# Emit the assistant's main message | |
chat_history.append( | |
gr.ChatMessage(role="assistant", content=content) | |
) | |
elif msg["role"] == "tool": | |
# For each tool response, we pair it with the corresponding call | |
for i, tool_call in enumerate(last_tool_calls): | |
name = tool_call.get("name", "") | |
args = tool_call.get("arguments", {}) | |
# Determine icon + title | |
database_label = "" | |
if name == "Tool_RAG": | |
title = "🧰 Tool RAG" | |
else: | |
title = f"🛠️ {name}" | |
for db_label, tool_list in tool_database_labels.items(): | |
if name in tool_list: | |
title = f"🛠️ {name}\n(**Info** {db_label} [Click to view])" | |
database_label = " (" + db_label + ")" | |
break | |
# Parse and pretty-print the tool response content | |
raw = msg.get("content", "") | |
try: | |
parsed = json.loads(raw) | |
pretty = json.dumps(parsed) | |
except json.JSONDecodeError: | |
pretty = raw | |
# Add as a single ChatMessage with metadata.title and metadata.log. | |
# Display the arguments as the first part of the content, clearly separated from the response, | |
# and display the tool response content as contiguous text. | |
chat_history.append( | |
gr.ChatMessage( | |
role="assistant", | |
content=f"Tool Response{database_label}:\n{pretty}", | |
metadata={ | |
"title": title, | |
"log": json.dumps(args), | |
"status": 'done' | |
} | |
) | |
) | |
# Clear after rendering | |
last_tool_calls = [] | |
# if chat_history: | |
# last_msg = chat_history[-1] | |
# if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content: | |
# # Find the first assistant message | |
# for msg in chat_history: | |
# if msg.role == "assistant" and isinstance(msg.content, str): | |
# msg.content = "**Reasoning:**\n" + msg.content | |
# break | |
if chat_history: | |
last_msg = chat_history[-1] | |
if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content: | |
last_msg.content = last_msg.content.replace("[FinalAnswer]", "\n**Answer:**\n") | |
final_answer_messages = [gr.ChatMessage(role="assistant", content=chat_history[-1].content.split("\n**Answer:**\n")[-1].strip())] | |
assistant_count = sum(1 for msg in chat_history if msg.role == "assistant") | |
if assistant_count == 1: | |
# If only one assistant message, show "No reasoning conducted." | |
reasoning_messages = [gr.ChatMessage(role="assistant", content="No reasoning was conducted.")] | |
else: | |
# Include ALL messages in reasoning_messages, including the last one | |
reasoning_messages = chat_history.copy() | |
return final_answer_messages, reasoning_messages, chat_history |