|
import os |
|
from datetime import datetime |
|
import json |
|
from huggingface_hub import HfApi |
|
import gradio as gr |
|
import csv |
|
import pandas as pd |
|
import io |
|
from typing import TypedDict, List |
|
from climateqa.constants import DOCUMENT_METADATA_DEFAULT_VALUES |
|
from langchain_core.documents import Document |
|
|
|
def serialize_docs(docs:list[Document])->list: |
|
"""Convert document objects to a simplified format compatible with Hugging Face datasets. |
|
|
|
This function processes document objects by extracting their page content and metadata, |
|
normalizing the metadata structure to ensure consistency. It applies default values |
|
from DOCUMENT_METADATA_DEFAULT_VALUES for any missing metadata fields. |
|
|
|
Args: |
|
docs (list): List of document objects, each with page_content and metadata attributes |
|
|
|
Returns: |
|
list: List of dictionaries with standardized "page_content" and "metadata" fields |
|
""" |
|
new_docs = [] |
|
for doc in docs: |
|
|
|
new_doc = { |
|
"page_content": doc.page_content, |
|
"metadata": {} |
|
} |
|
|
|
|
|
for field, default_value in DOCUMENT_METADATA_DEFAULT_VALUES.items(): |
|
new_value = doc.metadata.get(field, default_value) |
|
try: |
|
new_doc["metadata"][field] = type(default_value)(new_value) |
|
except: |
|
new_doc["metadata"][field] = default_value |
|
|
|
new_docs.append(new_doc) |
|
|
|
if new_docs == []: |
|
new_docs = [{"page_content": "No documents found", "metadata": DOCUMENT_METADATA_DEFAULT_VALUES}] |
|
return new_docs |
|
|
|
|
|
|
|
def log_on_azure(file, logs, share_client): |
|
"""Log data to Azure Blob Storage. |
|
|
|
Args: |
|
file (str): Name of the file to store logs |
|
logs (dict): Log data to store |
|
share_client: Azure share client instance |
|
""" |
|
logs = json.dumps(logs) |
|
file_client = share_client.get_file_client(file) |
|
file_client.upload_file(logs) |
|
|
|
|
|
def log_interaction_to_azure(history, output_query, sources, docs, share_client, user_id): |
|
"""Log chat interaction to Azure and Hugging Face. |
|
|
|
Args: |
|
history (list): Chat message history |
|
output_query (str): Processed query |
|
sources (list): Knowledge base sources used |
|
docs (list): Retrieved documents |
|
share_client: Azure share client instance |
|
user_id (str): User identifier |
|
""" |
|
try: |
|
|
|
if os.getenv("GRADIO_ENV") != "local": |
|
timestamp = str(datetime.now().timestamp()) |
|
prompt = history[1]["content"] |
|
logs = { |
|
"user_id": str(user_id), |
|
"prompt": prompt, |
|
"query": prompt, |
|
"question": output_query, |
|
"sources": sources, |
|
"docs": serialize_docs(docs), |
|
"answer": history[-1].content, |
|
"time": timestamp, |
|
} |
|
|
|
log_on_azure(f"{timestamp}.json", logs, share_client) |
|
except Exception as e: |
|
print(f"Error logging on Azure Blob Storage: {e}") |
|
error_msg = f"ClimateQ&A Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" |
|
raise gr.Error(error_msg) |
|
|
|
def log_drias_interaction_to_azure(query, sql_query, data, share_client, user_id): |
|
"""Log Drias data interaction to Azure and Hugging Face. |
|
|
|
Args: |
|
query (str): User query |
|
sql_query (str): SQL query used |
|
data: Retrieved data |
|
share_client: Azure share client instance |
|
user_id (str): User identifier |
|
""" |
|
try: |
|
|
|
if os.getenv("GRADIO_ENV") != "local": |
|
timestamp = str(datetime.now().timestamp()) |
|
logs = { |
|
"user_id": str(user_id), |
|
"query": query, |
|
"sql_query": sql_query, |
|
"time": timestamp, |
|
} |
|
log_on_azure(f"drias_{timestamp}.json", logs, share_client) |
|
print(f"Logged Drias interaction to Azure Blob Storage: {logs}") |
|
else: |
|
print("share_client or user_id is None, or GRADIO_ENV is local") |
|
except Exception as e: |
|
print(f"Error logging Drias interaction on Azure Blob Storage: {e}") |
|
error_msg = f"Drias Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" |
|
raise gr.Error(error_msg) |
|
|
|
|
|
|
|
def log_on_huggingface(log_filename, logs, log_type="chat"): |
|
"""Log data to Hugging Face dataset repository. |
|
|
|
Args: |
|
log_filename (str): Name of the file to store logs |
|
logs (dict): Log data to store |
|
log_type (str): Type of log to store |
|
""" |
|
try: |
|
if log_type =="chat": |
|
|
|
hf_token = os.getenv("HF_LOGS_TOKEN") |
|
if not hf_token: |
|
print("HF_LOGS_TOKEN not found in environment variables") |
|
return |
|
|
|
|
|
repo_id = os.getenv("HF_DATASET_REPO", "Ekimetrics/climateqa_logs") |
|
|
|
elif log_type =="drias": |
|
|
|
hf_token = os.getenv("HF_LOGS_DRIAS_TOKEN") |
|
if not hf_token: |
|
print("HF_LOGS_DRIAS_TOKEN not found in environment variables") |
|
return |
|
|
|
|
|
repo_id = os.getenv("HF_DATASET_REPO_DRIAS", "Ekimetrics/climateqa_logs_talk_to_data") |
|
|
|
else: |
|
raise ValueError(f"Invalid log type: {log_type}") |
|
|
|
|
|
api = HfApi(token=hf_token) |
|
|
|
|
|
logs["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
|
|
|
|
|
logs_json = json.dumps(logs) |
|
|
|
|
|
api.upload_file( |
|
path_or_fileobj=logs_json.encode('utf-8'), |
|
path_in_repo=log_filename, |
|
repo_id=repo_id, |
|
repo_type="dataset" |
|
) |
|
|
|
except Exception as e: |
|
print(f"Error logging to Hugging Face: {e}") |
|
|
|
|
|
def log_interaction_to_huggingface(history, output_query, sources, docs, share_client, user_id): |
|
"""Log chat interaction to Hugging Face. |
|
|
|
Args: |
|
history (list): Chat message history |
|
output_query (str): Processed query |
|
sources (list): Knowledge base sources used |
|
docs (list): Retrieved documents |
|
share_client: Azure share client instance (unused in this function) |
|
user_id (str): User identifier |
|
""" |
|
try: |
|
|
|
if os.getenv("GRADIO_ENV") != "local": |
|
timestamp = str(datetime.now().timestamp()) |
|
prompt = history[1]["content"] |
|
logs = { |
|
"user_id": str(user_id), |
|
"prompt": prompt, |
|
"query": prompt, |
|
"question": output_query, |
|
"sources": sources, |
|
"docs": serialize_docs(docs), |
|
"answer": history[-1].content, |
|
"time": timestamp, |
|
} |
|
|
|
log_on_huggingface(f"chat/{timestamp}.json", logs, log_type="chat") |
|
print(f"Logged interaction to Hugging Face") |
|
else: |
|
print("Did not log to Hugging Face because GRADIO_ENV is local") |
|
except Exception as e: |
|
print(f"Error logging to Hugging Face: {e}") |
|
error_msg = f"ClimateQ&A Error: {str(e)[:100]})" |
|
raise gr.Error(error_msg) |
|
|
|
def log_drias_interaction_to_huggingface(query, sql_query, user_id): |
|
"""Log Drias data interaction to Hugging Face. |
|
|
|
Args: |
|
query (str): User query |
|
sql_query (str): SQL query used |
|
data: Retrieved data |
|
user_id (str): User identifier |
|
""" |
|
try: |
|
if os.getenv("GRADIO_ENV") != "local": |
|
timestamp = str(datetime.now().timestamp()) |
|
logs = { |
|
"user_id": str(user_id), |
|
"query": query, |
|
"sql_query": sql_query, |
|
"time": timestamp, |
|
} |
|
log_on_huggingface(f"drias/drias_{timestamp}.json", logs, log_type="drias") |
|
print(f"Logged Drias interaction to Hugging Face: {logs}") |
|
else: |
|
print("share_client or user_id is None, or GRADIO_ENV is local") |
|
except Exception as e: |
|
print(f"Error logging Drias interaction to Hugging Face: {e}") |
|
error_msg = f"Drias Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" |
|
raise gr.Error(error_msg) |
|
|
|
def log_interaction(history, output_query, sources, docs, share_client, user_id): |
|
"""Log chat interaction to Hugging Face, and fall back to Azure if that fails. |
|
|
|
Args: |
|
history (list): Chat message history |
|
output_query (str): Processed query |
|
sources (list): Knowledge base sources used |
|
docs (list): Retrieved documents |
|
share_client: Azure share client instance |
|
user_id (str): User identifier |
|
""" |
|
try: |
|
|
|
log_interaction_to_huggingface(history, output_query, sources, docs, share_client, user_id) |
|
except Exception as e: |
|
print(f"Failed to log to Hugging Face, falling back to Azure: {e}") |
|
try: |
|
|
|
if os.getenv("GRADIO_ENV") != "local": |
|
timestamp = str(datetime.now().timestamp()) |
|
prompt = history[1]["content"] |
|
logs = { |
|
"user_id": str(user_id), |
|
"prompt": prompt, |
|
"query": prompt, |
|
"question": output_query, |
|
"sources": sources, |
|
"docs": serialize_docs(docs), |
|
"answer": history[-1].content, |
|
"time": timestamp, |
|
} |
|
|
|
log_on_azure(f"{timestamp}.json", logs, share_client) |
|
print("Successfully logged to Azure as fallback") |
|
except Exception as azure_error: |
|
print(f"Error in Azure fallback logging: {azure_error}") |
|
error_msg = f"ClimateQ&A Logging Error: {str(azure_error)[:100]})" |
|
|
|
print(error_msg) |
|
|
|
|
|
|