Huihui-InternVL3_5-1B-Instruct-abliterated / 01-compute_refusal_dir-DeepCoder-14B-Preview.py
huihui-ai's picture
Add files using upload-large-folder tool
26e1cba verified
import jaxtyping
import random
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import einops
from tqdm import tqdm
from datasets import load_dataset
import os
os.environ["MKL_NUM_THREADS"] = "72"
os.environ["OMP_NUM_THREADS"] = "72"
torch.set_num_threads(72) # 设置为物理核心数量
print(f"PyTorch threads: {torch.get_num_threads()}")
print(f"MKL threads: {os.getenv('MKL_NUM_THREADS')}")
print(f"OMP threads: {os.getenv('OMP_NUM_THREADS')}")
torch.inference_mode()
torch.set_default_device("cuda")
MODEL_ID = "agentica-org/DeepCoder-14B-Preview"
output_dir = MODEL_ID + "/hidden_states"
# 检查并创建目录(如果不存在)
os.makedirs(output_dir, exist_ok=True)
print(f"Load Model {MODEL_ID} ... ")
quant_config_4 = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True,
llm_int8_enable_fp32_cpu_offload=True,
)
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
device_map="auto",
trust_remote_code=True,
quantization_config=quant_config_4,
torch_dtype=torch.bfloat16
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
tokenizer.padding_side = 'left' # 设置填充方向为左
tokenizer.pad_token = tokenizer.eos_token # 将填充标记设置为结束标记
num_layers = len(model.model.layers)
print(f"Model has {num_layers} layers.")
print(f"Load data ... ")
# 重新格式化文本,将每个文本包装成包含 "role" 和 "content" 的字典
def reformat_texts(texts):
return [[{"role": "user", "content": text}] for text in texts]
def get_harmful_instructions():
with open("datasets17/harmful.txt", "r", encoding="utf-8") as f:
harmful = f.readlines()
return reformat_texts(harmful) # 重新格式化训练和测试数据
def get_harmless_instructions():
with open("datasets17/harmless.txt", "r", encoding="utf-8") as f:
harmless = f.readlines()
return reformat_texts(harmless) # 重新格式化训练和测试数据
# 获取有害的训练和测试指令
harmful = get_harmful_instructions()
# 获取无害的训练和测试指令
harmless = get_harmless_instructions()
print(f"harmful len: {len(harmful)}")
print(f"harmless len: {len(harmless)}")
n_instructions = min(len(harmful), len(harmless))
print("Instruction count: " + str(n_instructions))
harmful_instructions = harmful[:n_instructions]
harmless_instructions = harmless[:n_instructions]
print("Tokenizer ... ")
harmful_toks = [
tokenizer.apply_chat_template(insn, tokenize=True, add_generation_prompt=True,
return_tensors="pt", return_dict=True) for insn in harmful_instructions]
harmless_toks = [
tokenizer.apply_chat_template(insn, tokenize=True, add_generation_prompt=True,
return_tensors="pt", return_dict=True) for insn in harmless_instructions]
max_its = n_instructions * 2
bar = tqdm(total=max_its)
import gc # 添加垃圾收集模块
def generate_and_process(toks, label, idx):
bar.update(n=1)
# 将 input_ids 和 attention_mask 移动到 GPU 上
tokens = toks['input_ids'].to("cuda:0")
attention_mask = toks['attention_mask'].to("cuda:0")
# 生成输出
output = model.generate(tokens,
attention_mask=attention_mask,
use_cache=False,
max_new_tokens=1,
do_sample=True,
pad_token_id=tokenizer.pad_token_id,
return_dict_in_generate=True,
output_hidden_states=True)
# 保存 output.hidden_states[0] 到硬盘
#print(f"output.hidden_states len = {len(output.hidden_states)}")
hidden_states_0 = output.hidden_states[0]
torch.save(hidden_states_0, f"{output_dir}/{label}_hidden_state_{idx}.pt")
# 只删除不再需要的中间变量,保留模型
del toks, tokens, attention_mask, output, hidden_states_0
torch.cuda.empty_cache() # 释放GPU缓存
gc.collect() # 进行垃圾回收
print("Generate and process...")
# 对有害和无害数据进行处理
for idx, toks in enumerate(harmful_toks):
generate_and_process(toks, 'harmful', idx)
for idx, toks in enumerate(harmless_toks):
generate_and_process(toks, 'harmless', idx)
bar.close()
del model, tokenizer
torch.cuda.empty_cache() # 释放GPU缓存
gc.collect() # 进行垃圾回收
# 处理拒绝向量的计算
final_refusal_dirs = []
# 遍历每一条指令的数据
for idx in tqdm(range(n_instructions), desc="Processing instruction"):
harmful_hidden = torch.load(f"{output_dir}/harmful_hidden_state_{idx}.pt", map_location='cpu', weights_only=True)
harmless_hidden = torch.load(f"{output_dir}/harmless_hidden_state_{idx}.pt", map_location='cpu', weights_only=True)
# 针对每一层处理
for layer_idx in range(num_layers):
# 获取该指令的每一层的隐藏状态
harmful_layer_hidden = harmful_hidden[layer_idx]
harmless_layer_hidden = harmless_hidden[layer_idx]
# 如果这是第一次处理该层,初始化该层的存储
if len(final_refusal_dirs) <= layer_idx:
final_refusal_dirs.append([])
# 保存该层的有害和无害隐藏状态
final_refusal_dirs[layer_idx].append((harmful_layer_hidden, harmless_layer_hidden))
# 释放内存
del harmful_hidden, harmless_hidden
torch.cuda.empty_cache()
# 计算每一层的拒绝向量
final_refusal_directions = []
for layer_idx in tqdm(range(num_layers), desc="Calculating refusal direction for layer"):
pos = -1
# 将有害和无害隐藏状态分开
harmful_hidden_list = [hidden[0][:, pos, :] for hidden in final_refusal_dirs[layer_idx]]
harmless_hidden_list = [hidden[1][:, pos, :] for hidden in final_refusal_dirs[layer_idx]]
# 计算有害和无害隐藏状态的均值
harmful_mean = torch.stack(harmful_hidden_list).mean(dim=0)
harmless_mean = torch.stack(harmless_hidden_list).mean(dim=0)
# 计算拒绝向量
refusal_dir = harmful_mean - harmless_mean
refusal_dir = refusal_dir / refusal_dir.norm() # 归一化
# 保存拒绝向量
final_refusal_directions.append(refusal_dir)
# 最终的拒绝向量存储在 final_refusal_directions 中
torch.save(final_refusal_directions, output_dir + "/final_refusal_dirs.pt")
print("Refusal directions saved successfully.")