import os import pandas as pd import numpy as np from pathlib import Path # import #logging from datetime import datetime # Resolve DATA_DIR from config (container-safe) with fallback try: from src.config import DATA_DIR as CFG_DATA_DIR # when run as module except Exception: try: from config import DATA_DIR as CFG_DATA_DIR # when run as script from src/ except Exception: CFG_DATA_DIR = "/data" class FixedTimestampHandler: def __init__(self, base_path: str | os.PathLike | None = None): # Prefer explicit argument, then DATA_DIR env, then config fallback resolved_base = base_path or os.getenv("DATA_DIR") or CFG_DATA_DIR self.base_path = Path(resolved_base) self.finviz_path = self.base_path / "finviz" / "sentiment" self.crypto_features_path = self.base_path / "merged" / "features" / "crypto_features.parquet" self.stocks_features_path = self.base_path / "merged" / "features" / "stocks_features.parquet" self.output_path = self.base_path / "merged" / "features" self.output_path.mkdir(parents=True, exist_ok=True) # Configure #logging #logging.basicConfig(level=#logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s') # Define tickers and mappings self.stock_tickers = ["AAPL", "TSLA", "GOOGL", "NVDA", "MSFT", "COIN"] self.crypto_ticker_mapping = { "BTC": "bitcoin", "ETH": "ethereum", "SOL": "solana", "XRP": "ripple", "ADA": "cardano" } # Reverse mapping: crypto name to ticker (all lowercase keys) self.crypto_name_to_ticker = {v.lower(): k for k, v in self.crypto_ticker_mapping.items()} def crypto_name_to_symbol(self, name): """Transform crypto name (e.g., 'bitcoin', 'Bitcoin', 'BITCOIN') to ticker symbol (e.g., 'BTC')""" if not isinstance(name, str): return None name_lower = name.strip().lower() # Try exact match if name_lower in self.crypto_name_to_ticker: return self.crypto_name_to_ticker[name_lower] # Try to match ignoring spaces and underscores for key in self.crypto_name_to_ticker: if name_lower.replace(' ', '').replace('_', '') == key.replace(' ', '').replace('_', ''): return self.crypto_name_to_ticker[key] return None def is_timestamp_column(self, df, col_name): """Determine if a column is likely a timestamp column""" if pd.api.types.is_datetime64_any_dtype(df[col_name]): return True if pd.api.types.is_numeric_dtype(df[col_name]): sample_vals = df[col_name].dropna() if len(sample_vals) == 0: return False sample_val = sample_vals.iloc[0] current_time = pd.Timestamp.now().timestamp() units = [ ('s', 1), ('ms', 1000), ('us', 1000000), ('ns', 1000000000) ] for unit, divisor in units: try: if unit == 's': ts_value = sample_val else: ts_value = sample_val / divisor if abs(ts_value - current_time) < (10 * 365 * 24 * 3600): return True except: continue if df[col_name].dtype == 'object': sample_val = df[col_name].dropna().iloc[0] if not df[col_name].empty else None if sample_val and isinstance(sample_val, str): try: pd.to_datetime(sample_val) return True except (ValueError, TypeError): pass return False def get_timestamp_columns(self, df): """Identify all timestamp columns in a dataframe""" timestamp_cols = [] potential_names = ['time', 'date', 'interval', 'timestamp', 'dt'] for col in df.columns: if any(keyword in col.lower() for keyword in potential_names): if self.is_timestamp_column(df, col): timestamp_cols.append(col) return timestamp_cols def convert_timestamp_column(self, df, col_name, unit='auto'): """Convert a timestamp column to datetime format with improved validation""" if pd.api.types.is_datetime64_any_dtype(df[col_name]): if df[col_name].dt.tz is not None: df[col_name] = df[col_name].dt.tz_localize(None) return df[col_name] if pd.api.types.is_numeric_dtype(df[col_name]): sample_vals = df[col_name].dropna() if len(sample_vals) == 0: print(f"[ERROR] No valid values in timestamp column {col_name}") return None # Convert nullable Int64 to regular numeric if needed if hasattr(sample_vals, 'dtype') and str(sample_vals.dtype).startswith('Int'): sample_vals = sample_vals.astype('int64') if unit == 'auto': current_time = pd.Timestamp.now().timestamp() best_unit = None best_distance = float('inf') for test_unit in ['s', 'ms', 'us', 'ns']: try: # Additional safety check if len(sample_vals) == 0: continue first_val = sample_vals.iloc[0] if pd.isna(first_val): continue if test_unit == 's': test_ts = pd.to_datetime(first_val, unit='s') else: divisor = {'ms': 1000, 'us': 1000000, 'ns': 1000000000}[test_unit] test_ts = pd.to_datetime(first_val / divisor, unit='s') distance = abs((pd.Timestamp.now() - test_ts).total_seconds()) if distance < best_distance: best_distance = distance best_unit = test_unit except Exception as e: #logging.debug(f"Failed to test unit {test_unit} for column {col_name}: {e}") continue if best_unit is None: #logging.error(f"Could not determine unit for column {col_name}") return None unit = best_unit #logging.info(f"Auto-detected unit for {col_name}: {unit}") try: # Convert nullable Int64 to regular numeric if needed for the whole column values_to_convert = df[col_name] if hasattr(values_to_convert, 'dtype') and str(values_to_convert.dtype).startswith('Int'): values_to_convert = values_to_convert.astype('int64') if unit == 's': converted = pd.to_datetime(values_to_convert, unit='s') else: divisor = {'ms': 1000, 'us': 1000000, 'ns': 1000000000}[unit] converted = pd.to_datetime(values_to_convert / divisor, unit='s') if converted.dt.tz is not None: converted = converted.dt.tz_localize(None) if converted.min().year < 2000: #logging.warning(f"Converted timestamps for {col_name} seem too old. Checking alternative units.") for alt_unit in ['s', 'ms', 'us', 'ns']: if alt_unit == unit: continue try: if alt_unit == 's': alt_converted = pd.to_datetime(df[col_name], unit='s') else: alt_divisor = {'ms': 1000, 'us': 1000000, 'ns': 1000000000}[alt_unit] alt_converted = pd.to_datetime(df[col_name] / alt_divisor, unit='s') if alt_converted.min().year > 2000: #logging.info(f"Alternative unit {alt_unit} gives better results for {col_name}") converted = alt_converted break except Exception as e: #logging.debug(f"Failed to try alternative unit {alt_unit} for column {col_name}: {e}") continue #logging.info(f"Successfully converted {col_name} using unit '{unit}'") #logging.info(f"Date range: {converted.min()} to {converted.max()}") return converted except Exception as e: #logging.error(f"Failed to convert {col_name} using unit '{unit}': {e}") return None elif df[col_name].dtype == 'object': try: converted = pd.to_datetime(df[col_name]) if converted.dt.tz is not None: converted = converted.dt.tz_localize(None) #logging.info(f"Successfully converted string timestamps in {col_name}") return converted except Exception as e: #logging.error(f"Failed to convert string timestamps in {col_name}: {e}") return None else: #logging.error(f"Unknown timestamp format in column {col_name}") return None def select_best_timestamp_column(self, df, timestamp_columns): """Select the best timestamp column from a list of potential columns""" best_col = None best_score = -1 for col in timestamp_columns: try: if col not in df.columns: print(f"[WARN] Column {col} not found in dataframe") continue if df[col].isnull().all(): print(f"[WARN] Column {col} contains only null values") continue converted = self.convert_timestamp_column(df, col) if converted is None: print(f"[WARN] Could not convert column {col} to timestamp") continue non_null_count = converted.notna().sum() recent_count = converted[converted > pd.Timestamp('2020-01-01')].count() score = non_null_count + recent_count * 2 print(f"[DEBUG] Column {col}: score={score}, non_null={non_null_count}, recent={recent_count}") if score > best_score: best_score = score best_col = col except Exception as e: print(f"[WARN] Error evaluating timestamp column {col}: {e}") continue print(f"[INFO] Best timestamp column: {best_col} (score: {best_score})") return best_col def load_sentiment_data(self, symbol): """Load sentiment data with proper timestamp handling""" sentiment_file = self.finviz_path / f"{symbol.upper()}_sentiment.parquet" if not sentiment_file.exists(): print(f"[WARN] Sentiment file not found: {sentiment_file}") return None try: df = pd.read_parquet(sentiment_file) print(f"[INFO] Loaded sentiment data for {symbol}: {len(df)} rows") timestamp_cols = self.get_timestamp_columns(df) if not timestamp_cols: print(f"[ERROR] No timestamp columns found in {symbol} sentiment data") return None timestamp_col = timestamp_cols[0] converted = self.convert_timestamp_column(df, timestamp_col) if converted is None: print(f"[ERROR] Could not convert timestamp column {timestamp_col} in {symbol}") return None df['sentiment_timestamp'] = converted df['symbol'] = symbol.upper() return df except Exception as e: print(f"[ERROR] Error loading sentiment data for {symbol}: {e}") return None def load_features_data(self, data_type='stocks'): """Load features data with improved timestamp handling""" file_path = self.stocks_features_path if data_type == 'stocks' else self.crypto_features_path if not file_path.exists(): print(f"[ERROR] Features file not found: {file_path}") return None try: df = pd.read_parquet(file_path) print(f"[INFO] Loaded {data_type} features: {len(df)} rows") potential_timestamp_cols = [col for col in df.columns if any(keyword in col.lower() for keyword in ['time', 'date', 'interval', 'timestamp', 'dt'])] print(f"[INFO] Potential timestamp columns: {potential_timestamp_cols}") # Safer timestamp detection timestamp_cols = [] for col in potential_timestamp_cols: try: is_ts = self.is_timestamp_column(df, col) if is_ts: timestamp_cols.append(col) print(f"[DEBUG] {col} confirmed as timestamp column") else: print(f"[DEBUG] {col} rejected as timestamp column") except Exception as e: print(f"[WARN] Error checking {col}: {e}") continue print(f"[INFO] Confirmed timestamp columns: {timestamp_cols}") if not timestamp_cols: print(f"[ERROR] No valid timestamp columns found in {data_type} features") return None best_col = self.select_best_timestamp_column(df, timestamp_cols) if best_col is None: print(f"[ERROR] Could not select a valid timestamp column from {timestamp_cols}") return None converted = self.convert_timestamp_column(df, best_col) if converted is None: print(f"[ERROR] Failed to convert selected timestamp column {best_col}") return None df['feature_timestamp'] = converted print(f"[INFO] Selected timestamp column: {best_col}") print(f"[INFO] Date range: {converted.min()} to {converted.max()}") return df except Exception as e: import traceback print(f"[ERROR] Error loading {data_type} features: {e}") print(f"[ERROR] Traceback: {traceback.format_exc()}") return None def merge_sentiment_to_features(self, features_df, sentiment_df, tolerance_minutes=60*12): """Merge sentiment data INTO features data based on closest timestamp, with tolerance window""" features_sorted = features_df.sort_values(by='feature_timestamp') sentiment_sorted = sentiment_df.sort_values(by='sentiment_timestamp') # Use a tolerance window for timestamp matching tolerance = pd.Timedelta(minutes=tolerance_minutes) merged_df = pd.merge_asof( features_sorted, sentiment_sorted, left_on='feature_timestamp', right_on='sentiment_timestamp', direction='nearest', tolerance=tolerance ) # If no sentiment match within tolerance, sentiment_score will be NaN if 'sentiment_score' in merged_df.columns: unmatched = merged_df['sentiment_score'].isna().sum() print(f"[INFO] Rows with no sentiment match (NaN sentiment_score): {unmatched}") print(f"[INFO] Merged {len(features_df)} feature rows with {len(sentiment_df)} sentiment rows using tolerance {tolerance_minutes} min") print(f"[INFO] Result: {len(merged_df)} rows") return merged_df def process_stocks_data(self): """Process all stocks data by merging finviz sentiment into stock features""" print("[INFO] Processing stocks data...") # Load stocks features first (this is the base dataset) stocks_df = self.load_features_data('stocks') if stocks_df is None: print("[ERROR] Failed to load stocks features data") return None # Check what columns are available and what symbols are in the data if 'symbol' in stocks_df.columns: unique_symbols = stocks_df['symbol'].unique() elif 'ticker' in stocks_df.columns: unique_symbols = stocks_df['ticker'].unique() print(f"[INFO] Available symbols in stocks features: {unique_symbols}") # Check if any sentiment files exist if not self.finviz_path.exists(): print(f"[WARN] Finviz sentiment directory does not exist: {self.finviz_path}") print(f"[WARN] Proceeding without sentiment data merge for stocks") # Save features as-is without sentiment merge output_file = self.output_path / "stocks_features.parquet" stocks_df.to_parquet(output_file) print(f"[INFO] Stocks features saved without sentiment to: {output_file}") return stocks_df # Check if any sentiment files exist for our tickers sentiment_files_exist = any( (self.finviz_path / f"{ticker.upper()}_sentiment.parquet").exists() for ticker in self.stock_tickers ) if not sentiment_files_exist: print(f"[WARN] No sentiment files found for any stock tickers: {self.stock_tickers}") print(f"[WARN] Proceeding without sentiment data merge for stocks") # Save features as-is without sentiment merge output_file = self.output_path / "stocks_features.parquet" stocks_df.to_parquet(output_file) print(f"[INFO] Stocks features saved without sentiment to: {output_file}") return stocks_df merged_stocks_list = [] for ticker in self.stock_tickers: print(f"[INFO] Processing stock ticker: {ticker}") # Load sentiment data for this ticker sentiment_df = self.load_sentiment_data(ticker) if sentiment_df is None: print(f"[WARN] No sentiment data for {ticker}, skipping...") continue # Filter stocks features for this ticker ticker_stocks = None if 'symbol' in stocks_df.columns: ticker_stocks = stocks_df[stocks_df['symbol'] == ticker].copy() elif 'ticker' in stocks_df.columns: ticker_stocks = stocks_df[stocks_df['ticker'] == ticker].copy() if ticker_stocks is None or len(ticker_stocks) == 0: print(f"[WARN] No feature data found for ticker {ticker} - skipping this ticker") continue print(f"[INFO] Found {len(ticker_stocks)} feature rows for {ticker}") # Merge sentiment INTO features merged_ticker = self.merge_sentiment_to_features(ticker_stocks, sentiment_df) # Remove symbol_y and replace symbol_x with symbol if 'symbol_y' in merged_ticker.columns: merged_ticker = merged_ticker.drop(columns=['symbol_y']) if 'symbol_x' in merged_ticker.columns: merged_ticker = merged_ticker.rename(columns={'symbol_x': 'symbol'}) # Re-order columns: symbol first, interval_timestamp second (if present) cols = list(merged_ticker.columns) if 'symbol' in cols: cols.remove('symbol') new_order = ['symbol'] if 'interval_timestamp' in cols: cols.remove('interval_timestamp') new_order.append('interval_timestamp') new_order += cols merged_ticker = merged_ticker[new_order] merged_stocks_list.append(merged_ticker) if not merged_stocks_list: print("[WARN] No stocks data was successfully merged with sentiment") print("[WARN] Saving original stocks features without sentiment") output_file = self.output_path / "stocks_features.parquet" stocks_df.to_parquet(output_file) print(f"[INFO] Stocks features saved without sentiment to: {output_file}") return stocks_df # Combine all merged stock data final_stocks_df = pd.concat(merged_stocks_list, ignore_index=True) # Save the result output_file = self.output_path / "stocks_features.parquet" final_stocks_df.to_parquet(output_file) print(f"[INFO] Stocks data with sentiment saved to: {output_file}") return final_stocks_df def process_crypto_data(self): """Process all crypto data by merging finviz sentiment into crypto features""" print("[INFO] Processing crypto data...") # Load crypto features first (this is the base dataset) crypto_df = self.load_features_data('crypto') if crypto_df is None: print("[ERROR] Failed to load crypto features data") return None # Check for various possible symbol/ticker columns symbol_columns = [col for col in crypto_df.columns if any(keyword in col.lower() for keyword in ['symbol', 'ticker', 'name', 'id', 'coin'])] print(f"[INFO] Available symbol columns in crypto: {symbol_columns}") # Try to identify unique values in potential symbol columns for col in symbol_columns: if crypto_df[col].dtype == 'object': unique_values = crypto_df[col].unique()[:10] # Show first 10 unique values print(f"[INFO] Sample values in {col}: {unique_values}") # Check if any sentiment files exist if not self.finviz_path.exists(): print(f"[WARN] Finviz sentiment directory does not exist: {self.finviz_path}") print(f"[WARN] Proceeding without sentiment data merge for crypto") # Save features as-is without sentiment merge output_file = self.output_path / "crypto_features.parquet" crypto_df.to_parquet(output_file) print(f"[INFO] Crypto features saved without sentiment to: {output_file}") return crypto_df # Check if any sentiment files exist for our crypto tickers sentiment_files_exist = any( (self.finviz_path / f"{ticker.upper()}_sentiment.parquet").exists() for ticker in self.crypto_ticker_mapping.keys() ) if not sentiment_files_exist: print(f"[WARN] No sentiment files found for any crypto tickers: {list(self.crypto_ticker_mapping.keys())}") print(f"[WARN] Proceeding without sentiment data merge for crypto") # Save features as-is without sentiment merge output_file = self.output_path / "crypto_features.parquet" crypto_df.to_parquet(output_file) print(f"[INFO] Crypto features saved without sentiment to: {output_file}") return crypto_df merged_crypto_list = [] for crypto_ticker, crypto_name in self.crypto_ticker_mapping.items(): print(f"[INFO] Processing crypto ticker: {crypto_ticker} (name: {crypto_name})") # Load sentiment data for this crypto ticker sentiment_df = self.load_sentiment_data(crypto_ticker) if sentiment_df is None: print(f"[WARN] No sentiment data for {crypto_ticker}, skipping...") continue # Try different approaches to filter crypto features ticker_crypto = None # Approach 1: Try exact ticker match for col in ['symbol', 'ticker', 'coin_id', 'id']: if col in crypto_df.columns: matches = crypto_df[crypto_df[col].str.upper() == crypto_ticker].copy() if len(matches) > 0: ticker_crypto = matches print(f"[INFO] Found {len(matches)} rows matching {crypto_ticker} in column '{col}'") break # Approach 2: Try crypto name match if ticker_crypto is None or len(ticker_crypto) == 0: for col in ['name', 'coin_name']: if col in crypto_df.columns: matches = crypto_df[crypto_df[col].str.lower() == crypto_name.lower()].copy() if len(matches) > 0: ticker_crypto = matches print(f"[INFO] Found {len(matches)} rows matching {crypto_name} in column '{col}'") break # Approach 3: Try partial matching (in case of different formats) if ticker_crypto is None or len(ticker_crypto) == 0: for col in symbol_columns: if crypto_df[col].dtype == 'object': # Try case-insensitive contains match matches = crypto_df[crypto_df[col].str.contains(crypto_ticker, case=False, na=False)].copy() if len(matches) > 0: ticker_crypto = matches print(f"[INFO] Found {len(matches)} rows with partial match for {crypto_ticker} in column '{col}'") break # Try crypto name partial match matches = crypto_df[crypto_df[col].str.contains(crypto_name, case=False, na=False)].copy() if len(matches) > 0: ticker_crypto = matches print(f"[INFO] Found {len(matches)} rows with partial match for {crypto_name} in column '{col}'") break if ticker_crypto is None or len(ticker_crypto) == 0: print(f"[WARN] No feature data found for crypto {crypto_ticker} ({crypto_name}) - skipping this crypto") continue # Merge sentiment INTO features merged_ticker = self.merge_sentiment_to_features(ticker_crypto, sentiment_df) # Remove symbol_x and replace symbol_y with symbol if 'symbol_x' in merged_ticker.columns: merged_ticker = merged_ticker.drop(columns=['symbol_x']) if 'symbol_y' in merged_ticker.columns: merged_ticker = merged_ticker.rename(columns={'symbol_y': 'symbol'}) # Remove duplicate 'symbol' columns if any symbol_cols = [col for col in merged_ticker.columns if col == 'symbol'] if len(symbol_cols) > 1: # Keep only the first 'symbol' column # This will drop all but the first occurrence merged_ticker = merged_ticker.loc[:, ~merged_ticker.columns.duplicated()] # Re-order columns: symbol first, interval_timestamp second (if present) cols = list(merged_ticker.columns) if 'symbol' in cols: cols.remove('symbol') new_order = ['symbol'] if 'interval_timestamp' in cols: cols.remove('interval_timestamp') new_order.append('interval_timestamp') new_order += cols merged_ticker = merged_ticker[new_order] merged_crypto_list.append(merged_ticker) if not merged_crypto_list: print("[WARN] No crypto data was successfully merged with sentiment") print("[WARN] Saving original crypto features without sentiment") output_file = self.output_path / "crypto_features.parquet" crypto_df.to_parquet(output_file) print(f"[INFO] Crypto features saved without sentiment to: {output_file}") return crypto_df # Combine all merged crypto data final_crypto_df = pd.concat(merged_crypto_list, ignore_index=True) # Save the result output_file = self.output_path / "crypto_features.parquet" final_crypto_df.to_parquet(output_file) print(f"[INFO] Crypto data with sentiment saved to: {output_file}") return final_crypto_df def process_all_data(self): """Process both stocks and crypto data""" #logging.info("Starting data processing for all assets...") stocks_result = self.process_stocks_data() crypto_result = self.process_crypto_data() if stocks_result is not None: print(f"[OK] Stocks processing completed: {len(stocks_result)} rows") else: print("[ERROR] Stocks processing failed") if crypto_result is not None: print(f"[OK] Crypto processing completed: {len(crypto_result)} rows") else: print("[ERROR] Crypto processing failed") return stocks_result, crypto_result # Example usage if __name__ == "__main__": handler = FixedTimestampHandler() # Test individual components #logging.info("Testing sentiment data loading...") sentiment_df = handler.load_sentiment_data("AAPL") stocks_df = handler.load_features_data('stocks') # Test merge process # handler.test_merge() # Process all data handler.process_all_data()