import json import os import re def safe_float(value): """Convert a value to float safely. Returns None if conversion fails.""" try: return float(value) except ValueError: return None def calculate_task_metrics(task_info): """Calculate average accuracy, best prompt, and CPS for a task.""" accuracies = [prompt['value'] for prompt in task_info['prompts'] if prompt['value'] is not None] if not accuracies: return None task_info['average_accuracy'] = sum(accuracies) / len(accuracies) best_prompt_data = max(task_info['prompts'], key=lambda x: x['value']) task_info['best_prompt'] = best_prompt_data['value'] task_info['prompt_id'] = best_prompt_data['prompt'] # Calculate CPS avg_acc = task_info['average_accuracy'] best_acc = task_info['best_prompt'] task_info['CPS'] = (1 - (best_acc - avg_acc) / 100) * best_acc def extract_data_from_file(file_path): """Extract task and prompt data from the given file.""" with open(file_path, 'r') as file: lines = file.readlines() tasks_data = {} current_task = None for line in lines: line = line.strip() # Skip irrelevant lines if not line: continue if line.startswith("| Tasks"): continue if line.startswith("hf (pretrained="): # Estrai la parte dopo "pretrained=" start = line.find("pretrained=") + len("pretrained=") end = line.find(",", start) # Trova la virgola successiva # Estrai la stringa desiderata pretrained_model = line[start:end] # Estrarre num_fewshot num_fewshot_match = re.search(r"num_fewshot:\s*([\w\d]+)", line) num_fewshot = num_fewshot_match.group(1) if num_fewshot_match else None # Estrarre batch_size batch_size_match = re.search(r"batch_size:\s*(\d+)", line) batch_size = int(batch_size_match.group(1)) if batch_size_match else None continue columns = line.split('|') if len(columns) != 11: continue task_name = columns[1] metric = columns[5].strip() value = safe_float(columns[7]) stderr = safe_float(columns[9]) if metric == "acc_norm": continue # Identify task and prompts if task_name.startswith(" - "): task_name = task_name[3:].strip() current_task = task_name tasks_data.setdefault(current_task, {'prompts': [], 'average_accuracy': 0, 'best_prompt': None, 'prompt_id': None, 'CPS': None}) elif task_name.startswith(" - ") and current_task: prompt_name = task_name[4:].strip() prompt_data = {'prompt': prompt_name, 'metric': metric, 'value': value * 100, 'stderr': stderr} tasks_data[current_task]['prompts'].append(prompt_data) # Special handling for evalita NER if "evalita NER" in tasks_data: task_info = tasks_data["evalita NER"] weight_map = {"ADG prompt-1": 521, "ADG prompt-2": 521, "FIC prompt-1": 1517, "FIC prompt-2": 1517, "WN prompt-1": 2088, "WN prompt-2": 2088} weighted_values = {"prompt-1": 0, "prompt-2": 0} total_weights = sum(weight_map.values()) for prompt in task_info['prompts']: if prompt['prompt'] in weight_map: if "prompt-1" in prompt['prompt']: weighted_values["prompt-1"] += weight_map[prompt['prompt']] * prompt['value'] elif "prompt-2" in prompt['prompt']: weighted_values["prompt-2"] += weight_map[prompt['prompt']] * prompt['value'] task_info['prompts'] = [ {"prompt": "prompt-1", "metric": "acc", "value": weighted_values["prompt-1"] / total_weights, 'stderr': None}, {"prompt": "prompt-2", "metric": "acc", "value": weighted_values["prompt-2"] / total_weights, 'stderr': None}] # Calculate metrics for each task for task_info in tasks_data.values(): calculate_task_metrics(task_info) # Calculate average CPS tasks_with_cps = [task['CPS'] for task in tasks_data.values() if task['CPS'] is not None] average_CPS = sum(tasks_with_cps) / len(tasks_with_cps) if tasks_with_cps else 0 config = { "model_name": pretrained_model, "num_fewshot": num_fewshot, "batch_size": batch_size } return {'average_CPS': average_CPS, 'config': config, 'tasks': tasks_data} # Example usage #file_path = '../evalita_llm_results/models_output/slurm-7769.out' #json_output = extract_data_from_file(file_path) #print(json_output) # Directory da cui leggere i file .out directory_in_path = '../evalita_llm_models_output/' directory_out_results_path = '../evalita_llm_results/' directory_out_requests_path = '../evalita_llm_requests/' # Itera sui file nella directory for filename in os.listdir(directory_in_path): if filename.endswith('.out'): # Costruisci il percorso completo del file file_path = os.path.join(directory_in_path, filename) # Esegui la funzione extract_data_from_file json_output = extract_data_from_file(file_path) # Estrai model_org_name e model_name da model_name model_org_name, model_name = json_output['config']['model_name'].split('/') # Percorso del file JSON di configurazione in ../evalita_llm_requests2/ config_file_path = os.path.join(directory_out_requests_path, model_org_name, f"{model_name}.json") # Se il file esiste, caricalo e aggiorna il dizionario config if os.path.exists(config_file_path): with open(config_file_path, 'r', encoding='utf-8') as config_file: additional_config = json.load(config_file) # Aggiorna la configurazione con i nuovi dati json_output['config'].update(additional_config) # Crea il percorso della cartella per model_org_name org_folder_path = os.path.join(directory_out_results_path, model_org_name) os.makedirs(org_folder_path, exist_ok=True) # Crea la cartella se non esiste # Crea il percorso completo del file JSON file_suffix = f"{json_output['config']['num_fewshot']}" output_file_path = os.path.join(org_folder_path, f"{model_name}_{file_suffix}.json") # Salva il JSON in un file con ritorni a capo compatibili con Linux with open(output_file_path, 'w', newline="\n") as outfile: json.dump(json_output, outfile, indent=4) # Stampa il risultato print(f"File {filename} elaborato e salvato in {output_file_path}")