Spaces:
Sleeping
Sleeping
| # evaluate.py | |
| import os | |
| import json | |
| import torch | |
| import logging | |
| import numpy as np | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from sklearn.metrics import mean_squared_error, r2_score | |
| from transformer_model.scripts.config_transformer import BASE_DIR, RESULTS_DIR, CHECKPOINT_DIR, DATA_PATH, FORECAST_HORIZON, SEQ_LEN | |
| from transformer_model.scripts.training.load_basis_model import load_moment_model | |
| from transformer_model.scripts.utils.informer_dataset_class import InformerDataset | |
| from momentfm.utils.utils import control_randomness | |
| from transformer_model.scripts.utils.check_device import check_device | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| def evaluate(): | |
| control_randomness(seed=13) | |
| # Set device | |
| device, backend, scaler = check_device() | |
| logging.info(f"Evaluation is running on: {backend} ({device})") | |
| # Load final model | |
| model = load_moment_model() | |
| checkpoint_path = os.path.join(CHECKPOINT_DIR, "model_final.pth") | |
| model.load_state_dict(torch.load(checkpoint_path, map_location=device)) | |
| model.to(device) | |
| model.eval() | |
| logging.info(f"Loaded final model from: {checkpoint_path}") | |
| # Recreate training dataset to get the fitted scaler | |
| train_dataset = InformerDataset( | |
| data_split="train", | |
| random_seed=13, | |
| forecast_horizon=FORECAST_HORIZON | |
| ) | |
| # Use its scaler in the test dataset | |
| test_dataset = InformerDataset( | |
| data_split="test", | |
| random_seed=13, | |
| forecast_horizon=FORECAST_HORIZON | |
| ) | |
| test_dataset.scaler = train_dataset.scaler | |
| test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False) | |
| trues, preds = [], [] | |
| with torch.no_grad(): | |
| for timeseries, forecast, input_mask in tqdm(test_loader, desc="Evaluating on test set"): | |
| timeseries = timeseries.float().to(device) | |
| forecast = forecast.float().to(device) | |
| input_mask = input_mask.to(device) # <- wichtig! | |
| output = model(x_enc=timeseries, input_mask=input_mask) | |
| trues.append(forecast.cpu().numpy()) | |
| preds.append(output.forecast.cpu().numpy()) | |
| trues = np.concatenate(trues, axis=0) | |
| preds = np.concatenate(preds, axis=0) | |
| # Extract only first feature (consumption) | |
| true_values = trues[:, 0, :] | |
| pred_values = preds[:, 0, :] | |
| # Inverse normalization | |
| n_features = test_dataset.n_channels | |
| true_reshaped = np.column_stack([true_values.flatten()] + [np.zeros_like(true_values.flatten())] * (n_features - 1)) | |
| pred_reshaped = np.column_stack([pred_values.flatten()] + [np.zeros_like(pred_values.flatten())] * (n_features - 1)) | |
| true_original = test_dataset.scaler.inverse_transform(true_reshaped)[:, 0] | |
| pred_original = test_dataset.scaler.inverse_transform(pred_reshaped)[:, 0] | |
| # Build timestamp index, since date got cutted out in informerdataset we need original dataset and use the index of the beginning of testdata to get the date | |
| csv_path = os.path.join(DATA_PATH) | |
| df = pd.read_csv(csv_path, parse_dates=["date"]) | |
| train_len = len(train_dataset) | |
| test_start_idx = train_len + SEQ_LEN | |
| start_timestamp = df["date"].iloc[test_start_idx] | |
| logging.info(f"[DEBUG] timestamp: {start_timestamp}") | |
| timestamps = [start_timestamp + pd.Timedelta(hours=i) for i in range(len(true_original))] | |
| df = pd.DataFrame({ | |
| "Timestamp": timestamps, | |
| "True Consumption (MW)": true_original, | |
| "Predicted Consumption (MW)": pred_original | |
| }) | |
| # Save results to CSV | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| results_path = os.path.join(RESULTS_DIR, "test_results.csv") | |
| df.to_csv(results_path, index=False) | |
| logging.info(f"Saved prediction results to: {results_path}") | |
| # Evaluation metrics | |
| mse = mean_squared_error(df["True Consumption (MW)"], df["Predicted Consumption (MW)"]) | |
| rmse = np.sqrt(mse) | |
| mape = np.mean(np.abs((df["True Consumption (MW)"] - df["Predicted Consumption (MW)"]) / df["True Consumption (MW)"])) * 100 | |
| r2 = r2_score(df["True Consumption (MW)"], df["Predicted Consumption (MW)"]) | |
| # Save metrics to JSON | |
| metrics = {"RMSE": float(rmse), "MAPE": float(mape), "R2": float(r2)} | |
| metrics_path = os.path.join(RESULTS_DIR, "evaluation_metrics.json") | |
| with open(metrics_path, "w") as f: | |
| json.dump(metrics, f) | |
| logging.info(f"Saved evaluation metrics to: {metrics_path}") | |
| logging.info(f"RMSE: {rmse:.3f} | MAPE: {mape:.2f}% | R²: {r2:.3f}") | |
| if __name__ == "__main__": | |
| evaluate() | |