Spaces:
Running
Running
| from copy import deepcopy | |
| from functools import partial | |
| from typing import * | |
| import pandas as pd | |
| from fire import Fire | |
| """ | |
| This code assumes dealing with only one instruction | |
| """ | |
| # from varco_arena.tournament | |
| def log2_power_of_two(n): | |
| # First, let's make sure n is indeed a power of 2 | |
| if n & (n - 1) != 0 or n == 0: | |
| raise ValueError("n must be a positive power of 2") | |
| exponent = 0 | |
| while n > 1: | |
| n >>= 1 # Right shift is like dividing by 2, but faster | |
| exponent += 1 | |
| return exponent | |
| def get_1st(df: pd.DataFrame, alpha2names: dict) -> Optional[str]: | |
| finals = df[df["round"] == "final"] | |
| if len(finals) == 1: | |
| first = finals.iloc[0].winner_resolved | |
| else: | |
| first = ( | |
| None # error case (no finals match or multiple finals (buggy result file)) | |
| ) | |
| return first | |
| def get_unique_participants(df: pd.DataFrame) -> list: | |
| participants = pd.concat([df.model_a, df.model_b]).unique().tolist() | |
| participants = [p for p in participants if p] # remove None | |
| participants = sorted(participants) # make it sorted | |
| return participants | |
| def _impute_byes(df): | |
| max_depth = df.depth.max() | |
| # init | |
| imputed_parts = dict() | |
| for depth in range(max_depth + 1): | |
| imputed_parts[depth] = df[df.depth == depth].copy() | |
| # reverse | |
| for depth in range(max_depth, 0, -1): # always we have 1 proper match for depth=0 | |
| null_v_null = { | |
| "model_a": "", | |
| "model_b": "", | |
| "winner": "model_a", | |
| "match_order_in_round": "-", | |
| "depth": depth, | |
| } | |
| # fill some_model vs null byes | |
| players = get_unique_participants(imputed_parts[depth]) | |
| proceeded = get_unique_participants(imputed_parts[depth - 1]) | |
| imputed = [] | |
| for p in proceeded: | |
| if p not in players: | |
| p_v_null = deepcopy(null_v_null) | |
| p_v_null["model_a"] = p | |
| imputed.append(p_v_null) | |
| imputed_parts[depth] = pd.concat( | |
| [ | |
| imputed_parts[depth], | |
| pd.DataFrame(imputed), | |
| ], | |
| axis="index", | |
| ) | |
| # fill null vs null | |
| n_null_v_null = 2 ** (depth) - len(imputed_parts[depth]) | |
| if n_null_v_null > 0: | |
| imputed = pd.DataFrame([null_v_null] * n_null_v_null) | |
| imputed_parts[depth] = pd.concat( | |
| [ | |
| imputed_parts[depth], | |
| imputed, | |
| ], | |
| axis="index", | |
| ) | |
| df_imputed = pd.concat(imputed_parts.values(), axis="index") | |
| df_imputed = df_imputed.sort_values(by="depth").reset_index(drop=True) | |
| return df_imputed | |
| def index_test_scenario(df) -> pd.DataFrame: | |
| df["inst_src"] = "inst: " + df.instruction + "\n\nsrc: " + df.source | |
| df["idx_inst_src"] = df.apply( | |
| lambda row: f"{row.tournament_idx}:\n{row.inst_src}", axis=1 | |
| ) | |
| # later used for tournament bracket backtrackiung | |
| if "depth" not in df.columns: | |
| mappings = { | |
| "final": 0, | |
| "semi-final": 1, | |
| "quarter-final": 2, | |
| } | |
| def _convert_round_to_depth(rnd: str, mappings=None) -> int: | |
| if rnd is None: | |
| depth = None | |
| elif rnd in mappings.keys(): | |
| depth = mappings[rnd] | |
| elif rnd.startswith("round-"): # assume perfect power of two | |
| num = int(rnd.replace("round-", "").strip()) | |
| depth = log2_power_of_two(num) - 1 | |
| return depth | |
| conv = partial(_convert_round_to_depth, mappings=mappings) | |
| df["depth"] = df["round"].apply(conv) | |
| return df | |
| def init_tournament_dataframe(df, alpha2names: dict = None) -> pd.DataFrame: | |
| df = df.sort_values(by="depth").reset_index(drop=True) | |
| # make winner interpretable (A -> model_a, B -> model_b) | |
| df.winner = df.winner.apply(lambda txt: f"model_{txt.lower()}") | |
| # define alpha2names if not given (covers upto 168 participants) | |
| if alpha2names is None: | |
| alphabets = "ABCDEFGHIJKLMNOPQRSTUVWXYZ\ | |
| abcdefghijklmnopqrstuvwxyz\ | |
| ⓐⓑⓒⓓⓔⓕⓖⓗⓘⓙⓚⓛⓜⓝⓞⓟⓠⓡⓢⓣⓤⓥⓦⓧⓨⓩ\ | |
| ㉠㉡㉢㉣㉤㉥㉦㉧㉨㉩㉪㉫㉬㉭\ | |
| ㉮㉯㉰㉱㉲㉳㉴㉵㉶㉷㉸㉹㉺㉻\ | |
| ㄱㄴㄷㄹㅁㅂㅅㅇㅈㅊㅋㅌㅍㅎ\ | |
| ΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ\ | |
| αβγδεζηθικλμνξοπρστυφχψω" | |
| model_full_names = get_unique_participants(df) | |
| alpha2names = dict(zip(alphabets, model_full_names)) | |
| if len(alpha2names) < len(model_full_names): | |
| raise ValueError( | |
| f"Tournament viewer cannot visualize more than {len(alphabets)=} participants. ({len(model_full_names)=} is given)\n\nOther features will not be affected but the tournament visualizer." | |
| ) | |
| names2alpha = dict(zip(alpha2names.values(), alpha2names.keys())) | |
| df = _impute_byes(df) | |
| # preserve readables for later | |
| df = _make_readables(df, names2alpha) | |
| if len(df[df["round"] == "final"]) != 1: | |
| raise ValueError(f"final match need to be one and only.") | |
| return df, alpha2names | |
| def _make_readables(df, names2alpha): | |
| df["human_readable_model_a"] = df.model_a.copy() | |
| df["human_readable_model_b"] = df.model_b.copy() | |
| df.model_a = df.model_a.apply( | |
| lambda modelname: names2alpha[modelname] if modelname else "x" | |
| ) | |
| df.model_b = df.model_b.apply( | |
| lambda modelname: names2alpha[modelname] if modelname else "x" | |
| ) | |
| df["human_readable_idx"] = df.apply( | |
| lambda row: f"{row.name}: {row.human_readable_model_a} ({row.model_a}) vs. {row.human_readable_model_b} ({row.model_b if row.model_b else 'x'})", | |
| axis=1, | |
| ) | |
| df["winner_resolved"] = df.apply(lambda row: row[row.winner], axis=1) | |
| df["winner_nodes"] = df.apply( | |
| lambda row: f"{row.winner_resolved}:{row.name}".ljust(4, " "), axis=1 | |
| ) # later for figure representation of winner as a "node" | |
| return df | |
| # draw | |
| def draw(df: pd.DataFrame, alpha2names: dict = None) -> str: | |
| def _draw_round( | |
| df: pd.DataFrame, | |
| depth: int = None, | |
| winners_in_order: list = None, | |
| ) -> Tuple: | |
| df_now = df[df.depth == depth] | |
| max_depth = df.depth.max() | |
| width = 2 ** ((max_depth - depth) + 2) | |
| connect_left = "─" * (width) | |
| connect_left = connect_left[4:] | |
| connect_right = " " * (width) | |
| connect_right = "┐" + connect_right[1:] | |
| if winners_in_order is None: | |
| assert ( | |
| depth == 0 | |
| ), f"{winners_in_order=} is only allowed when drawing the top (=final match)" | |
| winners_in_order = df_now.winner_nodes | |
| round_drawing_parts = [] | |
| descending_round_winners = [] | |
| for node in winners_in_order: | |
| round_drawing_parts.append("".join([node, connect_left, connect_right])) | |
| # next round winners in sync with winner order | |
| row_now = df_now.query(f"winner_nodes=='{node}'") | |
| descending_round_winners.append(row_now.model_a.item()) | |
| descending_round_winners.append(row_now.model_b.item()) | |
| # find descending_round_winners within winner_nodes format (num:alpha) | |
| if depth == max_depth: | |
| pass # keep the descending_round_winners intact | |
| else: | |
| df_descend = df[df.depth == depth + 1] | |
| for i, winner_alpha in enumerate(descending_round_winners): | |
| node_intr = df_descend.query( | |
| f"winner_resolved=='{winner_alpha}'" | |
| ).winner_nodes.item() | |
| descending_round_winners[i] = node_intr | |
| round_drawing = "".join(round_drawing_parts) | |
| descending_unit = " " * width | |
| descending_unit = "│" + descending_unit[1:] | |
| descending_lines_parts = [descending_unit] * len(df_now) * 2 | |
| descending_lines = "".join(descending_lines_parts) | |
| return round_drawing, descending_lines, descending_round_winners | |
| drawings = [] | |
| winners_in_order = None | |
| max_depth = df.depth.max() | |
| for depth in range(max_depth + 1): | |
| max_depth = df.depth.max() | |
| winner_drw, lines_desc, winners_in_order = _draw_round( | |
| df, | |
| depth=depth, | |
| winners_in_order=winners_in_order, | |
| ) | |
| drawings.append((winner_drw, lines_desc)) | |
| # prepare bracket top | |
| champion_alphabet = drawings[0][0].split()[0].split(":")[0] | |
| champion_readable = alpha2names[champion_alphabet] | |
| bracket_top = [f"🥇winner: {champion_readable}", "│"] | |
| # prepare mid | |
| bracket_mid = "\n".join(["\n".join(tup) for tup in drawings]) | |
| # prepare bot | |
| initial_participants = winners_in_order | |
| bracket_bot = (" " * 3).join(initial_participants) | |
| full_figure = "\n".join(bracket_top + [bracket_mid, bracket_bot]) | |
| return full_figure | |
| def number_breakdown_from_df(result_df: pd.DataFrame) -> str: | |
| n_models = len(get_unique_participants(result_df)) | |
| size_testset = int(len(result_df) / (n_models - 1)) | |
| interpretation = f"total {len(result_df)} matches = (n_models-1) * size_testset = ({n_models}-1) * {size_testset}" | |
| return interpretation, n_models, size_testset | |
| def make_legend_str(df, alpha2names) -> str: | |
| first = get_1st(df, alpha2names) | |
| alpha2names = {k: v.replace("🥇 ", "") for k, v in alpha2names.items()} | |
| alpha_ordered = sorted(list(alpha2names.keys())) | |
| # names_ordered = sorted(list(alpha2names.values())) | |
| # name2alpha = {v: k for k, v in alpha2names.items()} | |
| for k, v in alpha2names.items(): | |
| if v == alpha2names[first]: | |
| alpha2names[k] = f"🥇 {v}" | |
| res_str = f"\n\nlegend:" | |
| # for name in names_ordered: | |
| # alpha = name2alpha[name] | |
| for alpha in alpha_ordered: | |
| name_w_medal = alpha2names[alpha] | |
| res_str += f"\n{alpha}\t{name_w_medal}" | |
| return res_str | |
| def main( | |
| jslname: str = "result.json", | |
| ): | |
| """ | |
| 테스트 코드 | |
| """ | |
| df = pd.read_json(jslname, orient="records") | |
| df = df.drop(columns=["tstamp", "logs"]) | |
| df = index_test_scenario(df) | |
| # 중간에 visualization(df) 여기선 생략. 만약 이거 뺴고 다 따라했는데 문제가 생긴다면 viz 문제다. 근데 안그럴거같긴함 | |
| selections = df.idx_inst_src.unique() | |
| for i, sel in enumerate(selections): | |
| try: | |
| df_now = df[df.idx_inst_src == sel] | |
| df_now_processed, _alpha2names = init_tournament_dataframe( | |
| df_now, alpha2names=alpha2names if "alpha2names" in dir() else None | |
| ) | |
| if "alpha2names" not in dir(): | |
| alpha2names = _alpha2names | |
| assert "alpha2names" in dir() | |
| bracket_drawing = draw(df_now_processed, alpha2names=alpha2names) | |
| legend = make_legend_str(df_now_processed, alpha2names) | |
| print(bracket_drawing + legend) | |
| print(bracket_drawing + legend, file=open(f"{i}.txt", "w")) | |
| print(f"\n\n{sel}", file=open(f"{i}.txt", "a")) | |
| for match_idx_human in df_now_processed.human_readable_idx: | |
| match_idx = int(match_idx_human.split(": ")[0]) | |
| row = df_now_processed.loc[match_idx] | |
| winner = row.winner | |
| except Exception as e: | |
| print(e, file=open(f"{i}_err.txt", "w")) | |
| print("", file=open(f"{i}_err.txt", "a")) | |
| print(sel, file=open(f"{i}_err.txt", "a")) | |
| df_now_processed[ | |
| [ | |
| "depth", | |
| "round", | |
| "winner_nodes", | |
| "winner_resolved", | |
| "winner", | |
| "model_a", | |
| "model_b", | |
| ] | |
| ].to_json(f"{i}_err.jsonl", lines=True, orient="records") | |
| if __name__ == "__main__": | |
| Fire(main) | |