advisorai-data-enhanced / src /merge /extract_symbols.py
Maaroufabousaleh
f
c49b21b
raw
history blame
11.5 kB
#!/usr/bin/env python3
"""
Extract symbols from symbols.* columns and populate the symbol field for crypto data.
This script runs after merge steps but before data_filler phases to ensure
the symbol column is properly populated from existing exchange symbol data.
Example: symbols.gateio:"BTC_USDT" -> symbol:"BTC"
"""
import pandas as pd
import sys
from pathlib import Path
import re
def extract_symbol_from_exchange_symbols(df):
"""Extract base symbol from exchange symbol columns"""
if 'symbol' not in df.columns:
df['symbol'] = None
# Find all symbols.* columns
symbol_columns = [col for col in df.columns if col.startswith('symbols.')]
if not symbol_columns:
return df
# Extract symbols from exchange symbol data
symbols_extracted = 0
symbols_normalized = 0
# First pass: extract symbols from exchange data for null symbols
for idx, row in df.iterrows():
# Skip if symbol is already populated
if pd.notna(row.get('symbol')):
continue
# Try to extract symbol from any exchange symbol column
extracted_symbol = None
for col in symbol_columns:
exchange_symbol = row.get(col)
if pd.notna(exchange_symbol) and isinstance(exchange_symbol, str):
# Extract base symbol from various exchange formats
symbol = extract_base_symbol(exchange_symbol)
if symbol:
extracted_symbol = symbol
break
if extracted_symbol:
df.at[idx, 'symbol'] = extracted_symbol
symbols_extracted += 1
# Second pass: normalize cg_id values to proper ticker symbols
cg_id_to_symbol_mapping = {
'bitcoin': 'BTC',
'ethereum': 'ETH',
'solana': 'SOL',
'cardano': 'ADA',
'ripple': 'XRP',
'binancecoin': 'BNB',
'dogecoin': 'DOGE',
'polkadot': 'DOT',
'chainlink': 'LINK',
'litecoin': 'LTC',
'uniswap': 'UNI',
'avalanche-2': 'AVAX',
'polygon': 'MATIC',
'stellar': 'XLM',
'bitcoin-cash': 'BCH',
'filecoin': 'FIL',
'tron': 'TRX',
'ethereum-classic': 'ETC',
'monero': 'XMR',
'cosmos': 'ATOM',
'algorand': 'ALGO',
'vechain': 'VET',
'hedera-hashgraph': 'HBAR',
'internet-computer': 'ICP',
'theta-token': 'THETA',
'eos': 'EOS',
'aave': 'AAVE',
'maker': 'MKR',
'curve-dao-token': 'CRV',
'pancakeswap-token': 'CAKE',
'the-sandbox': 'SAND',
'decentraland': 'MANA',
'axie-infinity': 'AXS',
'shiba-inu': 'SHIB',
'terra-luna': 'LUNA',
'near': 'NEAR',
'flow': 'FLOW',
'fantom': 'FTM',
'harmony': 'ONE',
'basic-attention-token': 'BAT',
'enjincoin': 'ENJ',
'sushi': 'SUSHI',
'compound': 'COMP',
'yearn-finance': 'YFI',
'synthetix': 'SNX',
'uma': 'UMA',
'0x': 'ZRX',
'loopring': 'LRC',
'balancer': 'BAL'
}
for idx, row in df.iterrows():
current_symbol = row.get('symbol')
# If symbol matches a cg_id pattern, normalize it to ticker symbol
if pd.notna(current_symbol) and isinstance(current_symbol, str):
normalized_symbol = cg_id_to_symbol_mapping.get(current_symbol.lower())
if normalized_symbol and normalized_symbol != current_symbol:
df.at[idx, 'symbol'] = normalized_symbol
symbols_normalized += 1
# Final stats for debugging if needed
# print(f"Extracted symbols for {symbols_extracted} rows")
# print(f"Normalized symbols for {symbols_normalized} rows")
# Show results
null_symbols_remaining = df['symbol'].isnull().sum()
# print(f"Remaining null symbols: {null_symbols_remaining}")
if null_symbols_remaining > 0:
# print("Rows with remaining null symbols:")
sample_nulls = df[df['symbol'].isnull()][['symbol', 'cg_id'] + symbol_columns[:3]].head(5)
# print(sample_nulls)
return df
def extract_base_symbol(exchange_symbol):
"""Extract base symbol from exchange symbol formats"""
if not isinstance(exchange_symbol, str):
return None
exchange_symbol = exchange_symbol.strip().upper()
# Common patterns for crypto exchange symbols
patterns = [
r'^([A-Z]{2,10})USDT?$', # BTCUSDT -> BTC
r'^([A-Z]{2,10})_USDT?$', # BTC_USDT -> BTC
r'^([A-Z]{2,10})/USDT?$', # BTC/USDT -> BTC
r'^([A-Z]{2,10})-USDT?$', # BTC-USDT -> BTC
r'^([A-Z]{2,10})USD$', # BTCUSD -> BTC
r'^([A-Z]{2,10})_USD$', # BTC_USD -> BTC
r'^([A-Z]{2,10})/USD$', # BTC/USD -> BTC
r'^([A-Z]{2,10})-USD$', # BTC-USD -> BTC
r'^([A-Z]{2,10})BUSD$', # BTCBUSD -> BTC
r'^([A-Z]{2,10})_BUSD$', # BTC_BUSD -> BTC
r'^([A-Z]{2,10})EUR$', # BTCEUR -> BTC
r'^([A-Z]{2,10})_EUR$', # BTC_EUR -> BTC
r'^([A-Z]{2,10})BTC$', # ETHBTC -> ETH
r'^([A-Z]{2,10})_BTC$', # ETH_BTC -> ETH
]
for pattern in patterns:
match = re.match(pattern, exchange_symbol)
if match:
base_symbol = match.group(1)
# Filter out obvious non-crypto symbols and ensure reasonable length
if len(base_symbol) >= 2 and len(base_symbol) <= 10:
# Skip if it looks like a quote currency
if base_symbol not in ['USDT', 'USDC', 'USD', 'EUR', 'BTC', 'ETH', 'BNB', 'BUSD']:
return base_symbol
elif base_symbol in ['BTC', 'ETH', 'BNB']: # These are valid base symbols
return base_symbol
# If no pattern matches, try simple heuristics
# Remove common suffixes
for suffix in ['USDT', 'USDC', 'USD', 'EUR', 'BUSD']:
if exchange_symbol.endswith(suffix):
base = exchange_symbol[:-len(suffix)]
if len(base) >= 2 and len(base) <= 10:
return base
# Split on common delimiters and take first part
for delimiter in ['_', '/', '-']:
if delimiter in exchange_symbol:
parts = exchange_symbol.split(delimiter)
if len(parts) >= 2:
base = parts[0]
if len(base) >= 2 and len(base) <= 10:
return base
return None
def process_crypto_features():
"""Process crypto features to extract symbols"""
# Try different possible paths
possible_paths = [
Path('data/merged/features/crypto_features.parquet'),
Path('../../data/merged/features/crypto_features.parquet'),
Path('../../../data/merged/features/crypto_features.parquet')
]
crypto_file = None
for path in possible_paths:
if path.exists():
crypto_file = path
break
if crypto_file is None:
print(f"Crypto features file not found in any of these locations:")
for path in possible_paths:
print(f" {path.absolute()}")
return False
print(f"Loading crypto features from: {crypto_file}")
df = pd.read_parquet(crypto_file)
print(f"Loaded {len(df)} rows with {len(df.columns)} columns")
# Check current state
null_symbols_before = df['symbol'].isnull().sum() if 'symbol' in df.columns else len(df)
print(f"Null symbols before: {null_symbols_before} ({null_symbols_before/len(df)*100:.1f}%)")
# Extract symbols
df_fixed = extract_symbol_from_exchange_symbols(df)
# Check results - note that extract_symbol_from_exchange_symbols tracks its own changes
null_symbols_after = df_fixed['symbol'].isnull().sum() if 'symbol' in df_fixed.columns else len(df_fixed)
# Calculate total improvement
total_improvement = null_symbols_before - null_symbols_after
print("Successfully extracted crypto symbols!")
# Save if there's been any improvement or if nulls are very low
if total_improvement > 0 or null_symbols_after <= 2:
# Save the fixed file
df_fixed.to_parquet(crypto_file)
return True
else:
return True # Success even if no changes needed
def process_stocks_features():
"""Process stocks features to extract symbols (if needed)"""
# Try different possible paths
possible_paths = [
Path('data/merged/features/stocks_features.parquet'),
Path('../../data/merged/features/stocks_features.parquet'),
Path('../../../data/merged/features/stocks_features.parquet')
]
stocks_file = None
for path in possible_paths:
if path.exists():
stocks_file = path
break
if stocks_file is None:
return False
df = pd.read_parquet(stocks_file)
# Check if stocks need symbol extraction too
null_symbols_before = df['symbol'].isnull().sum() if 'symbol' in df.columns else len(df)
print(f"Null symbols before: {null_symbols_before} ({null_symbols_before/len(df)*100:.1f}%)")
if null_symbols_before == 0:
print("Stocks symbols are already populated, skipping")
return True
# For stocks, we might have different symbol patterns
# Extract symbols if needed
df_fixed = extract_symbol_from_exchange_symbols(df)
# Check results
null_symbols_after = df_fixed['symbol'].isnull().sum() if 'symbol' in df_fixed.columns else len(df_fixed)
symbols_fixed = null_symbols_before - null_symbols_after
print(f"\nResults:")
print(f"- Symbols fixed: {symbols_fixed}")
print(f"- Null symbols after: {null_symbols_after} ({null_symbols_after/len(df_fixed)*100:.1f}%)")
if symbols_fixed > 0:
# Save the fixed file
print(f"\nSaving fixed stocks features to: {stocks_file}")
df_fixed.to_parquet(stocks_file)
print("File saved successfully!")
return True
else:
print("No symbols were extracted/fixed for stocks")
return True # Not an error for stocks
def main():
"""Main function to extract symbols from exchange symbol data"""
print("=== EXTRACTING SYMBOLS FROM EXCHANGE DATA ===")
print("This script extracts base symbols from symbols.* columns")
print("Example: symbols.gateio:'BTC_USDT' -> symbol:'BTC'")
print()
# Process crypto features
print("Processing crypto features...")
crypto_success = process_crypto_features()
print("\n" + "="*50 + "\n")
# Process stocks features
print("Processing stocks features...")
stocks_success = process_stocks_features()
print("\n" + "="*50)
if crypto_success:
print("Successfully extracted crypto symbols!")
else:
print("Failed to extract crypto symbols!")
if stocks_success:
print("Stocks symbols processing completed!")
else:
print("Failed to process stocks symbols!")
if crypto_success and stocks_success:
print("\nSymbol extraction completed successfully!")
return True
else:
print("\nSome issues occurred during symbol extraction")
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)