geoai / app.py
mset's picture
Update app.py
31f371a verified
raw
history blame
15.6 kB
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import requests
import re
import json
import os
from collections import Counter
from typing import List, Tuple, Dict
import random
import math
from datasets import load_dataset
from transformers import AutoTokenizer
import gradio as gr
class SelfOrganizingTokenizer:
def __init__(self, vocab_size=30000):
self.vocab_size = vocab_size
self.token_to_id = {'<PAD>': 0, '<UNK>': 1, '<BOS>': 2, '<EOS>': 3}
self.id_to_token = {0: '<PAD>', 1: '<UNK>', 2: '<BOS>', 3: '<EOS>'}
self.word_freq = Counter()
def build_vocab(self, texts):
for text in texts:
words = re.findall(r'\w+|[^\w\s]', text.lower())
self.word_freq.update(words)
most_common = self.word_freq.most_common(self.vocab_size - 4)
for i, (word, _) in enumerate(most_common):
idx = i + 4
self.token_to_id[word] = idx
self.id_to_token[idx] = word
def encode(self, text):
words = re.findall(r'\w+|[^\w\s]', text.lower())
return [self.token_to_id.get(word, 1) for word in words]
def decode(self, ids):
return ' '.join([self.id_to_token.get(id, '<UNK>') for id in ids])
class SelfOrganizingAttention(nn.Module):
def __init__(self, embed_dim, num_heads):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.qkv = nn.Linear(embed_dim, embed_dim * 3)
self.proj = nn.Linear(embed_dim, embed_dim)
self.adaptation_layer = nn.Linear(embed_dim, embed_dim)
def forward(self, x):
B, T, C = x.shape
qkv = self.qkv(x).reshape(B, T, 3, self.num_heads, self.head_dim)
q, k, v = qkv.permute(2, 0, 3, 1, 4)
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
att = torch.softmax(att, dim=-1)
y = att @ v
y = y.transpose(1, 2).reshape(B, T, C)
y = self.proj(y)
# Auto-organizzazione
adaptation = torch.tanh(self.adaptation_layer(x))
y = y * (1 + 0.1 * adaptation)
return y
class SelfOrganizingTransformer(nn.Module):
def __init__(self, vocab_size, embed_dim=512, num_heads=8, num_layers=6, max_len=1024):
super().__init__()
self.embed_dim = embed_dim
self.tok_embed = nn.Embedding(vocab_size, embed_dim)
self.pos_embed = nn.Embedding(max_len, embed_dim)
self.layers = nn.ModuleList([
nn.ModuleDict({
'attn': SelfOrganizingAttention(embed_dim, num_heads),
'norm1': nn.LayerNorm(embed_dim),
'mlp': nn.Sequential(
nn.Linear(embed_dim, 4 * embed_dim),
nn.GELU(),
nn.Linear(4 * embed_dim, embed_dim),
),
'norm2': nn.LayerNorm(embed_dim),
'adaptation': nn.Linear(embed_dim, embed_dim)
}) for _ in range(num_layers)
])
self.ln_f = nn.LayerNorm(embed_dim)
self.head = nn.Linear(embed_dim, vocab_size)
# Parametri per auto-organizzazione
self.plasticity = nn.Parameter(torch.ones(num_layers) * 0.01)
def forward(self, x):
B, T = x.shape
pos = torch.arange(0, T, dtype=torch.long, device=x.device)
x = self.tok_embed(x) + self.pos_embed(pos)
for i, layer in enumerate(self.layers):
residual = x
x = layer['norm1'](x)
x = layer['attn'](x)
# Auto-organizzazione adattiva
adaptation = torch.tanh(layer['adaptation'](x))
x = residual + x * (1 + self.plasticity[i] * adaptation)
residual = x
x = layer['norm2'](x)
x = layer['mlp'](x)
x = residual + x
x = self.ln_f(x)
logits = self.head(x)
return logits
class TextDataset(Dataset):
def __init__(self, texts, tokenizer, max_len=512):
self.texts = texts
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
tokens = self.tokenizer.encode(text)
if len(tokens) < self.max_len:
tokens = tokens + [0] * (self.max_len - len(tokens))
else:
tokens = tokens[:self.max_len]
return torch.tensor(tokens[:-1]), torch.tensor(tokens[1:])
class AITrainer:
def __init__(self):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.tokenizer = None
self.model = None
self.datasets = []
def load_public_datasets(self):
"""Carica dataset pubblici senza API key"""
datasets = []
try:
# Wikipedia in italiano
wiki = load_dataset("wikipedia", "20220301.it", split="train[:10000]")
for item in wiki:
if len(item['text']) > 100:
datasets.append(item['text'])
except:
pass
try:
# Common Crawl
cc = load_dataset("cc100", lang="it", split="train[:5000]")
for item in cc:
if len(item['text']) > 100:
datasets.append(item['text'])
except:
pass
try:
# OSCAR
oscar = load_dataset("oscar-corpus/OSCAR-2201", "it", split="train[:5000]")
for item in oscar:
if len(item['text']) > 100:
datasets.append(item['text'])
except:
pass
# Dataset di testo semplice da URL pubblici
urls = [
"https://www.gutenberg.org/files/2000/2000-0.txt", # Divina Commedia
"https://www.gutenberg.org/files/1065/1065-0.txt" # I Promessi Sposi
]
for url in urls:
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
text = response.text
chunks = [text[i:i+2000] for i in range(0, len(text), 2000)]
datasets.extend(chunks[:500])
except:
continue
# Genera dati sintetici se necessario
if len(datasets) < 1000:
synthetic_texts = self.generate_synthetic_data(5000)
datasets.extend(synthetic_texts)
self.datasets = datasets[:10000] # Limita a 10k esempi
print(f"Caricati {len(self.datasets)} esempi di training")
def generate_synthetic_data(self, num_samples):
"""Genera dati sintetici per il training"""
templates = [
"Il {sostantivo} {verbo} nel {luogo} durante {tempo}.",
"La {sostantivo} è molto {aggettivo} e {verbo} sempre.",
"Quando {verbo}, il {sostantivo} diventa {aggettivo}.",
"Nel {luogo}, la {sostantivo} {verbo} con {sostantivo}.",
"Il {aggettivo} {sostantivo} {verbo} ogni {tempo}."
]
sostantivi = ["gatto", "cane", "casa", "albero", "fiume", "montagna", "libro", "sole"]
verbi = ["corre", "salta", "vola", "nuota", "dorme", "mangia", "gioca", "legge"]
aggettivi = ["bello", "grande", "piccolo", "veloce", "lento", "intelligente", "forte"]
luoghi = ["parco", "giardino", "bosco", "città", "mare", "cielo", "campo"]
tempi = ["giorno", "notte", "mattina", "sera", "inverno", "estate", "primavera"]
texts = []
for _ in range(num_samples):
template = random.choice(templates)
text = template.format(
sostantivo=random.choice(sostantivi),
verbo=random.choice(verbi),
aggettivo=random.choice(aggettivi),
luogo=random.choice(luoghi),
tempo=random.choice(tempi)
)
texts.append(text)
return texts
def setup_model(self, vocab_size=30000):
"""Configura il modello transformer auto-organizzante"""
self.model = SelfOrganizingTransformer(
vocab_size=vocab_size,
embed_dim=512,
num_heads=8,
num_layers=6,
max_len=512
).to(self.device)
# Calcola parametri
total_params = sum(p.numel() for p in self.model.parameters())
print(f"Modello creato con {total_params:,} parametri")
def train(self, epochs=5, batch_size=16, lr=3e-4):
"""Training del modello"""
print("Inizializzazione tokenizer...")
self.tokenizer = SelfOrganizingTokenizer()
self.tokenizer.build_vocab(self.datasets)
print("Configurazione modello...")
self.setup_model(len(self.tokenizer.token_to_id))
print("Preparazione dataset...")
dataset = TextDataset(self.datasets, self.tokenizer)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
optimizer = optim.AdamW(self.model.parameters(), lr=lr, weight_decay=0.01)
criterion = nn.CrossEntropyLoss(ignore_index=0)
print("Inizio training...")
self.model.train()
for epoch in range(epochs):
total_loss = 0
num_batches = 0
for batch_idx, (input_ids, target_ids) in enumerate(dataloader):
input_ids = input_ids.to(self.device)
target_ids = target_ids.to(self.device)
optimizer.zero_grad()
logits = self.model(input_ids)
loss = criterion(logits.reshape(-1, logits.size(-1)), target_ids.reshape(-1))
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
num_batches += 1
if batch_idx % 50 == 0:
print(f"Epoch {epoch+1}/{epochs}, Batch {batch_idx}, Loss: {loss.item():.4f}")
avg_loss = total_loss / num_batches
print(f"Epoch {epoch+1}/{epochs} completata. Loss media: {avg_loss:.4f}")
# Test generazione
if epoch % 2 == 0:
self.test_generation("Il gatto")
print("Training completato!")
self.save_model()
def test_generation(self, prompt, max_length=50):
"""Test di generazione testo"""
self.model.eval()
with torch.no_grad():
tokens = self.tokenizer.encode(prompt)
input_ids = torch.tensor([tokens]).to(self.device)
for _ in range(max_length):
logits = self.model(input_ids)
next_token = torch.argmax(logits[0, -1, :], dim=-1)
input_ids = torch.cat([input_ids, next_token.unsqueeze(0).unsqueeze(0)], dim=1)
if next_token.item() == self.tokenizer.token_to_id.get('<EOS>', 3):
break
generated = self.tokenizer.decode(input_ids[0].cpu().numpy())
print(f"Generazione: {generated}")
self.model.train()
return generated
def save_model(self):
"""Salva il modello"""
torch.save({
'model_state_dict': self.model.state_dict(),
'tokenizer': self.tokenizer,
'vocab_size': len(self.tokenizer.token_to_id)
}, 'ai_model.pth')
print("Modello salvato in ai_model.pth")
def load_model(self):
"""Carica il modello"""
if os.path.exists('ai_model.pth'):
checkpoint = torch.load('ai_model.pth', map_location=self.device)
self.tokenizer = checkpoint['tokenizer']
self.setup_model(checkpoint['vocab_size'])
self.model.load_state_dict(checkpoint['model_state_dict'])
print("Modello caricato da ai_model.pth")
return True
return False
def generate_text(self, prompt, max_length=100, temperature=0.8):
"""Genera testo dal prompt"""
if not self.model or not self.tokenizer:
return "Modello non caricato. Esegui prima il training."
self.model.eval()
with torch.no_grad():
tokens = self.tokenizer.encode(prompt)
input_ids = torch.tensor([tokens]).to(self.device)
for _ in range(max_length):
logits = self.model(input_ids)
logits = logits[0, -1, :] / temperature
probs = torch.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, 1)
input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
if next_token.item() == self.tokenizer.token_to_id.get('<EOS>', 3):
break
generated = self.tokenizer.decode(input_ids[0].cpu().numpy())
return generated
def create_interface():
"""Crea interfaccia Gradio"""
trainer = AITrainer()
def start_training():
try:
trainer.load_public_datasets()
trainer.train(epochs=3)
return "Training completato con successo!"
except Exception as e:
return f"Errore durante il training: {str(e)}"
def generate(prompt, max_len, temp):
try:
if not trainer.load_model():
return "Modello non trovato. Esegui prima il training."
result = trainer.generate_text(prompt, max_len, temp)
return result
except Exception as e:
return f"Errore nella generazione: {str(e)}"
with gr.Blocks(title="AI Token Trainer") as demo:
gr.Markdown("# AI Training System - Predizione Token")
with gr.Tab("Training"):
train_btn = gr.Button("Avvia Training", variant="primary")
train_output = gr.Textbox(label="Stato Training", lines=5)
train_btn.click(start_training, outputs=train_output)
with gr.Tab("Generazione"):
prompt_input = gr.Textbox(label="Prompt", placeholder="Inserisci il testo di partenza...")
max_len_slider = gr.Slider(10, 200, value=50, label="Lunghezza massima")
temp_slider = gr.Slider(0.1, 2.0, value=0.8, label="Temperatura")
generate_btn = gr.Button("Genera Testo", variant="primary")
output_text = gr.Textbox(label="Testo Generato", lines=10)
generate_btn.click(
generate,
inputs=[prompt_input, max_len_slider, temp_slider],
outputs=output_text
)
return demo
if __name__ == "__main__":
# Training automatico se richiesto
if len(os.sys.argv) > 1 and os.sys.argv[1] == "train":
trainer = AITrainer()
trainer.load_public_datasets()
trainer.train()
else:
# Interfaccia Gradio
demo = create_interface()
demo.launch(share=True)