Spaces:
Runtime error
Runtime error
import pandas as pd | |
from statsmodels.tsa.tsatools import freq_to_period | |
from sktime.forecasting.base import ForecastingHorizon | |
from sktime.forecasting.model_selection import SlidingWindowSplitter, SingleWindowSplitter | |
from sktime.forecasting.model_selection import temporal_train_test_split | |
from typing import List | |
import logging | |
from .utils import split_x_y, k_folds | |
from .models import AllModels | |
class Forecaster(): | |
def __init__( | |
self | |
) -> None: | |
logging.debug('Forecaster init') | |
def fit( | |
self, | |
data: pd.DataFrame, | |
exog: pd.DataFrame = None, | |
n_predict: int = 1, | |
window_length: int = None, | |
target_col: str = None | |
) -> None: | |
''' | |
data: pandas DataFrame, required | |
Data must contains datetime index and y column. Any additional column will be considered | |
as exogenous data and been used for multi variate forecasting | |
window_length: int, optional | |
if not given, window_length will be inferred from the seasonality period, at most 20, at least 4 | |
recommend to use the amount of auto correlation(AR) as window length | |
exog: pandas DataFrame, optionsal | |
Exogenous data, must contains datetime columns, must NOT contain y column, the datetime index must | |
covers full range of training and forecasting data length == (training + n_predict) | |
''' | |
print('[Forecaster - fit] ----- START -----') | |
self.freq: str = pd.infer_freq(data.index) | |
self.period = freq_to_period(self.freq) | |
self.window_length = window_length | |
self.target_col = target_col | |
# Handle forecast window, this wil be used to build models | |
# Forecast window will be always smaller than 20, or 1 seasonality period | |
if window_length is None: | |
self.window_length = min(20, self.period, int(len(data)*0.2)) | |
# Forecast window will not be smaller than 4 | |
self.window_length = max(4, self.window_length) | |
print(f'Inferred window_length: {self.window_length}') | |
self.data = data.copy() | |
if data.index.freq is None: | |
self.data.index.freq = self.freq | |
# self.data.index = pd.to_timedelta(self.data.index, unit="MS") | |
# if there is no columns other than 'y', self.X and self.X_future will be None | |
( | |
self.fh, | |
self.y, | |
self.X, | |
self.X_future) = split_x_y( | |
self.data, | |
window_length, | |
n_predict, | |
self.freq) | |
if exog is not None: | |
print('[Forecaster - fit] - exogenous data provided') | |
try: | |
self.exog = exog.loc[self.X.index] | |
self.exog_future = exog.loc[self.X_future.index] | |
except Exception: | |
raise ValueError( | |
'Exogenous value not fit, exogenous data must contains a datetime index and covers the entire train and forecast time range.') | |
print('[Forecaster - fit] - merge exogenous data with features') | |
self.X = pd.concat([self.exog, self.X], axis=1) | |
self.X_future = pd.concat( | |
[self.exog_future, self.X_future], axis=1) | |
# ---------------- # | |
# Train Test Split # | |
# ---------------- # | |
print('[Forecaster - fit] Train test split') | |
test_size = len(self.fh) | |
print('[Forecaster - fit] Test size: ', test_size) | |
if self.X is not None: | |
( | |
self.y_train, | |
self.y_test, | |
self.X_train, | |
self.X_test | |
) = temporal_train_test_split( | |
self.y, | |
self.X, | |
test_size=test_size) | |
else: | |
( | |
self.y_train, | |
self.y_test, | |
) = temporal_train_test_split( | |
self.y, | |
test_size=test_size) | |
self.fh_test = ForecastingHorizon(self.y_test.index, is_relative=False) | |
# ---------------- # | |
# Cross Validation # | |
# ---------------- # | |
print( | |
f'[Forecaster - fit] Single window splitter, with window_length {len(self.y) + test_size} and fh {test_size}') | |
self.cv = SingleWindowSplitter( | |
window_length=len(self.y) + test_size, | |
fh=test_size | |
) | |
# ----- END [Train Test Split] ----- # | |
# Originally wanted to create my own k-fold validation, but realised this doesn't work well with sktime API | |
# self.k_folds = k_folds( | |
# data, | |
# self.period, | |
# window_length, | |
# n_predict, | |
# self.freq) | |
print('[Forecaster - fit] ----- END -----') | |
def forecast( | |
self, | |
models: str or List[str] = 'all', | |
test: bool = False, | |
): | |
# ----------- # | |
# Init Models # | |
# ----------- # | |
logging.debug('Init models') | |
all_models = AllModels() | |
self.models = all_models.init_models(models) | |
results = [] | |
for m in self.models: | |
model_name = m['name'] | |
m['model'].fit( | |
self.y_train if test else self.y, | |
self.cv, | |
self.window_length, | |
self.X_train if test else self.X) | |
results.append({ | |
'model': model_name, | |
'results': m['model'].forecast( | |
self.fh_test if test else self.fh, | |
self.X_test if test else self.X_future | |
) | |
}) | |
self.results = results | |
return results | |
def forecast__old( | |
self, | |
data: pd.DataFrame, | |
n_predict: int, | |
models: str or List[str] = 'all', | |
window_length: int = None | |
) -> None: | |
''' | |
data: pandas DataFrame, required | |
Data must contains datetime index and y column. Any additional column will be considered | |
as exogenous data and been used for multi variate forecasting | |
window_length: int, optional | |
if not given, window_length will be inferred from the seasonality period, at most 20, at least 4 | |
recommend to use the amount of auto correlation(AR) as window length | |
''' | |
self.data = data | |
logging.debug('Fitting data') | |
datetime_index = data.index | |
y = data[[self.target_col]].reset_index(drop=True) | |
freq: str = pd.infer_freq(datetime_index) | |
period = freq_to_period(freq) | |
self.window_length = window_length | |
# Handle forecast window, this wil be used to build models | |
# Forecast window will be always smaller than 20, or 1 seasonality period | |
if window_length is None: | |
self.window_length = min(20, period, int(len(data)*0.2)) | |
# Forecast window will not be smaller than 4 | |
self.window_length = max(4, self.window_length) | |
# ----------------------- # | |
# Handling exogenous data # | |
# ----------------------- # | |
exog, exog_columns, exog_train, exog_pred = None, None, None, None | |
if len(data.columns) > 1: | |
logging.debug('Exogenous found') | |
exog = data.drop(columns=self.target_col).reset_index(drop=True) | |
exog_columns = exog.columns | |
exog_train = exog.copy() | |
# Build lags of the exog data | |
logging.debug('Building lags of exog data') | |
for n in range(1, self.window_length+1): | |
shifted_columns = {} | |
for col in exog_columns: | |
shifted_columns[col] = f'{col}_-{n}' | |
shifted = exog.shift(n).rename(columns=shifted_columns) | |
exog_train = pd.concat( | |
[exog_train, shifted], | |
axis=1) | |
logging.debug('Backward fill lags of exog data') | |
exog_train = exog_train.bfill() | |
# Split last n_predict rows from exog_train as exog_pred | |
exog_pred = exog_train[-n_predict:] | |
exog_train = exog_train[:-n_predict] | |
# For both y and datetime index, need to cut off n_predict value to keep data consistent | |
logging.debug('Cutting off y and datetime index be n_predict') | |
y = y[n_predict:] | |
datetime_index = datetime_index[n_predict:] | |
# ----------- # | |
# Init Models # | |
# ----------- # | |
logging.debug('Init models') | |
all_models = AllModels() | |
self.models = all_models.init_models(models) | |
# Handle forecasting horizon | |
fh = ForecastingHorizon( | |
list(range(1, n_predict+1)), is_relative=True, freq=freq) | |
# Cutoff is the last datetime value in the given data | |
# meaning we'll forecast right after this point of time | |
cutoff = datetime_index[-1] | |
fh = fh.to_absolute(cutoff=cutoff) | |
results = [] | |
# ----------------------- # | |
# Fitting and Forecasting # | |
# ----------------------- # | |
for m in self.models: | |
m['model'].fit( | |
y, | |
datetime_index, | |
exog=exog_train, | |
window_length=self.window_length) | |
model_name = m['name'] | |
results.append({ | |
'model': model_name, | |
'forecast': m['model'].forecast(fh, exog=exog_pred) | |
}) | |
self.results = results | |
# For testing | |
self.exog_train = exog_train | |
self.exog_pred = exog_pred | |