jina-embeddings-v4 / custom_lora_module.py
jupyterjazz's picture
feat: implement stateless adapter switching [wip]
85f64e2
raw
history blame
14 kB
from __future__ import annotations
import math
import warnings
from typing import Any, Optional, Union
import torch
import torch.nn as nn
import torch.nn.functional as F
from accelerate.utils.imports import is_xpu_available
from torch import svd_lowrank
from transformers.pytorch_utils import Conv1D
from peft.tuners.tuners_utils import BaseTunerLayer, check_adapters_to_merge
from peft.utils.integrations import (
dequantize_module_weight,
gather_params_ctx,
get_bnb_param_type,
skip_init_on_device,
)
from peft.utils.other import transpose
from peft.tuners.lora import LoraLayer
class Linear(nn.Module, LoraLayer):
# Lora implemented in a dense layer
def __init__(
self,
base_layer,
adapter_name: str,
r: int = 0,
lora_alpha: int = 1,
lora_dropout: float = 0.0,
fan_in_fan_out: bool = False, # Set this to True if the layer to replace stores weight like (fan_in, fan_out)
is_target_conv_1d_layer: bool = False,
init_lora_weights: Union[bool, str] = True,
use_rslora: bool = False,
use_dora: bool = False,
lora_bias: bool = False,
**kwargs,
) -> None:
super().__init__()
LoraLayer.__init__(self, base_layer, **kwargs)
self.fan_in_fan_out = fan_in_fan_out
self._active_adapter = adapter_name
self.update_layer(
adapter_name,
r,
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
init_lora_weights=init_lora_weights,
use_rslora=use_rslora,
use_dora=use_dora,
lora_bias=lora_bias,
)
self.is_target_conv_1d_layer = is_target_conv_1d_layer
def merge(self, safe_merge: bool = False, adapter_names: Optional[list[str]] = None) -> None:
"""
Merge the active adapter weights into the base weights
Args:
safe_merge (`bool`, *optional*):
If True, the merge operation will be performed in a copy of the original weights and check for NaNs
before merging the weights. This is useful if you want to check if the merge operation will produce
NaNs. Defaults to `False`.
adapter_names (`list[str]`, *optional*):
The list of adapter names that should be merged. If None, all active adapters will be merged. Defaults
to `None`.
"""
adapter_names = check_adapters_to_merge(self, adapter_names)
if not adapter_names:
# no adapter to merge
return
for active_adapter in adapter_names:
if active_adapter in self.lora_A.keys():
base_layer = self.get_base_layer()
if safe_merge:
# Note that safe_merge will be slower than the normal merge
# because of the copy operation.
orig_weights = base_layer.weight.data.clone()
delta_weight = self.get_delta_weight(active_adapter)
if not self.use_dora[active_adapter]:
orig_weights += delta_weight
else:
# handle dora
# since delta_weight already includes scaling, set it to 1 here
weight_norm = (
self.lora_magnitude_vector[active_adapter]
.get_weight_norm(orig_weights, transpose(delta_weight, self.fan_in_fan_out), scaling=1)
.detach()
)
# We need to cache weight_norm because it has to be based on the original weights. We
# cannot calculate it on the fly based on the merged weights when unmerging because its a
# different value
self._cache_store(f"{active_adapter}-weight_norm", weight_norm)
dora_factor = self.lora_magnitude_vector[active_adapter].weight / weight_norm
dora_factor = transpose(dora_factor.view(-1, 1), self.fan_in_fan_out)
orig_weights = dora_factor * (orig_weights + delta_weight)
if not torch.isfinite(orig_weights).all():
raise ValueError(
f"NaNs detected in the merged weights. The adapter {active_adapter} seems to be broken"
)
base_layer.weight.data = orig_weights
if self.lora_bias[active_adapter]:
new_bias = base_layer.bias + self.lora_B[active_adapter].bias
if not torch.isfinite(new_bias).all():
raise ValueError(
f"NaNs detected in the merged weights. The adapter {active_adapter} seems to be broken"
)
base_layer.bias.data = new_bias
else:
delta_weight = self.get_delta_weight(active_adapter)
if not self.use_dora[active_adapter]:
base_layer.weight.data += delta_weight
else:
# handle dora
# since delta_weight already includes scaling, set it to 1 here
weight_norm = (
self.lora_magnitude_vector[active_adapter]
.get_weight_norm(
base_layer.weight, transpose(delta_weight, self.fan_in_fan_out), scaling=1
)
.detach()
)
# We need to cache weight_norm because it has to be based on the original weights. We
# cannot calculate it on the fly based on the merged weights when unmerging because its a
# different value
self._cache_store(f"{active_adapter}-weight_norm", weight_norm)
dora_factor = self.lora_magnitude_vector[active_adapter].weight / weight_norm
dora_factor = transpose(dora_factor.view(-1, 1), self.fan_in_fan_out)
new_weight = dora_factor * (base_layer.weight.data + delta_weight)
base_layer.weight.data = new_weight
if self.lora_bias[active_adapter]:
base_layer.bias.data += self.lora_B[active_adapter].bias
self.merged_adapters.append(active_adapter)
def unmerge(self) -> None:
"""
This method unmerges all merged adapter layers from the base weights.
"""
if not self.merged:
warnings.warn("Already unmerged. Nothing to do.")
return
while len(self.merged_adapters) > 0:
active_adapter = self.merged_adapters.pop()
if active_adapter in self.lora_A.keys():
weight = self.get_base_layer().weight
delta_weight = self.get_delta_weight(active_adapter)
if not self.use_dora[active_adapter]:
weight.data -= delta_weight
else:
weight_norm = self._cache_pop(f"{active_adapter}-weight_norm")
dora_factor = self.lora_magnitude_vector[active_adapter].weight / weight_norm
weight_orig = weight.data / dora_factor.view(-1, 1) - delta_weight
weight.data = weight_orig
if self.lora_bias[active_adapter]:
self.get_base_layer().bias.data -= self.lora_B[active_adapter].bias
def get_delta_weight(self, adapter) -> torch.Tensor:
"""
Compute the delta weight for the given adapter.
Args:
adapter (str):
The name of the adapter for which the delta weight should be computed.
"""
device = self.lora_B[adapter].weight.device
dtype = self.lora_B[adapter].weight.dtype
# In case users wants to merge the adapter weights that are in
# (b)float16 while being on CPU, we need to cast the weights to float32, perform the merge and then cast back to
# (b)float16 because some CPUs have slow bf16/fp16 matmuls.
cast_to_fp32 = device.type == "cpu" and (dtype == torch.float16 or dtype == torch.bfloat16)
weight_A = self.lora_A[adapter].weight
weight_B = self.lora_B[adapter].weight
if cast_to_fp32:
weight_A = weight_A.float()
weight_B = weight_B.float()
output_tensor = transpose(weight_B @ weight_A, self.fan_in_fan_out) * self.scaling[adapter]
if cast_to_fp32:
output_tensor = output_tensor.to(dtype=dtype)
# cast back the weights
self.lora_A[adapter].weight.data = weight_A.to(dtype)
self.lora_B[adapter].weight.data = weight_B.to(dtype)
return output_tensor
def forward(self, x: torch.Tensor, *args: Any, **kwargs: Any) -> torch.Tensor:
self._check_forward_args(x, *args, **kwargs)
adapter_names = kwargs.pop("adapter_names", None)
if self.disable_adapters:
if self.merged:
self.unmerge()
result = self.base_layer(x, *args, **kwargs)
elif adapter_names is not None:
result = self._mixed_batch_forward(x, *args, adapter_names=adapter_names, **kwargs)
elif self.merged:
result = self.base_layer(x, *args, **kwargs)
else:
result = self.base_layer(x, *args, **kwargs)
torch_result_dtype = result.dtype
lora_A_keys = self.lora_A.keys()
for active_adapter in self.active_adapters:
if active_adapter not in lora_A_keys:
continue
lora_A = self.lora_A[active_adapter]['default']
lora_B = self.lora_B[active_adapter]['default']
dropout = self.lora_dropout[active_adapter]
scaling = self.scaling[active_adapter]
x = self._cast_input_dtype(x, lora_A.weight.dtype)
if not self.use_dora[active_adapter]:
result = result + lora_B(lora_A(dropout(x))) * scaling
else:
if isinstance(dropout, nn.Identity) or not self.training:
base_result = result
else:
x = dropout(x)
base_result = None
result = result + self.lora_magnitude_vector[active_adapter](
x,
lora_A=lora_A,
lora_B=lora_B,
scaling=scaling,
base_layer=self.get_base_layer(),
base_result=base_result,
)
result = result.to(torch_result_dtype)
return result
def __repr__(self) -> str:
rep = super().__repr__()
return "lora." + rep
def update_layer(
self,
adapter_name,
r,
lora_alpha,
lora_dropout,
init_lora_weights,
use_rslora,
use_dora: bool = False,
lora_bias: bool = False,
):
# This code works for linear layers, override for other layer types
if r <= 0:
raise ValueError(f"`r` should be a positive integer value but the value passed is {r}")
self.r[adapter_name] = r
self.lora_alpha[adapter_name] = lora_alpha
if lora_dropout > 0.0:
lora_dropout_layer = nn.Dropout(p=lora_dropout)
else:
lora_dropout_layer = nn.Identity()
self.lora_dropout.update(nn.ModuleDict({adapter_name: lora_dropout_layer}))
# Actual trainable parameters
self.lora_A[adapter_name] = nn.ModuleDict({
"default": nn.Linear(self.in_features, r, bias=False),
"second_adapter": nn.Linear(self.in_features, r, bias=False)
})
self.lora_B[adapter_name] = nn.ModuleDict({
"default": nn.Linear(r, self.out_features, bias=lora_bias),
"second_adapter": nn.Linear(r, self.out_features, bias=lora_bias)
})
self.lora_bias[adapter_name] = lora_bias
if use_rslora:
self.scaling[adapter_name] = lora_alpha / math.sqrt(r)
else:
self.scaling[adapter_name] = lora_alpha / r
self.reset_lora_parameters(adapter_name, init_lora_weights)
self._move_adapter_to_device_of_base_layer(adapter_name)
self.use_dora[adapter_name] = False
self.set_adapter(self.active_adapters)
def reset_lora_parameters(self, adapter_name, init_lora_weights):
if init_lora_weights is False:
return
if init_lora_weights is True:
# initialize A the same way as the default for nn.Linear and B to zero
# https://github.com/microsoft/LoRA/blob/a0a92e0f26c067cf94747bdbf1ce73793fa44d19/loralib/layers.py#L124
nn.init.kaiming_uniform_(self.lora_A[adapter_name]['default'].weight, a=math.sqrt(5))
nn.init.kaiming_uniform_(self.lora_A[adapter_name]['second_adapter'].weight, a=math.sqrt(5))
elif init_lora_weights.lower() == "gaussian":
nn.init.normal_(self.lora_A[adapter_name]['default'].weight, std=1 / self.r[adapter_name])
nn.init.normal_(self.lora_A[adapter_name]['second_adapter'].weight, std=1 / self.r[adapter_name])
else:
raise ValueError(f"Unknown initialization {init_lora_weights=}")
nn.init.zeros_(self.lora_B[adapter_name]['default'].weight)
nn.init.zeros_(self.lora_B[adapter_name]['second_adapter'].weight)
if self.lora_bias[adapter_name]:
nn.init.zeros_(self.lora_B[adapter_name]['default'].bias)
nn.init.zeros_(self.lora_B[adapter_name]['second_adapter'].bias)