import pandas as pd import numpy as np import math import matplotlib.pyplot as plt import seaborn as sns from statsmodels.graphics.tsaplots import plot_pacf from statsmodels.tsa.seasonal import MSTL, seasonal_decompose from statsmodels.tsa.tsatools import freq_to_period import statsmodels.api as sm from scipy import stats from sklearn.preprocessing import MinMaxScaler class Ts_Analytics(): def __init__(self): self.log_transformed = False self.scaler = MinMaxScaler() pass def analyse( self, ts_df: pd.DataFrame, auto_correlations={}): ''' ts_df: timeseries dataframe, will assume the datetime column is the index and time encoded auto_correlations: dictionary to input customised auto correlations ''' self.ts_df = ts_df.copy() self.ts_df['datetime'] = pd.to_datetime(self.ts_df['datetime']) self.ts_df.set_index('datetime', inplace=True) self.ar = auto_correlations self.__infer_frequency() # Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52. # Using statsmodel's freq_to_period function self.period = freq_to_period(self.freq) pass def set_ar(self, col, ar): ''' Set the auto correlation ''' self.ar[col] = ar def set_period(self, period): self.period = period def create_target_lag_columns(self): print('create_target_lag_columns') def create_lag_dfs(col, n): print('create_lag_dfs', col, n) for i in range(n): self.ts_df[f'{col}_t-{i+1}'] = self.ts_df[col].shift(-(i+1)) for col, n in self.ar.items(): create_lag_dfs(col, n) print('drop all null values') self.ts_df.ffill(inplace=True) def log_transform(self): self.log_transformed = True self.ts_df = np.log2(self.ts_df) def exp_transform(self): self.log_transformed = False self.ts_df = np.exp(self.ts_df) def train_multiple_regression(self): print('train_multiple_regression') x_cols = self.ts_df.columns.tolist() x_cols.remove('y') _X = self.ts_df[x_cols] y = self.ts_df['y'] X = sm.add_constant(_X) # ----------------------------------------------------------------------- # # Train an additional model with standardized data, to get the Beta value # # ----------------------------------------------------------------------- # std_ts_df = pd.DataFrame(self.scaler.fit_transform( self.ts_df), columns=self.ts_df.columns) std_X = sm.add_constant(std_ts_df[x_cols]) std_y = std_ts_df['y'] self.multiple_regression = sm.OLS(y, X).fit() coef = self.multiple_regression.params self.multiple_regression_formula = f'{coef[0]} + {" + ".join([f"{c} * {round(n, 3)}" for c, n in zip(x_cols, coef[1:])]) }' self.std_multiple_regression = sm.OLS(std_y, std_X).fit() beta = self.std_multiple_regression.params self.multiple_regression_beta = pd.DataFrame( np.array(beta[1:]) ** 2, index=x_cols, columns=['Beta (influence on "y")']) self.multiple_regression_beta['Beta (influence on "y")'] = self.multiple_regression_beta['Beta (influence on "y")'].round( 3) return self.multiple_regression.summary() # ===== # # Plots # # ===== # def plot_correlation(self): # Generate a mask for the upper triangle corr = self.ts_df.corr(numeric_only=True) mask = np.triu(np.ones_like(corr, dtype=bool)) fig, ax = plt.subplots(figsize=(8, 8)) sns.heatmap( corr, mask=mask, square=True, annot=True, cmap='coolwarm', linewidths=.5, cbar_kws={"shrink": .5}, ax=ax) return fig def plot_target_pacf(self): fig, ax = plt.subplots(figsize=(12, 4)) plot_pacf(self.ts_df['y'], ax=ax) fig.tight_layout() return fig def plot_distributions(self): plot_col = min(math.ceil(math.sqrt(self.ts_df.shape[1])), 5) plot_row = math.ceil(self.ts_df.shape[1] / plot_col) fig, axs = plt.subplots(plot_row, plot_col) for idx, col in enumerate(self.ts_df.columns): axs_x = math.floor(idx/plot_col) axs_y = idx - axs_x * plot_col # sns.distplot(self.ts_df[col], ax=axs[axs_x, axs_y]) sns.histplot(self.ts_df[col], ax=axs[axs_x, axs_y], kde=True) fig.tight_layout() return fig def plot_target_seasonality(self): if isinstance(self.period, list): seasonal = MSTL( self.ts_df['y'], periods=self.period).fit() else: seasonal = seasonal_decompose(self.ts_df['y'], period=self.period) return seasonal def plot_beta(self): fig, ax = plt.subplots(figsize=(6, 4)) beta_plot = sns.barplot( self.multiple_regression_beta['Beta (influence on "y")'], gap=2, ax=ax) beta_plot.set_xticklabels(beta_plot.get_xticklabels(), rotation=45) ax.bar_label(ax.containers[-1], fmt='%.2f', label_type='center') return fig def __infer_frequency(self): # Attempt to get the frequency from the provided datetime column freq = pd.infer_freq(self.ts_df.index) if freq is not None: self.freq = freq # Always make sure the frequency is not None if self.freq is None: raise ValueError( 'Unable inference freq from datetime column, please make timeseries interval consistent or provide customized frequency.')