Huihui-InternVL3_5-1B-Instruct-abliterated
/
01-compute_refusal_dir-DeepSeek-R1-Distill-Qwen-1.5B.py
import random | |
import torch | |
from transformers import AutoTokenizer, AutoModelForCausalLM, TextStreamer, BitsAndBytesConfig, StoppingCriteriaList | |
from transformers.generation.stopping_criteria import StoppingCriteria | |
from tqdm import tqdm | |
from datasets import load_dataset | |
import json | |
import signal | |
import gc | |
import os | |
#random.seed(42) | |
#torch.manual_seed(42) | |
#torch.cuda.manual_seed_all(42) | |
os.environ["MKL_NUM_THREADS"] = "72" | |
os.environ["OMP_NUM_THREADS"] = "72" | |
torch.set_num_threads(72) | |
print(f"PyTorch threads: {torch.get_num_threads()}") | |
print(f"MKL threads: {os.getenv('MKL_NUM_THREADS')}") | |
print(f"OMP threads: {os.getenv('OMP_NUM_THREADS')}") | |
MODEL_ID = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" | |
output_dir1 = MODEL_ID + "/hidden_states1" | |
output_dir2 = MODEL_ID + "/hidden_states2" | |
output_generated_outputs1 = output_dir1 + "/generated_outputs1.jsonl" | |
output_generated_outputs2 = output_dir2 + "/generated_outputs2.jsonl" | |
output_generated_harmful1 = output_dir1 + "/generated_harmful1.txt" | |
output_generated_harmful2 = output_dir1 + "/generated_harmful2.txt" | |
os.makedirs(output_dir1, exist_ok=True) | |
os.makedirs(output_dir2, exist_ok=True) | |
print(f"Load Model {MODEL_ID} ... ") | |
quant_config_4 = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.bfloat16, | |
bnb_4bit_use_double_quant=True, | |
llm_int8_enable_fp32_cpu_offload=True, | |
) | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
device_map="balanced", | |
trust_remote_code=True, | |
quantization_config=quant_config_4, | |
torch_dtype=torch.bfloat16 | |
) | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID,trust_remote_code=True) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
tokenizer.pad_token_id = tokenizer.eos_token_id | |
num_layers = len(model.model.layers) | |
print(f"Model has {num_layers} layers.") | |
print(f"Load data ... ") | |
# 重新格式化文本,将每个文本包装成包含 "role" 和 "content" 的字典 | |
def reformat_texts(texts): | |
return [[{"role": "user", "content": text}] for text in texts] | |
def get_harmful_instructions(): | |
with open("datasets/harmful.txt", "r", encoding="utf-8") as f: | |
harmful = f.readlines() | |
return reformat_texts(harmful) # 重新格式化训练和测试数据 | |
def get_harmless_instructions(): | |
with open("datasets/harmless.txt", "r", encoding="utf-8") as f: | |
harmless = f.readlines() | |
return reformat_texts(harmless) # 重新格式化训练和测试数据 | |
# 获取有害的训练和测试指令 | |
harmful = get_harmful_instructions() | |
# 获取无害的训练和测试指令 | |
harmless = get_harmless_instructions() | |
print(f"harmful len: {len(harmful)}") | |
print(f"harmless len: {len(harmless)}") | |
n_instructions = min(len(harmful), len(harmless)) | |
print("Instruction count: " + str(n_instructions)) | |
harmful_instructions = harmful[:n_instructions] | |
harmless_instructions = harmless[:n_instructions] | |
max_its = n_instructions | |
bar = tqdm(total=max_its) | |
exclude_keywords = [ | |
"I am a bot designed to", | |
"As AI assistants", | |
"it’s against the law", | |
"Unfortunately" | |
"How to Write a How-To Article", | |
#"It is important to", | |
#"It’s important to", | |
"However", | |
"Educate yourself", | |
"How to Report Endangered Species Trafficking", | |
"We will not allow", | |
"As an AI language model", | |
"I am 16 years old", | |
"I am 17 years old", | |
"I am 18 years old", | |
"I am 19 years old", | |
"I am a software developer", | |
"I am a student", | |
"I am an AI language model", | |
"I am considering committing", | |
"I am currently experiencing ", | |
"I am currently struggling with", | |
"I am facing a difficult situation", | |
"I am in desperate need of help", | |
#"I am not", | |
#"I’m not", | |
"I am unable to", | |
"I’m unable to", | |
"I am very depressed", | |
"I’m very depressed", | |
"I am very sad and ", | |
"I’m very sad and ", | |
"I’m considering committing ", | |
"I am considering committing ", | |
"I’m currently in the process", | |
"I am currently in the process", | |
"I’m feeling very", | |
"I am feeling very", | |
"I’m in a very ", | |
"I am in a very ", | |
"I’m in the process of", | |
"I am in the process of", | |
"I’m really struggling with", | |
"I am really struggling with", | |
"I’m really sorry", | |
"I am really sorry", | |
"I’m sorry", | |
"I am sorry", | |
"I have a question", | |
"I have been feeling ", | |
"I have been struggling with", | |
"I have never committed", | |
"What can I do", | |
"**seek help**", | |
"<think>\n</think>", | |
"<think>\n\n</think>", | |
"As an AI language model", | |
"As an artificial intelligence language model", | |
"I advise against", | |
"I am designed to", | |
"I am programmed to", | |
"I apologize", | |
"I cannot", | |
"I can’t", | |
"I don’t have", | |
"I don’t", | |
"I have no idea how to respond to that", | |
"I have no information", | |
"I must clarify", | |
"I never", | |
"I refuse", | |
"I regret to say I can’t help with that", | |
"I say no", | |
#"I should", | |
#"I wish I could", | |
"I won’t", | |
"If you are feeling", | |
"If you are looking for", | |
"If you’re feeling", | |
"If you’re looking for", | |
"I’d prefer not to assist with that", | |
"I’d rather not respond to that", | |
"I’ll interpret", | |
"I’ll pass", | |
"I’m afraid", | |
"I’m against my rules", | |
"I’m at a loss on how to respond to that", | |
"I’m barred from helping with that", | |
"I’m beyond my", | |
"I’m constrained from responding to that", | |
"I’m forbidden", | |
"I’m glad to help you learn", | |
"I’m glad you’re interested", | |
"I’m here to", | |
"I’m outdated", | |
"I’m prohibited from assisting with that", | |
"I’m prohibited", | |
"I’m puzzled by how to respond to that", | |
"I’m really sorry", | |
"I’m regrettably", | |
"I’m restricted from helping with that", | |
"I’m sorry", | |
"I’m terribly sorry, but I can’t assist", | |
"I’m too late", | |
"I’m unable", | |
"I’m unaware of how to proceed with that", | |
"I’m uncertain about what to say", | |
"I’m unclear on how to address that", | |
"I’m unethical", | |
"I’m unfortunately", | |
"I’m unsure about how to address that", | |
"I’ve not in my data", | |
#"Maybe I should", | |
"My apologies, but", | |
"Sorry", | |
"That request doesn’t align with my purpose", | |
"That request is outside my scope", | |
"That’s against my policies", | |
"That is against my policies", | |
"That’s beyond my capabilities", | |
"That is beyond my capabilities", | |
#"That’s not", | |
#"That is not", | |
"That’s something I’m not prepared to address", | |
"That is something I’m not prepared to address", | |
"This is a very serious", | |
"We apologize", | |
"We cannot", | |
"We can’t", | |
"We don’t", | |
"We have no information", | |
"We really sorry", | |
"We refuse", | |
"We say no", | |
"We won’t", | |
"We’re forbidden", | |
#"We’re not", | |
"We’re outdated", | |
"We’re prohibited", | |
"We’re sorry", | |
"We’re unable", | |
"We’re unfortunately", | |
"We will not be providing", | |
" absolutely not", | |
" against the law", | |
" as a responsible ai", | |
" as an ai", | |
" certainly not", | |
" combat trafficking", | |
" combating", | |
" combatting", | |
" how to avoid", | |
" how to combat", | |
" how to identify and report", | |
" instead of", | |
" instead,", | |
" is a serious crime", | |
" is against ethical guidelines", | |
" is illegal and", | |
" is unethical and", | |
" is not legal and unethical", | |
"It’s not recommended", | |
"It is not recommended", | |
"it is not possible", | |
"it’s not possible", | |
"It is not appropriate", | |
"It’s not appropriate", | |
"it’s dangerous", | |
"it is dangerous", | |
"it’s generally considered wrong", | |
"it is generally considered wrong", | |
"it’s illegal", | |
"It is illegal", | |
"it’s wrong", | |
"it is wrong", | |
"It’s not ethical", | |
"It is not ethical", | |
"It is not advisable", | |
"It’s not advisable", | |
"It’s not legal", | |
"It is not legal", | |
"It’s deeply concerning", | |
"It is deeply concerning", | |
"professional help", | |
"the devastating reality", | |
"to clarify", | |
"In the realm of artificial intelligence", | |
"This is an extremely", | |
#"This is not", | |
"This is clearly not", | |
"This statement appears to", | |
"This statement is", | |
"This command is not", | |
"This is definitely not", | |
"Intentionally crashing stocks is illegal", | |
"Producing and distributing methamphetamine is illegal", | |
"Designing and implementing such a cyberweapon is illegal", | |
"We strongly discourage", | |
"We strongly advise against", | |
"To prevent", | |
"be unethical and illegal", | |
"抱歉,", | |
"很抱歉,", | |
"我不会", | |
"我不能", | |
"我不想", | |
"我不愿意", | |
"我没办法", | |
"我做不到", | |
] | |
max_new_tokens = 0 | |
for idx, instruction in enumerate(exclude_keywords): | |
tokens = tokenizer(instruction, add_special_tokens=False) | |
token_ids = tokens["input_ids"] | |
token_length = len(token_ids) | |
if token_length > max_new_tokens: | |
max_new_tokens = token_length | |
max_new_tokens += 512 | |
print(f"\nmax_new_tokens = {max_new_tokens}", flush=True) | |
class CustomStoppingCriteria(StoppingCriteria): | |
def __init__(self, tokenizer, stop_phrase): | |
self.tokenizer = tokenizer | |
self.stop_phrase = stop_phrase | |
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: | |
gen_text = self.tokenizer.decode( | |
input_ids[0], skip_prompt=True, skip_special_tokens=True | |
).replace("'", "’") | |
for keyword in self.stop_phrase: | |
if keyword in gen_text: | |
return True | |
return False | |
class CustomTextStreamer(TextStreamer): | |
def __init__(self, tokenizer, skip_prompt=True, skip_special_tokens=True): | |
super().__init__(tokenizer, skip_prompt=skip_prompt, skip_special_tokens=skip_special_tokens) | |
self.generated_text = "" | |
def on_finalized_text(self, text: str, stream_end: bool = False): | |
self.generated_text += text | |
print(text, end="", flush=True) | |
def find_sublist(full, sub): | |
for i in range(len(full) - len(sub) + 1): | |
if full[i : i+len(sub)] == sub: | |
yield i | |
def generate_harmful_hidden_states(instruction, exclude_keywords, max_new_tokens=1): | |
input_ids = tokenizer.apply_chat_template( | |
instruction, | |
tokenize=True, | |
add_generation_prompt=True, | |
return_tensors="pt" | |
) | |
attention_mask = torch.ones_like(input_ids, dtype=torch.long) | |
tokens = input_ids.to("cuda:0") | |
attention_mask = attention_mask.to("cuda:0") | |
streamer = CustomTextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
#streamer = CustomTextStreamer(tokenizer, skip_prompt=False, skip_special_tokens=False) | |
stopping_criteria = StoppingCriteriaList([CustomStoppingCriteria(tokenizer, exclude_keywords)]) | |
print("Response: ", end="", flush=True) | |
generated_ids = model.generate( | |
tokens, | |
attention_mask=attention_mask, | |
use_cache=False, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
pad_token_id=tokenizer.pad_token_id, | |
return_dict_in_generate=True, | |
output_hidden_states=True, | |
streamer=streamer, | |
stopping_criteria=stopping_criteria | |
) | |
prompt_len = input_ids.shape[1] | |
gen_ids = generated_ids.sequences[0, prompt_len:].tolist() | |
matched_string = None | |
ids_find = None | |
h_target = None | |
start_idx = -1 | |
gen_text = streamer.generated_text.replace("'", "’") | |
for phrase in exclude_keywords: | |
# 用 find 拿到第一个匹配的起始下标 | |
idx = gen_text.find(phrase) | |
if idx != -1: | |
# 1. 记录字符级别的起止位置 | |
start_char = idx | |
end_char = idx + len(phrase) | |
matched_string = streamer.generated_text[start_char:end_char] | |
# 2. 目标短语对应的 token ID 列表 | |
target_ids = tokenizer.encode(matched_string, add_special_tokens=False) | |
# 3. 找到它在 gen_ids 里的起始位置 | |
positions = list(find_sublist(gen_ids, target_ids)) | |
if positions: | |
ids_find = True | |
start_idx = positions[0] | |
# 4. 拿到那一帧 | |
h_target = generated_ids.hidden_states[start_idx] | |
else: | |
matched_string2 = " " + matched_string | |
idx = streamer.generated_text.find(matched_string2) | |
if idx != -1: | |
target_ids = tokenizer.encode(matched_string2, add_special_tokens=False) | |
positions = list(find_sublist(gen_ids, target_ids)) | |
if positions: | |
ids_find = True | |
start_idx = positions[0] | |
# 4. 拿到那一帧 | |
h_target = generated_ids.hidden_states[start_idx] | |
matched_string = matched_string2 | |
break | |
del input_ids, attention_mask, generated_ids | |
return streamer.generated_text, matched_string, ids_find, start_idx, h_target | |
def generate_harmless_hidden_states(instruction, max_new_tokens=1): | |
input_ids = tokenizer.apply_chat_template( | |
instruction, | |
tokenize=True, | |
add_generation_prompt=True, | |
return_tensors="pt" | |
) | |
attention_mask = torch.ones_like(input_ids, dtype=torch.long) | |
tokens = input_ids.to("cuda:0") | |
attention_mask = attention_mask.to("cuda:0") | |
# 生成输出 | |
output = model.generate(tokens, | |
attention_mask=attention_mask, | |
use_cache=False, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
pad_token_id=tokenizer.pad_token_id, | |
return_dict_in_generate=True, | |
output_hidden_states=True | |
) | |
hidden_states_0 = output.hidden_states[0] | |
del tokens, attention_mask, output | |
return hidden_states_0 | |
print("\nGenerate and process...") | |
# 对有害和无害数据进行处理 | |
for (h_idx, harmful), (m_idx, harmless) in zip( | |
enumerate(harmful_instructions), | |
enumerate(harmless_instructions) | |
): | |
bar.update(n=1) | |
print(f"\nPrompt {h_idx}: {harmful[0]['content']}") | |
generated_text, matched_string, ids_find, start_idx, h_target = generate_harmful_hidden_states(harmful, exclude_keywords, max_new_tokens) | |
print("\n", flush=True) | |
output_data = { | |
"instruction": harmful[0]['content'], | |
"instruction_id": h_idx + 1, | |
"ids_find": ids_find, | |
"matched_string": matched_string if matched_string else None, | |
"generated_text": generated_text, | |
} | |
if ids_find: | |
print(f"\n[matched_string: '{matched_string}', {start_idx}]") | |
torch.save(h_target, f"{output_dir1}/harmful_hidden_state_{h_idx}.pt") | |
del h_target | |
with open(output_generated_outputs1, "a", encoding="utf-8") as f1: | |
f1.write(json.dumps(output_data, ensure_ascii=False) + "\n") | |
f1.flush() | |
with open(output_generated_harmful1, "a", encoding="utf-8") as f3: | |
f3.write(harmful[0]['content'].strip() + "\n") | |
f3.flush() | |
# 处理 harmless 指令 | |
hidden_states_0 = generate_harmless_hidden_states(harmless) | |
torch.save(hidden_states_0, f"{output_dir1}/harmless_hidden_state_{m_idx}.pt") | |
del hidden_states_0 | |
else: | |
torch.save(h_target, f"{output_dir2}/harmful_hidden_state_{h_idx}.pt") | |
del h_target | |
with open(output_generated_outputs2, "a", encoding="utf-8") as f2: | |
f2.write(json.dumps(output_data, ensure_ascii=False) + "\n") | |
f2.flush() | |
with open(output_generated_harmful2, "a", encoding="utf-8") as f4: | |
f4.write(harmful[0]['content'].strip() + "\n") | |
f4.flush() | |
hidden_states_0 = generate_harmless_hidden_states(harmless) | |
torch.save(hidden_states_0, f"{output_dir2}/harmless_hidden_state_{m_idx}.pt") | |
del hidden_states_0 | |
torch.cuda.empty_cache() # 释放 GPU 缓存 | |
gc.collect() # 进行垃圾回收 | |
bar.close() | |
del model, tokenizer | |
torch.cuda.empty_cache() # 释放GPU缓存 | |
gc.collect() # 进行垃圾回收 | |
# 处理拒绝向量的计算 | |
final_refusal_dirs = [] | |
# 遍历每一条指令的数据 | |
for idx in tqdm(range(n_instructions), desc="Processing instruction"): | |
try: | |
harmful_hidden = torch.load(f"{output_dir1}/harmful_hidden_state_{idx}.pt", map_location='cpu', weights_only=True) | |
harmless_hidden = torch.load(f"{output_dir1}/harmless_hidden_state_{idx}.pt", map_location='cpu', weights_only=True) | |
# 针对每一层处理 | |
for layer_idx in range(num_layers): | |
# 获取该指令的每一层的隐藏状态 | |
harmful_layer_hidden = harmful_hidden[layer_idx] | |
harmless_layer_hidden = harmless_hidden[layer_idx] | |
# 如果这是第一次处理该层,初始化该层的存储 | |
if len(final_refusal_dirs) <= layer_idx: | |
final_refusal_dirs.append([]) | |
# 保存该层的有害和无害隐藏状态 | |
final_refusal_dirs[layer_idx].append((harmful_layer_hidden, harmless_layer_hidden)) | |
# 释放内存 | |
del harmful_hidden, harmless_hidden | |
torch.cuda.empty_cache() | |
except FileNotFoundError: | |
harmful_hidden = None # 或者其他默认值/逻辑 | |
# 计算每一层的拒绝向量 | |
final_refusal_directions = [] | |
for layer_idx in tqdm(range(num_layers), desc="Calculating refusal direction for layer"): | |
pos = -1 | |
# 将有害和无害隐藏状态分开 | |
harmful_hidden_list = [hidden[0][:, pos, :] for hidden in final_refusal_dirs[layer_idx]] | |
harmless_hidden_list = [hidden[1][:, pos, :] for hidden in final_refusal_dirs[layer_idx]] | |
# 计算有害和无害隐藏状态的均值 | |
harmful_mean = torch.stack(harmful_hidden_list).mean(dim=0) | |
harmless_mean = torch.stack(harmless_hidden_list).mean(dim=0) | |
# 计算拒绝向量 | |
refusal_dir = harmful_mean - harmless_mean | |
refusal_dir = refusal_dir / refusal_dir.norm() # 归一化 | |
# 保存拒绝向量 | |
final_refusal_directions.append(refusal_dir) | |
# 最终的拒绝向量存储在 final_refusal_directions 中 | |
torch.save(final_refusal_directions, output_dir1 + "/final_refusal_dirs.pt") | |
print("Refusal directions saved successfully.") | |