Spaces:
Runtime error
Runtime error
from .active_models import active_models, idsc_models | |
from .forecast.Prophet import ProphetWrapper | |
from .idsc.IDSC import IDSC | |
import pandas as pd | |
import math | |
import numpy as np | |
from sklearn.metrics import mean_squared_error, mean_absolute_error | |
from .functions.mase import MASE | |
from .functions.order_qty_rmse import order_qty_rmse | |
from .functions.itmtt_scores import interm_scores | |
# List of models to verify user input | |
class DemandForecasting(): | |
''' | |
DemandForecasting is assuming a single SKU at each time. | |
There will be a 2 step process, model selection and forecasting. | |
This process is identified by whether model parameter is provided | |
This API's behavior was designed based on if certain information is provided, and the API itself | |
will decide what to do. Instead of trying to force user perform "model selection" or "actual | |
forecasting" the API will only check what are the models user attempted to run, as well as if user | |
want any test result or not. In this way, we can take care of multiple requirements without having | |
a lot of different end points. | |
''' | |
def __init__(self) -> None: | |
self.idsc = IDSC() | |
pass | |
def forecast( | |
self, | |
ts, | |
n_predict: int, | |
model: str or list, | |
freq=None, | |
run_test: bool = False, | |
characteristic=None, | |
m=None): | |
''' | |
ts: timeseries object, use pd.DataFrame().to_json() to generate | |
example: | |
{ | |
"datetime": | |
{"0":"2018-05-06","1":"2018-05-13"}, | |
"y": | |
{"0":2,"1":12}} | |
n_predict: number of future values to predict | |
freq: optional, timeseries data frequency, if not provided, will try to inference by pandas lib | |
model: optional, | |
If not provided, consider model selection process | |
If model is provided, will not calculate the RMSE and will not perform test | |
characteristic: optionsal | |
Provide model information about the data characteristic, for now, either continuous or anything else (intermittent) | |
If not provided, will perform profiling (relay on IDSC API) first, user are quired to track the data's characteristics | |
for future forecasting purpose. | |
m: seasonal period value, most likely will be used for internal testing purpose. | |
''' | |
self.idsc_profile = None | |
self.characteristic = characteristic | |
self.ts_df = pd.DataFrame(ts) | |
self.ts_df['datetime'] = pd.to_datetime(self.ts_df['datetime']) | |
self.freq = freq | |
self.n_predict = n_predict | |
self.run_test = run_test | |
if self.n_predict <= 0: | |
print('n_predict is 0, force run_test to be true') | |
self.run_test = True | |
# Try to get the timeseries frequency based on the data | |
# This will be used if user did not provide freq param | |
self.__get_frequency() | |
self.m = m | |
# Convert n_predict number to timestamp based on the frequency | |
self.forecast_horizon = pd.date_range( | |
self.ts_df['datetime'].iloc[-1], periods=n_predict, freq=self.freq) | |
''' | |
Split 80% data for training and the rest for testing | |
This will only be used if rum_test param set to True | |
''' | |
self.n_test = round(self.ts_df.shape[0] * 0.2) | |
self.ts_train = self.ts_df[:-self.n_test] | |
self.test_truth = self.ts_df[-self.n_test:]['y'].tolist() | |
self.test_horizon = self.ts_df[-self.n_test:]['datetime'].tolist() | |
self.__prep_idsc_ts() # prep idsc_ts, both profiling and idsc models will require this | |
# ============== # | |
# IDSC profiling # | |
# ============== # | |
# Default idsc characteristic, continuous or intermittent | |
self.idsc_characteristic = None | |
if self.characteristic is None: | |
print('characteristic not provided, running profiling') | |
self.__profiling() | |
print('profiling completed, data characteristic is ', | |
self.characteristic) | |
# ======= # | |
# TESTING # | |
# ======= # | |
"For testing purpose, only return data's characteristics" | |
# return self.characteristic | |
# ------------- # | |
# Assign models # | |
# ------------- # | |
''' | |
For model parameter, user can input either string name of a particular model name, or a list of available models | |
if user input "all", will just call all models | |
''' | |
model_is_str = isinstance(model, str) | |
if model_is_str: | |
model_is_all = (model == 'all') | |
if not model_is_all: | |
# When there is only one model name provided | |
self.model = [model] | |
if model_is_all: | |
if self.characteristic == 'continuous': | |
self.model = active_models['continuous'] | |
if self.characteristic != 'continuous': | |
self.model = active_models['intermittent'] | |
if not model_is_str: | |
self.model = model | |
''' | |
For idsc models, the profiling process will be required | |
Also input data will be formated specifically for idsc | |
''' | |
temp_models = [list(filter(lambda x: x in self.model, sublist)) | |
for sublist in idsc_models] | |
# self.has_idsc_model = any('plus' in m for m in self.model) | |
self.has_idsc_model = len(temp_models) > 0 | |
print('Has idsc model, ', self.has_idsc_model) | |
if self.has_idsc_model and self.idsc_profile is None: | |
''' | |
Running profiling if the idsc_profile is none,this is | |
because some idsc model request idsc profile as input | |
''' | |
self.__profiling() | |
self.__check_model() | |
# =================== # | |
# Perform forecasting # | |
# =================== # | |
''' | |
The model below should always return the forecasted result based on n_predict value | |
res : { | |
'model': model name, | |
'forecast': the forecasted value, | |
'test': test result, | |
'RMSE': RMSE value to evaluate best performing model, | |
'raw': keep a copy of the original model response, without any filtering | |
} | |
''' | |
self.fcst_res = [] # Array storeing all results | |
# -------------------------- # | |
# Calling forecasting models # | |
# -------------------------- # | |
# Todo: to track model time spending here | |
for m in self.model: | |
print(f'callindg model: {m}') | |
getattr(self, m)() | |
# ========================== # | |
# Rank the model by response # | |
# ========================== # | |
"For continuous data, use RMSE, for intermittent data, use average of interm scores" | |
# Sort forecast result by smallest RMSE | |
if self.run_test and self.characteristic == 'continuous': | |
self.fcst_res.sort(key=lambda x: x['RMSE']) | |
# Sort forecast result by highest avg_interm_scores | |
if self.run_test and self.characteristic != 'continuous': | |
self.fcst_res.sort( | |
key=lambda x: x['avg_interm_scores'], reverse=True) | |
# Return the result with lowest RMSE ranked as 1st item | |
self.res = {'characteristic': self.characteristic, | |
'predictability': self.predictability, | |
'forecast': self.fcst_res} | |
return self.res | |
def __get_frequency(self): | |
# Attempt to get the frequency from the provided datetime column | |
if pd.infer_freq(self.ts_df['datetime']) is not None: | |
self.freq = pd.infer_freq(self.ts_df['datetime']) | |
# Always make sure the frequency is not None | |
if self.freq is None: | |
raise ValueError( | |
'Unable inference freq from datetime column, please make timeseries interval consistent or provide customized frequency.') | |
def __check_model(self): | |
all_active_models = active_models['continuous'] + \ | |
active_models['intermittent'] | |
unknown_models = set(self.model) - set(all_active_models) | |
if len(unknown_models) > 0: | |
raise ValueError( | |
f'Unknown model : {unknown_models}, please use active models: {active_models}') | |
if self.characteristic == 'continuous': | |
unsuitable_models = set(self.model) - \ | |
set(active_models['continuous']) | |
if len(unsuitable_models) > 0: | |
raise ValueError( | |
f'Unsuitable model for continuous data: {unsuitable_models}. please use continuous models: {active_models["continuous"]}') | |
if self.characteristic != 'continuous': | |
unsuitable_models = set(self.model) - \ | |
set(active_models['intermittent']) | |
if len(unsuitable_models) > 0: | |
raise ValueError( | |
f'Unsuitable model for intermittent data: {unsuitable_models}. please use continuous models: {active_models["intermittent"]}') | |
def __prep_idsc_ts(self): | |
# Time series configured for IDSC apis, all converted to json strings | |
print('[__prep_idsc_ts]') | |
self.idsc_ts = self.ts_df.rename( | |
columns={'datetime': 'date', 'y': 'target'}) | |
self.idsc_ts['date'] = self.idsc_ts['date'].dt.strftime('%Y-%m-%d') | |
self.idsc_ts = self.idsc_ts.to_json() | |
self.idsc_ts_train = self.ts_train.rename( | |
columns={'datetime': 'date', 'y': 'target'}) | |
self.idsc_ts_train['date'] = self.idsc_ts_train['date'].dt.strftime( | |
'%Y-%m-%d') | |
self.idsc_ts_train = self.idsc_ts_train.to_json() | |
def __profiling(self): | |
self.idsc_profile = self.idsc.profiling(self.idsc_ts) | |
characteristic = self.idsc_profile['classification_res'][ | |
'time_series_class']['overall_characteristic'] | |
print('predictability temporarily using order_quantity predictability') | |
# print(self.idsc_profile) | |
predictability = self.idsc_profile['predictability_res'][ | |
'predictability_result']['order_quantity'][-1]['predictability'] | |
predictability = predictability if isinstance( | |
predictability, str) else round(predictability, 2) | |
if self.characteristic is not None and self.characteristic != characteristic: | |
raise ValueError( | |
f"Provided characteristics - {self.characteristic} is different from data's characteristics - {characteristic}. Please use the correct data characteristics.") | |
self.characteristic = characteristic | |
self.predictability = predictability | |
if self.run_test: | |
self.idsc_profile_train = self.idsc.profiling( | |
self.idsc_ts_train) | |
else: | |
self.idsc_profile_train = None | |
# =========== # | |
# Core method # | |
# =========== # | |
''' | |
This methods takes input of model and run the mode, test (to evaluate RMSE) and | |
return the processed result within this method itself. In this way, the model can | |
be considered as a black box, as long as the model takes ls, n_predict, **kwargs | |
and return as an object, this method can process it and format it correctly. | |
Because sometimes actual forecasting model and test model may take different arguments | |
both args and test_args can be used and pass the arguments around. | |
''' | |
def __use_model(self, model, model_name, get_value, args=None, test_args=None): | |
''' | |
model: the model to call | |
get_value: lambda, to extract the value list from the model response | |
''' | |
ts = self.ts_df | |
train = self.ts_train | |
res = {'model': model_name} | |
# IDSC is using different input configuration | |
# if 'plus' in model_name: | |
if model_name in idsc_models: | |
print('has_idsc_model') | |
ts = self.idsc_ts | |
train = self.idsc_ts_train | |
# Pass keyword arguments to the model | |
if self.n_predict > 0: | |
if args is not None: | |
pred = model(ts, self.n_predict, **args) | |
else: | |
pred = model(ts, self.n_predict) | |
pred_val: list = get_value(pred) | |
# res['forecast'] = pd.DataFrame( | |
# pred_val, | |
# # len() required because sometimes the response is not same size as n_predict requirement | |
# # Same for below 'test' dataframe | |
# index=self.forecast_horizon[:len(pred_val)], | |
# columns=['y']) | |
res['forecast'] = { | |
'datetime': self.forecast_horizon[:len(pred_val)+1], | |
'y': pred_val} | |
res['raw'] = pred | |
# Run the test set and evaluate model performance | |
if self.run_test: | |
# If the train and test arguments are exactly the same | |
# Expect user only provide 1 args dictionary | |
test_args = args if test_args is None else test_args | |
if test_args is not None: | |
test = model(train, self.n_test, **test_args) | |
else: | |
test = model(train, self.n_test) | |
test_val: list = get_value(test) | |
# Make sure test truth same size as test_val | |
test_truth = self.test_truth[:len(test_val)] | |
res['test'] = pd.DataFrame( | |
{ | |
'truth': test_truth, | |
'test': test_val | |
}, | |
index=self.test_horizon[:len(test_val)]) | |
res['RMSE'] = math.sqrt( | |
mean_squared_error( | |
test_truth, list(test_val))) | |
# res['MASE'] = MASE(test_truth, list(test_val)) | |
res['order_quantity_RMSE'] = order_qty_rmse( | |
test_truth, list(test_val)) | |
res['inter_order_RMSE'] = mean_squared_error( | |
[0 if i == 0 else 1 for i in test_truth], | |
[0 if i == 0 else 1 for i in list(test_val)]) | |
res['interm_scores'] = interm_scores( | |
test_truth, list(test_val)) | |
# Calculate the average intermittent data score, used for sorting the forecasting response | |
res['avg_interm_scores'] = np.mean(res['interm_scores']) | |
res['test_raw'] = test | |
self.fcst_res.append(res) | |
# ---------- # | |
# All Models # | |
# ---------- # | |
def prophet_i(self): | |
model = self.idsc.prophet | |
model_name = 'prophet_i' | |
args = {'profile': self.idsc_profile} | |
test_args = {'profile': self.idsc_profile_train} | |
self.__use_model( | |
model, | |
model_name, | |
lambda x: x['prediction_result']['predicted_value'].values(), | |
args=args, | |
test_args=test_args | |
) | |
def prophet(self): | |
model = ProphetWrapper() | |
model_name = 'prophet' | |
args = {'freq': self.freq} | |
self.__use_model( | |
model.forecast, | |
model_name, | |
lambda x: x['yhat'].to_list(), | |
args=args) | |
def ceif(self): | |
model_name = 'ceif' | |
self.__use_model( | |
self.idsc.ceif, | |
model_name, | |
lambda x: x['prediction_result']['predicted_value']) | |
def fft_i(self): | |
model_name = 'fft_i' | |
self.__use_model( | |
self.idsc.fft, | |
model_name, | |
lambda x: x['prediction_result']['predicted_value']) | |
def holt_winters_i(self): | |
model_name = 'holt_winters_i' | |
def get_value(x): return x['prediction_result']['predicted_value'] | |
if self.m is not None: | |
args = {'seasonal_cycle': self.m} | |
self.__use_model( | |
self.idsc.holt_winters, | |
model_name, | |
get_value, | |
args=args) | |
else: | |
self.__use_model( | |
self.idsc.holt_winters, | |
model_name, | |
get_value) | |
def auto_arima_i(self): | |
model_name = 'auto_arima_i' | |
model = self.idsc.auto_arima | |
def get_value(x): return x['prediction_result']['predicted_value'] | |
self.__use_model(model, model_name, get_value) | |