training-scripts / scripts /eval_n8n_model.py
stmasson's picture
Upload scripts/eval_n8n_model.py with huggingface_hub
a779a89 verified
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "transformers>=4.45.0",
# "datasets>=3.0.0",
# "accelerate>=1.0.0",
# "huggingface_hub>=0.26.0",
# "torch>=2.4.0",
# "tqdm>=4.66.0",
# "pandas>=2.0.0",
# ]
# [tool.uv]
# extra-index-url = ["https://download.pytorch.org/whl/cu124"]
# ///
"""
Script d'évaluation pour le modèle n8n Expert.
Métriques:
1. JSON Validity - Le output est-il du JSON valide?
2. Schema Compliance - Le workflow suit-il le schéma n8n?
3. Node Accuracy - Les types de nodes sont-ils corrects?
4. Connection Logic - Les connexions sont-elles cohérentes?
5. Thinking Quality - Le raisonnement est-il présent et structuré?
Usage:
python eval_n8n_model.py --model stmasson/n8n-expert-14b --samples 100
"""
import os
import json
import argparse
import re
from typing import Dict, List, Any, Tuple
from dataclasses import dataclass
from tqdm import tqdm
import pandas as pd
import torch
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from huggingface_hub import login
# ============================================================================
# CONFIGURATION
# ============================================================================
# Types de nodes n8n valides (liste partielle)
VALID_NODE_TYPES = {
# Triggers
"n8n-nodes-base.webhookTrigger",
"n8n-nodes-base.scheduleTrigger",
"n8n-nodes-base.manualTrigger",
"n8n-nodes-base.emailTrigger",
# Actions
"n8n-nodes-base.httpRequest",
"n8n-nodes-base.set",
"n8n-nodes-base.if",
"n8n-nodes-base.switch",
"n8n-nodes-base.merge",
"n8n-nodes-base.splitInBatches",
"n8n-nodes-base.function",
"n8n-nodes-base.code",
"n8n-nodes-base.noOp",
# Intégrations
"n8n-nodes-base.slack",
"n8n-nodes-base.gmail",
"n8n-nodes-base.googleSheets",
"n8n-nodes-base.airtable",
"n8n-nodes-base.notion",
"n8n-nodes-base.discord",
"n8n-nodes-base.telegram",
"n8n-nodes-base.openAi",
"n8n-nodes-base.postgres",
"n8n-nodes-base.mysql",
"n8n-nodes-base.mongodb",
# AI
"@n8n/n8n-nodes-langchain.agent",
"@n8n/n8n-nodes-langchain.chainLlm",
}
# ============================================================================
# MÉTRIQUES
# ============================================================================
@dataclass
class EvalResult:
"""Résultat d'évaluation pour un exemple"""
task_type: str
valid_json: bool
has_nodes: bool
has_connections: bool
nodes_valid: bool
has_thinking: bool
thinking_structured: bool
error: str = ""
@property
def score(self) -> float:
"""Score global 0-1"""
scores = [
self.valid_json,
self.has_nodes,
self.has_connections,
self.nodes_valid,
self.has_thinking,
self.thinking_structured,
]
return sum(scores) / len(scores)
def extract_workflow_json(text: str) -> Tuple[str, str]:
"""
Extrait le JSON du workflow et le thinking de la réponse.
Retourne (thinking, workflow_json)
"""
thinking = ""
workflow_json = ""
# Extraire le thinking
thinking_match = re.search(r'<thinking>(.*?)</thinking>', text, re.DOTALL)
if thinking_match:
thinking = thinking_match.group(1).strip()
# Extraire le JSON (après le thinking ou dans un bloc code)
# Méthode 1: Bloc code JSON
json_block = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL)
if json_block:
workflow_json = json_block.group(1).strip()
else:
# Méthode 2: JSON brut après le thinking
after_thinking = text
if thinking_match:
after_thinking = text[thinking_match.end():]
# Chercher un objet JSON
json_match = re.search(r'\{[\s\S]*\}', after_thinking)
if json_match:
workflow_json = json_match.group(0).strip()
return thinking, workflow_json
def validate_workflow(workflow_json: str) -> Dict[str, Any]:
"""Valide un workflow n8n"""
result = {
"valid_json": False,
"has_nodes": False,
"has_connections": False,
"nodes_valid": False,
"node_count": 0,
"connection_count": 0,
"invalid_nodes": [],
}
# Test JSON valide
try:
wf = json.loads(workflow_json)
result["valid_json"] = True
except json.JSONDecodeError as e:
result["error"] = str(e)
return result
# Test nodes présents
nodes = wf.get("nodes", [])
result["has_nodes"] = len(nodes) > 0
result["node_count"] = len(nodes)
# Test connexions présentes
connections = wf.get("connections", {})
result["has_connections"] = len(connections) > 0
result["connection_count"] = sum(len(v) for v in connections.values())
# Test types de nodes valides
invalid_nodes = []
for node in nodes:
node_type = node.get("type", "")
if node_type and node_type not in VALID_NODE_TYPES:
# Accepter les types qui ressemblent à des nodes n8n
if not (node_type.startswith("n8n-nodes-base.") or
node_type.startswith("@n8n/")):
invalid_nodes.append(node_type)
result["invalid_nodes"] = invalid_nodes
result["nodes_valid"] = len(invalid_nodes) == 0
return result
def validate_thinking(thinking: str) -> Dict[str, bool]:
"""Valide la qualité du thinking"""
result = {
"has_thinking": len(thinking) > 50, # Au moins 50 caractères
"thinking_structured": False,
}
# Vérifier si le thinking est structuré (contient des points numérotés ou tirets)
if thinking:
has_structure = (
re.search(r'\d+\.', thinking) is not None or # Points numérotés
re.search(r'^-\s', thinking, re.MULTILINE) is not None or # Tirets
re.search(r'^\*\s', thinking, re.MULTILINE) is not None or # Étoiles
"étape" in thinking.lower() or
"step" in thinking.lower()
)
result["thinking_structured"] = has_structure
return result
def evaluate_example(
model_output: str,
task_type: str,
) -> EvalResult:
"""Évalue un exemple généré par le modèle"""
# Extraire thinking et JSON
thinking, workflow_json = extract_workflow_json(model_output)
# Valider le workflow
wf_validation = validate_workflow(workflow_json)
# Valider le thinking
thinking_validation = validate_thinking(thinking)
return EvalResult(
task_type=task_type,
valid_json=wf_validation["valid_json"],
has_nodes=wf_validation["has_nodes"],
has_connections=wf_validation["has_connections"],
nodes_valid=wf_validation["nodes_valid"],
has_thinking=thinking_validation["has_thinking"],
thinking_structured=thinking_validation["thinking_structured"],
error=wf_validation.get("error", ""),
)
# ============================================================================
# ÉVALUATION
# ============================================================================
def run_evaluation(
model_path: str,
dataset_repo: str = "stmasson/n8n-agentic-multitask",
data_file: str = "data/multitask_large/val.jsonl",
num_samples: int = 100,
output_file: str = "eval_results.json",
):
"""Lance l'évaluation complète du modèle"""
print("=" * 60)
print("ÉVALUATION DU MODÈLE N8N EXPERT")
print("=" * 60)
# Auth
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
login(token=hf_token)
# Charger le modèle
print(f"\nChargement du modèle: {model_path}")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True,
)
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device_map="auto",
)
# Charger le dataset
print(f"\nChargement du dataset: {dataset_repo}")
dataset = load_dataset(
dataset_repo,
data_files={"validation": data_file},
split="validation"
)
# Échantillonner
if num_samples < len(dataset):
dataset = dataset.shuffle(seed=42).select(range(num_samples))
print(f"Évaluation sur {len(dataset)} exemples")
# Évaluer
results = []
task_counts = {}
for example in tqdm(dataset, desc="Évaluation"):
messages = example["messages"]
# Déterminer le type de tâche
system_msg = messages[0]["content"] if messages else ""
if "génère" in system_msg.lower() or "generate" in system_msg.lower():
task_type = "generate"
elif "édite" in system_msg.lower() or "edit" in system_msg.lower():
task_type = "edit"
elif "corrige" in system_msg.lower() or "fix" in system_msg.lower():
task_type = "fix"
elif "améliore" in system_msg.lower() or "improve" in system_msg.lower():
task_type = "improve"
elif "explique" in system_msg.lower() or "explain" in system_msg.lower():
task_type = "explain"
elif "débogue" in system_msg.lower() or "debug" in system_msg.lower():
task_type = "debug"
else:
task_type = "unknown"
task_counts[task_type] = task_counts.get(task_type, 0) + 1
# Construire le prompt
prompt = tokenizer.apply_chat_template(
messages[:-1], # Exclure la réponse attendue
tokenize=False,
add_generation_prompt=True,
)
# Générer
try:
output = pipe(
prompt,
max_new_tokens=4096,
do_sample=False,
temperature=None,
top_p=None,
return_full_text=False,
)
generated = output[0]["generated_text"]
except Exception as e:
generated = f"ERROR: {str(e)}"
# Évaluer
eval_result = evaluate_example(generated, task_type)
results.append(eval_result)
# Calculer les statistiques
print("\n" + "=" * 60)
print("RÉSULTATS")
print("=" * 60)
total = len(results)
# Métriques globales
metrics = {
"valid_json": sum(r.valid_json for r in results) / total,
"has_nodes": sum(r.has_nodes for r in results) / total,
"has_connections": sum(r.has_connections for r in results) / total,
"nodes_valid": sum(r.nodes_valid for r in results) / total,
"has_thinking": sum(r.has_thinking for r in results) / total,
"thinking_structured": sum(r.thinking_structured for r in results) / total,
"overall_score": sum(r.score for r in results) / total,
}
print("\nMétriques globales:")
for metric, value in metrics.items():
print(f" {metric}: {value:.1%}")
# Métriques par tâche
print("\nMétriques par tâche:")
for task_type in sorted(task_counts.keys()):
task_results = [r for r in results if r.task_type == task_type]
if task_results:
task_score = sum(r.score for r in task_results) / len(task_results)
task_json = sum(r.valid_json for r in task_results) / len(task_results)
print(f" {task_type}: score={task_score:.1%}, json={task_json:.1%} (n={len(task_results)})")
# Sauvegarder les résultats
output = {
"model": model_path,
"num_samples": total,
"metrics": metrics,
"by_task": {
task: {
"count": len([r for r in results if r.task_type == task]),
"score": sum(r.score for r in results if r.task_type == task) /
max(1, len([r for r in results if r.task_type == task])),
}
for task in task_counts.keys()
},
}
with open(output_file, "w") as f:
json.dump(output, f, indent=2)
print(f"\nRésultats sauvegardés dans: {output_file}")
return metrics
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Évaluation du modèle n8n Expert")
parser.add_argument("--model", type=str, required=True, help="Chemin du modèle à évaluer")
parser.add_argument("--samples", type=int, default=100, help="Nombre d'exemples à évaluer")
parser.add_argument("--output", type=str, default="eval_results.json", help="Fichier de sortie")
args = parser.parse_args()
run_evaluation(
model_path=args.model,
num_samples=args.samples,
output_file=args.output,
)