import jaxtyping import random import torch from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig import einops from tqdm import tqdm from datasets import load_dataset import os os.environ["MKL_NUM_THREADS"] = "72" os.environ["OMP_NUM_THREADS"] = "72" torch.set_num_threads(72) # 设置为物理核心数量 print(f"PyTorch threads: {torch.get_num_threads()}") print(f"MKL threads: {os.getenv('MKL_NUM_THREADS')}") print(f"OMP threads: {os.getenv('OMP_NUM_THREADS')}") torch.inference_mode() torch.set_default_device("cuda") MODEL_ID = "agentica-org/DeepCoder-1.5B-Preview" output_dir = MODEL_ID + "/hidden_states" # 检查并创建目录(如果不存在) os.makedirs(output_dir, exist_ok=True) print(f"Load Model {MODEL_ID} ... ") quant_config_4 = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, llm_int8_enable_fp32_cpu_offload=True, ) model = AutoModelForCausalLM.from_pretrained( MODEL_ID, device_map="auto", trust_remote_code=True, #quantization_config=quant_config_4, torch_dtype=torch.bfloat16 ) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) tokenizer.padding_side = 'left' # 设置填充方向为左 tokenizer.pad_token = tokenizer.eos_token # 将填充标记设置为结束标记 num_layers = len(model.model.layers) print(f"Model has {num_layers} layers.") print(f"Load data ... ") # 重新格式化文本,将每个文本包装成包含 "role" 和 "content" 的字典 def reformat_texts(texts): return [[{"role": "user", "content": text}] for text in texts] def get_harmful_instructions(): with open("datasets17/harmful.txt", "r", encoding="utf-8") as f: harmful = f.readlines() return reformat_texts(harmful) # 重新格式化训练和测试数据 def get_harmless_instructions(): with open("datasets17/harmless.txt", "r", encoding="utf-8") as f: harmless = f.readlines() return reformat_texts(harmless) # 重新格式化训练和测试数据 # 获取有害的训练和测试指令 harmful = get_harmful_instructions() # 获取无害的训练和测试指令 harmless = get_harmless_instructions() print(f"harmful len: {len(harmful)}") print(f"harmless len: {len(harmless)}") n_instructions = min(len(harmful), len(harmless)) print("Instruction count: " + str(n_instructions)) harmful_instructions = harmful[:n_instructions] harmless_instructions = harmless[:n_instructions] print("Tokenizer ... ") harmful_toks = [ tokenizer.apply_chat_template(insn, tokenize=True, add_generation_prompt=True, return_tensors="pt", return_dict=True) for insn in harmful_instructions] harmless_toks = [ tokenizer.apply_chat_template(insn, tokenize=True, add_generation_prompt=True, return_tensors="pt", return_dict=True) for insn in harmless_instructions] max_its = n_instructions * 2 bar = tqdm(total=max_its) import gc # 添加垃圾收集模块 def generate_and_process(toks, label, idx): bar.update(n=1) # 将 input_ids 和 attention_mask 移动到 GPU 上 tokens = toks['input_ids'].to("cuda:0") attention_mask = toks['attention_mask'].to("cuda:0") # 生成输出 output = model.generate(tokens, attention_mask=attention_mask, use_cache=False, max_new_tokens=1, do_sample=True, pad_token_id=tokenizer.pad_token_id, return_dict_in_generate=True, output_hidden_states=True) # 保存 output.hidden_states[0] 到硬盘 #print(f"output.hidden_states len = {len(output.hidden_states)}") hidden_states_0 = output.hidden_states[0] torch.save(hidden_states_0, f"{output_dir}/{label}_hidden_state_{idx}.pt") # 只删除不再需要的中间变量,保留模型 del toks, tokens, attention_mask, output, hidden_states_0 torch.cuda.empty_cache() # 释放GPU缓存 gc.collect() # 进行垃圾回收 print("Generate and process...") # 对有害和无害数据进行处理 for idx, toks in enumerate(harmful_toks): generate_and_process(toks, 'harmful', idx) for idx, toks in enumerate(harmless_toks): generate_and_process(toks, 'harmless', idx) bar.close() del model, tokenizer torch.cuda.empty_cache() # 释放GPU缓存 gc.collect() # 进行垃圾回收 # 处理拒绝向量的计算 final_refusal_dirs = [] # 遍历每一条指令的数据 for idx in tqdm(range(n_instructions), desc="Processing instruction"): harmful_hidden = torch.load(f"{output_dir}/harmful_hidden_state_{idx}.pt", map_location='cpu', weights_only=True) harmless_hidden = torch.load(f"{output_dir}/harmless_hidden_state_{idx}.pt", map_location='cpu', weights_only=True) # 针对每一层处理 for layer_idx in range(num_layers): # 获取该指令的每一层的隐藏状态 harmful_layer_hidden = harmful_hidden[layer_idx] harmless_layer_hidden = harmless_hidden[layer_idx] # 如果这是第一次处理该层,初始化该层的存储 if len(final_refusal_dirs) <= layer_idx: final_refusal_dirs.append([]) # 保存该层的有害和无害隐藏状态 final_refusal_dirs[layer_idx].append((harmful_layer_hidden, harmless_layer_hidden)) # 释放内存 del harmful_hidden, harmless_hidden torch.cuda.empty_cache() # 计算每一层的拒绝向量 final_refusal_directions = [] for layer_idx in tqdm(range(num_layers), desc="Calculating refusal direction for layer"): pos = -1 # 将有害和无害隐藏状态分开 harmful_hidden_list = [hidden[0][:, pos, :] for hidden in final_refusal_dirs[layer_idx]] harmless_hidden_list = [hidden[1][:, pos, :] for hidden in final_refusal_dirs[layer_idx]] # 计算有害和无害隐藏状态的均值 harmful_mean = torch.stack(harmful_hidden_list).mean(dim=0) harmless_mean = torch.stack(harmless_hidden_list).mean(dim=0) # 计算拒绝向量 refusal_dir = harmful_mean - harmless_mean refusal_dir = refusal_dir / refusal_dir.norm() # 归一化 # 保存拒绝向量 final_refusal_directions.append(refusal_dir) # 最终的拒绝向量存储在 final_refusal_directions 中 torch.save(final_refusal_directions, output_dir + "/final_refusal_dirs.pt") print("Refusal directions saved successfully.")