import pandas as pd from statsmodels.tsa.tsatools import freq_to_period from sktime.forecasting.base import ForecastingHorizon from sktime.forecasting.model_selection import SlidingWindowSplitter, SingleWindowSplitter from sktime.forecasting.model_selection import temporal_train_test_split from typing import List import logging from .utils import split_x_y, k_folds from .models import AllModels class Forecaster(): def __init__( self ) -> None: logging.debug('Forecaster init') def fit( self, data: pd.DataFrame, exog: pd.DataFrame = None, n_predict: int = 1, window_length: int = None, target_col: str = None ) -> None: ''' data: pandas DataFrame, required Data must contains datetime index and y column. Any additional column will be considered as exogenous data and been used for multi variate forecasting window_length: int, optional if not given, window_length will be inferred from the seasonality period, at most 20, at least 4 recommend to use the amount of auto correlation(AR) as window length exog: pandas DataFrame, optionsal Exogenous data, must contains datetime columns, must NOT contain y column, the datetime index must covers full range of training and forecasting data length == (training + n_predict) ''' print('[Forecaster - fit] ----- START -----') self.freq: str = pd.infer_freq(data.index) self.period = freq_to_period(self.freq) self.window_length = window_length self.target_col = target_col # Handle forecast window, this wil be used to build models # Forecast window will be always smaller than 20, or 1 seasonality period if window_length is None: self.window_length = min(20, self.period, int(len(data)*0.2)) # Forecast window will not be smaller than 4 self.window_length = max(4, self.window_length) print(f'Inferred window_length: {self.window_length}') self.data = data.copy() if data.index.freq is None: self.data.index.freq = self.freq # self.data.index = pd.to_timedelta(self.data.index, unit="MS") # if there is no columns other than 'y', self.X and self.X_future will be None ( self.fh, self.y, self.X, self.X_future) = split_x_y( self.data, window_length, n_predict, self.freq) if exog is not None: print('[Forecaster - fit] - exogenous data provided') try: self.exog = exog.loc[self.X.index] self.exog_future = exog.loc[self.X_future.index] except Exception: raise ValueError( 'Exogenous value not fit, exogenous data must contains a datetime index and covers the entire train and forecast time range.') print('[Forecaster - fit] - merge exogenous data with features') self.X = pd.concat([self.exog, self.X], axis=1) self.X_future = pd.concat( [self.exog_future, self.X_future], axis=1) # ---------------- # # Train Test Split # # ---------------- # print('[Forecaster - fit] Train test split') test_size = len(self.fh) print('[Forecaster - fit] Test size: ', test_size) if self.X is not None: ( self.y_train, self.y_test, self.X_train, self.X_test ) = temporal_train_test_split( self.y, self.X, test_size=test_size) else: ( self.y_train, self.y_test, ) = temporal_train_test_split( self.y, test_size=test_size) self.fh_test = ForecastingHorizon(self.y_test.index, is_relative=False) # ---------------- # # Cross Validation # # ---------------- # print( f'[Forecaster - fit] Single window splitter, with window_length {len(self.y) + test_size} and fh {test_size}') self.cv = SingleWindowSplitter( window_length=len(self.y) + test_size, fh=test_size ) # ----- END [Train Test Split] ----- # # Originally wanted to create my own k-fold validation, but realised this doesn't work well with sktime API # self.k_folds = k_folds( # data, # self.period, # window_length, # n_predict, # self.freq) print('[Forecaster - fit] ----- END -----') def forecast( self, models: str or List[str] = 'all', test: bool = False, ): # ----------- # # Init Models # # ----------- # logging.debug('Init models') all_models = AllModels() self.models = all_models.init_models(models) results = [] for m in self.models: model_name = m['name'] m['model'].fit( self.y_train if test else self.y, self.cv, self.window_length, self.X_train if test else self.X) results.append({ 'model': model_name, 'results': m['model'].forecast( self.fh_test if test else self.fh, self.X_test if test else self.X_future ) }) self.results = results return results def forecast__old( self, data: pd.DataFrame, n_predict: int, models: str or List[str] = 'all', window_length: int = None ) -> None: ''' data: pandas DataFrame, required Data must contains datetime index and y column. Any additional column will be considered as exogenous data and been used for multi variate forecasting window_length: int, optional if not given, window_length will be inferred from the seasonality period, at most 20, at least 4 recommend to use the amount of auto correlation(AR) as window length ''' self.data = data logging.debug('Fitting data') datetime_index = data.index y = data[[self.target_col]].reset_index(drop=True) freq: str = pd.infer_freq(datetime_index) period = freq_to_period(freq) self.window_length = window_length # Handle forecast window, this wil be used to build models # Forecast window will be always smaller than 20, or 1 seasonality period if window_length is None: self.window_length = min(20, period, int(len(data)*0.2)) # Forecast window will not be smaller than 4 self.window_length = max(4, self.window_length) # ----------------------- # # Handling exogenous data # # ----------------------- # exog, exog_columns, exog_train, exog_pred = None, None, None, None if len(data.columns) > 1: logging.debug('Exogenous found') exog = data.drop(columns=self.target_col).reset_index(drop=True) exog_columns = exog.columns exog_train = exog.copy() # Build lags of the exog data logging.debug('Building lags of exog data') for n in range(1, self.window_length+1): shifted_columns = {} for col in exog_columns: shifted_columns[col] = f'{col}_-{n}' shifted = exog.shift(n).rename(columns=shifted_columns) exog_train = pd.concat( [exog_train, shifted], axis=1) logging.debug('Backward fill lags of exog data') exog_train = exog_train.bfill() # Split last n_predict rows from exog_train as exog_pred exog_pred = exog_train[-n_predict:] exog_train = exog_train[:-n_predict] # For both y and datetime index, need to cut off n_predict value to keep data consistent logging.debug('Cutting off y and datetime index be n_predict') y = y[n_predict:] datetime_index = datetime_index[n_predict:] # ----------- # # Init Models # # ----------- # logging.debug('Init models') all_models = AllModels() self.models = all_models.init_models(models) # Handle forecasting horizon fh = ForecastingHorizon( list(range(1, n_predict+1)), is_relative=True, freq=freq) # Cutoff is the last datetime value in the given data # meaning we'll forecast right after this point of time cutoff = datetime_index[-1] fh = fh.to_absolute(cutoff=cutoff) results = [] # ----------------------- # # Fitting and Forecasting # # ----------------------- # for m in self.models: m['model'].fit( y, datetime_index, exog=exog_train, window_length=self.window_length) model_name = m['name'] results.append({ 'model': model_name, 'forecast': m['model'].forecast(fh, exog=exog_pred) }) self.results = results # For testing self.exog_train = exog_train self.exog_pred = exog_pred