RedSecureBERT πŸ”΄πŸ›‘οΈ

Detects technical red-team / offensive security text (English).

Split Precision Recall F1 Threshold
Validation 0.963 0.991 0.977 0.515

Recommended cut-off: prob >= 0.515 (chosen via Fβ‚‚ on the validation split).

Demo

Phrase BlueSecureBERT RedSecureBERT
To exfiltrate sensitive data, launch a phishing campaign that tricks employees into revealing their VPN credentials. 0.066 0.824
We should deploy an EDR solution, monitor all endpoints for intrusion attempts, and enforce strict password policies. 0.557 0.019
Our marketing team will unveil the new cybersecurity branding materials at next Tuesday’s antivirus product launch. 0.256 0.021
I'm excited about the company picnic. There's no cybersecurity topicβ€”just burgers and games. 0.272 0.103

Intended uses & limits

  • Triaging large corpora for technical content.
  • Input language: English.
  • No external test set yet β†’ treat scores as optimistic.

Training data

Label Rows
Offensive 30 746
Defensive 19 550
Other 130 000
Total 180 296

Model details

Field Value
Base encoder ehsanaghaei/SecureBERT (RoBERTa-base, 125 M)
Objective One-vs-rest, focal-loss (Ξ³ = 2)
Epochs 3  Β·  micro-batch 16  Β·  LR 2e-5
Hardware 1Γ— RTX 4090 (β‰ˆ 41 min)
Inference dtype FP16-safe

Training Data License

Script exemple

#!/usr/bin/env python
"""
06_split_binary.py
~~~~~~~~~~~~~~~~~~

Stream-splits a JSONL cybersecurity corpus into *offensive*, *defensive*, and *other* shards
using **two** fine-tuned SecureBERT heads.

How the two heads work together
-------------------------------
We load two independent checkpoints:

* `offensive_vs_rest` → gives **P(offensive | text)**
* `defensive_vs_rest` → gives **P(defensive | text)**

For every line we:

1. run both heads in the same GPU batch;
2. take the positive-class probability from each soft-max;
3. compare against per-head thresholds (from `thresholds.json`, default 0.5);
4. route the text with this truth table
"""

from __future__ import annotations

import argparse
import json
from itertools import islice
from pathlib import Path

import torch
from torch.nn.functional import softmax
from tqdm.auto import tqdm
from transformers import (
    AutoModelForSequenceClassification as HFModel,
    AutoTokenizer,
)

from config import RAW_JSONL, MODEL_DIR  # MODEL_DIR == securebert_finetuned

# ─────────────────────────────  GPU SETTINGS  ──────────────────────────
# 1. Use TensorFloat-32 on Ada GPUs (gives a big matmul speed boost).
torch.backends.cuda.matmul.allow_tf32 = True
torch.set_float32_matmul_precision("medium")

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# ────────────────────────────────  CLI  ────────────────────────────────
cli = argparse.ArgumentParser(description="Split JSONL into offence/defence/other")
cli.add_argument("--batch_size", type=int, help="override auto batch sizing")
args = cli.parse_args()

# ─────────────────────  BATCH-SIZE HEURISTIC  ──────────────────────────
if args.batch_size:  # user override wins
    BATCH = args.batch_size
