Spaces:
Build error
Build error
| import os | |
| import re | |
| import pandas as pd | |
| from tqdm import tqdm | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| from matplotlib import rcParams | |
| from matplotlib.ticker import MultipleLocator | |
| from datasets import load_dataset | |
| import numpy as np | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| precision_score, | |
| recall_score, | |
| f1_score, | |
| confusion_matrix, | |
| ) | |
| from llm_toolkit.llm_utils import invoke_langchain | |
| print(f"loading {__file__}") | |
| P1 = """你是一个逻辑游戏的主持人。游戏规则如下: | |
| 1. 参与者会得到一个谜题。 | |
| 2. 参与者可以通过提问来获取线索,尝试解开谜题。 | |
| 3. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。 | |
| 4. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
| 5. 参与者需要根据回答来推理,并最终找出谜题的正确答案。 | |
| 请严格按照这些规则回答参与者提出的问题。 | |
| 谜题: {} | |
| 实际情况: {} | |
| 参与者提出的问题: {} | |
| """ | |
| P1_en = """You are the host of a logic game. The rules of the game are as follows: | |
| 1. Participants will receive a puzzle. | |
| 2. Participants can ask questions to obtain clues and try to solve the puzzle. | |
| 3. For each question, the host will answer with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. | |
| 4. The answer cannot include any additional information, nor can any word in the options be omitted. For example, “No” cannot be shortened to “N”. | |
| 5. Participants need to infer and ultimately find the correct answer to the puzzle based on the responses. | |
| Please strictly adhere to these rules when answering participants’ questions. | |
| Puzzle: {} | |
| Actual situation: {} | |
| Question from participants: {}""" | |
| P2 = """你是一个情景猜谜游戏的主持人。游戏规则如下: | |
| 1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。 | |
| 2. 主持人知道谜底,谜底是谜面的答案。 | |
| 3. 参与者可以询问任何封闭式问题来找寻事件的真相。 | |
| 4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下: | |
| - 若谜面和谜底能找到问题的答案,回答:是或者不是 | |
| - 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要 | |
| - 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误 | |
| - 若参与者提问基本还原了谜底真相,回答:回答正确 | |
| 5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
| 请严格按照这些规则回答参与者提出的问题。 | |
| 谜面: {} | |
| 谜底: {} | |
| 参与者提出的问题: {} | |
| 回答: | |
| """ | |
| P2_en = """You are the host of a situational guessing game. The rules of the game are as follows: | |
| 1. Participants will receive a riddle that describes a simple yet difficult to understand event. | |
| 2. The host knows the truth, which is the solution to the riddle. | |
| 3. Participants can ask any closed-ended questions to uncover the truth of the event. | |
| 4. For each question, the host will respond with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. The criteria for each response are as follows: | |
| - If the riddle and answer can provide an answer to the question, respond with: Yes or No | |
| - If the riddle and answer cannot directly or indirectly infer an answer to the question, respond with: Unimportant | |
| - If the participant's question is not a closed-ended question or is difficult to understand, respond with: Incorrect questioning | |
| - If the participant's question essentially reveals the truth of the answer, respond with: Correct answer | |
| 5. The response must not include any additional information, nor should any word be omitted from the options. For example, "No" cannot be abbreviated to "N". | |
| Please strictly follow these rules when answering the participant's questions. | |
| Riddle: {} | |
| Truth: {} | |
| Participant's question: {} | |
| """ | |
| system_prompt = "You are an expert in logical reasoning." | |
| P2_few_shot = """你是一个情景猜谜游戏的主持人。游戏规则如下: | |
| 1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。 | |
| 2. 主持人知道谜底,谜底是谜面的答案。 | |
| 3. 参与者可以询问任何封闭式问题来找寻事件的真相。 | |
| 4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下: | |
| - 若谜面和谜底能找到问题的答案,回答:是或者不是 | |
| - 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要 | |
| - 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误 | |
| - 若参与者提问基本还原了谜底真相,回答:回答正确 | |
| 5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
| 请严格按照这些规则回答参与者提出的问题。 | |
| 示例输入和输出: | |
| {examples} | |
| 谜面: {} | |
| 谜底: {} | |
| 参与者提出的问题: {} | |
| 回答: | |
| """ | |
| def get_prompt_template(using_p1=True, chinese_prompt=True): | |
| if using_p1: | |
| return P1 if chinese_prompt else P1_en | |
| else: | |
| return P2 if chinese_prompt else P2_en | |
| def get_few_shot_prompt_template(num_shots, train_dataset, debug=False): | |
| if num_shots == 0: | |
| return get_prompt_template(using_p1=False, chinese_prompt=True) | |
| labels = train_dataset["label"].unique() | |
| if debug: | |
| print("num_shots:", num_shots) | |
| print("labels:", labels) | |
| examples = "" | |
| index = 0 | |
| while num_shots > 0: | |
| for label in labels: | |
| while train_dataset["label"][index] != label: | |
| index += 1 | |
| row = train_dataset.iloc[index] | |
| examples += f"""谜面: {row["puzzle"]} | |
| 谜底: {row["truth"]} | |
| 参与者提出的问题: {row["text"]} | |
| 回答: {row["label"]} | |
| """ | |
| num_shots -= 1 | |
| if num_shots == 0: | |
| break | |
| prompt = P2_few_shot.replace("{examples}", examples) | |
| if debug: | |
| print("P2_few_shot:", prompt) | |
| return prompt | |
| def extract_answer(text, debug=False): | |
| if text and isinstance(text, str): | |
| # Remove the begin and end tokens | |
| text = re.sub( | |
| r".*?(assistant|\[/INST\]).+?\b", | |
| "", | |
| text, | |
| flags=re.DOTALL | re.MULTILINE, | |
| ) | |
| if debug: | |
| print("--------\nstep 1:", text) | |
| text = re.sub(r"<.+?>.*", "", text, flags=re.DOTALL | re.MULTILINE) | |
| if debug: | |
| print("--------\nstep 2:", text) | |
| text = re.sub( | |
| r".*?end_header_id\|>\n\n", "", text, flags=re.DOTALL | re.MULTILINE | |
| ) | |
| if debug: | |
| print("--------\nstep 3:", text) | |
| text = text.split(".")[0].strip() | |
| text = text.split("\n")[0].strip() | |
| text = text.split("。")[0].strip() | |
| text = text.replace("回答: ", "").strip() | |
| if debug: | |
| print("--------\nstep 4:", text) | |
| text = re.sub( | |
| r"^Response:.+?\b", | |
| "", | |
| text, | |
| flags=re.DOTALL | re.MULTILINE, | |
| ) | |
| if debug: | |
| print("--------\nstep 5:", text) | |
| return text.strip() | |
| return "" | |
| def calc_metrics(references, predictions, debug=False): | |
| assert len(references) == len( | |
| predictions | |
| ), f"lengths are difference: {len(references)} != {len(predictions)}" | |
| labels = np.unique(references) | |
| valid_classifications = [1 if p in labels else 0 for p in predictions] | |
| predictions = [extract_answer(text) for text in predictions] | |
| accuracy = accuracy_score(references, predictions) | |
| results = {"accuracy": accuracy} | |
| if debug: | |
| incorrect_ids = [i for i, p in enumerate(predictions) if p != references[i]] | |
| results["incorrect_ids"] = incorrect_ids | |
| precision = precision_score( | |
| references, predictions, average="weighted", labels=labels | |
| ) | |
| results["precision"] = float(precision) | |
| recall = recall_score(references, predictions, average="weighted", labels=labels) | |
| results["recall"] = float(recall) | |
| f1 = f1_score(references, predictions, average="weighted", labels=labels) | |
| results["f1"] = float(f1) | |
| results["ratio_valid_classifications"] = sum(valid_classifications) / len( | |
| valid_classifications | |
| ) | |
| return results | |
| def save_results(model_name, results_path, dataset, predictions, debug=False): | |
| if not os.path.exists(results_path): | |
| # Get the directory part of the file path | |
| dir_path = os.path.dirname(results_path) | |
| # Create all directories in the path (if they don't exist) | |
| os.makedirs(dir_path, exist_ok=True) | |
| if isinstance(dataset, pd.DataFrame): | |
| df = dataset | |
| else: | |
| df = dataset.to_pandas() | |
| df.drop( | |
| columns=["answer", "prompt", "train_text"], inplace=True, errors="ignore" | |
| ) | |
| else: | |
| df = pd.read_csv(results_path, on_bad_lines="warn") | |
| df[model_name] = predictions | |
| if debug: | |
| print(df.head(1)) | |
| df.to_csv(results_path, index=False) | |
| def load_logical_reasoning_dataset( | |
| data_path, | |
| tokenizer=None, | |
| using_p1=True, | |
| chinese_prompt=True, | |
| test_data=None, | |
| num_shots=0, | |
| ): | |
| postfix = "" if chinese_prompt else "_en" | |
| train_data_file = data_path + f"/train{postfix}.csv" | |
| test_data_file = data_path + f"/{test_data if test_data else 'dev'}{postfix}.csv" | |
| print("loading train/test data files") | |
| datasets = load_dataset( | |
| "csv", | |
| data_files={"train": train_data_file, "test": test_data_file}, | |
| ) | |
| if tokenizer: | |
| reasoning_prompt = ( | |
| get_prompt_template(using_p1, chinese_prompt) | |
| if num_shots == 0 | |
| else get_few_shot_prompt_template(num_shots, datasets["train"].to_pandas()) | |
| ) | |
| def formatting_prompts_func(examples): | |
| inputs = examples["text"] | |
| outputs = examples["label"] | |
| puzzles = examples["puzzle"] | |
| truths = examples["truth"] | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_prompt, | |
| }, | |
| None, | |
| ] | |
| model_name = os.getenv("MODEL_NAME") | |
| if "gemma" in model_name.lower(): | |
| messages = messages[1:] | |
| texts = [] | |
| prompts = [] | |
| for input, output, puzzle, truth in zip(inputs, outputs, puzzles, truths): | |
| prompt = reasoning_prompt.format(puzzle, truth, input) | |
| messages[-1] = {"role": "user", "content": prompt} | |
| prompt = tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| prompts.append(prompt) | |
| texts.append(prompt + output + tokenizer.eos_token if output else "") | |
| return {"train_text": texts, "prompt": prompts} | |
| datasets = datasets.map( | |
| formatting_prompts_func, | |
| batched=True, | |
| ) | |
| print(datasets) | |
| return datasets | |
| def get_metrics(df): | |
| metrics_df = pd.DataFrame(df.columns.T)[2:] | |
| metrics_df.rename(columns={0: "model"}, inplace=True) | |
| metrics_df["model"] = metrics_df["model"].apply(lambda x: x.split("/")[-1]) | |
| metrics_df.reset_index(inplace=True) | |
| metrics_df = metrics_df.drop(columns=["index"]) | |
| accuracy = [] | |
| meteor = [] | |
| bleu_1 = [] | |
| rouge_l = [] | |
| all_metrics = [] | |
| for col in df.columns[2:]: | |
| metrics = calc_metrics(df["label"], df[col], debug=True) | |
| print(f"{col}: {metrics}") | |
| accuracy.append(metrics["accuracy"]) | |
| all_metrics.append(metrics) | |
| metrics_df["accuracy"] = accuracy | |
| metrics_df["all_metrics"] = all_metrics | |
| return metrics_df | |
| def load_alpaca_data(data_path, using_p1=True, use_english_datasets=False): | |
| alpaca_data_path = ( | |
| "llama-factory/data/alpaca_mgtv_p1.json" | |
| if using_p1 | |
| else "llama-factory/data/alpaca_mgtv_p2.json" | |
| ) | |
| if use_english_datasets: | |
| alpaca_data_path = alpaca_data_path.replace(".json", "_en.json") | |
| if os.path.exists(alpaca_data_path): | |
| print("loading existing data from:", alpaca_data_path) | |
| data = pd.read_json(alpaca_data_path, orient="records", lines=False) | |
| return data | |
| print("loading new data from:", alpaca_data_path) | |
| chinese_prompt = not use_english_datasets | |
| datasets = load_logical_reasoning_dataset( | |
| data_path, using_p1=using_p1, chinese_prompt=chinese_prompt | |
| ) | |
| prompt_template = get_prompt_template(using_p1, chinese_prompt) | |
| df_train = datasets["train"].to_pandas() | |
| df_train["instruction"] = df_train.apply( | |
| lambda x: prompt_template.format(x["puzzle"], x["truth"], x["text"]), axis=1 | |
| ) | |
| df_alpaca = pd.DataFrame( | |
| {"instruction": [""] * len(df_train), "input": [""] * len(df_train)} | |
| ) | |
| df_alpaca["instruction"] = df_train["instruction"] | |
| df_alpaca["output"] = df_train["label"] | |
| df_alpaca.to_json(alpaca_data_path, orient="records", lines=False, indent=2) | |
| return df_alpaca | |
| def plot_value_counts(df, column_name, offset=0.1, title=None, preprocess_func=None): | |
| font_family = rcParams["font.family"] | |
| # Set the font to SimHei to support Chinese characters | |
| rcParams["font.family"] = "STHeiti" | |
| rcParams["axes.unicode_minus"] = ( | |
| False # This is to support the minus sign in Chinese. | |
| ) | |
| if preprocess_func: | |
| df["backup"] = df[column_name] | |
| df[column_name] = df[column_name].apply(preprocess_func) | |
| plt.figure(figsize=(8, 4)) | |
| df[column_name].value_counts().plot(kind="bar") | |
| # add values on top of bars | |
| for i, v in enumerate(df[column_name].value_counts()): | |
| plt.text(i, v + offset, str(v), ha="center") | |
| plt.xlabel(title or column_name) | |
| plt.show() | |
| rcParams["font.family"] = font_family | |
| if preprocess_func: | |
| plot_confusion_matrix(df["label"], df[column_name]) | |
| df[column_name] = df["backup"] | |
| df.drop(columns=["backup"], inplace=True) | |
| def calc_metrics_for_col(df, col): | |
| metrics = calc_metrics(df["label"], df[col], debug=True) | |
| return metrics["accuracy"], metrics["precision"], metrics["recall"], metrics["f1"] | |
| def get_metrics_df(df, variant="epoch"): | |
| perf_df = pd.DataFrame( | |
| columns=[variant, "model", "run", "accuracy", "precision", "recall", "f1"] | |
| ) | |
| for i, col in enumerate(df.columns[5:]): | |
| metrics = calc_metrics(df["label"], df[col], debug=False) | |
| new_model_metrics = { | |
| variant: i / 5 if variant == "epoch" else i + 1, | |
| "model": col if "/" not in col else col.split("/")[1].split("_torch")[0], | |
| "run": col, | |
| } | |
| if variant == "shots": | |
| parts = col.split("/shots-") | |
| new_model_metrics["shots"] = int(parts[1]) | |
| new_model_metrics["model"] = parts[0] | |
| new_model_metrics.update(metrics) | |
| # Convert the dictionary to a DataFrame and concatenate it with the existing DataFrame | |
| perf_df = pd.concat( | |
| [perf_df, pd.DataFrame([new_model_metrics])], ignore_index=True | |
| ) | |
| return perf_df | |
| def plot_metrics(perf_df, model_name, variant="epoch", offset=0.01): | |
| fig, ax = plt.subplots(1, 1, figsize=(8, 4)) | |
| perf_df = perf_df[perf_df["model"] == model_name] | |
| # Ensure the lengths of perf_df["epoch"], perf_df["accuracy"], and perf_df["f1"] are the same | |
| min_length = min( | |
| len(perf_df[variant]), len(perf_df["accuracy"]), len(perf_df["f1"]) | |
| ) | |
| perf_df = perf_df.iloc[:min_length] | |
| # Plot accuracy and f1 on the same chart with different markers | |
| ax.plot( | |
| perf_df[variant], perf_df["accuracy"], marker="o", label="Accuracy", color="r" | |
| ) | |
| ax.plot( | |
| perf_df[variant], perf_df["f1"], marker="s", label="F1 Score", color="b" | |
| ) # Square marker for F1 Score | |
| # Add values on top of points | |
| for i in range(min_length): | |
| print(f"{perf_df[variant].iloc[i]}: {perf_df['run'].iloc[i]}") | |
| ax.annotate( | |
| f"{perf_df['accuracy'].iloc[i]*100:.2f}%", | |
| (perf_df[variant].iloc[i], perf_df["accuracy"].iloc[i]), | |
| ha="center", | |
| va="bottom", # Move accuracy numbers below the points | |
| xytext=(0, -15), | |
| textcoords="offset points", | |
| fontsize=10, | |
| color="r", | |
| ) | |
| ax.annotate( | |
| f"{perf_df['f1'].iloc[i]*100:.2f}%", | |
| (perf_df[variant].iloc[i], perf_df["f1"].iloc[i]), | |
| ha="center", | |
| va="top", # Move F1 score numbers above the points | |
| xytext=(0, 15), # Offset by 15 points vertically | |
| textcoords="offset points", | |
| fontsize=10, | |
| color="b", | |
| ) | |
| # Set y-axis limit | |
| ylimits = ax.get_ylim() | |
| ax.set_ylim(ylimits[0] - offset, ylimits[1] + offset) | |
| # Add title and labels | |
| ax.set_xlabel( | |
| "Epoch (0: base model, 0.2 - 2: fine-tuned models)" | |
| if variant == "epoch" | |
| else "Number of Shots" | |
| ) | |
| ax.set_ylabel("Accuracy and F1 Score") | |
| ax.xaxis.set_major_locator(MultipleLocator(0.2 if variant == "epoch" else 5)) | |
| ax.set_title(f"Performance Analysis Across Checkpoints for the {model_name} Model") | |
| # Rotate x labels | |
| plt.xticks(rotation=0) | |
| plt.grid(True) | |
| # plt.tight_layout() | |
| # Set legend at the right to avoid overlapping with lines | |
| plt.legend(loc="center left", bbox_to_anchor=(1.0, 0.5)) | |
| plt.show() | |
| def reasoning_with_openai( | |
| row, | |
| user_prompt, | |
| max_tokens=None, | |
| model="gpt-4o-mini", | |
| base_url=None, | |
| temperature=0, | |
| using_system_prompt=True, | |
| is_openai=True, | |
| ): | |
| return invoke_langchain( | |
| user_prompt.format(row["puzzle"], row["truth"], row["text"]), | |
| system_prompt=system_prompt if using_system_prompt else None, | |
| max_tokens=max_tokens, | |
| model=model, | |
| base_url=base_url, | |
| temperature=temperature, | |
| is_openai=is_openai, | |
| ) | |
| def eval_openai( | |
| eval_dataset, | |
| model="gpt-4o-mini", | |
| max_new_tokens=300, | |
| num_shots=0, | |
| train_dataset=None, | |
| ): | |
| user_prompt = ( | |
| get_prompt_template(using_p1=False, chinese_prompt=True) | |
| if num_shots == 0 | |
| else get_few_shot_prompt_template(num_shots, train_dataset) | |
| ) | |
| print("user_prompt:", user_prompt) | |
| total = len(eval_dataset) | |
| predictions = [] | |
| is_using_o1 = "o1" in model | |
| is_openai = "claude" not in model | |
| for i in tqdm(range(total)): | |
| output = reasoning_with_openai( | |
| eval_dataset.iloc[i], | |
| user_prompt, | |
| model=model, | |
| max_tokens=None if is_using_o1 else max_new_tokens, | |
| temperature=1 if is_using_o1 else 0, | |
| using_system_prompt=not is_using_o1, | |
| is_openai=is_openai, | |
| ) | |
| predictions.append(output) | |
| return predictions | |
| def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): | |
| font_family = rcParams["font.family"] | |
| # Set the font to SimHei to support Chinese characters | |
| rcParams["font.family"] = "STHeiti" | |
| rcParams["axes.unicode_minus"] = ( | |
| False # This is to support the minus sign in Chinese. | |
| ) | |
| labels = np.unique(y_true) | |
| y_pred = [extract_answer(text) for text in y_pred] | |
| cm = confusion_matrix(y_true, y_pred) | |
| cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis] | |
| fig, ax = plt.subplots(figsize=(8, 8)) | |
| sns.heatmap( | |
| cm, | |
| annot=True, | |
| fmt=".4f", | |
| cmap="Blues", | |
| xticklabels=labels, | |
| yticklabels=labels, | |
| ) | |
| ax.set_title(title) | |
| ax.set_xlabel("Predicted labels") | |
| ax.set_ylabel("True labels") | |
| plt.show() | |
| rcParams["font.family"] = font_family | |
| def majority_vote(r1, r2, r3): | |
| label = r2 | |
| if r1 == r3: | |
| label = r1 | |
| return label | |
| def load_openai_batch_data(data_path, num_shots=10, model="o1-mini", debug=True): | |
| openai_data_path = f"{data_path}/{model}.jsonl" | |
| if os.path.exists(openai_data_path): | |
| print("loading existing data from:", openai_data_path) | |
| data = pd.read_json(openai_data_path, orient="records", lines=True) | |
| return data | |
| datasets = load_logical_reasoning_dataset(data_path) | |
| df_train = datasets["train"].to_pandas() | |
| prompt = get_few_shot_prompt_template(num_shots, df_train, debug=debug) | |
| messages = [] | |
| df_test = datasets["test"].to_pandas() | |
| for i, row in df_test.iterrows(): | |
| content = prompt.format(row["puzzle"], row["truth"], row["text"]) | |
| messages.append( | |
| { | |
| "custom_id": f"request-{i + 1}", | |
| "method": "POST", | |
| "url": "/v1/chat/completions", | |
| "body": { | |
| "model": model, | |
| "messages": [ | |
| {"role": "user", "content": content}, | |
| ], | |
| }, | |
| } | |
| ) | |
| df_openai = pd.DataFrame(messages) | |
| df_openai.to_json(openai_data_path, orient="records", lines=True) | |
| return df_openai | |