 # Transformer Encoder for Email Spam Classification using PyTorch. 
## Steps
1. Reading a CSV file
2. Preprocessing text (Tokenization, Vocabulary creation)
3. Splitting the dataset into train & test
4. Transformer Encoder Model (with a Sigmoid activation for binary classification)
5. Training & Evaluation on CPU

## Install the Pacakges

In [None]:
pip install torch torchvision torchaudio pandas scikit-learn nltk tqdm transformers


[31mERROR: Could not find a version that satisfies the requirement pickle (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for pickle[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


## Import Packages

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
import nltk
from nltk.tokenize import word_tokenize
from collections import Counter
from tqdm import tqdm
import re

nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data] /teamspace/studios/this_studio/nltk_data...
[nltk_data] Package punkt is already up-to-date!


True

## Read the Dataset using Pandas

In [3]:
# Load Data
df = pd.read_csv("spam.csv")
df.columns = ["labels", "text"]


# Split Data
train_texts, test_texts, train_labels, test_labels = train_test_split(df["text"], df["labels"], test_size=0.2, random_state=42)

## Preprocessing text

In [4]:
def simple_tokenize(text):
 return re.findall(r"\b\w+\b", text.lower())


class SpamDataset(Dataset):
 def __init__(self, texts, labels, vocab, max_len=100):
 self.texts = [self.tokenize_and_pad(text, vocab, max_len) for text in texts]
 self.labels = torch.tensor(labels, dtype=torch.float32)

 def tokenize_and_pad(self, text, vocab, max_len):
 tokens = simple_tokenize(text.lower()) # Tokenize text
 token_ids = [vocab.get(word, vocab['']) for word in tokens] # Convert to IDs
 if len(token_ids) < max_len:
 token_ids += [vocab['']] * (max_len - len(token_ids)) # Padding
 return torch.tensor(token_ids[:max_len], dtype=torch.long) # Trim if too long

 def __len__(self):
 return len(self.labels)

 def __getitem__(self, idx):
 return self.texts[idx], self.labels[idx]



# Create Vocabulary
all_words = [word for text in train_texts for word in simple_tokenize(text)]
word_freq = Counter(all_words)
print(word_freq)
vocab = {word: i+2 for i, (word, _) in enumerate(word_freq.most_common(10000))} # Top 10k words
vocab[''] = 0
vocab[''] = 1
vocab_size = len(vocab)
print(vocab_size)

# Create Datasets
train_dataset = SpamDataset(train_texts.tolist(), train_labels.tolist(), vocab)
test_dataset = SpamDataset(test_texts.tolist(), test_labels.tolist(), vocab)

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

7742


## Transformer Encoder Model

In [5]:
class TransformerEncoder(nn.Module):
 def __init__(self, d_model=256, num_heads=1, d_ff=512, num_layers=1, vocab_size=10000, max_seq_len=100, dropout=0.1):
 super(TransformerEncoder, self).__init__()
 
 # Embedding & Positional Encoding
 self.embedding = nn.Embedding(vocab_size, d_model)
 self.positional_encoding = nn.Parameter(torch.zeros(1, max_seq_len, d_model))

 # Transformer Encoder Layers
 encoder_layer = nn.TransformerEncoderLayer(
 d_model=d_model,
 nhead=num_heads,
 dim_feedforward=d_ff,
 dropout=dropout,
 activation='relu',
 batch_first=True
 )
 
 self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

 # Classification Head
 self.fc = nn.Linear(d_model, 1)
 self.sigmoid = nn.Sigmoid()

 def forward(self, x):
 x = self.embedding(x) + self.positional_encoding[:, :x.size(1), :]
 x = self.encoder(x) # Pass through transformer
 x = x[:, 0, :] # Take first token's output (CLS token equivalent)
 x = self.fc(x)
 return self.sigmoid(x) # Binary classification (spam or not)

## Train & Evaluate Model

In [6]:
device = torch.device("cpu") # Run on CPU
model = TransformerEncoder(d_model=256, num_heads=1, num_layers=1, vocab_size=vocab_size, max_seq_len=100).to(device)

criterion = nn.BCELoss() # Binary Cross Entropy for classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_model(model, train_loader, criterion, optimizer, epochs=10):
 model.train()
 for epoch in range(epochs):
 total_loss = 0
 for texts, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
 texts, labels = texts.to(device), labels.to(device)
 optimizer.zero_grad()
 outputs = model(texts).squeeze()
 loss = criterion(outputs, labels)
 loss.backward()
 optimizer.step()
 total_loss += loss.item()
 print(f"Epoch {epoch+1}: Loss = {total_loss / len(train_loader):.4f}")

def evaluate_model(model, test_loader):
 model.eval()
 correct, total = 0, 0
 with torch.no_grad():
 for texts, labels in test_loader:
 texts, labels = texts.to(device), labels.to(device)
 outputs = model(texts).squeeze()
 preds = (outputs > 0.5).float()
 correct += (preds == labels).sum().item()
 total += labels.size(0)
 print(f"Test Accuracy: {100 * correct / total:.2f}%")

# Run Training & Evaluation
train_model(model, train_loader, criterion, optimizer, epochs=10)
evaluate_model(model, test_loader)

Epoch 1/10: 100%|██████████| 140/140 [00:09<00:00, 14.09it/s]


Epoch 1: Loss = 0.2599


Epoch 2/10: 100%|██████████| 140/140 [00:09<00:00, 14.76it/s]


Epoch 2: Loss = 0.1236


Epoch 3/10: 100%|██████████| 140/140 [00:09<00:00, 14.92it/s]


Epoch 3: Loss = 0.0734


Epoch 4/10: 100%|██████████| 140/140 [00:09<00:00, 14.53it/s]


Epoch 4: Loss = 0.0531


Epoch 5/10: 100%|██████████| 140/140 [00:09<00:00, 14.19it/s]


Epoch 5: Loss = 0.0716


Epoch 6/10: 100%|██████████| 140/140 [00:09<00:00, 14.52it/s]


Epoch 6: Loss = 0.0612


Epoch 7/10: 100%|██████████| 140/140 [00:09<00:00, 14.30it/s]


Epoch 7: Loss = 0.0527


Epoch 8/10: 100%|██████████| 140/140 [00:10<00:00, 13.52it/s]


Epoch 8: Loss = 0.0391


Epoch 9/10: 100%|██████████| 140/140 [00:11<00:00, 11.78it/s]


Epoch 9: Loss = 0.0342


Epoch 10/10: 100%|██████████| 140/140 [00:10<00:00, 12.76it/s]


Epoch 10: Loss = 0.0380
Test Accuracy: 95.52%


## Save & Test the Model

In [7]:
import torch
import pickle
import json

# Define paths
MODEL_PATH = "spam_model.pth"
VOCAB_PATH = "vocab.pkl"

# Save model
torch.save(model.state_dict(), MODEL_PATH)

# Save vocabulary
with open(VOCAB_PATH, "wb") as f:
 pickle.dump(vocab, f)

print("✅ Model and vocabulary saved successfully!")





✅ Model and vocabulary saved successfully!


## Load the Model & Vocabulary

In [8]:
# Load vocabulary
with open(VOCAB_PATH, "rb") as f:
 vocab = pickle.load(f)

# Load model
device = torch.device("cpu") # Change to "cuda" if using GPU
model = TransformerEncoder(d_model=256, num_heads=1, num_layers=1, vocab_size=len(vocab), max_seq_len=100).to(device)
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.eval() # Set model to evaluation mode

print("✅ Model and vocabulary loaded successfully!")


✅ Model and vocabulary loaded successfully!




## Test the Model with a Sample Input

In [10]:
def simple_tokenize(text):
 return re.findall(r"\b\w+\b", text.lower())
def predict(text, model, vocab, max_len=100):
 model.eval()
 tokens = simple_tokenize(text.lower())
 token_ids = [vocab.get(word, vocab['']) for word in tokens]
 token_ids += [vocab['']] * (max_len - len(token_ids)) # Pad if needed
 input_tensor = torch.tensor([token_ids], dtype=torch.long).to(device)

 with torch.no_grad():
 output = model(input_tensor).squeeze().item()
 
 return "Spam" if output > 0.5 else "Ham"

# Test prediction
sample_text = "FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv"
print(f"Prediction: {predict(sample_text, model, vocab)}")


Prediction: Spam
