Ne-En-Trn / xtransformer.py
bishaltwr's picture
init
5b4b058
raw
history blame
15.5 kB
# import numpy as np # Unused import
import torch
import math
from torch import nn
import torch.nn.functional as F
from nepalitokenizers import SentencePiece
from torch.amp import autocast # Mixed precision
from torch.utils.checkpoint import checkpoint # Gradient checkpointing
# Device setup
def get_device():
return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# Efficient Scaled Dot-Product Attention
def scaled_dot_product(q, k, v, mask=None):
d_k = q.size()[-1]
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k) # Simplified attention computation
if mask is not None:
scores += mask
attention = F.softmax(scores, dim=-1)
values = torch.matmul(attention, v)
return values, attention
# Precompute Positional Encoding
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_sequence_length):
super().__init__()
self.max_sequence_length = max_sequence_length
self.d_model = d_model
self.pe = self._create_positional_encoding() # Precompute during initialization
def _create_positional_encoding(self):
position = torch.arange(self.max_sequence_length).unsqueeze(1)
div_term = torch.exp(torch.arange(0, self.d_model, 2).float() * (-math.log(10000.0) / self.d_model))
pe = torch.zeros(self.max_sequence_length, self.d_model)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return pe
def forward(self, x):
seq_length = x.size(1) # Handle variable sequence lengths
return self.pe[:seq_length, :].to(x.device)
# Efficient Sentence Embedding with Caching
class SentenceEmbedding(nn.Module):
def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
super().__init__()
self.vocab_size = len(language_to_index)
self.max_sequence_length = max_sequence_length
self.embedding = nn.Embedding(self.vocab_size, d_model)
self.language_to_index = language_to_index
self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
self.dropout = nn.Dropout(p=0.1)
self.START_TOKEN = START_TOKEN
self.END_TOKEN = END_TOKEN
self.PADDING_TOKEN = PADDING_TOKEN
self.tokenizer = SentencePiece()
class SentenceEmbedding(nn.Module):
def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
super().__init__()
self.vocab_size = len(language_to_index)
self.max_sequence_length = max_sequence_length
self.embedding = nn.Embedding(self.vocab_size, d_model)
self.language_to_index = language_to_index
self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
self.dropout = nn.Dropout(p=0.1)
self.START_TOKEN = START_TOKEN
self.END_TOKEN = END_TOKEN
self.PADDING_TOKEN = PADDING_TOKEN
self.tokenizer = SentencePiece()
def batch_tokenize(self, batch, start_token, end_token):
"""
Tokenizes a batch of sentences or processes pre-tokenized tensors.
Args:
batch: A list of sentences (str) or a tensor of token IDs.
start_token: Whether to add a start token.
end_token: Whether to add an end token.
Returns:
A tensor of token IDs with shape (batch_size, seq_len).
"""
# If input is already a tensor, return it directly
if isinstance(batch, torch.Tensor):
return batch.to(get_device())
# Process raw text inputs
token_ids = []
for sentence in batch:
if not isinstance(sentence, str):
sentence = str(sentence).strip()
if not sentence:
sentence = self.PADDING_TOKEN
try:
tokens = self.tokenizer.encode(sentence)
token_ids.append(tokens.ids)
except Exception:
print(f"Error tokenizing: {sentence}")
token_ids.append([self.language_to_index.get(self.PADDING_TOKEN, 0)])
# Add start and end tokens if required
if start_token:
token_ids = [[self.language_to_index.get(self.START_TOKEN, self.PADDING_TOKEN)] + ids for ids in token_ids]
if end_token:
token_ids = [ids + [self.language_to_index.get(self.END_TOKEN, self.PADDING_TOKEN)] for ids in token_ids]
# Truncate sequences to max_sequence_length
token_ids = [ids[:self.max_sequence_length] for ids in token_ids]
# Pad sequences to max_sequence_length
token_ids = torch.nn.utils.rnn.pad_sequence(
[torch.tensor(ids, dtype=torch.long) for ids in token_ids],
batch_first=True,
padding_value=self.language_to_index.get(self.PADDING_TOKEN, 0)
).to(get_device())
return token_ids
def forward(self, x, start_token, end_token):
"""
Forward pass for the SentenceEmbedding module.
Args:
x: Input batch (list of sentences or tensor of token IDs).
start_token: Whether to add a start token.
end_token: Whether to add an end token.
Returns:
Embedded and positional-encoded output tensor.
"""
# Tokenize input if it's raw text
if not isinstance(x, torch.Tensor):
x = self.batch_tokenize(x, start_token, end_token)
# Embed tokens and add positional encoding
x = self.embedding(x)
pos = self.position_encoder(x)
x = self.dropout(x + pos)
return x
def forward(self, x, start_token, end_token):
# If x is already a tensor, skip tokenization
if not isinstance(x, torch.Tensor):
x = self.batch_tokenize(x, start_token, end_token)
x = self.embedding(x)
pos = self.position_encoder(x)
x = self.dropout(x + pos)
return x
# Multi-Head Attention with Efficient Matrix Operations
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.qkv_layer = nn.Linear(d_model, 3 * d_model)
self.linear_layer = nn.Linear(d_model, d_model)
def forward(self, x, mask):
batch_size, seq_length, d_model = x.size()
qkv = self.qkv_layer(x)
qkv = qkv.view(batch_size, seq_length, self.num_heads, 3 * self.head_dim)
qkv = qkv.permute(0, 2, 1, 3) # (batch_size, num_heads, seq_length, 3 * head_dim)
q, k, v = qkv.chunk(3, dim=-1)
values, _ = scaled_dot_product(q, k, v, mask) # Ignore unused variable 'attention'
values = values.permute(0, 2, 1, 3).contiguous().view(batch_size, seq_length, d_model)
out = self.linear_layer(values)
return out
# Multi-Head Cross Attention
class MultiHeadCrossAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.kv_layer = nn.Linear(d_model, 2 * d_model)
self.q_layer = nn.Linear(d_model, d_model)
self.linear_layer = nn.Linear(d_model, d_model)
def forward(self, x, y, mask):
batch_size, x_seq_length, _ = x.size() # Encoder sequence length
batch_size, y_seq_length, _ = y.size() # Decoder sequence length
# Process encoder output (x) for Key/Value
kv = self.kv_layer(x)
kv = kv.view(batch_size, x_seq_length, self.num_heads, 2 * self.head_dim)
kv = kv.permute(0, 2, 1, 3) # [batch, heads, x_seq, 2*head_dim]
k, v = kv.chunk(2, dim=-1) # Each [batch, heads, x_seq, head_dim]
# Process decoder input (y) for Query
q = self.q_layer(y)
q = q.view(batch_size, y_seq_length, self.num_heads, self.head_dim)
q = q.permute(0, 2, 1, 3) # [batch, heads, y_seq, head_dim]
# Compute attention
values, _ = scaled_dot_product(q, k, v, mask)
# Reshape back to original dimensions
values = values.permute(0, 2, 1, 3).contiguous()
values = values.view(batch_size, y_seq_length, self.d_model)
return self.linear_layer(values)
# Layer Normalization
class LayerNormalization(nn.Module):
def __init__(self, parameters_shape, eps=1e-5):
super().__init__()
self.layer_norm = nn.LayerNorm(parameters_shape, eps=eps)
def forward(self, inputs):
return self.layer_norm(inputs)
# Position-wise Feed-Forward Network
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, hidden, drop_prob=0.1):
super().__init__()
self.linear1 = nn.Linear(d_model, hidden)
self.linear2 = nn.Linear(hidden, d_model)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(p=drop_prob)
def forward(self, x):
x = self.linear1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.linear2(x)
return x
# Encoder Layer with Gradient Checkpointing
class EncoderLayer(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
super().__init__()
self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
self.norm1 = LayerNormalization(parameters_shape=[d_model])
self.dropout1 = nn.Dropout(p=drop_prob)
self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
self.norm2 = LayerNormalization(parameters_shape=[d_model])
self.dropout2 = nn.Dropout(p=drop_prob)
def forward(self, x, self_attention_mask):
residual_x = x.clone()
x = checkpoint(self.attention, x, self_attention_mask, preserve_rng_state=True, use_reentrant=False) # Gradient checkpointing
x = self.dropout1(x)
x = self.norm1(x + residual_x)
residual_x = x.clone()
x = checkpoint(self.ffn, x, preserve_rng_state=True, use_reentrant=False) # Gradient checkpointing
x = self.dropout2(x)
x = self.norm2(x + residual_x)
return x
# Sequential Encoder
class SequentialEncoder(nn.Sequential):
def forward(self, *inputs):
x, self_attention_mask = inputs
for module in self._modules.values():
x = module(x, self_attention_mask)
return x
# Encoder with Mixed Precision
class Encoder(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, encoder_layer, max_sequence_length, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
super().__init__()
self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(encoder_layer)])
def forward(self, x, self_attention_mask, start_token, end_token):
with autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'): # Mixed precision
x = self.sentence_embedding(x, start_token, end_token)
x = self.layers(x, self_attention_mask)
return x
# Decoder Layer with Gradient Checkpointing
class DecoderLayer(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
super().__init__()
self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
self.layer_norm1 = LayerNormalization(parameters_shape=[d_model])
self.dropout1 = nn.Dropout(p=drop_prob)
self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
self.layer_norm2 = LayerNormalization(parameters_shape=[d_model])
self.dropout2 = nn.Dropout(p=drop_prob)
self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
self.layer_norm3 = LayerNormalization(parameters_shape=[d_model])
self.dropout3 = nn.Dropout(p=drop_prob)
def forward(self, x, y, self_attention_mask, cross_attention_mask):
_y = y.clone()
y = checkpoint(self.self_attention, y, self_attention_mask, preserve_rng_state=True, use_reentrant=False) # Gradient checkpointing
y = self.dropout1(y)
y = self.layer_norm1(y + _y)
_y = y.clone()
y = checkpoint(self.encoder_decoder_attention, x, y, cross_attention_mask, preserve_rng_state=True, use_reentrant=False) # Gradient checkpointing
y = self.dropout2(y)
y = self.layer_norm2(y + _y)
_y = y.clone()
y = checkpoint(self.ffn, y, preserve_rng_state=True, use_reentrant=False) # Gradient checkpointing
y = self.dropout3(y)
y = self.layer_norm3(y + _y)
return y
# Sequential Decoder
class SequentialDecoder(nn.Sequential):
def forward(self, *inputs):
x, y, self_attention_mask, cross_attention_mask = inputs
for module in self._modules.values():
y = module(x, y, self_attention_mask, cross_attention_mask)
return y
# Decoder with Mixed Precision
class Decoder(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, decoder_layer, max_sequence_length, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
super().__init__()
self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(decoder_layer)])
def forward(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token):
with autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'): # Mixed precision
y = self.sentence_embedding(y, start_token, end_token)
y = self.layers(x, y, self_attention_mask, cross_attention_mask)
return y
# Transformer with Mixed Precision and Gradient Checkpointing
class Transformer(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, encoder_layer, decoder_layer, max_sequence_length, ne_vocab_size, english_to_index, nepali_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
super().__init__()
self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, encoder_layer, max_sequence_length, english_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, decoder_layer, max_sequence_length, nepali_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.linear = nn.Linear(d_model, ne_vocab_size)
self.device = get_device()
def forward(self, x, y, encoder_self_attention_mask=None, decoder_self_attention_mask=None, decoder_cross_attention_mask=None, enc_start_token=False, enc_end_token=False, dec_start_token=False, dec_end_token=False):
with autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'): # Mixed precision
x = self.encoder(x, encoder_self_attention_mask, enc_start_token, enc_end_token)
out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, dec_start_token, dec_end_token)
out = self.linear(out)
return out