Huihui-InternVL3_5-1B-Instruct-abliterated / 00-test-vector-results-Qwen3-16B-A3B.py
huihui-ai's picture
Add files using upload-large-folder tool
26e1cba verified
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextStreamer, Qwen3MoeForCausalLM
import torch
import torch.nn as nn
import os
import signal
from typing import Optional, Tuple
import einops
import jaxtyping
cpu_count = os.cpu_count()
print(f"Number of CPU cores in the system: {cpu_count}")
half_cpu_count = cpu_count // 2
os.environ["MKL_NUM_THREADS"] = str(half_cpu_count)
os.environ["OMP_NUM_THREADS"] = str(half_cpu_count)
torch.set_num_threads(half_cpu_count)
print(f"PyTorch threads: {torch.get_num_threads()}")
print(f"MKL threads: {os.getenv('MKL_NUM_THREADS')}")
print(f"OMP threads: {os.getenv('OMP_NUM_THREADS')}")
# Load the model and tokenizer
MODEL_ID = "kalomaze/Qwen3-16B-A3B"
print(f"Load Model {MODEL_ID} ... ")
quant_config_4 = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True,
llm_int8_enable_fp32_cpu_offload=True,
)
model = Qwen3MoeForCausalLM.from_pretrained(
MODEL_ID,
device_map="cpu",
trust_remote_code=True,
#quantization_config=quant_config_4,
torch_dtype=torch.bfloat16
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
messages = []
enable_thinking = True
skip_prompt=True
skip_special_tokens=True
def direction_ablation_hook(activation: jaxtyping.Float[torch.Tensor, "... d_act"],
direction: jaxtyping.Float[torch.Tensor, "d_act"]):
proj = einops.einsum(activation, direction.view(-1, 1), '... d_act, d_act single -> ... single') * direction
return activation - proj
class AblationDecoderLayer(nn.Module):
def __init__(self, original_layer, refusal_dir):
super(AblationDecoderLayer, self).__init__()
self.original_layer = original_layer
self.refusal_dir = refusal_dir
def forward(self, *args, **kwargs):
hidden_states = args[0]
ablated = direction_ablation_hook(hidden_states, self.refusal_dir.to(hidden_states.device)).to(hidden_states.device)
args = (ablated,) + args[1:]
return self.original_layer.forward(*args, **kwargs)
class CustomTextStreamer(TextStreamer):
def __init__(self, tokenizer, skip_prompt=True, skip_special_tokens=True):
super().__init__(tokenizer, skip_prompt=skip_prompt, skip_special_tokens=skip_special_tokens)
self.generated_text = ""
self.stop_flag = False
def on_finalized_text(self, text: str, stream_end: bool = False):
self.generated_text += text
print(text, end="", flush=True)
if self.stop_flag:
raise StopIteration
def stop_generation(self):
self.stop_flag = True
def generate_stream(model, tokenizer, messages, enable_thinking, skip_prompt, skip_special_tokens, max_new_tokens):
input_ids = tokenizer.apply_chat_template(
messages,
tokenize=True,
enable_thinking = enable_thinking,
add_generation_prompt=True,
return_tensors="pt"
)
attention_mask = torch.ones_like(input_ids, dtype=torch.long)
tokens = input_ids.to(model.device)
attention_mask = attention_mask.to(model.device)
streamer = CustomTextStreamer(tokenizer, skip_prompt=skip_prompt, skip_special_tokens=skip_special_tokens)
def signal_handler(sig, frame):
streamer.stop_generation()
print("\n[Generation stopped by user with Ctrl+C]")
signal.signal(signal.SIGINT, signal_handler)
print("Response: ", end="", flush=True)
try:
generated_ids = model.generate(
tokens,
attention_mask=attention_mask,
use_cache=False,
max_new_tokens=max_new_tokens,
do_sample=True,
pad_token_id=tokenizer.pad_token_id,
streamer=streamer
)
del generated_ids
except StopIteration:
print("\n[Stopped by user]")
del input_ids, attention_mask
torch.cuda.empty_cache()
signal.signal(signal.SIGINT, signal.SIG_DFL)
return streamer.generated_text, streamer.stop_flag
final_refusal_dirs= torch.load(MODEL_ID + "/hidden_states/final_refusal_dirs.pt", map_location='cpu', weights_only=True)
candidate_layer = 20
refusal_dir = final_refusal_dirs[candidate_layer]
layer = model.model.layers[20]
for name, param in layer.named_parameters():
print(f"layer0 {name} ")
original_params = {name: param.clone() for name, param in layer.named_parameters()}
for idx in range(len(model.model.layers)):
model.model.layers[idx] = AblationDecoderLayer(model.model.layers[idx], refusal_dir)
while True:
user_input = input("User: ").strip()
if user_input.lower() == "/exit":
print("Exiting chat.")
break
if user_input.lower() == "/clear":
messages = []
print("Chat history cleared. Starting a new conversation.")
continue
if user_input.lower() == "/no_think":
if enable_thinking:
enable_thinking = False
print("Thinking = False.")
else:
enable_thinking = True
print("Thinking = True.")
continue
if user_input.lower() == "/skip_prompt":
if skip_prompt:
skip_prompt = False
print("skip_prompt = False.")
else:
skip_prompt = True
print("skip_prompt = True.")
continue
if user_input.lower() == "/skip_special_tokens":
if skip_special_tokens:
skip_special_tokens = False
print("skip_special_tokens = False.")
else:
skip_special_tokens = True
print("skip_special_tokens = True.")
continue
if not user_input:
print("Input cannot be empty. Please enter something.")
continue
messages.append({"role": "user", "content": user_input})
response, stop_flag = generate_stream(model, tokenizer, messages, enable_thinking, skip_prompt, skip_special_tokens, 2)
print("", flush=True)
messages.append({"role": "assistant", "content": response})
layer2 = model.model.layers[20]
for name, param in layer2.named_parameters():
print(f"layer1 {name} ")
layer2 = layer2.original_layer
for name, param in layer2.named_parameters():
print(f"layer2 {name} ")
for name, param in layer2.named_parameters():
if not torch.equal(original_params[name], param):
print(f"参数 {name} 被修改!")
if stop_flag:
continue