Spaces:
Runtime error
Runtime error
import pandas as pd | |
import numpy as np | |
import math | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from statsmodels.graphics.tsaplots import plot_pacf | |
from statsmodels.tsa.seasonal import MSTL, seasonal_decompose | |
from statsmodels.tsa.tsatools import freq_to_period | |
import statsmodels.api as sm | |
from scipy import stats | |
from sklearn.preprocessing import MinMaxScaler | |
class Ts_Analytics(): | |
def __init__(self): | |
self.log_transformed = False | |
self.scaler = MinMaxScaler() | |
pass | |
def analyse( | |
self, | |
ts_df: pd.DataFrame, | |
auto_correlations={}): | |
''' | |
ts_df: timeseries dataframe, will assume the datetime column is the index and time encoded | |
auto_correlations: dictionary to input customised auto correlations | |
''' | |
self.ts_df = ts_df.copy() | |
self.ts_df['datetime'] = pd.to_datetime(self.ts_df['datetime']) | |
self.ts_df.set_index('datetime', inplace=True) | |
self.ar = auto_correlations | |
self.__infer_frequency() | |
# Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52. | |
# Using statsmodel's freq_to_period function | |
self.period = freq_to_period(self.freq) | |
pass | |
def set_ar(self, col, ar): | |
''' | |
Set the auto correlation | |
''' | |
self.ar[col] = ar | |
def set_period(self, period): | |
self.period = period | |
def create_target_lag_columns(self): | |
print('create_target_lag_columns') | |
def create_lag_dfs(col, n): | |
print('create_lag_dfs', col, n) | |
for i in range(n): | |
self.ts_df[f'{col}_t-{i+1}'] = self.ts_df[col].shift(-(i+1)) | |
for col, n in self.ar.items(): | |
create_lag_dfs(col, n) | |
print('drop all null values') | |
self.ts_df.ffill(inplace=True) | |
def log_transform(self): | |
self.log_transformed = True | |
self.ts_df = np.log2(self.ts_df) | |
def exp_transform(self): | |
self.log_transformed = False | |
self.ts_df = np.exp(self.ts_df) | |
def train_multiple_regression(self): | |
print('train_multiple_regression') | |
x_cols = self.ts_df.columns.tolist() | |
x_cols.remove('y') | |
_X = self.ts_df[x_cols] | |
y = self.ts_df['y'] | |
X = sm.add_constant(_X) | |
# ----------------------------------------------------------------------- # | |
# Train an additional model with standardized data, to get the Beta value # | |
# ----------------------------------------------------------------------- # | |
std_ts_df = pd.DataFrame(self.scaler.fit_transform( | |
self.ts_df), columns=self.ts_df.columns) | |
std_X = sm.add_constant(std_ts_df[x_cols]) | |
std_y = std_ts_df['y'] | |
self.multiple_regression = sm.OLS(y, X).fit() | |
coef = self.multiple_regression.params | |
self.multiple_regression_formula = f'{coef[0]} + {" + ".join([f"{c} * {round(n, 3)}" for c, n in zip(x_cols, coef[1:])]) }' | |
self.std_multiple_regression = sm.OLS(std_y, std_X).fit() | |
beta = self.std_multiple_regression.params | |
self.multiple_regression_beta = pd.DataFrame( | |
np.array(beta[1:]) ** 2, index=x_cols, columns=['Beta (influence on "y")']) | |
self.multiple_regression_beta['Beta (influence on "y")'] = self.multiple_regression_beta['Beta (influence on "y")'].round( | |
3) | |
return self.multiple_regression.summary() | |
# ===== # | |
# Plots # | |
# ===== # | |
def plot_correlation(self): | |
# Generate a mask for the upper triangle | |
corr = self.ts_df.corr(numeric_only=True) | |
mask = np.triu(np.ones_like(corr, dtype=bool)) | |
fig, ax = plt.subplots(figsize=(8, 8)) | |
sns.heatmap( | |
corr, | |
mask=mask, | |
square=True, | |
annot=True, | |
cmap='coolwarm', | |
linewidths=.5, | |
cbar_kws={"shrink": .5}, | |
ax=ax) | |
return fig | |
def plot_target_pacf(self): | |
fig, ax = plt.subplots(figsize=(12, 4)) | |
plot_pacf(self.ts_df['y'], ax=ax) | |
fig.tight_layout() | |
return fig | |
def plot_distributions(self): | |
plot_col = min(math.ceil(math.sqrt(self.ts_df.shape[1])), 5) | |
plot_row = math.ceil(self.ts_df.shape[1] / plot_col) | |
fig, axs = plt.subplots(plot_row, plot_col) | |
for idx, col in enumerate(self.ts_df.columns): | |
axs_x = math.floor(idx/plot_col) | |
axs_y = idx - axs_x * plot_col | |
# sns.distplot(self.ts_df[col], ax=axs[axs_x, axs_y]) | |
sns.histplot(self.ts_df[col], ax=axs[axs_x, axs_y], kde=True) | |
fig.tight_layout() | |
return fig | |
def plot_target_seasonality(self): | |
if isinstance(self.period, list): | |
seasonal = MSTL( | |
self.ts_df['y'], periods=self.period).fit() | |
else: | |
seasonal = seasonal_decompose(self.ts_df['y'], period=self.period) | |
return seasonal | |
def plot_beta(self): | |
fig, ax = plt.subplots(figsize=(6, 4)) | |
beta_plot = sns.barplot( | |
self.multiple_regression_beta['Beta (influence on "y")'], gap=2, ax=ax) | |
beta_plot.set_xticklabels(beta_plot.get_xticklabels(), rotation=45) | |
ax.bar_label(ax.containers[-1], fmt='%.2f', label_type='center') | |
return fig | |
def __infer_frequency(self): | |
# Attempt to get the frequency from the provided datetime column | |
freq = pd.infer_freq(self.ts_df.index) | |
if freq is not None: | |
self.freq = freq | |
# Always make sure the frequency is not None | |
if self.freq is None: | |
raise ValueError( | |
'Unable inference freq from datetime column, please make timeseries interval consistent or provide customized frequency.') | |