#!/usr/bin/env python3 """ Extract symbols from symbols.* columns and populate the symbol field for crypto data. This script runs after merge steps but before data_filler phases to ensure the symbol column is properly populated from existing exchange symbol data. Example: symbols.gateio:"BTC_USDT" -> symbol:"BTC" """ import pandas as pd import sys from pathlib import Path import re def extract_symbol_from_exchange_symbols(df): """Extract base symbol from exchange symbol columns""" if 'symbol' not in df.columns: df['symbol'] = None # Find all symbols.* columns symbol_columns = [col for col in df.columns if col.startswith('symbols.')] if not symbol_columns: return df # Extract symbols from exchange symbol data symbols_extracted = 0 symbols_normalized = 0 # First pass: extract symbols from exchange data for null symbols for idx, row in df.iterrows(): # Skip if symbol is already populated if pd.notna(row.get('symbol')): continue # Try to extract symbol from any exchange symbol column extracted_symbol = None for col in symbol_columns: exchange_symbol = row.get(col) if pd.notna(exchange_symbol) and isinstance(exchange_symbol, str): # Extract base symbol from various exchange formats symbol = extract_base_symbol(exchange_symbol) if symbol: extracted_symbol = symbol break if extracted_symbol: df.at[idx, 'symbol'] = extracted_symbol symbols_extracted += 1 # Second pass: normalize cg_id values to proper ticker symbols cg_id_to_symbol_mapping = { 'bitcoin': 'BTC', 'ethereum': 'ETH', 'solana': 'SOL', 'cardano': 'ADA', 'ripple': 'XRP', 'binancecoin': 'BNB', 'dogecoin': 'DOGE', 'polkadot': 'DOT', 'chainlink': 'LINK', 'litecoin': 'LTC', 'uniswap': 'UNI', 'avalanche-2': 'AVAX', 'polygon': 'MATIC', 'stellar': 'XLM', 'bitcoin-cash': 'BCH', 'filecoin': 'FIL', 'tron': 'TRX', 'ethereum-classic': 'ETC', 'monero': 'XMR', 'cosmos': 'ATOM', 'algorand': 'ALGO', 'vechain': 'VET', 'hedera-hashgraph': 'HBAR', 'internet-computer': 'ICP', 'theta-token': 'THETA', 'eos': 'EOS', 'aave': 'AAVE', 'maker': 'MKR', 'curve-dao-token': 'CRV', 'pancakeswap-token': 'CAKE', 'the-sandbox': 'SAND', 'decentraland': 'MANA', 'axie-infinity': 'AXS', 'shiba-inu': 'SHIB', 'terra-luna': 'LUNA', 'near': 'NEAR', 'flow': 'FLOW', 'fantom': 'FTM', 'harmony': 'ONE', 'basic-attention-token': 'BAT', 'enjincoin': 'ENJ', 'sushi': 'SUSHI', 'compound': 'COMP', 'yearn-finance': 'YFI', 'synthetix': 'SNX', 'uma': 'UMA', '0x': 'ZRX', 'loopring': 'LRC', 'balancer': 'BAL' } for idx, row in df.iterrows(): current_symbol = row.get('symbol') # If symbol matches a cg_id pattern, normalize it to ticker symbol if pd.notna(current_symbol) and isinstance(current_symbol, str): normalized_symbol = cg_id_to_symbol_mapping.get(current_symbol.lower()) if normalized_symbol and normalized_symbol != current_symbol: df.at[idx, 'symbol'] = normalized_symbol symbols_normalized += 1 # Final stats for debugging if needed # print(f"Extracted symbols for {symbols_extracted} rows") # print(f"Normalized symbols for {symbols_normalized} rows") # Show results null_symbols_remaining = df['symbol'].isnull().sum() # print(f"Remaining null symbols: {null_symbols_remaining}") if null_symbols_remaining > 0: # print("Rows with remaining null symbols:") sample_nulls = df[df['symbol'].isnull()][['symbol', 'cg_id'] + symbol_columns[:3]].head(5) # print(sample_nulls) return df def extract_base_symbol(exchange_symbol): """Extract base symbol from exchange symbol formats""" if not isinstance(exchange_symbol, str): return None exchange_symbol = exchange_symbol.strip().upper() # Common patterns for crypto exchange symbols patterns = [ r'^([A-Z]{2,10})USDT?$', # BTCUSDT -> BTC r'^([A-Z]{2,10})_USDT?$', # BTC_USDT -> BTC r'^([A-Z]{2,10})/USDT?$', # BTC/USDT -> BTC r'^([A-Z]{2,10})-USDT?$', # BTC-USDT -> BTC r'^([A-Z]{2,10})USD$', # BTCUSD -> BTC r'^([A-Z]{2,10})_USD$', # BTC_USD -> BTC r'^([A-Z]{2,10})/USD$', # BTC/USD -> BTC r'^([A-Z]{2,10})-USD$', # BTC-USD -> BTC r'^([A-Z]{2,10})BUSD$', # BTCBUSD -> BTC r'^([A-Z]{2,10})_BUSD$', # BTC_BUSD -> BTC r'^([A-Z]{2,10})EUR$', # BTCEUR -> BTC r'^([A-Z]{2,10})_EUR$', # BTC_EUR -> BTC r'^([A-Z]{2,10})BTC$', # ETHBTC -> ETH r'^([A-Z]{2,10})_BTC$', # ETH_BTC -> ETH ] for pattern in patterns: match = re.match(pattern, exchange_symbol) if match: base_symbol = match.group(1) # Filter out obvious non-crypto symbols and ensure reasonable length if len(base_symbol) >= 2 and len(base_symbol) <= 10: # Skip if it looks like a quote currency if base_symbol not in ['USDT', 'USDC', 'USD', 'EUR', 'BTC', 'ETH', 'BNB', 'BUSD']: return base_symbol elif base_symbol in ['BTC', 'ETH', 'BNB']: # These are valid base symbols return base_symbol # If no pattern matches, try simple heuristics # Remove common suffixes for suffix in ['USDT', 'USDC', 'USD', 'EUR', 'BUSD']: if exchange_symbol.endswith(suffix): base = exchange_symbol[:-len(suffix)] if len(base) >= 2 and len(base) <= 10: return base # Split on common delimiters and take first part for delimiter in ['_', '/', '-']: if delimiter in exchange_symbol: parts = exchange_symbol.split(delimiter) if len(parts) >= 2: base = parts[0] if len(base) >= 2 and len(base) <= 10: return base return None def process_crypto_features(): """Process crypto features to extract symbols""" # Try different possible paths possible_paths = [ Path('data/merged/features/crypto_features.parquet'), Path('../../data/merged/features/crypto_features.parquet'), Path('../../../data/merged/features/crypto_features.parquet') ] crypto_file = None for path in possible_paths: if path.exists(): crypto_file = path break if crypto_file is None: print(f"Crypto features file not found in any of these locations:") for path in possible_paths: print(f" {path.absolute()}") return False print(f"Loading crypto features from: {crypto_file}") df = pd.read_parquet(crypto_file) print(f"Loaded {len(df)} rows with {len(df.columns)} columns") # Check current state null_symbols_before = df['symbol'].isnull().sum() if 'symbol' in df.columns else len(df) print(f"Null symbols before: {null_symbols_before} ({null_symbols_before/len(df)*100:.1f}%)") # Extract symbols df_fixed = extract_symbol_from_exchange_symbols(df) # Check results - note that extract_symbol_from_exchange_symbols tracks its own changes null_symbols_after = df_fixed['symbol'].isnull().sum() if 'symbol' in df_fixed.columns else len(df_fixed) # Calculate total improvement total_improvement = null_symbols_before - null_symbols_after print("Successfully extracted crypto symbols!") # Save if there's been any improvement or if nulls are very low if total_improvement > 0 or null_symbols_after <= 2: # Save the fixed file df_fixed.to_parquet(crypto_file) return True else: return True # Success even if no changes needed def process_stocks_features(): """Process stocks features to extract symbols (if needed)""" # Try different possible paths possible_paths = [ Path('data/merged/features/stocks_features.parquet'), Path('../../data/merged/features/stocks_features.parquet'), Path('../../../data/merged/features/stocks_features.parquet') ] stocks_file = None for path in possible_paths: if path.exists(): stocks_file = path break if stocks_file is None: return False df = pd.read_parquet(stocks_file) # Check if stocks need symbol extraction too null_symbols_before = df['symbol'].isnull().sum() if 'symbol' in df.columns else len(df) print(f"Null symbols before: {null_symbols_before} ({null_symbols_before/len(df)*100:.1f}%)") if null_symbols_before == 0: print("Stocks symbols are already populated, skipping") return True # For stocks, we might have different symbol patterns # Extract symbols if needed df_fixed = extract_symbol_from_exchange_symbols(df) # Check results null_symbols_after = df_fixed['symbol'].isnull().sum() if 'symbol' in df_fixed.columns else len(df_fixed) symbols_fixed = null_symbols_before - null_symbols_after print(f"\nResults:") print(f"- Symbols fixed: {symbols_fixed}") print(f"- Null symbols after: {null_symbols_after} ({null_symbols_after/len(df_fixed)*100:.1f}%)") if symbols_fixed > 0: # Save the fixed file print(f"\nSaving fixed stocks features to: {stocks_file}") df_fixed.to_parquet(stocks_file) print("File saved successfully!") return True else: print("No symbols were extracted/fixed for stocks") return True # Not an error for stocks def main(): """Main function to extract symbols from exchange symbol data""" print("=== EXTRACTING SYMBOLS FROM EXCHANGE DATA ===") print("This script extracts base symbols from symbols.* columns") print("Example: symbols.gateio:'BTC_USDT' -> symbol:'BTC'") print() # Process crypto features print("Processing crypto features...") crypto_success = process_crypto_features() print("\n" + "="*50 + "\n") # Process stocks features print("Processing stocks features...") stocks_success = process_stocks_features() print("\n" + "="*50) if crypto_success: print("Successfully extracted crypto symbols!") else: print("Failed to extract crypto symbols!") if stocks_success: print("Stocks symbols processing completed!") else: print("Failed to process stocks symbols!") if crypto_success and stocks_success: print("\nSymbol extraction completed successfully!") return True else: print("\nSome issues occurred during symbol extraction") return False if __name__ == "__main__": success = main() sys.exit(0 if success else 1)