|
--- |
|
language: |
|
- hi |
|
tags: |
|
- hindi |
|
- text-generation |
|
- causal-lm |
|
- lm |
|
- rope |
|
license: mit |
|
datasets: |
|
- custom_hindi_corpus |
|
--- |
|
|
|
# Hindi-CausalLM |
|
|
|
A Hindi language generation model with the following specifications: |
|
|
|
## Model Architecture |
|
- **Type**: Causal Language Model with Transformer architecture |
|
- **Hidden size**: 768 |
|
- **Layers**: 12 |
|
- **Attention heads**: 16 |
|
- **Key-value heads**: 4 (using grouped-query attention) |
|
- **Position encoding**: Rotary Position Embeddings (RoPE) |
|
- **Vocabulary size**: 16000 |
|
- **Parameters**: ~100M |
|
- **Context window**: 512 tokens |
|
- **Trained on**: Large corpus of Hindi text |
|
|
|
## Training |
|
|
|
The model was trained on a large corpus of Hindi text using a cosine learning rate schedule with warmup. Training utilized mixed-precision and distributed data parallel across multiple GPUs. |
|
## Usage |
|
|
|
You can use this model with the following code: |
|
|
|
```python |
|
import torch |
|
import math |
|
import os |
|
from hindi_embeddings import SentencePieceTokenizerWrapper |
|
from safetensors.torch import load_file |
|
from torch import nn |
|
from transformers import PreTrainedModel, PretrainedConfig |
|
|
|
|
|
class ConvaiCausalLMConfig(PretrainedConfig): |
|
model_type = "convaicausallm" |
|
|
|
def __init__( |
|
self, |
|
vocab_size=16000, |
|
hidden_size=768, |
|
num_hidden_layers=12, |
|
num_attention_heads=16, |
|
num_key_value_heads=4, |
|
intermediate_size=3072, |
|
hidden_act="silu", |
|
max_position_embeddings=512, |
|
rope_theta=10000.0, # Base parameter for RoPE |
|
**kwargs |
|
): |
|
super().__init__(**kwargs) |
|
self.vocab_size = vocab_size |
|
self.hidden_size = hidden_size |
|
self.num_hidden_layers = num_hidden_layers |
|
self.num_attention_heads = num_attention_heads |
|
self.num_key_value_heads = num_key_value_heads |
|
self.intermediate_size = intermediate_size |
|
self.hidden_act = hidden_act |
|
self.max_position_embeddings = max_position_embeddings |
|
self.rope_theta = rope_theta |
|
|
|
|
|
def precompute_freqs_cis(dim, end, theta=10000.0): |
|
"""Precompute the frequency tensor for complex exponentials (cos, sin)""" |
|
# Ensure dim is even for complex numbers |
|
assert dim % 2 == 0, "Dimension must be even" |
|
|
|
# Create position indices for caching |
|
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2).float() / dim)) |
|
t = torch.arange(end).float() |
|
freqs = torch.outer(t, freqs) # [end, dim/2] |
|
|
|
# Create complex exponentials (cos, sin pairs) |
|
cos, sin = torch.cos(freqs), torch.sin(freqs) |
|
return cos, sin |
|
|
|
|
|
def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None): |
|
"""Apply rotary position embeddings to q and k tensors""" |
|
# Extract shapes |
|
batch, seq_len, n_heads, head_dim = q.shape |
|
_, kv_seq_len, n_kv_heads, _ = k.shape |
|
|
|
# Handle position IDs or use sequential positions |
|
if position_ids is None: |
|
# Default: Just use sequential positions |
|
position_ids = torch.arange(seq_len, device=q.device) |
|
position_ids = position_ids.unsqueeze(0).expand(batch, -1) |
|
|
|
# Get the cosine and sine for the positions we're using |
|
cos = cos[position_ids].unsqueeze(-2) # [batch, seq, 1, dim/2] |
|
sin = sin[position_ids].unsqueeze(-2) # [batch, seq, 1, dim/2] |
|
|
|
# q and k must be arranged in pairs for rotation |
|
q_embed_dim = q.shape[-1] |
|
q_half_dim = q_embed_dim // 2 |
|
|
|
# Split the embedding dimensions into pairs |
|
q_half1, q_half2 = q[..., :q_half_dim], q[..., q_half_dim:] |
|
k_half1, k_half2 = k[..., :q_half_dim], k[..., q_half_dim:] |
|
|
|
# Apply rotary embeddings to each pair of dimensions |
|
# For each pair (a, b), we compute (a*cos - b*sin, a*sin + b*cos) |
|
q_out_half1 = q_half1 * cos - q_half2 * sin |
|
q_out_half2 = q_half1 * sin + q_half2 * cos |
|
k_out_half1 = k_half1 * cos - k_half2 * sin |
|
k_out_half2 = k_half1 * sin + k_half2 * cos |
|
|
|
# Concatenate back to original shape |
|
q_out = torch.cat([q_out_half1, q_out_half2], dim=-1) |
|
k_out = torch.cat([k_out_half1, k_out_half2], dim=-1) |
|
|
|
return q_out, k_out |
|
|
|
|
|
class GroupedQueryAttention(nn.Module): |
|
def __init__(self, config): |
|
super().__init__() |
|
self.hidden_size = config.hidden_size |
|
self.num_heads = config.num_attention_heads |
|
self.num_kv_heads = config.num_key_value_heads |
|
self.head_dim = config.hidden_size // config.num_attention_heads |
|
|
|
# For MQA/GQA support |
|
self.num_key_value_groups = self.num_heads // self.num_kv_heads |
|
|
|
self.q_proj = nn.Linear(config.hidden_size, self.num_heads * self.head_dim) |
|
self.k_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim) |
|
self.v_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim) |
|
self.o_proj = nn.Linear(config.hidden_size, config.hidden_size) |
|
|
|
# Precompute rotary position encoding frequencies |
|
max_seq_len = config.max_position_embeddings |
|
self.max_seq_len = max_seq_len |
|
|
|
# Register frequencies as buffers |
|
cos, sin = precompute_freqs_cis(self.head_dim, max_seq_len, config.rope_theta) |
|
self.register_buffer("cos", cos) # [max_seq_len, dim/2] |
|
self.register_buffer("sin", sin) # [max_seq_len, dim/2] |
|
|
|
# Create causal mask for attention |
|
self.register_buffer( |
|
"causal_mask", |
|
torch.triu(torch.ones(max_seq_len, max_seq_len) * -1e9, diagonal=1) |
|
) |
|
|
|
def forward(self, hidden_states, attention_mask=None): |
|
batch_size, seq_len, _ = hidden_states.size() |
|
|
|
# Project queries, keys, values |
|
q = self.q_proj(hidden_states) |
|
k = self.k_proj(hidden_states) |
|
v = self.v_proj(hidden_states) |
|
|
|
# Reshape for attention computation |
|
q = q.view(batch_size, seq_len, self.num_heads, self.head_dim) |
|
k = k.view(batch_size, seq_len, self.num_kv_heads, self.head_dim) |
|
v = v.view(batch_size, seq_len, self.num_kv_heads, self.head_dim) |
|
|
|
# Apply rotary position embeddings |
|
q_rotary, k_rotary = apply_rotary_pos_emb(q, k, self.cos, self.sin) |
|
|
|
# Reshape for attention computation |
|
q_rotary = q_rotary.transpose(1, 2) # [batch, heads, seq, dim] |
|
k_rotary = k_rotary.transpose(1, 2) # [batch, kv_heads, seq, dim] |
|
v = v.transpose(1, 2) # [batch, kv_heads, seq, dim] |
|
|
|
# Handle Multi-Query Attention / Grouped-Query Attention |
|
if self.num_key_value_groups > 1: |
|
# Repeat k, v for each query in the group |
|
k_rotary = k_rotary.repeat_interleave(self.num_key_value_groups, dim=1) |
|
v = v.repeat_interleave(self.num_key_value_groups, dim=1) |
|
|
|
# Compute attention scores |
|
attn_scores = torch.matmul(q_rotary, k_rotary.transpose(-1, -2)) / (self.head_dim ** 0.5) |
|
|
|
# Apply causal mask - only attend to previous tokens |
|
causal_mask = self.causal_mask[:seq_len, :seq_len] |
|
attn_scores = attn_scores + causal_mask |
|
|
|
# Apply attention mask if provided |
|
if attention_mask is not None: |
|
attn_scores = attn_scores + attention_mask |
|
|
|
# Normalize the attention scores to probabilities |
|
attn_probs = torch.softmax(attn_scores, dim=-1) |
|
|
|
# Apply attention to values |
|
context = torch.matmul(attn_probs, v) # [b, n_heads, seq, head_dim] |
|
|
|
# Reshape back to [batch_size, seq_length, hidden_size] |
|
context = context.transpose(1, 2).contiguous() |
|
context = context.view(batch_size, seq_len, -1) |
|
|
|
# Final projection |
|
output = self.o_proj(context) |
|
|
|
return output |
|
|
|
|
|
class ConvaiCausalLM(PreTrainedModel): |
|
config_class = ConvaiCausalLMConfig |
|
|
|
def __init__(self, config): |
|
super().__init__(config) |
|
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size) |
|
self.layers = nn.ModuleList([ |
|
nn.ModuleDict({ |
|
"self_attn": GroupedQueryAttention(config), |
|
"mlp": nn.Sequential( |
|
nn.Linear(config.hidden_size, config.intermediate_size), |
|
nn.SiLU(), |
|
nn.Linear(config.intermediate_size, config.hidden_size) |
|
), |
|
"input_layernorm": nn.LayerNorm(config.hidden_size), |
|
"post_attention_layernorm": nn.LayerNorm(config.hidden_size) |
|
}) for _ in range(config.num_hidden_layers) |
|
]) |
|
self.norm = nn.LayerNorm(config.hidden_size) |
|
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
|
|
|
# Initialize weights |
|
self.apply(self._init_weights) |
|
|
|
def _init_weights(self, module): |
|
if isinstance(module, nn.Linear): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
if module.bias is not None: |
|
torch.nn.init.zeros_(module.bias) |
|
elif isinstance(module, nn.Embedding): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
|
|
def _prepare_attention_mask(self, attention_mask, input_shape, device): |
|
# Prepare masks for attention |
|
if attention_mask is None: |
|
attention_mask = torch.ones(input_shape, device=device) |
|
|
|
# Make broadcastable shape: [batch, 1, 1, seq_len] |
|
extended_mask = attention_mask.unsqueeze(1).unsqueeze(2) |
|
|
|
# Convert to additive mask (0 for valid, -10000 for masked) |
|
extended_mask = (1.0 - extended_mask) * -10000.0 |
|
|
|
return extended_mask |
|
|
|
def forward(self, input_ids, attention_mask=None): |
|
batch_size, seq_len = input_ids.shape |
|
device = input_ids.device |
|
|
|
# Prepare attention mask |
|
if attention_mask is not None: |
|
attention_mask = self._prepare_attention_mask( |
|
attention_mask, (batch_size, seq_len), device |
|
) |
|
|
|
# Get embeddings |
|
hidden_states = self.embed_tokens(input_ids) |
|
|
|
# Apply each layer |
|
for layer in self.layers: |
|
residual = hidden_states |
|
|
|
# First norm and attention |
|
hidden_states = layer["input_layernorm"](hidden_states) |
|
hidden_states = layer["self_attn"](hidden_states, attention_mask) |
|
hidden_states = residual + hidden_states |
|
|
|
# Second norm and MLP |
|
residual = hidden_states |
|
hidden_states = layer["post_attention_layernorm"](hidden_states) |
|
hidden_states = layer["mlp"](hidden_states) |
|
hidden_states = residual + hidden_states |
|
|
|
# Final norm |
|
hidden_states = self.norm(hidden_states) |
|
|
|
# Compute logits |
|
logits = self.lm_head(hidden_states) |
|
|
|
return logits |
|
|
|
|
|
class HindiLLMGenerator: |
|
def __init__(self, model_path, device=None): |
|
# Set device |
|
if device is None: |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
else: |
|
self.device = torch.device(device) |
|
|
|
print(f"Using device: {self.device}") |
|
|
|
# Load tokenizer |
|
tokenizer_path = os.path.join(model_path, "tokenizer.model") |
|
self.tokenizer = SentencePieceTokenizerWrapper(tokenizer_path) |
|
|
|
# Load model config |
|
config_path = os.path.join(model_path, "config.json") |
|
import json |
|
with open(config_path, 'r') as f: |
|
config_dict = json.load(f) |
|
|
|
self.config = ConvaiCausalLMConfig(**config_dict) |
|
|
|
# Load model - try safetensors first, fall back to PyTorch bin if needed |
|
safetensors_path = os.path.join(model_path, "model.safetensors") |
|
pytorch_path = os.path.join(model_path, "pytorch_model.bin") |
|
|
|
self.model = ConvaiCausalLM(self.config) |
|
|
|
# Check which format is available and load accordingly |
|
if os.path.exists(safetensors_path): |
|
print(f"Loading model from SafeTensors") |
|
state_dict = load_file(safetensors_path, device="cpu") |
|
self.model.load_state_dict(state_dict) |
|
elif os.path.exists(pytorch_path): |
|
print(f"Loading model from PyTorch bin") |
|
self.model.load_state_dict(torch.load(pytorch_path, map_location="cpu")) |
|
|
|
# Move model to device and set to evaluation mode |
|
self.model.to(self.device) |
|
self.model.eval() |
|
|
|
def generate(self, prompt, max_length=100, temperature=0.8, top_k=50, top_p=0.9, |
|
repetition_penalty=1.1, do_sample=True): |
|
# Tokenize the prompt |
|
input_ids = self.tokenizer.sp_model.EncodeAsIds(prompt) |
|
input_tensor = torch.tensor([input_ids], dtype=torch.long).to(self.device) |
|
|
|
# Start with the input tensor |
|
output_sequence = input_tensor.clone() |
|
|
|
# Generate tokens one by one |
|
for _ in range(max_length - len(input_ids)): |
|
with torch.no_grad(): |
|
# Get the model's output for the current sequence |
|
outputs = self.model(output_sequence) |
|
next_token_logits = outputs[0, -1, :] |
|
|
|
# Apply temperature |
|
if temperature > 0: |
|
next_token_logits = next_token_logits / temperature |
|
|
|
# Apply repetition penalty |
|
if repetition_penalty > 1.0: |
|
for token_id in output_sequence[0].tolist(): |
|
next_token_logits[token_id] /= repetition_penalty |
|
|
|
# Filter with top-k sampling |
|
if top_k > 0: |
|
top_k_values, top_k_indices = torch.topk(next_token_logits, top_k) |
|
next_token_logits = torch.full_like(next_token_logits, float('-inf')) |
|
next_token_logits.scatter_(0, top_k_indices, top_k_values) |
|
|
|
# Filter with top-p/nucleus sampling |
|
if top_p < 1.0 and do_sample: |
|
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True) |
|
cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1) |
|
|
|
# Remove tokens with cumulative probability above the threshold |
|
sorted_indices_to_remove = cumulative_probs > top_p |
|
# Shift the indices to the right to keep the first token above the threshold |
|
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() |
|
sorted_indices_to_remove[..., 0] = 0 |
|
|
|
indices_to_remove = sorted_indices[sorted_indices_to_remove] |
|
next_token_logits[indices_to_remove] = float('-inf') |
|
|
|
# Sample or choose the next token |
|
if do_sample: |
|
probs = torch.softmax(next_token_logits, dim=-1) |
|
next_token = torch.multinomial(probs, num_samples=1) |
|
else: |
|
next_token = torch.argmax(next_token_logits, dim=-1).unsqueeze(0) |
|
|
|
# Add the next token to the sequence |
|
output_sequence = torch.cat([output_sequence, next_token.unsqueeze(0)], dim=1) |
|
|
|
# Check if we've generated an end token |
|
if next_token.item() == self.tokenizer.eos_token_id: |
|
break |
|
|
|
# Decode the generated sequence |
|
generated_ids = output_sequence[0].tolist() |
|
generated_text = self.tokenizer.sp_model.DecodeIds(generated_ids) |
|
|
|
return generated_text |
|
|
|
# Example usage |
|
if __name__ == "__main__": |
|
generator = HindiLLMGenerator("path/to/model") |
|
result = generator.generate("भारत एक विशाल देश है") |
|
print(result) |
|
``` |
|
|
|
## Example Prompts |
|
|
|
Try the model with these example prompts: |
|
|
|
``` |
|
भारत एक विशाल देश है |
|
मुझे हिंदी में एक कहानी सुनाओ |
|
आज का मौसम बहुत अच्छा है |
|
हिंदी साहित्य की प्रमुख विशेषताएं |
|
``` |
|
|
|
## Capabilities |
|
|
|
This model can: |
|
- Generate coherent Hindi text |
|
- Continue text from a given prompt |
|
- Create stories, explanations, and other content in Hindi |
|
|
|
## Limitations |
|
|
|
- Performance varies based on the similarity of the input to the training data |
|
- May occasionally generate repetitive content for longer texts |
|
- May produce grammatically incorrect Hindi in some contexts |
|
- Has no knowledge of events beyond its training corpus |
|
|
|
## Intended Use |
|
|
|
This model is intended for Hindi language generation tasks, creative writing assistance, and as a foundation for fine-tuning on specific tasks. |
|
|
|
## Ethical Considerations |
|
|
|
Users should be aware that like all language models, this model may reproduce biases or generate problematic content in certain contexts. |
|
|