from transformers import Pipeline from .rpeaks2hrv import RPeak2HRV, FeatureDomain class RPeak2HRVPipeline(Pipeline): rpeak2HRVExtractor = RPeak2HRV() def _sanitize_parameters(self, **kwargs): preprocess_kwargs = {} if "sampling_rate" in kwargs: preprocess_kwargs["sampling_rate"] = kwargs["sampling_rate"] if "windowing_method" in kwargs: preprocess_kwargs["windowing_method"] = kwargs["windowing_method"] if "time_header" in kwargs: preprocess_kwargs["time_header"] = kwargs["time_header"] if "rri_header" in kwargs: preprocess_kwargs["rri_header"] = kwargs["rri_header"] if "window_size" in kwargs: preprocess_kwargs["window_size"] = kwargs["window_size"] if "feature_domains" in kwargs: preprocess_kwargs["feature_domains"] = kwargs["feature_domains"] return preprocess_kwargs, {}, {} def preprocess(self, inputs, windowing_method:str = None, time_header = "SystemTime", rri_header = "interbeat_interval", window_size = "60s", feature_domains = [FeatureDomain.TIME, FeatureDomain.FREQUENCY, FeatureDomain.NON_LINEAR], sampling_rate = 1000): return self.rpeak2HRVExtractor.get_hrv_features(inputs, windowing_method=windowing_method, time_header=time_header, rri_header=rri_header, window_size=window_size, feature_domains=feature_domains, sampling_rate=sampling_rate) def _forward(self, model_inputs): # currently empty as all preprocessing steps are performed by preprocess function # in future extendable to facilitate end-2-end ML pipelines return model_inputs def postprocess(self, model_outputs): return model_outputs