Spaces:
Runtime error
Runtime error
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import numpy as np | |
| from sklearn.preprocessing import MinMaxScaler | |
| class Autoencoder(nn.Module): | |
| def __init__(self, input_size): | |
| super(Autoencoder, self).__init__() | |
| self.encoder = nn.Sequential( | |
| nn.Linear(input_size, 256), | |
| nn.ReLU(), | |
| nn.Linear(256, 128), | |
| nn.ReLU(), | |
| nn.Linear(128, 64), | |
| nn.ReLU(), | |
| nn.Linear(64, 32) | |
| ) | |
| self.decoder = nn.Sequential( | |
| nn.Linear(32, 64), | |
| nn.ReLU(), | |
| nn.Linear(64, 128), | |
| nn.ReLU(), | |
| nn.Linear(128, 256), | |
| nn.ReLU(), | |
| nn.Linear(256, input_size) | |
| ) | |
| def forward(self, x): | |
| batch_size, seq_len, _ = x.size() | |
| x = x.view(batch_size * seq_len, -1) | |
| encoded = self.encoder(x) | |
| decoded = self.decoder(encoded) | |
| return decoded.view(batch_size, seq_len, -1) | |
| def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5): | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Normalize posture | |
| scaler_posture = MinMaxScaler() | |
| X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1)) | |
| # Process facial embeddings | |
| X_embeddings = torch.FloatTensor(X_embeddings).to(device) | |
| if X_embeddings.dim() == 2: | |
| X_embeddings = X_embeddings.unsqueeze(0) | |
| # Process posture | |
| X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device) | |
| if X_posture_scaled.dim() == 2: | |
| X_posture_scaled = X_posture_scaled.unsqueeze(0) | |
| model_embeddings = Autoencoder(input_size=X_embeddings.shape[2]).to(device) | |
| model_posture = Autoencoder(input_size=X_posture_scaled.shape[2]).to(device) | |
| criterion = nn.MSELoss() | |
| optimizer_embeddings = optim.Adam(model_embeddings.parameters()) | |
| optimizer_posture = optim.Adam(model_posture.parameters()) | |
| # Train models | |
| for epoch in range(epochs): | |
| for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings), | |
| (model_posture, optimizer_posture, X_posture_scaled)]: | |
| model.train() | |
| optimizer.zero_grad() | |
| output = model(X) | |
| loss = criterion(output, X) | |
| loss.backward() | |
| optimizer.step() | |
| # Compute MSE for embeddings and posture | |
| model_embeddings.eval() | |
| model_posture.eval() | |
| with torch.no_grad(): | |
| reconstructed_embeddings = model_embeddings(X_embeddings).cpu().numpy() | |
| reconstructed_posture = model_posture(X_posture_scaled).cpu().numpy() | |
| mse_embeddings = np.mean(np.power(X_embeddings.cpu().numpy() - reconstructed_embeddings, 2), axis=2).squeeze() | |
| mse_posture = np.mean(np.power(X_posture_scaled.cpu().numpy() - reconstructed_posture, 2), axis=2).squeeze() | |
| return mse_embeddings, mse_posture | |
| def determine_anomalies(mse_values, threshold): | |
| mean = np.mean(mse_values) | |
| std = np.std(mse_values) | |
| anomalies = mse_values > (mean + threshold * std) | |
| return anomalies |