Maaroufabousaleh
f
c49b21b
import os
import pandas as pd
import numpy as np
from pathlib import Path
# import #logging
from datetime import datetime
# Resolve DATA_DIR from config (container-safe) with fallback
try:
from src.config import DATA_DIR as CFG_DATA_DIR # when run as module
except Exception:
try:
from config import DATA_DIR as CFG_DATA_DIR # when run as script from src/
except Exception:
CFG_DATA_DIR = "/data"
class FixedTimestampHandler:
def __init__(self, base_path: str | os.PathLike | None = None):
# Prefer explicit argument, then DATA_DIR env, then config fallback
resolved_base = base_path or os.getenv("DATA_DIR") or CFG_DATA_DIR
self.base_path = Path(resolved_base)
self.finviz_path = self.base_path / "finviz" / "sentiment"
self.crypto_features_path = self.base_path / "merged" / "features" / "crypto_features.parquet"
self.stocks_features_path = self.base_path / "merged" / "features" / "stocks_features.parquet"
self.output_path = self.base_path / "merged" / "features"
self.output_path.mkdir(parents=True, exist_ok=True)
# Configure #logging
#logging.basicConfig(level=#logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s')
# Define tickers and mappings
self.stock_tickers = ["AAPL", "TSLA", "GOOGL", "NVDA", "MSFT", "COIN"]
self.crypto_ticker_mapping = {
"BTC": "bitcoin",
"ETH": "ethereum",
"SOL": "solana",
"XRP": "ripple",
"ADA": "cardano"
}
# Reverse mapping: crypto name to ticker (all lowercase keys)
self.crypto_name_to_ticker = {v.lower(): k for k, v in self.crypto_ticker_mapping.items()}
def crypto_name_to_symbol(self, name):
"""Transform crypto name (e.g., 'bitcoin', 'Bitcoin', 'BITCOIN') to ticker symbol (e.g., 'BTC')"""
if not isinstance(name, str):
return None
name_lower = name.strip().lower()
# Try exact match
if name_lower in self.crypto_name_to_ticker:
return self.crypto_name_to_ticker[name_lower]
# Try to match ignoring spaces and underscores
for key in self.crypto_name_to_ticker:
if name_lower.replace(' ', '').replace('_', '') == key.replace(' ', '').replace('_', ''):
return self.crypto_name_to_ticker[key]
return None
def is_timestamp_column(self, df, col_name):
"""Determine if a column is likely a timestamp column"""
if pd.api.types.is_datetime64_any_dtype(df[col_name]):
return True
if pd.api.types.is_numeric_dtype(df[col_name]):
sample_vals = df[col_name].dropna()
if len(sample_vals) == 0:
return False
sample_val = sample_vals.iloc[0]
current_time = pd.Timestamp.now().timestamp()
units = [
('s', 1),
('ms', 1000),
('us', 1000000),
('ns', 1000000000)
]
for unit, divisor in units:
try:
if unit == 's':
ts_value = sample_val
else:
ts_value = sample_val / divisor
if abs(ts_value - current_time) < (10 * 365 * 24 * 3600):
return True
except:
continue
if df[col_name].dtype == 'object':
sample_val = df[col_name].dropna().iloc[0] if not df[col_name].empty else None
if sample_val and isinstance(sample_val, str):
try:
pd.to_datetime(sample_val)
return True
except (ValueError, TypeError):
pass
return False
def get_timestamp_columns(self, df):
"""Identify all timestamp columns in a dataframe"""
timestamp_cols = []
potential_names = ['time', 'date', 'interval', 'timestamp', 'dt']
for col in df.columns:
if any(keyword in col.lower() for keyword in potential_names):
if self.is_timestamp_column(df, col):
timestamp_cols.append(col)
return timestamp_cols
def convert_timestamp_column(self, df, col_name, unit='auto'):
"""Convert a timestamp column to datetime format with improved validation"""
if pd.api.types.is_datetime64_any_dtype(df[col_name]):
if df[col_name].dt.tz is not None:
df[col_name] = df[col_name].dt.tz_localize(None)
return df[col_name]
if pd.api.types.is_numeric_dtype(df[col_name]):
sample_vals = df[col_name].dropna()
if len(sample_vals) == 0:
print(f"[ERROR] No valid values in timestamp column {col_name}")
return None
# Convert nullable Int64 to regular numeric if needed
if hasattr(sample_vals, 'dtype') and str(sample_vals.dtype).startswith('Int'):
sample_vals = sample_vals.astype('int64')
if unit == 'auto':
current_time = pd.Timestamp.now().timestamp()
best_unit = None
best_distance = float('inf')
for test_unit in ['s', 'ms', 'us', 'ns']:
try:
# Additional safety check
if len(sample_vals) == 0:
continue
first_val = sample_vals.iloc[0]
if pd.isna(first_val):
continue
if test_unit == 's':
test_ts = pd.to_datetime(first_val, unit='s')
else:
divisor = {'ms': 1000, 'us': 1000000, 'ns': 1000000000}[test_unit]
test_ts = pd.to_datetime(first_val / divisor, unit='s')
distance = abs((pd.Timestamp.now() - test_ts).total_seconds())
if distance < best_distance:
best_distance = distance
best_unit = test_unit
except Exception as e:
#logging.debug(f"Failed to test unit {test_unit} for column {col_name}: {e}")
continue
if best_unit is None:
#logging.error(f"Could not determine unit for column {col_name}")
return None
unit = best_unit
#logging.info(f"Auto-detected unit for {col_name}: {unit}")
try:
# Convert nullable Int64 to regular numeric if needed for the whole column
values_to_convert = df[col_name]
if hasattr(values_to_convert, 'dtype') and str(values_to_convert.dtype).startswith('Int'):
values_to_convert = values_to_convert.astype('int64')
if unit == 's':
converted = pd.to_datetime(values_to_convert, unit='s')
else:
divisor = {'ms': 1000, 'us': 1000000, 'ns': 1000000000}[unit]
converted = pd.to_datetime(values_to_convert / divisor, unit='s')
if converted.dt.tz is not None:
converted = converted.dt.tz_localize(None)
if converted.min().year < 2000:
#logging.warning(f"Converted timestamps for {col_name} seem too old. Checking alternative units.")
for alt_unit in ['s', 'ms', 'us', 'ns']:
if alt_unit == unit:
continue
try:
if alt_unit == 's':
alt_converted = pd.to_datetime(df[col_name], unit='s')
else:
alt_divisor = {'ms': 1000, 'us': 1000000, 'ns': 1000000000}[alt_unit]
alt_converted = pd.to_datetime(df[col_name] / alt_divisor, unit='s')
if alt_converted.min().year > 2000:
#logging.info(f"Alternative unit {alt_unit} gives better results for {col_name}")
converted = alt_converted
break
except Exception as e:
#logging.debug(f"Failed to try alternative unit {alt_unit} for column {col_name}: {e}")
continue
#logging.info(f"Successfully converted {col_name} using unit '{unit}'")
#logging.info(f"Date range: {converted.min()} to {converted.max()}")
return converted
except Exception as e:
#logging.error(f"Failed to convert {col_name} using unit '{unit}': {e}")
return None
elif df[col_name].dtype == 'object':
try:
converted = pd.to_datetime(df[col_name])
if converted.dt.tz is not None:
converted = converted.dt.tz_localize(None)
#logging.info(f"Successfully converted string timestamps in {col_name}")
return converted
except Exception as e:
#logging.error(f"Failed to convert string timestamps in {col_name}: {e}")
return None
else:
#logging.error(f"Unknown timestamp format in column {col_name}")
return None
def select_best_timestamp_column(self, df, timestamp_columns):
"""Select the best timestamp column from a list of potential columns"""
best_col = None
best_score = -1
for col in timestamp_columns:
try:
if col not in df.columns:
print(f"[WARN] Column {col} not found in dataframe")
continue
if df[col].isnull().all():
print(f"[WARN] Column {col} contains only null values")
continue
converted = self.convert_timestamp_column(df, col)
if converted is None:
print(f"[WARN] Could not convert column {col} to timestamp")
continue
non_null_count = converted.notna().sum()
recent_count = converted[converted > pd.Timestamp('2020-01-01')].count()
score = non_null_count + recent_count * 2
print(f"[DEBUG] Column {col}: score={score}, non_null={non_null_count}, recent={recent_count}")
if score > best_score:
best_score = score
best_col = col
except Exception as e:
print(f"[WARN] Error evaluating timestamp column {col}: {e}")
continue
print(f"[INFO] Best timestamp column: {best_col} (score: {best_score})")
return best_col
def load_sentiment_data(self, symbol):
"""Load sentiment data with proper timestamp handling"""
sentiment_file = self.finviz_path / f"{symbol.upper()}_sentiment.parquet"
if not sentiment_file.exists():
print(f"[WARN] Sentiment file not found: {sentiment_file}")
return None
try:
df = pd.read_parquet(sentiment_file)
print(f"[INFO] Loaded sentiment data for {symbol}: {len(df)} rows")
timestamp_cols = self.get_timestamp_columns(df)
if not timestamp_cols:
print(f"[ERROR] No timestamp columns found in {symbol} sentiment data")
return None
timestamp_col = timestamp_cols[0]
converted = self.convert_timestamp_column(df, timestamp_col)
if converted is None:
print(f"[ERROR] Could not convert timestamp column {timestamp_col} in {symbol}")
return None
df['sentiment_timestamp'] = converted
df['symbol'] = symbol.upper()
return df
except Exception as e:
print(f"[ERROR] Error loading sentiment data for {symbol}: {e}")
return None
def load_features_data(self, data_type='stocks'):
"""Load features data with improved timestamp handling"""
file_path = self.stocks_features_path if data_type == 'stocks' else self.crypto_features_path
if not file_path.exists():
print(f"[ERROR] Features file not found: {file_path}")
return None
try:
df = pd.read_parquet(file_path)
print(f"[INFO] Loaded {data_type} features: {len(df)} rows")
potential_timestamp_cols = [col for col in df.columns if any(keyword in col.lower() for keyword in ['time', 'date', 'interval', 'timestamp', 'dt'])]
print(f"[INFO] Potential timestamp columns: {potential_timestamp_cols}")
# Safer timestamp detection
timestamp_cols = []
for col in potential_timestamp_cols:
try:
is_ts = self.is_timestamp_column(df, col)
if is_ts:
timestamp_cols.append(col)
print(f"[DEBUG] {col} confirmed as timestamp column")
else:
print(f"[DEBUG] {col} rejected as timestamp column")
except Exception as e:
print(f"[WARN] Error checking {col}: {e}")
continue
print(f"[INFO] Confirmed timestamp columns: {timestamp_cols}")
if not timestamp_cols:
print(f"[ERROR] No valid timestamp columns found in {data_type} features")
return None
best_col = self.select_best_timestamp_column(df, timestamp_cols)
if best_col is None:
print(f"[ERROR] Could not select a valid timestamp column from {timestamp_cols}")
return None
converted = self.convert_timestamp_column(df, best_col)
if converted is None:
print(f"[ERROR] Failed to convert selected timestamp column {best_col}")
return None
df['feature_timestamp'] = converted
print(f"[INFO] Selected timestamp column: {best_col}")
print(f"[INFO] Date range: {converted.min()} to {converted.max()}")
return df
except Exception as e:
import traceback
print(f"[ERROR] Error loading {data_type} features: {e}")
print(f"[ERROR] Traceback: {traceback.format_exc()}")
return None
def merge_sentiment_to_features(self, features_df, sentiment_df, tolerance_minutes=60*12):
"""Merge sentiment data INTO features data based on closest timestamp, with tolerance window"""
features_sorted = features_df.sort_values(by='feature_timestamp')
sentiment_sorted = sentiment_df.sort_values(by='sentiment_timestamp')
# Use a tolerance window for timestamp matching
tolerance = pd.Timedelta(minutes=tolerance_minutes)
merged_df = pd.merge_asof(
features_sorted,
sentiment_sorted,
left_on='feature_timestamp',
right_on='sentiment_timestamp',
direction='nearest',
tolerance=tolerance
)
# If no sentiment match within tolerance, sentiment_score will be NaN
if 'sentiment_score' in merged_df.columns:
unmatched = merged_df['sentiment_score'].isna().sum()
print(f"[INFO] Rows with no sentiment match (NaN sentiment_score): {unmatched}")
print(f"[INFO] Merged {len(features_df)} feature rows with {len(sentiment_df)} sentiment rows using tolerance {tolerance_minutes} min")
print(f"[INFO] Result: {len(merged_df)} rows")
return merged_df
def process_stocks_data(self):
"""Process all stocks data by merging finviz sentiment into stock features"""
print("[INFO] Processing stocks data...")
# Load stocks features first (this is the base dataset)
stocks_df = self.load_features_data('stocks')
if stocks_df is None:
print("[ERROR] Failed to load stocks features data")
return None
# Check what columns are available and what symbols are in the data
if 'symbol' in stocks_df.columns:
unique_symbols = stocks_df['symbol'].unique()
elif 'ticker' in stocks_df.columns:
unique_symbols = stocks_df['ticker'].unique()
print(f"[INFO] Available symbols in stocks features: {unique_symbols}")
# Check if any sentiment files exist
if not self.finviz_path.exists():
print(f"[WARN] Finviz sentiment directory does not exist: {self.finviz_path}")
print(f"[WARN] Proceeding without sentiment data merge for stocks")
# Save features as-is without sentiment merge
output_file = self.output_path / "stocks_features.parquet"
stocks_df.to_parquet(output_file)
print(f"[INFO] Stocks features saved without sentiment to: {output_file}")
return stocks_df
# Check if any sentiment files exist for our tickers
sentiment_files_exist = any(
(self.finviz_path / f"{ticker.upper()}_sentiment.parquet").exists()
for ticker in self.stock_tickers
)
if not sentiment_files_exist:
print(f"[WARN] No sentiment files found for any stock tickers: {self.stock_tickers}")
print(f"[WARN] Proceeding without sentiment data merge for stocks")
# Save features as-is without sentiment merge
output_file = self.output_path / "stocks_features.parquet"
stocks_df.to_parquet(output_file)
print(f"[INFO] Stocks features saved without sentiment to: {output_file}")
return stocks_df
merged_stocks_list = []
for ticker in self.stock_tickers:
print(f"[INFO] Processing stock ticker: {ticker}")
# Load sentiment data for this ticker
sentiment_df = self.load_sentiment_data(ticker)
if sentiment_df is None:
print(f"[WARN] No sentiment data for {ticker}, skipping...")
continue
# Filter stocks features for this ticker
ticker_stocks = None
if 'symbol' in stocks_df.columns:
ticker_stocks = stocks_df[stocks_df['symbol'] == ticker].copy()
elif 'ticker' in stocks_df.columns:
ticker_stocks = stocks_df[stocks_df['ticker'] == ticker].copy()
if ticker_stocks is None or len(ticker_stocks) == 0:
print(f"[WARN] No feature data found for ticker {ticker} - skipping this ticker")
continue
print(f"[INFO] Found {len(ticker_stocks)} feature rows for {ticker}")
# Merge sentiment INTO features
merged_ticker = self.merge_sentiment_to_features(ticker_stocks, sentiment_df)
# Remove symbol_y and replace symbol_x with symbol
if 'symbol_y' in merged_ticker.columns:
merged_ticker = merged_ticker.drop(columns=['symbol_y'])
if 'symbol_x' in merged_ticker.columns:
merged_ticker = merged_ticker.rename(columns={'symbol_x': 'symbol'})
# Re-order columns: symbol first, interval_timestamp second (if present)
cols = list(merged_ticker.columns)
if 'symbol' in cols:
cols.remove('symbol')
new_order = ['symbol']
if 'interval_timestamp' in cols:
cols.remove('interval_timestamp')
new_order.append('interval_timestamp')
new_order += cols
merged_ticker = merged_ticker[new_order]
merged_stocks_list.append(merged_ticker)
if not merged_stocks_list:
print("[WARN] No stocks data was successfully merged with sentiment")
print("[WARN] Saving original stocks features without sentiment")
output_file = self.output_path / "stocks_features.parquet"
stocks_df.to_parquet(output_file)
print(f"[INFO] Stocks features saved without sentiment to: {output_file}")
return stocks_df
# Combine all merged stock data
final_stocks_df = pd.concat(merged_stocks_list, ignore_index=True)
# Save the result
output_file = self.output_path / "stocks_features.parquet"
final_stocks_df.to_parquet(output_file)
print(f"[INFO] Stocks data with sentiment saved to: {output_file}")
return final_stocks_df
def process_crypto_data(self):
"""Process all crypto data by merging finviz sentiment into crypto features"""
print("[INFO] Processing crypto data...")
# Load crypto features first (this is the base dataset)
crypto_df = self.load_features_data('crypto')
if crypto_df is None:
print("[ERROR] Failed to load crypto features data")
return None
# Check for various possible symbol/ticker columns
symbol_columns = [col for col in crypto_df.columns if any(keyword in col.lower()
for keyword in ['symbol', 'ticker', 'name', 'id', 'coin'])]
print(f"[INFO] Available symbol columns in crypto: {symbol_columns}")
# Try to identify unique values in potential symbol columns
for col in symbol_columns:
if crypto_df[col].dtype == 'object':
unique_values = crypto_df[col].unique()[:10] # Show first 10 unique values
print(f"[INFO] Sample values in {col}: {unique_values}")
# Check if any sentiment files exist
if not self.finviz_path.exists():
print(f"[WARN] Finviz sentiment directory does not exist: {self.finviz_path}")
print(f"[WARN] Proceeding without sentiment data merge for crypto")
# Save features as-is without sentiment merge
output_file = self.output_path / "crypto_features.parquet"
crypto_df.to_parquet(output_file)
print(f"[INFO] Crypto features saved without sentiment to: {output_file}")
return crypto_df
# Check if any sentiment files exist for our crypto tickers
sentiment_files_exist = any(
(self.finviz_path / f"{ticker.upper()}_sentiment.parquet").exists()
for ticker in self.crypto_ticker_mapping.keys()
)
if not sentiment_files_exist:
print(f"[WARN] No sentiment files found for any crypto tickers: {list(self.crypto_ticker_mapping.keys())}")
print(f"[WARN] Proceeding without sentiment data merge for crypto")
# Save features as-is without sentiment merge
output_file = self.output_path / "crypto_features.parquet"
crypto_df.to_parquet(output_file)
print(f"[INFO] Crypto features saved without sentiment to: {output_file}")
return crypto_df
merged_crypto_list = []
for crypto_ticker, crypto_name in self.crypto_ticker_mapping.items():
print(f"[INFO] Processing crypto ticker: {crypto_ticker} (name: {crypto_name})")
# Load sentiment data for this crypto ticker
sentiment_df = self.load_sentiment_data(crypto_ticker)
if sentiment_df is None:
print(f"[WARN] No sentiment data for {crypto_ticker}, skipping...")
continue
# Try different approaches to filter crypto features
ticker_crypto = None
# Approach 1: Try exact ticker match
for col in ['symbol', 'ticker', 'coin_id', 'id']:
if col in crypto_df.columns:
matches = crypto_df[crypto_df[col].str.upper() == crypto_ticker].copy()
if len(matches) > 0:
ticker_crypto = matches
print(f"[INFO] Found {len(matches)} rows matching {crypto_ticker} in column '{col}'")
break
# Approach 2: Try crypto name match
if ticker_crypto is None or len(ticker_crypto) == 0:
for col in ['name', 'coin_name']:
if col in crypto_df.columns:
matches = crypto_df[crypto_df[col].str.lower() == crypto_name.lower()].copy()
if len(matches) > 0:
ticker_crypto = matches
print(f"[INFO] Found {len(matches)} rows matching {crypto_name} in column '{col}'")
break
# Approach 3: Try partial matching (in case of different formats)
if ticker_crypto is None or len(ticker_crypto) == 0:
for col in symbol_columns:
if crypto_df[col].dtype == 'object':
# Try case-insensitive contains match
matches = crypto_df[crypto_df[col].str.contains(crypto_ticker, case=False, na=False)].copy()
if len(matches) > 0:
ticker_crypto = matches
print(f"[INFO] Found {len(matches)} rows with partial match for {crypto_ticker} in column '{col}'")
break
# Try crypto name partial match
matches = crypto_df[crypto_df[col].str.contains(crypto_name, case=False, na=False)].copy()
if len(matches) > 0:
ticker_crypto = matches
print(f"[INFO] Found {len(matches)} rows with partial match for {crypto_name} in column '{col}'")
break
if ticker_crypto is None or len(ticker_crypto) == 0:
print(f"[WARN] No feature data found for crypto {crypto_ticker} ({crypto_name}) - skipping this crypto")
continue
# Merge sentiment INTO features
merged_ticker = self.merge_sentiment_to_features(ticker_crypto, sentiment_df)
# Remove symbol_x and replace symbol_y with symbol
if 'symbol_x' in merged_ticker.columns:
merged_ticker = merged_ticker.drop(columns=['symbol_x'])
if 'symbol_y' in merged_ticker.columns:
merged_ticker = merged_ticker.rename(columns={'symbol_y': 'symbol'})
# Remove duplicate 'symbol' columns if any
symbol_cols = [col for col in merged_ticker.columns if col == 'symbol']
if len(symbol_cols) > 1:
# Keep only the first 'symbol' column
# This will drop all but the first occurrence
merged_ticker = merged_ticker.loc[:, ~merged_ticker.columns.duplicated()]
# Re-order columns: symbol first, interval_timestamp second (if present)
cols = list(merged_ticker.columns)
if 'symbol' in cols:
cols.remove('symbol')
new_order = ['symbol']
if 'interval_timestamp' in cols:
cols.remove('interval_timestamp')
new_order.append('interval_timestamp')
new_order += cols
merged_ticker = merged_ticker[new_order]
merged_crypto_list.append(merged_ticker)
if not merged_crypto_list:
print("[WARN] No crypto data was successfully merged with sentiment")
print("[WARN] Saving original crypto features without sentiment")
output_file = self.output_path / "crypto_features.parquet"
crypto_df.to_parquet(output_file)
print(f"[INFO] Crypto features saved without sentiment to: {output_file}")
return crypto_df
# Combine all merged crypto data
final_crypto_df = pd.concat(merged_crypto_list, ignore_index=True)
# Save the result
output_file = self.output_path / "crypto_features.parquet"
final_crypto_df.to_parquet(output_file)
print(f"[INFO] Crypto data with sentiment saved to: {output_file}")
return final_crypto_df
def process_all_data(self):
"""Process both stocks and crypto data"""
#logging.info("Starting data processing for all assets...")
stocks_result = self.process_stocks_data()
crypto_result = self.process_crypto_data()
if stocks_result is not None:
print(f"[OK] Stocks processing completed: {len(stocks_result)} rows")
else:
print("[ERROR] Stocks processing failed")
if crypto_result is not None:
print(f"[OK] Crypto processing completed: {len(crypto_result)} rows")
else:
print("[ERROR] Crypto processing failed")
return stocks_result, crypto_result
# Example usage
if __name__ == "__main__":
handler = FixedTimestampHandler()
# Test individual components
#logging.info("Testing sentiment data loading...")
sentiment_df = handler.load_sentiment_data("AAPL")
stocks_df = handler.load_features_data('stocks')
# Test merge process
# handler.test_merge()
# Process all data
handler.process_all_data()