import sys import os import numpy as np import pandas as pd from datetime import datetime # Ensure src/merge is in the path for import sys.path.append(os.path.dirname(__file__)) from alpaca_features import build_features, save def create_symbol_mapping(): """ Create mapping between crypto full names and ticker symbols. """ # Common crypto symbol mappings crypto_mapping = { # Major cryptocurrencies 'bitcoin': 'BTC', 'ethereum': 'ETH', 'binancecoin': 'BNB', 'ripple': 'XRP', 'cardano': 'ADA', 'solana': 'SOL', 'dogecoin': 'DOGE', 'polkadot': 'DOT', 'matic-network': 'MATIC', 'polygon': 'MATIC', 'avalanche-2': 'AVAX', 'avalanche': 'AVAX', 'chainlink': 'LINK', 'litecoin': 'LTC', 'bitcoin-cash': 'BCH', 'stellar': 'XLM', 'vechain': 'VET', 'ethereum-classic': 'ETC', 'filecoin': 'FIL', 'tron': 'TRX', 'monero': 'XMR', 'eos': 'EOS', 'aave': 'AAVE', 'maker': 'MKR', 'compound': 'COMP', 'uniswap': 'UNI', 'yearn-finance': 'YFI', 'sushi': 'SUSHI', 'curve-dao-token': 'CRV', 'pancakeswap-token': 'CAKE', 'terra-luna': 'LUNA', 'fantom': 'FTM', 'harmony': 'ONE', 'near': 'NEAR', 'algorand': 'ALGO', 'cosmos': 'ATOM', 'internet-computer': 'ICP', 'helium': 'HNT', 'theta-token': 'THETA', 'chiliz': 'CHZ', 'decentraland': 'MANA', 'the-sandbox': 'SAND', 'axie-infinity': 'AXS', 'shiba-inu': 'SHIB', 'apecoin': 'APE', 'gala': 'GALA', 'enjincoin': 'ENJ', 'flow': 'FLOW', 'basic-attention-token': 'BAT', 'omg': 'OMG', 'loopring': 'LRC', 'immutable-x': 'IMX', 'render-token': 'RNDR', 'quant-network': 'QNT', 'injective-protocol': 'INJ', 'sei-network': 'SEI', 'arbitrum': 'ARB', 'optimism': 'OP', 'blur': 'BLUR', 'pepe': 'PEPE', 'bonk': 'BONK', 'wormhole': 'W', 'jupiter-exchange-solana': 'JUP', 'worldcoin-wld': 'WLD', 'pyth-network': 'PYTH', 'jito': 'JTO', 'tensor': 'TNSR', 'meme': 'MEME', 'cat-in-a-dogs-world': 'MEW', 'book-of-meme': 'BOME', 'dogwifhat': 'WIF', 'popcat': 'POPCAT', 'goatseus-maximus': 'GOAT', 'peanut-the-squirrel': 'PNUT', 'act-i-the-ai-prophecy': 'ACT', 'fartcoin': 'FARTCOIN', 'ai16z': 'AI16Z', 'virtual-protocol': 'VIRTUAL', 'zerebro': 'ZEREBRO', 'griffain': 'GRIFFAIN', 'aixbt-by-virtuals': 'AIXBT', 'marc-and-ethan-are-based': 'BASED', 'pudgy-penguins': 'PENGU', 'hyperliquid': 'HYPE', 'move-movement': 'MOVE', 'usual': 'USUAL', 'reserve-rights': 'RSR', 'ondo-finance': 'ONDO', 'ethena': 'ENA', 'eigenlayer': 'EIGEN', 'grass': 'GRASS', 'io': 'IO', 'notcoin': 'NOT', 'turbo': 'TURBO', 'jasmy': 'JASMY', 'neo': 'NEO', 'iota': 'IOTA', 'dash': 'DASH', 'zcash': 'ZEC', 'waves': 'WAVES', } # Create reverse mapping (ticker -> full name) reverse_mapping = {v.lower(): k for k, v in crypto_mapping.items()} # Also add the forward mapping (full name -> ticker) forward_mapping = {k: v.lower() for k, v in crypto_mapping.items()} return crypto_mapping, reverse_mapping, forward_mapping def normalize_symbols(df, symbol_col, is_alpaca=False): """ Normalize symbols to handle crypto name/ticker differences and stock symbols. """ df = df.copy() crypto_mapping, reverse_mapping, forward_mapping = create_symbol_mapping() # Convert to lowercase for consistency df[symbol_col] = df[symbol_col].str.lower() if is_alpaca: # Alpaca uses tickers (BTC, ETH, etc. for crypto, NVDA, AAPL, etc. for stocks) # For crypto: Map tickers to full names to match merged data # For stocks: Keep the ticker symbol as-is (in lowercase) def map_alpaca_symbol(symbol): symbol_lower = symbol.lower() # Check if it's a crypto ticker that needs mapping if symbol_lower in reverse_mapping: return reverse_mapping[symbol_lower] else: # It's likely a stock symbol, keep as-is (lowercase) return symbol_lower df[symbol_col] = df[symbol_col].apply(map_alpaca_symbol) else: # Merged data uses full names for crypto (bitcoin, ethereum, etc.) # and should use lowercase tickers for stocks (nvda, aapl, etc.) # Keep as is, but ensure lowercase pass return df def merge_alpaca_features(): """ Merge Alpaca features with existing merged features. Handles timestamp alignment, column conflicts, and symbol mapping. """ # Step 1: Create Alpaca features alpaca_df = build_features() save(alpaca_df) # Step 2: Load merged features try: from src import config as app_config base_dir = app_config.DATA_DIR except Exception: base_dir = os.environ.get("DATA_DIR", "/data") merged_path = os.path.join(base_dir, "merged", "features", "merged_features.parquet") merged_df = pd.read_parquet(merged_path) # Normalize symbols alpaca_df_normalized = normalize_symbols(alpaca_df, "symbol", is_alpaca=True) merged_df_normalized = normalize_symbols(merged_df, "symbol", is_alpaca=False) # Find overlapping symbols alpaca_normalized = set(alpaca_df_normalized["symbol"].unique()) merged_normalized = set(merged_df_normalized["symbol"].unique()) overlapping_symbols = alpaca_normalized.intersection(merged_normalized) missing_in_merged = alpaca_normalized - merged_normalized # Step 6: Handle symbols that exist only in Alpaca data if missing_in_merged: new_symbol_rows = [] for missing_symbol in missing_in_merged: # Get actual data for this symbol from Alpaca symbol_data = alpaca_df_normalized[alpaca_df_normalized["symbol"] == missing_symbol] if len(symbol_data) == 0: continue # Create rows based on Alpaca timestamps, not merged timestamps for _, alpaca_row in symbol_data.iterrows(): new_row = { "symbol": missing_symbol, "interval_timestamp": alpaca_row["timestamp"], # Use Alpaca timestamp "is_stock": True if missing_symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"] else False, "is_crypto": False if missing_symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"] else True, "stock_market": "NASDAQ" if missing_symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"] else None, "feature_timestamp": pd.Timestamp.now().value // 1000000, # Convert to milliseconds } # Copy all Alpaca feature columns into the new row for col in alpaca_row.index: if col not in new_row: new_row[col] = alpaca_row[col] # Add all other columns from merged_df with NaN values (except the ones we set above) for col in merged_df_normalized.columns: if col not in new_row: new_row[col] = np.nan new_symbol_rows.append(new_row) if new_symbol_rows: new_symbols_df = pd.DataFrame(new_symbol_rows) merged_df_normalized = pd.concat([merged_df_normalized, new_symbols_df], ignore_index=True) # Step 7: Check for overlapping columns and handle them join_keys = ["symbol", "timestamp", "interval_timestamp"] alpaca_cols = set(alpaca_df_normalized.columns) - set(join_keys) merged_cols = set(merged_df_normalized.columns) - set(join_keys) overlapping_cols = alpaca_cols.intersection(merged_cols) # Convert timestamps to datetime for processing (use pd.concat to avoid fragmentation) timestamp_columns = {} if "timestamp" in alpaca_df_normalized.columns: timestamp_columns["timestamp_dt"] = pd.to_datetime(alpaca_df_normalized["timestamp"], unit="ms") if "interval_timestamp" in merged_df_normalized.columns: timestamp_columns["interval_timestamp_dt"] = pd.to_datetime(merged_df_normalized["interval_timestamp"], unit="ms") # Add timestamp columns efficiently using pd.concat if timestamp_columns: for col_name, col_data in timestamp_columns.items(): if col_name == "timestamp_dt" and "timestamp" in alpaca_df_normalized.columns: alpaca_df_normalized = pd.concat([alpaca_df_normalized, col_data.to_frame(col_name)], axis=1) elif col_name == "interval_timestamp_dt" and "interval_timestamp" in merged_df_normalized.columns: merged_df_normalized = pd.concat([merged_df_normalized, col_data.to_frame(col_name)], axis=1) # Perform an OUTER merge to capture all data from both sources final_merge = pd.merge( merged_df_normalized, alpaca_df_normalized, left_on=["symbol", "interval_timestamp"], right_on=["symbol", "timestamp"], how="outer", # Changed from "left" to "outer" suffixes=("", "_alpaca") ) # For rows that came only from Alpaca (new symbols), copy the timestamp to interval_timestamp alpaca_only_mask = final_merge["interval_timestamp"].isna() & final_merge["timestamp"].notna() if alpaca_only_mask.any(): final_merge.loc[alpaca_only_mask, "interval_timestamp"] = final_merge.loc[alpaca_only_mask, "timestamp"] # Set basic metadata for these new rows final_merge.loc[alpaca_only_mask, "feature_timestamp"] = pd.Timestamp.now().value // 1000000 # Set stock/crypto flags based on symbol for symbol in final_merge.loc[alpaca_only_mask, "symbol"].unique(): symbol_mask = alpaca_only_mask & (final_merge["symbol"] == symbol) is_stock = symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"] final_merge.loc[symbol_mask, "is_stock"] = is_stock final_merge.loc[symbol_mask, "is_crypto"] = not is_stock if is_stock: final_merge.loc[symbol_mask, "stock_market"] = "NASDAQ" # Copy _alpaca columns into base columns for Alpaca-only rows feature_cols = [ "open", "high", "low", "close", "volume", "trade_count", "vwap", "symbol_quote", "bid_price", "bid_size", "bid_exchange", "ask_price", "ask_size", "ask_exchange", "conditions", "tape", "symbol_trade", "exchange", "price", "size", "id", "conditions_trade", "tape_trade" ] for col in feature_cols: alpaca_col = f"{col}_alpaca" if alpaca_col in final_merge.columns and col in final_merge.columns: final_merge.loc[alpaca_only_mask, col] = final_merge.loc[alpaca_only_mask, alpaca_col] # Step 11: Calculate merge statistics total_merged_rows = len(merged_df_normalized) total_alpaca_rows = len(alpaca_df_normalized) total_final_rows = len(final_merge) # Count matches from original merged data original_matched_rows = final_merge[ final_merge["timestamp"].notna() & final_merge["interval_timestamp"].notna() & (final_merge["interval_timestamp"] != final_merge["timestamp"]) ].shape[0] # Count new rows from Alpaca-only symbols alpaca_only_rows = final_merge[ final_merge["timestamp"].notna() & (final_merge["interval_timestamp"] == final_merge["timestamp"]) ].shape[0] # Total rows with Alpaca data total_alpaca_matched = final_merge[final_merge["timestamp"].notna()].shape[0] original_match_rate = original_matched_rows / total_merged_rows if total_merged_rows > 0 else 0 overall_match_rate = total_alpaca_matched / total_final_rows if total_final_rows > 0 else 0 # Step 12: Debug successful matches and new symbols if total_alpaca_matched > 0: successful_matches = final_merge[final_merge["timestamp"].notna()] sample_cols = ["symbol", "interval_timestamp", "timestamp", "open", "high", "low", "close", "volume"] available_cols = [col for col in sample_cols if col in successful_matches.columns] # Step 13: Add merge metadata final_merge["alpaca_merge_timestamp"] = pd.Timestamp.now().value // 1000000 # Convert to milliseconds final_merge["alpaca_data_available"] = final_merge["timestamp"].notna() final_merge["alpaca_match_rate"] = overall_match_rate final_merge["is_new_symbol"] = final_merge["interval_timestamp"] == final_merge["timestamp"] # Step 14: Handle duplicate columns before saving duplicate_cols = final_merge.columns[final_merge.columns.duplicated()].tolist() if duplicate_cols: final_merge = final_merge.loc[:, ~final_merge.columns.duplicated()] # Save the merged features out_path = os.path.join(base_dir, "merged", "features", "merged_features.parquet") final_merge.to_parquet(out_path, index=False) # Generate detailed summary report print(f"Total final rows: {len(final_merge)}") print(f"Rows with Alpaca data: {total_alpaca_matched}") print(f"New symbols added: {alpaca_only_rows}") print(f"Overall match rate: {overall_match_rate:.2%}") print(f"Total columns: {len(final_merge.columns)}") # Show symbols with and without Alpaca data symbol_summary = final_merge.groupby("symbol").agg({ "alpaca_data_available": ["count", "sum"], "is_new_symbol": "sum" }).round(2) symbol_summary.columns = ["total_rows", "alpaca_matches", "new_symbol_rows"] symbol_summary["match_rate"] = symbol_summary["alpaca_matches"] / symbol_summary["total_rows"] symbol_summary["is_new_symbol"] = symbol_summary["new_symbol_rows"] > 0 # Show which symbols have complete data complete_symbols = symbol_summary[symbol_summary["match_rate"] > 0.5] if len(complete_symbols) > 0: print(complete_symbols[["total_rows", "alpaca_matches", "match_rate"]]) # Show sample of final merged data sample_cols = ["symbol", "interval_timestamp", "alpaca_data_available", "is_new_symbol", "open", "high", "low", "close", "volume"] return final_merge if __name__ == "__main__": try: merged_df = merge_alpaca_features() except Exception as e: import traceback traceback.print_exc()