rpeaks-to-hrv-pipeline / rpeaks_2_hrv_pipeline.py
Georg Willer
Extend calculated features to support time, frequency and non_linear domain
800538f
from transformers import Pipeline
from .rpeaks2hrv import RPeak2HRV, FeatureDomain
class RPeak2HRVPipeline(Pipeline):
rpeak2HRVExtractor = RPeak2HRV()
def _sanitize_parameters(self, **kwargs):
preprocess_kwargs = {}
if "sampling_rate" in kwargs:
preprocess_kwargs["sampling_rate"] = kwargs["sampling_rate"]
if "windowing_method" in kwargs:
preprocess_kwargs["windowing_method"] = kwargs["windowing_method"]
if "time_header" in kwargs:
preprocess_kwargs["time_header"] = kwargs["time_header"]
if "rri_header" in kwargs:
preprocess_kwargs["rri_header"] = kwargs["rri_header"]
if "window_size" in kwargs:
preprocess_kwargs["window_size"] = kwargs["window_size"]
if "feature_domains" in kwargs:
preprocess_kwargs["feature_domains"] = kwargs["feature_domains"]
return preprocess_kwargs, {}, {}
def preprocess(self, inputs, windowing_method:str = None, time_header = "SystemTime", rri_header = "interbeat_interval", window_size = "60s", feature_domains = [FeatureDomain.TIME, FeatureDomain.FREQUENCY, FeatureDomain.NON_LINEAR], sampling_rate = 1000):
return self.rpeak2HRVExtractor.get_hrv_features(inputs, windowing_method=windowing_method, time_header=time_header, rri_header=rri_header, window_size=window_size, feature_domains=feature_domains, sampling_rate=sampling_rate)
def _forward(self, model_inputs):
# currently empty as all preprocessing steps are performed by preprocess function
# in future extendable to facilitate end-2-end ML pipelines
return model_inputs
def postprocess(self, model_outputs):
return model_outputs