Spaces:
Running
Running
import json | |
from collections import defaultdict | |
import os | |
from tabulate import tabulate | |
from datasets import load_dataset | |
private_solutions = {} | |
def load_private_solutions(): | |
global private_solutions | |
private_zebra_data = load_dataset("allenai/ZebraLogicBench-private", "grid_mode", split="test") | |
for item in private_zebra_data: | |
private_solutions[item["id"]] = item["solution"] | |
return | |
def load_model_results(run_name_folders): | |
model_results = {} | |
for run_name, folder in run_name_folders.items(): | |
# iterate all json files under the folder | |
for filename in os.listdir(folder): | |
filepath = os.path.join(folder, filename) | |
if not filename.endswith(".json"): | |
continue | |
model_name = filename.replace(".json", "") | |
model_name = f"{model_name}%{run_name}" | |
model_results[model_name] = filepath | |
return model_results | |
def extract_last_complete_json(s): | |
# Stack to keep track of opening and closing braces | |
stack = [] | |
last_json_start = None | |
last_json_str = None | |
for i, char in enumerate(s): | |
if char == '{': | |
stack.append(i) | |
if last_json_start is None: | |
last_json_start = i | |
elif char == '}': | |
if stack: | |
start = stack.pop() | |
if not stack: | |
# Complete JSON object found | |
last_json_str = s[last_json_start:i+1] | |
last_json_start = None | |
# Load the last JSON object | |
if last_json_str: | |
try: | |
return json.loads(last_json_str.replace("\n", "")) | |
except json.JSONDecodeError: | |
pass | |
return None | |
def eval_each_puzzle(id, prediction_table): | |
global private_solutions | |
if not private_solutions: | |
load_private_solutions() | |
solution = private_solutions[id] | |
solution_table = {} | |
num_houses = len(solution["rows"]) | |
columns = solution["header"] | |
assert columns[0] == "House" | |
solution_table = {} | |
this_total_cells = 0 | |
for i in range(num_houses): | |
solution_table[f'House {i+1}'] = {columns[j]: solution["rows"][i][j] for j in range(1, len(columns))} | |
this_total_cells += len(columns) - 1 | |
this_correct_cells = 0 # number in the solution_table | |
for house in solution_table: | |
for column in solution_table[house]: | |
# if prediction_table[house][column] not exist then pass | |
if house in prediction_table and column in prediction_table[house]: | |
truth_cell = solution_table[house][column].lower().strip() | |
if prediction_table[house][column] is None or len(prediction_table[house][column]) == 0: | |
continue | |
if type(prediction_table[house][column]) == list: | |
predicted_cell = prediction_table[house][column][0].lower().strip() | |
elif type(prediction_table[house][column]) == str: | |
predicted_cell = prediction_table[house][column].lower().strip() | |
if truth_cell == predicted_cell: | |
this_correct_cells += 1 | |
return this_total_cells, this_correct_cells, private_solutions[id] | |
def eval_model(model, filepath): | |
global private_solutions | |
with open(filepath, "r") as f: | |
print(f"Processing {filepath}") | |
data = json.load(f) | |
solved_puzzles = 0 | |
num_total_puzzles = len(data) | |
correct_cells = 0 | |
total_cells = 0 | |
no_asnwer = 0 | |
num_total_puzzles_by_size = defaultdict(int) | |
solved_puzzles_by_size = defaultdict(int) | |
reason_lens = [] | |
for item in data: | |
# solution = item["solution"] | |
solution = private_solutions[item["id"]] | |
size = item["size"] | |
num_total_puzzles_by_size[size] += 1 | |
# Process the solution | |
solution_table = {} | |
num_houses = len(solution["rows"]) | |
columns = solution["header"] | |
assert columns[0] == "House" | |
solution_table = {} | |
this_total_cells = 0 | |
for i in range(num_houses): | |
solution_table[f'House {i+1}'] = {columns[j]: solution["rows"][i][j] for j in range(1, len(columns))} | |
this_total_cells += len(columns) - 1 | |
total_cells += this_total_cells | |
# Read and Parse the prediction from model output | |
prediction_str = item["output"][0] | |
prediction_json = extract_last_complete_json(prediction_str) | |
if prediction_json is None or "solution" not in prediction_json: | |
# print("-"*100) | |
# prediction_str = prediction_str.replace("\n", "") | |
# print([prediction_str]) | |
# json.loads(prediction_str) | |
no_asnwer += 1 | |
# print(item["id"]) | |
continue | |
reason = prediction_json.get("reasoning", "") | |
prediction_table = prediction_json["solution"] | |
reason_lens.append(len(reason)) | |
this_correct_cells = 0 # number in the solution_table | |
for house in solution_table: | |
for column in solution_table[house]: | |
# if prediction_table[house][column] not exist then pass | |
if house in prediction_table and column in prediction_table[house]: | |
truth_cell = solution_table[house][column].lower().strip() | |
if prediction_table[house][column] is None or len(prediction_table[house][column]) == 0: | |
continue | |
if type(prediction_table[house][column]) == list: | |
predicted_cell = prediction_table[house][column][0].lower().strip() | |
elif type(prediction_table[house][column]) == str: | |
predicted_cell = prediction_table[house][column].lower().strip() | |
else: | |
raise ValueError(f"Unknown type: {type(prediction_table[house][column])}") | |
if truth_cell == predicted_cell: | |
this_correct_cells += 1 | |
correct_cells += this_correct_cells | |
# compute puzzle success rate | |
if this_correct_cells == this_total_cells: | |
solved_puzzles += 1 | |
solved_puzzles_by_size[size] += 1 | |
# # print the success rate by size; order the dict by size first | |
sizes = sorted(num_total_puzzles_by_size.keys()) | |
easy_sizes = ['2*2', '2*3', '2*4', '2*5', '2*6', '3*2', '3*3',] | |
hard_sizes = ['3*4', '3*5', '4*2', '3*6', '4*3', '4*4', '5*2', '6*2', '4*5', '4*6', '5*3', '5*4', '5*5', '5*6', '6*3', '6*4', '6*5', '6*6'] | |
easy_solved_puzzles = sum([solved_puzzles_by_size[size] for size in easy_sizes]) | |
easy_total_puzzles = sum([num_total_puzzles_by_size[size] for size in easy_sizes]) | |
hard_solved_puzzles = sum([solved_puzzles_by_size[size] for size in hard_sizes]) | |
hard_total_puzzles = sum([num_total_puzzles_by_size[size] for size in hard_sizes]) | |
# for size in sizes: | |
# print(f"Size {size}: {solved_puzzles_by_size[size]}/{num_total_puzzles_by_size[size]} -> {solved_puzzles_by_size[size]/num_total_puzzles_by_size[size]*100:.2f}%") | |
result = {} | |
result["Model"] = model.split("%")[0] | |
result["Mode"] = model.split("%")[1] | |
result["Puzzle Acc"] = f"{solved_puzzles/num_total_puzzles*100:.2f}" | |
result["Cell Acc"] = f"{correct_cells/total_cells*100:.2f}" | |
result["No answer"] = f"{no_asnwer/num_total_puzzles*100:.2f}" | |
result["Easy Puzzle Acc"] = f"{easy_solved_puzzles/easy_total_puzzles*100:.2f}" | |
result["Hard Puzzle Acc"] = f"{hard_solved_puzzles/hard_total_puzzles*100:.2f}" | |
result["Total Puzzles"] = num_total_puzzles | |
result["Reason Lens"] = f"{sum(reason_lens)/len(reason_lens):.2f}" | |
return result | |
def gen_results(run_name_folders): | |
model_results = load_model_results(run_name_folders) | |
columns = ["Model", "Mode", "Puzzle Acc", "Cell Acc", "No answer", "Easy Puzzle Acc", "Hard Puzzle Acc", "Total Puzzles", "Reason Lens"] | |
rows = [] | |
for model_name, filepath in model_results.items(): | |
result = eval_model(model_name, filepath) | |
rows.append(result) | |
# sort the rows by puzzle accuracy | |
rows = sorted(rows, key=lambda x: -float(x["Puzzle Acc"])) | |
# Convert rows to the expected format for tabulate | |
table_data = [[row[col] for col in columns] for row in rows] | |
print(tabulate(table_data, headers=columns, tablefmt="fancy_outline", stralign="center", numalign="center")) | |
# print(tabulate(rows, headers=columns, tablefmt="github")) | |
# write to json file | |
with open("result_dirs/zebra-grid.summary.json", "w") as f: | |
json.dump(rows, f, indent=2) | |
if __name__ == "__main__": | |
run_name_folders = { | |
"greedy": "result_dirs/zebra-grid", | |
"sampling": "result_dirs/zebra-grid/sampling", | |
} | |
load_private_solutions() | |
gen_results(run_name_folders) | |