Spaces:
Runtime error
Runtime error
| import json | |
| from collections import defaultdict | |
| import os | |
| from tabulate import tabulate | |
| from datasets import load_dataset | |
| private_solutions = {} | |
| def load_private_solutions(): | |
| global private_solutions | |
| private_zebra_data = load_dataset("allenai/ZebraLogicBench-private", "grid_mode", split="test") | |
| for item in private_zebra_data: | |
| private_solutions[item["id"]] = item["solution"] | |
| return | |
| def load_model_results(run_name_folders): | |
| model_results = {} | |
| for run_name, folder in run_name_folders.items(): | |
| # iterate all json files under the folder | |
| for filename in os.listdir(folder): | |
| filepath = os.path.join(folder, filename) | |
| if not filename.endswith(".json"): | |
| continue | |
| model_name = filename.replace(".json", "") | |
| model_name = f"{model_name}%{run_name}" | |
| model_results[model_name] = filepath | |
| return model_results | |
| def extract_last_complete_json(s): | |
| # Stack to keep track of opening and closing braces | |
| stack = [] | |
| last_json_start = None | |
| last_json_str = None | |
| for i, char in enumerate(s): | |
| if char == '{': | |
| stack.append(i) | |
| if last_json_start is None: | |
| last_json_start = i | |
| elif char == '}': | |
| if stack: | |
| start = stack.pop() | |
| if not stack: | |
| # Complete JSON object found | |
| last_json_str = s[last_json_start:i+1] | |
| last_json_start = None | |
| # Load the last JSON object | |
| if last_json_str: | |
| try: | |
| return json.loads(last_json_str.replace("\n", "")) | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| def eval_each_puzzle(id, prediction_table): | |
| global private_solutions | |
| if not private_solutions: | |
| load_private_solutions() | |
| solution = private_solutions[id] | |
| solution_table = {} | |
| num_houses = len(solution["rows"]) | |
| columns = solution["header"] | |
| assert columns[0] == "House" | |
| solution_table = {} | |
| this_total_cells = 0 | |
| for i in range(num_houses): | |
| solution_table[f'House {i+1}'] = {columns[j]: solution["rows"][i][j] for j in range(1, len(columns))} | |
| this_total_cells += len(columns) - 1 | |
| this_correct_cells = 0 # number in the solution_table | |
| for house in solution_table: | |
| for column in solution_table[house]: | |
| # if prediction_table[house][column] not exist then pass | |
| if house in prediction_table and column in prediction_table[house]: | |
| truth_cell = solution_table[house][column].lower().strip() | |
| if prediction_table[house][column] is None or len(prediction_table[house][column]) == 0: | |
| continue | |
| if type(prediction_table[house][column]) == list: | |
| predicted_cell = prediction_table[house][column][0].lower().strip() | |
| elif type(prediction_table[house][column]) == str: | |
| predicted_cell = prediction_table[house][column].lower().strip() | |
| if truth_cell == predicted_cell: | |
| this_correct_cells += 1 | |
| return this_total_cells, this_correct_cells, private_solutions[id] | |
| def eval_model(model, filepath): | |
| global private_solutions | |
| with open(filepath, "r") as f: | |
| print(f"Processing {filepath}") | |
| data = json.load(f) | |
| solved_puzzles = 0 | |
| num_total_puzzles = len(data) | |
| correct_cells = 0 | |
| total_cells = 0 | |
| no_asnwer = 0 | |
| num_total_puzzles_by_size = defaultdict(int) | |
| solved_puzzles_by_size = defaultdict(int) | |
| reason_lens = [] | |
| for item in data: | |
| # solution = item["solution"] | |
| solution = private_solutions[item["id"]] | |
| size = item["size"] | |
| num_total_puzzles_by_size[size] += 1 | |
| # Process the solution | |
| solution_table = {} | |
| num_houses = len(solution["rows"]) | |
| columns = solution["header"] | |
| assert columns[0] == "House" | |
| solution_table = {} | |
| this_total_cells = 0 | |
| for i in range(num_houses): | |
| solution_table[f'House {i+1}'] = {columns[j]: solution["rows"][i][j] for j in range(1, len(columns))} | |
| this_total_cells += len(columns) - 1 | |
| total_cells += this_total_cells | |
| # Read and Parse the prediction from model output | |
| prediction_str = item["output"][0] | |
| prediction_json = extract_last_complete_json(prediction_str) | |
| if prediction_json is None or "solution" not in prediction_json: | |
| # print("-"*100) | |
| # prediction_str = prediction_str.replace("\n", "") | |
| # print([prediction_str]) | |
| # json.loads(prediction_str) | |
| no_asnwer += 1 | |
| # print(item["id"]) | |
| continue | |
| reason = prediction_json.get("reasoning", "") | |
| prediction_table = prediction_json["solution"] | |
| reason_lens.append(len(reason)) | |
| this_correct_cells = 0 # number in the solution_table | |
| for house in solution_table: | |
| for column in solution_table[house]: | |
| # if prediction_table[house][column] not exist then pass | |
| if house in prediction_table and column in prediction_table[house]: | |
| truth_cell = solution_table[house][column].lower().strip() | |
| if prediction_table[house][column] is None or len(prediction_table[house][column]) == 0: | |
| continue | |
| if type(prediction_table[house][column]) == list: | |
| predicted_cell = prediction_table[house][column][0].lower().strip() | |
| elif type(prediction_table[house][column]) == str: | |
| predicted_cell = prediction_table[house][column].lower().strip() | |
| else: | |
| raise ValueError(f"Unknown type: {type(prediction_table[house][column])}") | |
| if truth_cell == predicted_cell: | |
| this_correct_cells += 1 | |
| correct_cells += this_correct_cells | |
| # compute puzzle success rate | |
| if this_correct_cells == this_total_cells: | |
| solved_puzzles += 1 | |
| solved_puzzles_by_size[size] += 1 | |
| # # print the success rate by size; order the dict by size first | |
| sizes = sorted(num_total_puzzles_by_size.keys()) | |
| easy_sizes = ['2*2', '2*3', '2*4', '2*5', '2*6', '3*2', '3*3',] | |
| hard_sizes = ['3*4', '3*5', '4*2', '3*6', '4*3', '4*4', '5*2', '6*2', '4*5', '4*6', '5*3', '5*4', '5*5', '5*6', '6*3', '6*4', '6*5', '6*6'] | |
| easy_solved_puzzles = sum([solved_puzzles_by_size[size] for size in easy_sizes]) | |
| easy_total_puzzles = sum([num_total_puzzles_by_size[size] for size in easy_sizes]) | |
| hard_solved_puzzles = sum([solved_puzzles_by_size[size] for size in hard_sizes]) | |
| hard_total_puzzles = sum([num_total_puzzles_by_size[size] for size in hard_sizes]) | |
| # for size in sizes: | |
| # print(f"Size {size}: {solved_puzzles_by_size[size]}/{num_total_puzzles_by_size[size]} -> {solved_puzzles_by_size[size]/num_total_puzzles_by_size[size]*100:.2f}%") | |
| result = {} | |
| result["Model"] = model.split("%")[0] | |
| result["Mode"] = model.split("%")[1] | |
| result["Puzzle Acc"] = f"{solved_puzzles/num_total_puzzles*100:.2f}" | |
| result["Cell Acc"] = f"{correct_cells/total_cells*100:.2f}" | |
| result["No answer"] = f"{no_asnwer/num_total_puzzles*100:.2f}" | |
| result["Easy Puzzle Acc"] = f"{easy_solved_puzzles/easy_total_puzzles*100:.2f}" | |
| result["Hard Puzzle Acc"] = f"{hard_solved_puzzles/hard_total_puzzles*100:.2f}" | |
| result["Total Puzzles"] = num_total_puzzles | |
| result["Reason Lens"] = f"{sum(reason_lens)/len(reason_lens):.2f}" | |
| return result | |
| def gen_results(run_name_folders): | |
| model_results = load_model_results(run_name_folders) | |
| columns = ["Model", "Mode", "Puzzle Acc", "Cell Acc", "No answer", "Easy Puzzle Acc", "Hard Puzzle Acc", "Total Puzzles", "Reason Lens"] | |
| rows = [] | |
| for model_name, filepath in model_results.items(): | |
| result = eval_model(model_name, filepath) | |
| rows.append(result) | |
| # sort the rows by puzzle accuracy | |
| rows = sorted(rows, key=lambda x: -float(x["Puzzle Acc"])) | |
| # Convert rows to the expected format for tabulate | |
| table_data = [[row[col] for col in columns] for row in rows] | |
| print(tabulate(table_data, headers=columns, tablefmt="fancy_outline", stralign="center", numalign="center")) | |
| # print(tabulate(rows, headers=columns, tablefmt="github")) | |
| # write to json file | |
| with open("result_dirs/zebra-grid.summary.json", "w") as f: | |
| json.dump(rows, f, indent=2) | |
| if __name__ == "__main__": | |
| run_name_folders = { | |
| "greedy": "result_dirs/zebra-grid", | |
| "sampling": "result_dirs/zebra-grid/sampling", | |
| } | |
| load_private_solutions() | |
| gen_results(run_name_folders) | |