File size: 7,541 Bytes
e83178f 800538f e83178f 800538f e83178f 800538f e83178f 800538f e83178f 800538f e83178f 800538f 597061b 800538f e83178f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
import neurokit2 as nk
import pandas as pd
from enum import Enum
class WindowingMethod(Enum):
ROLLING = 'rolling'
FIRST_INTERVAL = 'first_interval'
LAST_INTERVAL = 'last_interval'
class FeatureDomain(Enum):
TIME = 'time'
FREQUENCY = 'freq'
NON_LINEAR = 'non_lin'
class RPeak2HRV():
def get_hrv_features(self, input, windowing_method:str = None, time_header = "SystemTime", rri_header = "interbeat_interval", window_size = "60s", feature_domains = [FeatureDomain.TIME, FeatureDomain.FREQUENCY, FeatureDomain.NON_LINEAR], sampling_rate = 1000):
data = self._load_data(input)
refined_data = self._refine_dataframe(input=data, sampling_rate=sampling_rate, time_header=time_header, rri_header=rri_header)
if (windowing_method != None):
windows = self._apply_windowing(data=refined_data, method=windowing_method, window_size= window_size)
hrv_values = pd.DataFrame()
for window in windows:
hrv_feature_values = self._calculate_features(window,feature_domains,sampling_rate)
hrv_feature_values['window_start'] = window.index[0]
hrv_feature_values['window_end'] = window.index[-1]
hrv_values = pd.concat([hrv_values, hrv_feature_values], ignore_index=True)
return hrv_values
else:
return self._calculate_features(refined_data, feature_domains, sampling_rate)
def _load_data(self, input):
if isinstance(input, str):
file_path = input
data = self.__load_data_from_str(file_path)
elif isinstance(input, pd.DataFrame):
data = input
else:
raise ValueError('Input format not supported. Provide Either a file Path or a DataFrame')
return data
def __load_data_from_str(self, file_path:str):
if file_path.endswith('.csv'):
sep = self.__derive_separator(file_path)
return pd.read_csv(file_path, sep=sep)
elif file_path.endswith('.txt'):
return pd.read_csv(file_path, sep='\t')
else:
raise ValueError('File format not supported. Please provide a csv or txt file.')
def _refine_dataframe(self, input:pd.DataFrame, time_header:str, rri_header:str, sampling_rate = 1000):
input = self.__clean_data(input)
if 'ECG_R_Peaks' in input.columns:
# Schritt 1: Berechne die Timestamps für alle Datenpunkte
timestamps = pd.to_datetime(input.index / sampling_rate, unit='s') # Indizes durch Sampling-Rate teilen
data = pd.DataFrame(input["ECG_R_Peaks"], columns=["ECG_R_Peaks"])
data['Timestamp'] = timestamps
data.set_index("Timestamp", inplace=True)
return data
elif (time_header in input.columns) and (rri_header in input.columns):
timestamps = pd.to_datetime(input[time_header].astype(str))
data = pd.DataFrame(input[rri_header])
data["Timestamp"] = timestamps
data.set_index("Timestamp", inplace=True)
return data
else:
raise ValueError('DataFrame Structure not supported. Make sure that input is either dict containing \"RRI\" and \"RRI_Time\" keys, or DataFrame containing \"ECG_R_Peaks\" column.')
def __clean_data(self, data):
return data.dropna()
def __derive_separator(self, file_path):
# Versuche, die Datei mit Komma als Separator zu lesen
try:
df_comma = pd.read_csv(file_path, sep=',')
comma_columns = len(df_comma.columns)
except Exception:
comma_columns = 0
# Versuche, die Datei mit Semikolon als Separator zu lesen
try:
df_semicolon = pd.read_csv(file_path, sep=';')
semicolon_columns = len(df_semicolon.columns)
except Exception:
semicolon_columns = 0
# Vergleiche die Anzahl der Spalten und bestimme den Separator
if comma_columns > semicolon_columns:
return ',' # Komma als Separator
elif semicolon_columns > comma_columns:
return ';' # Semikolon als Separator
else:
raise ValueError('Columns separator in CSV not supported. Make sure to use either , or ; as separator')
def _apply_windowing(self, data:pd.DataFrame, method:str, window_size:str):
recording_length = (data.index.max() - data.index.min())
window = pd.Timedelta(window_size)
if (recording_length < window):
raise ValueError('Given Window size is larger than recording interval')
if method == WindowingMethod.ROLLING.value:
# TODO: Supress RuntimeWarnings
return data.rolling(window=window)
elif method == WindowingMethod.FIRST_INTERVAL.value:
# Erstes Fenster: Extrahieren der Daten im ersten Zeitfenster
first_window_start = data.index[0] # Start des ersten Zeitfensters
first_window_end = first_window_start + window # Endzeit des ersten Fensters
first_window = data[(data.index >= first_window_start) & (data.index < first_window_end)]
return [first_window]
elif method == WindowingMethod.LAST_INTERVAL.value:
# Letztes Fenster: Extrahieren der Daten im letzten Zeitfenster
last_window_end = data.index[-1] # Endzeit des letzten Zeitfensters
last_window_start = last_window_end - window # Startzeit des letzten Fensters
last_window = data[(data.index >= last_window_start) & (data.index <= last_window_end)]
return [last_window]
def _convert_format(self, window):
if "ECG_R_Peaks" in window.columns:
return window
else:
timestamps = window.index
rri_timesteps = (timestamps - timestamps.min()).total_seconds()
try:
rri = window["interbeat_interval"].str.replace(",", ".", regex=False).astype(float).tolist()
except AttributeError:
rri = window["interbeat_interval"].astype(float).tolist()
data_for_pipeline = {
"RRI" : rri,
"RRI_Time" : rri_timesteps.tolist()
}
return data_for_pipeline
def _calculate_features(self, data, feature_domains, sampling_rate):
for feature in feature_domains:
if (feature not in [item.value for item in FeatureDomain]):
raise KeyError(f"'{feature}' is not a supported feature domain. feature_domains may only include 'time', 'freq' and 'non_lin'.")
data = self._convert_format(data)
if FeatureDomain.TIME.value in feature_domains and FeatureDomain.FREQUENCY.value in feature_domains and FeatureDomain.NON_LINEAR.value in feature_domains:
return nk.hrv(data, sampling_rate)
else:
result = pd.DataFrame()
if FeatureDomain.TIME.value in feature_domains:
result = nk.hrv_time(data, sampling_rate)
if FeatureDomain.FREQUENCY.value in feature_domains:
frequency_values = nk.hrv_frequency(data, sampling_rate)
result = pd.concat([result, frequency_values], axis=1)
if FeatureDomain.NON_LINEAR.value in feature_domains:
nonlinear_values = nk.hrv_nonlinear(data, sampling_rate)
result = pd.concat([result, nonlinear_values], axis=1)
return result
|