import torch import torch.nn as nn import math # --- Submodule: FusedQKVAttention --- class FusedQKVAttention(nn.Module): def __init__(self, d_model, num_heads): super().__init__() self.d_model = d_model self.num_heads = num_heads self.head_dim = d_model // num_heads # Fused QKV projection self.qkv_proj = nn.Linear(d_model, 3 * d_model) self.wo = nn.Linear(d_model, d_model) # Initialize weights for better training stability nn.init.xavier_uniform_(self.qkv_proj.weight) nn.init.xavier_uniform_(self.wo.weight) nn.init.zeros_(self.qkv_proj.bias) nn.init.zeros_(self.wo.bias) def forward(self, x, attention_mask=None): batch_size, seq_len, _ = x.shape # Fused projection and reshape qkv = self.qkv_proj(x).reshape(batch_size, seq_len, 3, self.num_heads, self.head_dim) qkv = qkv.permute(2, 0, 3, 1, 4) # [3, batch, heads, seq_len, head_dim] q, k, v = qkv[0], qkv[1], qkv[2] # Compute attention with memory efficiency attention_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) if attention_mask is not None: attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf')) attention_weights = torch.softmax(attention_scores, dim=-1) # Apply attention and reshape context = torch.matmul(attention_weights, v) context = context.transpose(1, 2).reshape(batch_size, seq_len, self.d_model) return self.wo(context) # --- Submodule: EnhancedFeedForward --- class EnhancedFeedForward(nn.Module): def __init__(self, d_model, ff_dim, dropout=0.1): super().__init__() self.linear1 = nn.Linear(d_model, ff_dim) self.dropout1 = nn.Dropout(dropout) self.linear2 = nn.Linear(ff_dim, d_model) self.dropout2 = nn.Dropout(dropout) self.activation = nn.GELU() # Initialize weights for better training nn.init.xavier_uniform_(self.linear1.weight) nn.init.xavier_uniform_(self.linear2.weight) nn.init.zeros_(self.linear1.bias) nn.init.zeros_(self.linear2.bias) def forward(self, x): return self.dropout2(self.linear2(self.dropout1(self.activation(self.linear1(x))))) # --- Submodule: EnhancedTransformerBlock --- class EnhancedTransformerBlock(nn.Module): def __init__(self, d_model, num_heads, ff_dim, dropout=0.1): super().__init__() self.attention = FusedQKVAttention(d_model, num_heads) self.norm1 = nn.LayerNorm(d_model, eps=1e-6) self.dropout1 = nn.Dropout(dropout) self.feed_forward = EnhancedFeedForward(d_model, ff_dim, dropout) self.norm2 = nn.LayerNorm(d_model, eps=1e-6) self.dropout2 = nn.Dropout(dropout) def forward(self, x, attention_mask=None): # Pre-norm architecture attn_input = self.norm1(x) attn_output = self.attention(attn_input, attention_mask) x = x + self.dropout1(attn_output) ff_input = self.norm2(x) ff_output = self.feed_forward(ff_input) x = x + self.dropout2(ff_output) return x # --- Main Model Class: Snowflake4CausalLM --- class Snowflake4CausalLM(nn.Module): def __init__(self, vocab_size, max_seq_length, d_model, num_heads, num_layers, ff_dim, dropout=0.1): super().__init__() self.embedding = nn.Embedding(vocab_size, d_model) # Initialize positional encodings without in-place modification self.pos_encoding = nn.Parameter(torch.zeros(1, max_seq_length, d_model)) position = torch.arange(max_seq_length).unsqueeze(1).float() div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pos_enc = torch.zeros(1, max_seq_length, d_model) pos_enc[0, :, 0::2] = torch.sin(position * div_term) pos_enc[0, :, 1::2] = torch.cos(position * div_term) self.pos_encoding.data = pos_enc.data self.layers = nn.ModuleList([ EnhancedTransformerBlock(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers) ]) self.final_norm = nn.LayerNorm(d_model, eps=1e-6) self.dropout = nn.Dropout(dropout) self.fc_out = nn.Linear(d_model, vocab_size) # Tie embedding and output weights for memory efficiency and better generalization self.fc_out.weight = self.embedding.weight # Initialize embedding weights nn.init.normal_(self.embedding.weight, mean=0, std=0.02) def forward(self, input_ids, attention_mask=None): seq_length = input_ids.size(1) x = self.embedding(input_ids) + self.pos_encoding[:, :seq_length, :] x = self.dropout(x) for layer in self.layers: x = layer(x, attention_mask) x = self.final_norm(x) return self.fc_out(x)