Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| from datasets import load_dataset | |
| from logger import logger | |
| from typing import Dict, List | |
| DATASET_CONFIGS = [ | |
| 'covidqa', 'cuad', 'delucionqa', 'emanual', 'expertqa', 'finqa', 'hagrid', 'hotpotqa', 'msmarco', 'pubmedqa','tatqa', 'techqa' | |
| ] | |
| #function to load the dataset for the given configurations. | |
| #Args:configs: List of dataset configurations to load. | |
| #Returns: A dictionary where keys are config names and values are the loaded datasets. | |
| def load_rag_bench_dataset(configs: List[str]) -> Dict[str, dict]: | |
| ragbench = {} | |
| for config in configs: | |
| try: | |
| ragbench[config] = load_dataset("rungalileo/ragbench", config) | |
| logger.info(f"Successfully loaded dataset for config: {config}") | |
| except Exception as e: | |
| logger.error(f"Failed to load dataset for config {config}: {e}") | |
| return ragbench | |
| #Extract data from the RAGBench dataset and store it in a Pandas DataFrame. | |
| #Args:ragbench: Dictionary containing loaded datasets. split: Dataset split to extract (e.g., "train", "test", "validation"). | |
| #Returns:A Pandas DataFrame containing the extracted data. | |
| def ExtractData(ragbench: Dict[str, dict], split: str = "train") -> pd.DataFrame: | |
| # Initialize a dictionary to store extracted data | |
| data = { | |
| "question": [], | |
| "documents": [], | |
| "gpt3_context_relevance": [], | |
| "gpt35_utilization": [], | |
| "gpt3_adherence": [], | |
| "id": [], | |
| "dataset_name": [], | |
| "relevance_score": [], | |
| "utilization_score": [], | |
| "completeness_score": [], | |
| "adherence_score": [] | |
| } | |
| for datasetname, dataset in ragbench.items(): | |
| try: | |
| # Ensure the split exists in the dataset | |
| if split not in dataset: | |
| logger.warning(f"Split '{split}' not found in dataset {datasetname}. Skipping.") | |
| continue | |
| # Extract data from the specified split | |
| split_data = dataset[split] | |
| # Check if required columns exist | |
| required_columns = ["question", "documents", "gpt3_context_relevance", | |
| "gpt35_utilization", "gpt3_adherence", "id", "dataset_name"] | |
| missing_columns = [col for col in required_columns if col not in split_data.column_names] | |
| if missing_columns: | |
| logger.warning(f"Missing columns {missing_columns} in dataset {datasetname}. Skipping.") | |
| continue | |
| # Append data to lists | |
| data["question"].extend(split_data["question"]) | |
| data["documents"].extend(split_data["documents"]) | |
| data["gpt3_context_relevance"].extend(split_data["gpt3_context_relevance"]) | |
| data["gpt35_utilization"].extend(split_data["gpt35_utilization"]) | |
| data["gpt3_adherence"].extend(split_data["gpt3_adherence"]) | |
| data["id"].extend(split_data["id"]) | |
| data["dataset_name"].extend(split_data["dataset_name"]) | |
| data["relevance_score"].extend(split_data["relevance_score"]) | |
| data["utilization_score"].extend(split_data["utilization_score"]) | |
| data["completeness_score"].extend(split_data["completeness_score"]) | |
| data["adherence_score"].extend(split_data["adherence_score"]) | |
| logger.info(f"Successfully extracted data from {datasetname} ({split} split).") | |
| except Exception as e: | |
| logger.error(f"Error extracting data from {datasetname} ({split} split): {e}") | |
| # Convert the dictionary to a Pandas DataFrame | |
| df = pd.DataFrame(data) | |
| return df | |
| def ExtractRagBenchData(): | |
| ragbench = load_rag_bench_dataset(DATASET_CONFIGS) | |
| rag_extracted_data = ExtractData(ragbench, split="train") | |
| rag_extracted_data["dataset_name"].fillna("covidqa", inplace=True) | |
| return rag_extracted_data |