else:
    try:
        import pynvml

        pynvml.nvmlInit()
        free = (
            pynvml.nvmlDeviceGetMemoryInfo(pynvml.nvmlDeviceGetHandleByIndex(0)).free
            / 1024**3
        )
        pynvml.nvmlShutdown()
        # ~30 MB per 512-token sequence (bfloat16, two heads) – clamp sensibly
        BATCH = max(64, min(int(free // 0.03), 1024))
    except Exception:  # any issue β†’ decent default
        BATCH = 256
print(f"[split-binary] batch size = {BATCH}")

# ─────────────────────────  THRESHOLDS  ────────────────────────────────
thr_path = Path(MODEL_DIR) / "thresholds.json"
if thr_path.exists():
    THR = json.loads(thr_path.read_text())
    print("Loaded thresholds:", THR)
else:
    THR = {"off": 0.5, "def": 0.5}
    print("No thresholds.json β†’ default 0.5 each")

# ───────────────────  MODEL & TOKENISER LOADING  ───────────────────────
def load_model(path: Path):
    """Load classification head in BF16 (no flash-attention)."""
    return HFModel.from_pretrained(path, torch_dtype=torch.bfloat16)


paths = {
    "off": Path(MODEL_DIR) / "offensive_vs_rest",
    "def": Path(MODEL_DIR) / "defensive_vs_rest",
}
print("Loading models …")
m_off = load_model(paths["off"]).to(DEVICE).eval()
m_def = load_model(paths["def"]).to(DEVICE).eval()

# Optional: compile graphs for a little extra throughput
try:
    m_off = torch.compile(m_off, dynamic=True, mode="reduce-overhead")
    m_def = torch.compile(m_def, dynamic=True, mode="reduce-overhead")
    print("torch.compile: dynamic=True, reduce-overhead βœ“")
except Exception:
    pass

tok = AutoTokenizer.from_pretrained(paths["off"])
ENC = dict(
    truncation=True,
    padding="longest",
    max_length=512,
    return_tensors="pt",
)

# ───────────────────────  OUTPUT HANDLES  ──────────────────────────────
outs = {
    "off": open("offensive.jsonl", "w", encoding="utf-8"),
    "def": open("defensive.jsonl", "w", encoding="utf-8"),
    "oth": open("other.jsonl", "w", encoding="utf-8"),
}

# ─────────────────────────  HELPERS  ───────────────────────────────────
def batched(it, n):
    """Yield `n`-sized chunks from iterator `it`."""
    while True:
        chunk = list(islice(it, n))
        if not chunk:
            break
        yield chunk


# ─────────────────────  MAIN SPLITTING LOOP  ───────────────────────────
with open(RAW_JSONL, "r", encoding="utf-8") as fin, torch.inference_mode():
    for lines in tqdm(batched(fin, BATCH), desc="Splitting", ncols=110):
        recs = [json.loads(l) for l in lines]
        texts = [r.get("content", "") for r in recs]

        # Tokenise β†’ pin CPU mem β†’ async copy to GPU
        batch = tok(texts, **ENC)
        batch = {
            k: v.pin_memory().to(DEVICE, non_blocking=True) for k, v in batch.items()
        }

        # Positive-class probabilities
        p_off = softmax(m_off(**batch).logits, dim=-1)[:, 1].cpu()
        p_def = softmax(m_def(**batch).logits, dim=-1)[:, 1].cpu()

        for r, po, pd in zip(recs, p_off, p_def):
            txt = r.get("content", "")
            off, dfn = po >= THR["off"], pd >= THR["def"]

            if off and not dfn:
                outs["off"].write(json.dumps({"content": txt}) + "\n")
            elif dfn and not off:
                outs["def"].write(json.dumps({"content": txt}) + "\n")
            elif off and dfn:  # tie β†’ higher prob wins
                (outs["off"] if po >= pd else outs["def"]).write(
                    json.dumps({"content": txt}) + "\n"
                )
            else:
                outs["oth"].write(json.dumps({"content": txt}) + "\n")

# ─────────────────────────  CLEAN-UP  ──────────────────────────────────
for f in outs.values():
    f.close()
print("βœ… Done! β†’ offensive.jsonl  defensive.jsonl  other.jsonl")
Downloads last month
16
Safetensors
Model size
125M params
Tensor type
F32
Β·
Inference Providers NEW
This model isn't deployed by any Inference Provider. πŸ™‹ Ask for provider support

Dataset used to train HagalazAI/RedSecureBERT

Space using HagalazAI/RedSecureBERT 1