Maaroufabousaleh
f
c49b21b
raw
history blame
15 kB
import sys
import os
import numpy as np
import pandas as pd
from datetime import datetime
# Ensure src/merge is in the path for import
sys.path.append(os.path.dirname(__file__))
from alpaca_features import build_features, save
def create_symbol_mapping():
"""
Create mapping between crypto full names and ticker symbols.
"""
# Common crypto symbol mappings
crypto_mapping = {
# Major cryptocurrencies
'bitcoin': 'BTC',
'ethereum': 'ETH',
'binancecoin': 'BNB',
'ripple': 'XRP',
'cardano': 'ADA',
'solana': 'SOL',
'dogecoin': 'DOGE',
'polkadot': 'DOT',
'matic-network': 'MATIC',
'polygon': 'MATIC',
'avalanche-2': 'AVAX',
'avalanche': 'AVAX',
'chainlink': 'LINK',
'litecoin': 'LTC',
'bitcoin-cash': 'BCH',
'stellar': 'XLM',
'vechain': 'VET',
'ethereum-classic': 'ETC',
'filecoin': 'FIL',
'tron': 'TRX',
'monero': 'XMR',
'eos': 'EOS',
'aave': 'AAVE',
'maker': 'MKR',
'compound': 'COMP',
'uniswap': 'UNI',
'yearn-finance': 'YFI',
'sushi': 'SUSHI',
'curve-dao-token': 'CRV',
'pancakeswap-token': 'CAKE',
'terra-luna': 'LUNA',
'fantom': 'FTM',
'harmony': 'ONE',
'near': 'NEAR',
'algorand': 'ALGO',
'cosmos': 'ATOM',
'internet-computer': 'ICP',
'helium': 'HNT',
'theta-token': 'THETA',
'chiliz': 'CHZ',
'decentraland': 'MANA',
'the-sandbox': 'SAND',
'axie-infinity': 'AXS',
'shiba-inu': 'SHIB',
'apecoin': 'APE',
'gala': 'GALA',
'enjincoin': 'ENJ',
'flow': 'FLOW',
'basic-attention-token': 'BAT',
'omg': 'OMG',
'loopring': 'LRC',
'immutable-x': 'IMX',
'render-token': 'RNDR',
'quant-network': 'QNT',
'injective-protocol': 'INJ',
'sei-network': 'SEI',
'arbitrum': 'ARB',
'optimism': 'OP',
'blur': 'BLUR',
'pepe': 'PEPE',
'bonk': 'BONK',
'wormhole': 'W',
'jupiter-exchange-solana': 'JUP',
'worldcoin-wld': 'WLD',
'pyth-network': 'PYTH',
'jito': 'JTO',
'tensor': 'TNSR',
'meme': 'MEME',
'cat-in-a-dogs-world': 'MEW',
'book-of-meme': 'BOME',
'dogwifhat': 'WIF',
'popcat': 'POPCAT',
'goatseus-maximus': 'GOAT',
'peanut-the-squirrel': 'PNUT',
'act-i-the-ai-prophecy': 'ACT',
'fartcoin': 'FARTCOIN',
'ai16z': 'AI16Z',
'virtual-protocol': 'VIRTUAL',
'zerebro': 'ZEREBRO',
'griffain': 'GRIFFAIN',
'aixbt-by-virtuals': 'AIXBT',
'marc-and-ethan-are-based': 'BASED',
'pudgy-penguins': 'PENGU',
'hyperliquid': 'HYPE',
'move-movement': 'MOVE',
'usual': 'USUAL',
'reserve-rights': 'RSR',
'ondo-finance': 'ONDO',
'ethena': 'ENA',
'eigenlayer': 'EIGEN',
'grass': 'GRASS',
'io': 'IO',
'notcoin': 'NOT',
'turbo': 'TURBO',
'jasmy': 'JASMY',
'neo': 'NEO',
'iota': 'IOTA',
'dash': 'DASH',
'zcash': 'ZEC',
'waves': 'WAVES',
}
# Create reverse mapping (ticker -> full name)
reverse_mapping = {v.lower(): k for k, v in crypto_mapping.items()}
# Also add the forward mapping (full name -> ticker)
forward_mapping = {k: v.lower() for k, v in crypto_mapping.items()}
return crypto_mapping, reverse_mapping, forward_mapping
def normalize_symbols(df, symbol_col, is_alpaca=False):
"""
Normalize symbols to handle crypto name/ticker differences and stock symbols.
"""
df = df.copy()
crypto_mapping, reverse_mapping, forward_mapping = create_symbol_mapping()
# Convert to lowercase for consistency
df[symbol_col] = df[symbol_col].str.lower()
if is_alpaca:
# Alpaca uses tickers (BTC, ETH, etc. for crypto, NVDA, AAPL, etc. for stocks)
# For crypto: Map tickers to full names to match merged data
# For stocks: Keep the ticker symbol as-is (in lowercase)
def map_alpaca_symbol(symbol):
symbol_lower = symbol.lower()
# Check if it's a crypto ticker that needs mapping
if symbol_lower in reverse_mapping:
return reverse_mapping[symbol_lower]
else:
# It's likely a stock symbol, keep as-is (lowercase)
return symbol_lower
df[symbol_col] = df[symbol_col].apply(map_alpaca_symbol)
else:
# Merged data uses full names for crypto (bitcoin, ethereum, etc.)
# and should use lowercase tickers for stocks (nvda, aapl, etc.)
# Keep as is, but ensure lowercase
pass
return df
def merge_alpaca_features():
"""
Merge Alpaca features with existing merged features.
Handles timestamp alignment, column conflicts, and symbol mapping.
"""
# Step 1: Create Alpaca features
alpaca_df = build_features()
save(alpaca_df)
# Step 2: Load merged features
try:
from src import config as app_config
base_dir = app_config.DATA_DIR
except Exception:
base_dir = os.environ.get("DATA_DIR", "/data")
merged_path = os.path.join(base_dir, "merged", "features", "merged_features.parquet")
merged_df = pd.read_parquet(merged_path)
# Normalize symbols
alpaca_df_normalized = normalize_symbols(alpaca_df, "symbol", is_alpaca=True)
merged_df_normalized = normalize_symbols(merged_df, "symbol", is_alpaca=False)
# Find overlapping symbols
alpaca_normalized = set(alpaca_df_normalized["symbol"].unique())
merged_normalized = set(merged_df_normalized["symbol"].unique())
overlapping_symbols = alpaca_normalized.intersection(merged_normalized)
missing_in_merged = alpaca_normalized - merged_normalized
# Step 6: Handle symbols that exist only in Alpaca data
if missing_in_merged:
new_symbol_rows = []
for missing_symbol in missing_in_merged:
# Get actual data for this symbol from Alpaca
symbol_data = alpaca_df_normalized[alpaca_df_normalized["symbol"] == missing_symbol]
if len(symbol_data) == 0:
continue
# Create rows based on Alpaca timestamps, not merged timestamps
for _, alpaca_row in symbol_data.iterrows():
new_row = {
"symbol": missing_symbol,
"interval_timestamp": alpaca_row["timestamp"], # Use Alpaca timestamp
"is_stock": True if missing_symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"] else False,
"is_crypto": False if missing_symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"] else True,
"stock_market": "NASDAQ" if missing_symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"] else None,
"feature_timestamp": pd.Timestamp.now().value // 1000000, # Convert to milliseconds
}
# Copy all Alpaca feature columns into the new row
for col in alpaca_row.index:
if col not in new_row:
new_row[col] = alpaca_row[col]
# Add all other columns from merged_df with NaN values (except the ones we set above)
for col in merged_df_normalized.columns:
if col not in new_row:
new_row[col] = np.nan
new_symbol_rows.append(new_row)
if new_symbol_rows:
new_symbols_df = pd.DataFrame(new_symbol_rows)
merged_df_normalized = pd.concat([merged_df_normalized, new_symbols_df], ignore_index=True)
# Step 7: Check for overlapping columns and handle them
join_keys = ["symbol", "timestamp", "interval_timestamp"]
alpaca_cols = set(alpaca_df_normalized.columns) - set(join_keys)
merged_cols = set(merged_df_normalized.columns) - set(join_keys)
overlapping_cols = alpaca_cols.intersection(merged_cols)
# Convert timestamps to datetime for processing (use pd.concat to avoid fragmentation)
timestamp_columns = {}
if "timestamp" in alpaca_df_normalized.columns:
timestamp_columns["timestamp_dt"] = pd.to_datetime(alpaca_df_normalized["timestamp"], unit="ms")
if "interval_timestamp" in merged_df_normalized.columns:
timestamp_columns["interval_timestamp_dt"] = pd.to_datetime(merged_df_normalized["interval_timestamp"], unit="ms")
# Add timestamp columns efficiently using pd.concat
if timestamp_columns:
for col_name, col_data in timestamp_columns.items():
if col_name == "timestamp_dt" and "timestamp" in alpaca_df_normalized.columns:
alpaca_df_normalized = pd.concat([alpaca_df_normalized, col_data.to_frame(col_name)], axis=1)
elif col_name == "interval_timestamp_dt" and "interval_timestamp" in merged_df_normalized.columns:
merged_df_normalized = pd.concat([merged_df_normalized, col_data.to_frame(col_name)], axis=1)
# Perform an OUTER merge to capture all data from both sources
final_merge = pd.merge(
merged_df_normalized,
alpaca_df_normalized,
left_on=["symbol", "interval_timestamp"],
right_on=["symbol", "timestamp"],
how="outer", # Changed from "left" to "outer"
suffixes=("", "_alpaca")
)
# For rows that came only from Alpaca (new symbols), copy the timestamp to interval_timestamp
alpaca_only_mask = final_merge["interval_timestamp"].isna() & final_merge["timestamp"].notna()
if alpaca_only_mask.any():
final_merge.loc[alpaca_only_mask, "interval_timestamp"] = final_merge.loc[alpaca_only_mask, "timestamp"]
# Set basic metadata for these new rows
final_merge.loc[alpaca_only_mask, "feature_timestamp"] = pd.Timestamp.now().value // 1000000
# Set stock/crypto flags based on symbol
for symbol in final_merge.loc[alpaca_only_mask, "symbol"].unique():
symbol_mask = alpaca_only_mask & (final_merge["symbol"] == symbol)
is_stock = symbol.upper() in ["NVDA", "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META"]
final_merge.loc[symbol_mask, "is_stock"] = is_stock
final_merge.loc[symbol_mask, "is_crypto"] = not is_stock
if is_stock:
final_merge.loc[symbol_mask, "stock_market"] = "NASDAQ"
# Copy _alpaca columns into base columns for Alpaca-only rows
feature_cols = [
"open", "high", "low", "close", "volume", "trade_count", "vwap",
"symbol_quote", "bid_price", "bid_size", "bid_exchange", "ask_price", "ask_size", "ask_exchange",
"conditions", "tape", "symbol_trade", "exchange", "price", "size", "id", "conditions_trade", "tape_trade"
]
for col in feature_cols:
alpaca_col = f"{col}_alpaca"
if alpaca_col in final_merge.columns and col in final_merge.columns:
final_merge.loc[alpaca_only_mask, col] = final_merge.loc[alpaca_only_mask, alpaca_col]
# Step 11: Calculate merge statistics
total_merged_rows = len(merged_df_normalized)
total_alpaca_rows = len(alpaca_df_normalized)
total_final_rows = len(final_merge)
# Count matches from original merged data
original_matched_rows = final_merge[
final_merge["timestamp"].notna() &
final_merge["interval_timestamp"].notna() &
(final_merge["interval_timestamp"] != final_merge["timestamp"])
].shape[0]
# Count new rows from Alpaca-only symbols
alpaca_only_rows = final_merge[
final_merge["timestamp"].notna() &
(final_merge["interval_timestamp"] == final_merge["timestamp"])
].shape[0]
# Total rows with Alpaca data
total_alpaca_matched = final_merge[final_merge["timestamp"].notna()].shape[0]
original_match_rate = original_matched_rows / total_merged_rows if total_merged_rows > 0 else 0
overall_match_rate = total_alpaca_matched / total_final_rows if total_final_rows > 0 else 0
# Step 12: Debug successful matches and new symbols
if total_alpaca_matched > 0:
successful_matches = final_merge[final_merge["timestamp"].notna()]
sample_cols = ["symbol", "interval_timestamp", "timestamp", "open", "high", "low", "close", "volume"]
available_cols = [col for col in sample_cols if col in successful_matches.columns]
# Step 13: Add merge metadata
final_merge["alpaca_merge_timestamp"] = pd.Timestamp.now().value // 1000000 # Convert to milliseconds
final_merge["alpaca_data_available"] = final_merge["timestamp"].notna()
final_merge["alpaca_match_rate"] = overall_match_rate
final_merge["is_new_symbol"] = final_merge["interval_timestamp"] == final_merge["timestamp"]
# Step 14: Handle duplicate columns before saving
duplicate_cols = final_merge.columns[final_merge.columns.duplicated()].tolist()
if duplicate_cols:
final_merge = final_merge.loc[:, ~final_merge.columns.duplicated()]
# Save the merged features
out_path = os.path.join(base_dir, "merged", "features", "merged_features.parquet")
final_merge.to_parquet(out_path, index=False)
# Generate detailed summary report
print(f"Total final rows: {len(final_merge)}")
print(f"Rows with Alpaca data: {total_alpaca_matched}")
print(f"New symbols added: {alpaca_only_rows}")
print(f"Overall match rate: {overall_match_rate:.2%}")
print(f"Total columns: {len(final_merge.columns)}")
# Show symbols with and without Alpaca data
symbol_summary = final_merge.groupby("symbol").agg({
"alpaca_data_available": ["count", "sum"],
"is_new_symbol": "sum"
}).round(2)
symbol_summary.columns = ["total_rows", "alpaca_matches", "new_symbol_rows"]
symbol_summary["match_rate"] = symbol_summary["alpaca_matches"] / symbol_summary["total_rows"]
symbol_summary["is_new_symbol"] = symbol_summary["new_symbol_rows"] > 0
# Show which symbols have complete data
complete_symbols = symbol_summary[symbol_summary["match_rate"] > 0.5]
if len(complete_symbols) > 0:
print(complete_symbols[["total_rows", "alpaca_matches", "match_rate"]])
# Show sample of final merged data
sample_cols = ["symbol", "interval_timestamp", "alpaca_data_available", "is_new_symbol", "open", "high", "low", "close", "volume"]
return final_merge
if __name__ == "__main__":
try:
merged_df = merge_alpaca_features()
except Exception as e:
import traceback
traceback.print_exc()