Spaces:
Running
Running
| import argparse | |
| import json | |
| import os | |
| import random | |
| import re | |
| from collections import defaultdict | |
| def list_directories(path): | |
| # μ§μ λ κ²½λ‘μ μλ νλͺ©λ€μ 리μ€νΈλ‘ λ°μμ΄ | |
| items = os.listdir(path) | |
| # νλͺ©λ€ μ€μμ λλ ν 리(ν΄λ)λ§μ νν°λ§ | |
| directories = [item for item in items if os.path.isdir(os.path.join(path, item))] | |
| return directories | |
| def parse_by_regex(string): | |
| varco_template_w_src = r"μλλ μμ μ μ€λͺ νλ λͺ λ Ήμ΄μ μΆκ°μ λ§₯λ½μ μ 곡νλ μ λ ₯μ΄ μ§μ μ΄λ£¨λ μμ μ λλ€.\nμ£Όμ΄μ§ μ λ ₯μ λν΄ λͺ λ Ήμ΄λ₯Ό μ μ ν μννλ μλ΅μ μμ±νμΈμ.\n\n### μ λ ₯:\n(?P<source>.*?)\n\n### λͺ λ Ήμ΄:\n(?P<instruction>.*?)\n\n### μλ΅:\n" | |
| varco_template_wo_src = r"μλλ μμ μ μ€λͺ νλ λͺ λ Ήμ΄μ λλ€.\nλͺ λ Ήμ΄μ λ°λ₯Έ μμ²μ μ μ ν μλ£νλ μλ΅μ μμ±νμΈμ.\n\n### λͺ λ Ήμ΄:\n(?P<instruction>.*?)\n\n### μλ΅:\n" | |
| if re.compile(varco_template_w_src, flags=re.MULTILINE | re.DOTALL).match(string): | |
| match = re.compile(varco_template_w_src, flags=re.MULTILINE | re.DOTALL).match( | |
| string | |
| ) | |
| source = match.group("source") | |
| instruction = match.group("instruction") | |
| elif re.compile(varco_template_wo_src, flags=re.MULTILINE | re.DOTALL).match( | |
| string | |
| ): | |
| match = re.compile(varco_template_wo_src, flags=re.MULTILINE | re.DOTALL).match( | |
| string | |
| ) | |
| source = "" | |
| instruction = match.group("instruction") | |
| else: | |
| source = None | |
| instruction = None | |
| return source, instruction | |
| # path μ μλ result.json νμΌ μ½μ΄μ μ μ²λ¦¬λ instanceλ€λ‘ λ§λ λ€. | |
| def result_file_process(model, task, path): | |
| with open(path, encoding="utf8") as f: | |
| instances = json.loads(f.read()) | |
| processed_instances = [] | |
| for instance in instances: | |
| raw = instance.get("input", False) | |
| if raw: | |
| source = instance["source"] | |
| instruction = instance["instruction"] | |
| else: | |
| raw = instance.get("source", False) | |
| source, instruction = parse_by_regex(instance.get("source", False)) | |
| if source is None or instruction is None: | |
| print(f"PARSING ERROR IN MODEL {model} TASK {task} PATH {path} SRC {raw}") | |
| else: | |
| processed_instances.append( | |
| { | |
| "model_id": model, | |
| "task": task, | |
| "instruction": instruction.strip(), | |
| "source": source.strip(), | |
| "generated": instance["generated_result"], | |
| } | |
| ) | |
| return processed_instances | |
| # model results λλ ν 리μμ κ²°κ³Όκ° λ³ν μμ | |
| def transform_results_folder(input_path, output_path, model_name_pattern, num_instance): | |
| regex_pattern = re.compile(model_name_pattern) | |
| tasks = list_directories(input_path) | |
| models = list_directories(os.path.join(input_path, tasks[0])) | |
| models = [model for model in models if regex_pattern.match(model)] | |
| model_results = {} | |
| print(f"TASKS: {tasks}") | |
| print(f"MODELS: {models}") | |
| for task in tasks: | |
| models = [ | |
| model | |
| for model in list_directories(os.path.join(input_path, task)) | |
| if regex_pattern.match(model) | |
| ] | |
| for model in models: | |
| result_path = os.path.join(input_path, task, model, "result.json") | |
| model_name = model | |
| if task in model: | |
| model_name = model.split(f"-{task}-")[0] | |
| instances = result_file_process(model_name, task, result_path) | |
| if model_name in model_results.keys(): | |
| model_results[model_name] += instances | |
| else: | |
| model_results[model_name] = instances | |
| print(f"{task} results processing is over..") | |
| for k, v in model_results.items(): | |
| print(f"# of instances in {k} is {len(v)}") | |
| dataset_by_task = defaultdict(lambda: defaultdict(list)) | |
| for data in ( | |
| all_datasets := [obj for obj_list in model_results.values() for obj in obj_list] | |
| ): | |
| dataset_by_task[data["task"]][ | |
| f"{data['instruction']}\n\n{data['source']}" | |
| ].append(data) | |
| new_results = {model: [] for model in {data["model_id"] for data in all_datasets}} | |
| num_model = len(list(new_results.keys())) | |
| for task in dataset_by_task.keys(): | |
| candidates = [] | |
| for data in dataset_by_task[task].values(): | |
| if len(data) != num_model: | |
| continue | |
| candidates.append(data) | |
| random.shuffle(candidates) | |
| selected = candidates[:num_instance] | |
| for data_list in selected: | |
| for data in data_list: | |
| new_results[data["model_id"]].append(data) | |
| for model in new_results.keys(): | |
| path = os.path.join(output_path, f"{model}.jsonl") | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| with open(path, "w", encoding="utf8") as f_out: | |
| for instance in new_results[model]: | |
| json.dump(instance, f_out, ensure_ascii=False) | |
| f_out.write("\n") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "-i", "--input_path", type=str, help="path of generated result directory" | |
| ) | |
| parser.add_argument( | |
| "-o", "--output_path", type=str, help="path of processed result directory" | |
| ) | |
| parser.add_argument( | |
| "-m", | |
| "--model_name_pattern", | |
| type=str, | |
| help="model name's pattern for regex", | |
| default="", | |
| ) | |
| parser.add_argument( | |
| "-n", "--num_instance", type=int, help="number of instance to choice" | |
| ) | |
| args = parser.parse_args() | |
| transform_results_folder( | |
| args.input_path, args.output_path, args.model_name_pattern, args.num_instance | |
| ) | |