import neurokit2 as nk import pandas as pd from enum import Enum class WindowingMethod(Enum): ROLLING = 'rolling' FIRST_INTERVAL = 'first_interval' LAST_INTERVAL = 'last_interval' class FeatureDomain(Enum): TIME = 'time' FREQUENCY = 'freq' NON_LINEAR = 'non_lin' class RPeak2HRV(): def get_hrv_features(self, input, windowing_method:str = None, time_header = "SystemTime", rri_header = "interbeat_interval", window_size = "60s", feature_domains = [FeatureDomain.TIME, FeatureDomain.FREQUENCY, FeatureDomain.NON_LINEAR], sampling_rate = 1000): data = self._load_data(input) refined_data = self._refine_dataframe(input=data, sampling_rate=sampling_rate, time_header=time_header, rri_header=rri_header) if (windowing_method != None): windows = self._apply_windowing(data=refined_data, method=windowing_method, window_size= window_size) hrv_values = pd.DataFrame() for window in windows: hrv_feature_values = self._calculate_features(window,feature_domains,sampling_rate) hrv_feature_values['window_start'] = window.index[0] hrv_feature_values['window_end'] = window.index[-1] hrv_values = pd.concat([hrv_values, hrv_feature_values], ignore_index=True) return hrv_values else: return self._calculate_features(refined_data, feature_domains, sampling_rate) def _load_data(self, input): if isinstance(input, str): file_path = input data = self.__load_data_from_str(file_path) elif isinstance(input, pd.DataFrame): data = input else: raise ValueError('Input format not supported. Provide Either a file Path or a DataFrame') return data def __load_data_from_str(self, file_path:str): if file_path.endswith('.csv'): sep = self.__derive_separator(file_path) return pd.read_csv(file_path, sep=sep) elif file_path.endswith('.txt'): return pd.read_csv(file_path, sep='\t') else: raise ValueError('File format not supported. Please provide a csv or txt file.') def _refine_dataframe(self, input:pd.DataFrame, time_header:str, rri_header:str, sampling_rate = 1000): input = self.__clean_data(input) if 'ECG_R_Peaks' in input.columns: # Schritt 1: Berechne die Timestamps für alle Datenpunkte timestamps = pd.to_datetime(input.index / sampling_rate, unit='s') # Indizes durch Sampling-Rate teilen data = pd.DataFrame(input["ECG_R_Peaks"], columns=["ECG_R_Peaks"]) data['Timestamp'] = timestamps data.set_index("Timestamp", inplace=True) return data elif (time_header in input.columns) and (rri_header in input.columns): timestamps = pd.to_datetime(input[time_header].astype(str)) data = pd.DataFrame(input[rri_header]) data["Timestamp"] = timestamps data.set_index("Timestamp", inplace=True) return data else: raise ValueError('DataFrame Structure not supported. Make sure that input is either dict containing \"RRI\" and \"RRI_Time\" keys, or DataFrame containing \"ECG_R_Peaks\" column.') def __clean_data(self, data): return data.dropna() def __derive_separator(self, file_path): # Versuche, die Datei mit Komma als Separator zu lesen try: df_comma = pd.read_csv(file_path, sep=',') comma_columns = len(df_comma.columns) except Exception: comma_columns = 0 # Versuche, die Datei mit Semikolon als Separator zu lesen try: df_semicolon = pd.read_csv(file_path, sep=';') semicolon_columns = len(df_semicolon.columns) except Exception: semicolon_columns = 0 # Vergleiche die Anzahl der Spalten und bestimme den Separator if comma_columns > semicolon_columns: return ',' # Komma als Separator elif semicolon_columns > comma_columns: return ';' # Semikolon als Separator else: raise ValueError('Columns separator in CSV not supported. Make sure to use either , or ; as separator') def _apply_windowing(self, data:pd.DataFrame, method:str, window_size:str): recording_length = (data.index.max() - data.index.min()) window = pd.Timedelta(window_size) if (recording_length < window): raise ValueError('Given Window size is larger than recording interval') if method == WindowingMethod.ROLLING.value: # TODO: Supress RuntimeWarnings return data.rolling(window=window) elif method == WindowingMethod.FIRST_INTERVAL.value: # Erstes Fenster: Extrahieren der Daten im ersten Zeitfenster first_window_start = data.index[0] # Start des ersten Zeitfensters first_window_end = first_window_start + window # Endzeit des ersten Fensters first_window = data[(data.index >= first_window_start) & (data.index < first_window_end)] return [first_window] elif method == WindowingMethod.LAST_INTERVAL.value: # Letztes Fenster: Extrahieren der Daten im letzten Zeitfenster last_window_end = data.index[-1] # Endzeit des letzten Zeitfensters last_window_start = last_window_end - window # Startzeit des letzten Fensters last_window = data[(data.index >= last_window_start) & (data.index <= last_window_end)] return [last_window] def _convert_format(self, window): if "ECG_R_Peaks" in window.columns: return window else: timestamps = window.index rri_timesteps = (timestamps - timestamps.min()).total_seconds() try: rri = window["interbeat_interval"].str.replace(",", ".", regex=False).astype(float).tolist() except AttributeError: rri = window["interbeat_interval"].astype(float).tolist() data_for_pipeline = { "RRI" : rri, "RRI_Time" : rri_timesteps.tolist() } return data_for_pipeline def _calculate_features(self, data, feature_domains, sampling_rate): for feature in feature_domains: if (feature not in [item.value for item in FeatureDomain]): raise KeyError(f"'{feature}' is not a supported feature domain. feature_domains may only include 'time', 'freq' and 'non_lin'.") data = self._convert_format(data) if FeatureDomain.TIME.value in feature_domains and FeatureDomain.FREQUENCY.value in feature_domains and FeatureDomain.NON_LINEAR.value in feature_domains: return nk.hrv(data, sampling_rate) else: result = pd.DataFrame() if FeatureDomain.TIME.value in feature_domains: result = nk.hrv_time(data, sampling_rate) if FeatureDomain.FREQUENCY.value in feature_domains: frequency_values = nk.hrv_frequency(data, sampling_rate) result = pd.concat([result, frequency_values], axis=1) if FeatureDomain.NON_LINEAR.value in feature_domains: nonlinear_values = nk.hrv_nonlinear(data, sampling_rate) result = pd.concat([result, nonlinear_values], axis=1) return result