Georg Willer
Change default header names to upercase starting & add checking for alternative time_header names
b9f7415
import pandas as pd | |
import numpy as np | |
from .constants import TIME_ALIASES, LEFT_X_ALIASES, LEFT_Y_ALIASES, RIGHT_X_ALIASES, RIGHT_Y_ALIASES, X_ALIASES, \ | |
Y_ALIASES | |
from .detectors import saccade_detection | |
from typing import Union | |
class Eye2SacExtractor: | |
data: pd.DataFrame = None | |
x: np.array = None | |
y: np.array = None | |
time: np.array = None | |
def _load_data(self, file_path: str): | |
if file_path.endswith('.csv'): | |
sep = self.__derive_separator(file_path) | |
return pd.read_csv(file_path, sep) | |
elif file_path.endswith('.txt'): | |
return pd.read_csv(file_path, sep='\t') | |
else: | |
raise ValueError('File format not supported. Please provide a csv or txt file.') | |
def _clean_data(self): | |
self.data.dropna(inplace=True) | |
def _map_relevant_data(self, time_header: str, x_headers: Union[str, list], y_headers: Union[str, list]): | |
# map and extract relevant data | |
try: | |
self.time = self._get_value_array(time_header, TIME_ALIASES) | |
if isinstance(x_headers, str): | |
self.x = self._get_value_array(x_headers, X_ALIASES) | |
# if the user accidently specified the header as list with a single entry, capture it | |
elif isinstance(x_headers, list) and len(x_headers) == 1: | |
self.x = self._get_value_array(x_headers[0], X_ALIASES) | |
elif isinstance(x_headers, list) and len(x_headers) == 2: | |
left_x_header = x_headers[0] | |
right_x_header = x_headers[1] | |
left_x = self._get_value_array(left_x_header, LEFT_X_ALIASES) | |
right_x = self._get_value_array(right_x_header, RIGHT_X_ALIASES) | |
self.x = np.mean([left_x, right_x], axis=0) | |
else: | |
raise ValueError('invalid size of x_headers') | |
if isinstance(y_headers, str): | |
self.y = self._get_value_array(y_headers, Y_ALIASES) | |
# if the user accidently specified the header as list with a single entry, capture it | |
elif isinstance(y_headers, list) and len(y_headers) == 1: | |
self.y = self._get_value_array(y_headers[0], Y_ALIASES) | |
elif isinstance(y_headers, list) and len(y_headers) == 2: | |
left_y_header = y_headers[0] | |
right_y_header = y_headers[1] | |
left_y = self._get_value_array(left_y_header, LEFT_Y_ALIASES) | |
right_y = self._get_value_array(right_y_header, RIGHT_Y_ALIASES) | |
self.y = np.mean([left_y, right_y], axis=0) | |
else: | |
raise ValueError('invalid size of y_headers') | |
except KeyError: | |
raise ValueError('Required data columns are missing or not in the correct naming format.') | |
def _get_value_array(self, header_name: str, known_names: list) -> np.array: | |
# convert column names to lowercase | |
data_columns_lowercase = self.data.columns.str.lower() | |
# if the specified header is non-existent, check against known header names | |
if header_name not in self.data.columns: | |
# if no known header name matches, throw value error | |
matching_columns = data_columns_lowercase.intersection(known_names) | |
print(f"Using alternative columns: {matching_columns}") | |
if len(matching_columns) == 0: | |
raise ValueError(f'Invalid data format: header {header_name} not found.') | |
else: | |
return self.data[matching_columns].to_numpy().flatten() | |
else: | |
return self.data[header_name].to_numpy().flatten() | |
def extract_features(self, data: Union[pd.DataFrame, str], time_header: str, x_headers: Union[str, list], y_headers: Union[str, list], missing: float, minlen: int, maxvel: int, maxacc: int): | |
if isinstance(data, pd.DataFrame): | |
self.data = data | |
elif isinstance(data, str): | |
self.data = self._load_data(data) | |
else: | |
raise ValueError('Data must be a pandas DataFrame or a file path to a csv or txt file.') | |
self._clean_data() | |
self._map_relevant_data(time_header, x_headers, y_headers) | |
return self._extract_features(missing, minlen, maxvel, maxacc) | |
def _extract_features(self, missing: float = 0.0, minlen: int = 5, maxvel: int = 40, maxacc: int = 340) -> pd.DataFrame : | |
_, esac = saccade_detection(self.x, self.y, self.time, missing=missing, minlen=minlen, maxvel=maxvel, maxacc=maxacc) | |
esac_df = pd.DataFrame(esac, columns=['starttime', 'endtime', 'duration', 'startx', 'starty', 'endx', 'endy']) | |
return esac_df | |
def __derive_separator(self, file_path): | |
# Versuche, die Datei mit Komma als Separator zu lesen | |
try: | |
df_comma = pd.read_csv(file_path, sep=',') | |
comma_columns = len(df_comma.columns) | |
except Exception: | |
comma_columns = 0 | |
# Versuche, die Datei mit Semikolon als Separator zu lesen | |
try: | |
df_semicolon = pd.read_csv(file_path, sep=';') | |
semicolon_columns = len(df_semicolon.columns) | |
except Exception: | |
semicolon_columns = 0 | |
# Vergleiche die Anzahl der Spalten und bestimme den Separator | |
if comma_columns > semicolon_columns: | |
return ',' # Komma als Separator | |
elif semicolon_columns > comma_columns: | |
return ';' # Semikolon als Separator | |
else: | |
raise ValueError('Columns separator in CSV not supported. Make sure to use either , or ; as separator') | |