|
from tensorflow.keras.models import Model
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import torch.nn.functional as F
|
|
import math
|
|
from torch.utils.data import DataLoader
|
|
from datasets import load_dataset
|
|
|
|
|
|
|
|
|
|
|
|
raw_dataset = load_dataset("wikitext", "wikitext-2-raw-v1")
|
|
|
|
|
|
train_data = raw_dataset["train"][:3000]
|
|
val_data = raw_dataset["validation"][:500]
|
|
|
|
train_texts = [example for example in train_data["text"]]
|
|
val_texts = [example for example in val_data["text"]]
|
|
|
|
class MyTokenizer:
|
|
def __init__(self, vocab_size=15000, max_length=64):
|
|
self.vocab_size = vocab_size
|
|
self.max_length = max_length
|
|
|
|
self.PAD = "[PAD]"
|
|
self.UNK = "[UNK]"
|
|
self.pad_id = 0
|
|
self.unk_id = 1
|
|
self.word2id = {self.PAD: self.pad_id, self.UNK: self.unk_id}
|
|
self.id2word = {self.pad_id: self.PAD, self.unk_id: self.UNK}
|
|
|
|
def build_vocab(self, all_texts):
|
|
from collections import Counter
|
|
freq = Counter()
|
|
for line in all_texts:
|
|
tokens = line.strip().split()
|
|
freq.update(tokens)
|
|
most_common = freq.most_common(self.vocab_size - len(self.word2id))
|
|
idx = len(self.word2id)
|
|
for word, count in most_common:
|
|
if word not in self.word2id:
|
|
self.word2id[word] = idx
|
|
self.id2word[idx] = word
|
|
idx += 1
|
|
|
|
def encode(self, text):
|
|
tokens = text.strip().split()
|
|
token_ids = [self.word2id.get(t, self.unk_id) for t in tokens]
|
|
token_ids = token_ids[:self.max_length]
|
|
token_ids += [self.pad_id] * (self.max_length - len(token_ids))
|
|
return token_ids
|
|
|
|
def decode(self, token_ids):
|
|
words = []
|
|
for tid in token_ids:
|
|
if tid in self.id2word and self.id2word[tid] != self.PAD:
|
|
words.append(self.id2word[tid])
|
|
return " ".join(words)
|
|
|
|
|
|
my_tokenizer = MyTokenizer(vocab_size=15000, max_length=64)
|
|
my_tokenizer.build_vocab(train_texts)
|
|
print(f"Vocab boyutu: {len(my_tokenizer.word2id)}")
|
|
|
|
def tokenize_function(text):
|
|
return {"input_ids": my_tokenizer.encode(text)}
|
|
|
|
train_encodings = list(map(tokenize_function, train_texts))
|
|
val_encodings = list(map(tokenize_function, val_texts))
|
|
|
|
class WikiTextDataset:
|
|
def __init__(self, encodings):
|
|
self.encodings = encodings
|
|
def __len__(self):
|
|
return len(self.encodings)
|
|
def __getitem__(self, idx):
|
|
item = self.encodings[idx]
|
|
input_ids = torch.tensor(item["input_ids"], dtype=torch.long)
|
|
attn_mask = (input_ids != my_tokenizer.pad_id).long()
|
|
return {"input_ids": input_ids, "attention_mask": attn_mask}
|
|
|
|
train_dataset = WikiTextDataset(train_encodings)
|
|
val_dataset = WikiTextDataset(val_encodings)
|
|
|
|
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
|
|
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
|
|
|
|
|
|
|
|
|
|
|
|
def generate_square_subsequent_mask(sz):
|
|
mask = torch.triu(torch.ones(sz, sz), diagonal=1)
|
|
mask = mask.masked_fill(mask == 1, float('-inf'))
|
|
return mask
|
|
|
|
class PositionalEncoding(nn.Module):
|
|
def __init__(self, d_model, max_len=5000):
|
|
super().__init__()
|
|
pe = torch.zeros(max_len, d_model)
|
|
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
|
|
pe[:, 0::2] = torch.sin(position * div_term)
|
|
pe[:, 1::2] = torch.cos(position * div_term)
|
|
pe = pe.unsqueeze(0)
|
|
self.register_buffer('pe', pe)
|
|
|
|
def forward(self, x):
|
|
seq_len = x.size(1)
|
|
return x + self.pe[:, :seq_len, :]
|
|
|
|
class TransformerLM(nn.Module):
|
|
def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, dim_feedforward=1024, dropout=0.1):
|
|
super().__init__()
|
|
self.d_model = d_model
|
|
self.embedding = nn.Embedding(vocab_size, d_model)
|
|
self.pos_encoder = PositionalEncoding(d_model)
|
|
encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, activation='relu')
|
|
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
|
|
self.fc_out = nn.Linear(d_model, vocab_size)
|
|
|
|
def forward(self, input_ids, attention_mask=None):
|
|
embedded = self.embedding(input_ids)
|
|
embedded = self.pos_encoder(embedded)
|
|
embedded = embedded.permute(1, 0, 2)
|
|
seq_len = embedded.size(0)
|
|
mask = generate_square_subsequent_mask(seq_len).to(embedded.device)
|
|
encoded = self.transformer_encoder(embedded, mask=mask)
|
|
encoded = encoded.permute(1, 0, 2)
|
|
logits = self.fc_out(encoded)
|
|
return logits
|
|
|
|
vocab_size = len(my_tokenizer.word2id)
|
|
model = TransformerLM(
|
|
vocab_size=vocab_size,
|
|
d_model=256,
|
|
nhead=8,
|
|
num_layers=4,
|
|
dim_feedforward=1024,
|
|
dropout=0.1
|
|
)
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
model.to(device)
|
|
|
|
|
|
|
|
|
|
|
|
optimizer = optim.Adam(model.parameters(), lr=1e-4)
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
|
|
|
num_epochs = 1
|
|
scaler = torch.cuda.amp.GradScaler() if device.type == 'cuda' else None
|
|
|
|
for epoch in range(num_epochs):
|
|
model.train()
|
|
total_loss = 0
|
|
for batch in train_loader:
|
|
input_ids = batch["input_ids"].to(device)
|
|
optimizer.zero_grad()
|
|
if scaler:
|
|
with torch.cuda.amp.autocast():
|
|
logits = model(input_ids)
|
|
shift_logits = logits[:, :-1, :].contiguous()
|
|
shift_labels = input_ids[:, 1:].contiguous()
|
|
loss = loss_fn(shift_logits.view(-1, vocab_size), shift_labels.view(-1))
|
|
scaler.scale(loss).backward()
|
|
scaler.step(optimizer)
|
|
scaler.update()
|
|
else:
|
|
logits = model(input_ids)
|
|
shift_logits = logits[:, :-1, :].contiguous()
|
|
shift_labels = input_ids[:, 1:].contiguous()
|
|
loss = loss_fn(shift_logits.view(-1, vocab_size), shift_labels.view(-1))
|
|
loss.backward()
|
|
optimizer.step()
|
|
total_loss += loss.item()
|
|
avg_loss = total_loss / len(train_loader)
|
|
print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")
|
|
|
|
|
|
|
|
|
|
|
|
def generate_text(prompt: str, max_new_tokens=30, temperature=1.5, top_k=200):
|
|
model.eval()
|
|
|
|
input_ids = torch.tensor([my_tokenizer.encode(prompt)], dtype=torch.long).to(device)
|
|
original_length = input_ids.shape[1]
|
|
with torch.no_grad():
|
|
for _ in range(max_new_tokens):
|
|
logits = model(input_ids)
|
|
next_token_logits = logits[:, -1, :] / temperature
|
|
|
|
k = min(top_k, next_token_logits.size(-1))
|
|
values, indices = torch.topk(next_token_logits, k)
|
|
filtered_logits = torch.full_like(next_token_logits, float('-inf'))
|
|
filtered_logits.scatter_(1, indices, values)
|
|
probs = F.softmax(filtered_logits, dim=-1)
|
|
|
|
next_token = torch.multinomial(probs, num_samples=1)
|
|
input_ids = torch.cat([input_ids, next_token], dim=1)
|
|
full_output = my_tokenizer.decode(input_ids[0].cpu().numpy().tolist())
|
|
|
|
new_tokens = my_tokenizer.decode(input_ids[0][original_length:].cpu().numpy().tolist())
|
|
return full_output, new_tokens
|
|
|
|
question_prompt = "What is the capital of France?"
|
|
full_generated_text, new_generated_text = generate_text(question_prompt, max_new_tokens=20, temperature=1.5, top_k=200)
|
|
|
|
print("\nPrompt:", question_prompt)
|
|
print("Full Output (Prompt + Üretilen):", full_generated_text)
|
|
print("Yalnızca Üretilen Tokenlar:", new_generated_text)
|
|
|
|
|
|
|
|
|
|
|
|
model.eval()
|
|
total_val_loss = 0
|
|
with torch.no_grad():
|
|
for batch in val_loader:
|
|
input_ids = batch["input_ids"].to(device)
|
|
logits = model(input_ids)
|
|
shift_logits = logits[:, :-1, :].contiguous()
|
|
shift_labels = input_ids[:, 1:].contiguous()
|
|
loss = loss_fn(shift_logits.view(-1, vocab_size), shift_labels.view(-1))
|
|
total_val_loss += loss.item()
|
|
avg_val_loss = total_val_loss / len(val_loader)
|
|
print(f"Validation Loss: {avg_val_loss:.4f}")
|
|
|
|
torch.save(model, "brt-1 mmlu.pth") |