""" Merge your features JSON with coin-metadata JSON, or merge a crypto-bubbles Parquet into your merged-features Parquet. Usage: # JSON mode (default): python merge_2.py json \ --features data/merged/features/merged_features.json \ --coininfo data/coininfo/coin_metadata.json \ --out merged_with_coininfo.ndjson # Parquet mode: python merge_2.py parquet \ --base data/merged/features/merged_features.parquet \ --bubbles data/crypto-bubbles/crypto_bubbles_2025-07-15.parquet \ --out data/merged/features/merged_features.parquet """ import json import pandas as pd from datetime import datetime from pathlib import Path import argparse def merge_parquet_features(base_fp: Path, bubbles_fp: Path, out_fp: Path): """ Merge crypto bubbles Parquet into merged features Parquet. For overlapping columns, non-null bubbles values overwrite base. New columns from bubbles are added. """ import time base = pd.read_parquet(base_fp) bubbles = pd.read_parquet(bubbles_fp) # Fill missing interval_timestamp with current UTC ms, ensure int (ms) robustly now_ms = int(time.time() * 1000) def to_millis(val): if pd.isna(val): return pd.NA if isinstance(val, (pd.Timestamp, datetime)): return val.value // 1_000_000 try: return int(float(val)) except (ValueError, TypeError): try: return int(pd.to_datetime(val).value // 1_000_000) except Exception: return pd.NA for df in (base, bubbles): if 'interval_timestamp' in df.columns: df['interval_timestamp'] = df['interval_timestamp'].fillna(now_ms) df['interval_timestamp'] = df['interval_timestamp'].map(to_millis).astype('Int64') # Rename 'slug' in bubbles to 'symbol' for join, if needed bubbles_renamed = bubbles.rename(columns={"slug": "symbol"}) if "slug" in bubbles.columns else bubbles # Remove duplicate columns, keep first occurrence bubbles_renamed = bubbles_renamed.loc[:, ~bubbles_renamed.columns.duplicated()] # Use 'symbol' and 'interval_timestamp' as join keys keys = [k for k in ["symbol", "interval_timestamp"] if k in base.columns and k in bubbles_renamed.columns] if not all(k in base.columns for k in keys) or not all(k in bubbles_renamed.columns for k in keys): raise ValueError("No common key columns found for merge (need 'symbol' and 'interval_timestamp').") # Normalize symbol column in both DataFrames for robust merging def normalize_symbol_col(df): df['symbol'] = df['symbol'].astype(str).str.lower() # Map 'ripple' <-> 'xrp' both ways for robust merging df['symbol'] = df['symbol'].replace({'ripple': 'xrp', 'xrp/ripple': 'xrp'}) # Also add a step to map 'xrp' to 'ripple' for output if needed df['symbol'] = df['symbol'].replace({'xrp': 'ripple'}) return df bubbles_renamed = normalize_symbol_col(bubbles_renamed) base = normalize_symbol_col(base) # Pick top 50 by rank if present, else first 50 unique if 'rank' in bubbles_renamed.columns: sorted_bubbles = bubbles_renamed.sort_values('rank') else: sorted_bubbles = bubbles_renamed top_50 = sorted_bubbles.drop_duplicates(subset='symbol').head(50) # Always include these must-have assets must_have = {'xrp', 'ripple', 'solana','eth','btc','bitcoin','ethereum', 'sol', 'ada', 'cardano'} extra = bubbles_renamed[bubbles_renamed['symbol'].isin(must_have)] # Combine and dedupe on available keys dedup_cols = ['symbol'] if 'interval_timestamp' in pd.concat([top_50, extra]).columns: dedup_cols.append('interval_timestamp') bubbles_renamed = pd.concat([top_50, extra]).drop_duplicates(subset=dedup_cols) base = base.set_index(keys) bubbles_renamed = bubbles_renamed.set_index(keys) # Union of columns, with bubbles first so its columns take precedence all_cols = list(dict.fromkeys(bubbles_renamed.columns.tolist() + base.columns.tolist())) base = base.reindex(columns=all_cols) bubbles_renamed = bubbles_renamed.reindex(columns=all_cols) merged = bubbles_renamed.combine_first(base).reset_index() # Ensure 'symbol' column matches the index value for every row if 'symbol' in merged.columns: merged['symbol'] = merged['symbol'].astype(str) # Always output 'ripple' instead of 'xrp' merged['symbol'] = merged['symbol'].replace({'xrp': 'ripple'}) # Ensure interval_timestamp is never null in the output and is int (ms), robustly if 'interval_timestamp' in merged.columns: merged['interval_timestamp'] = merged['interval_timestamp'].fillna(now_ms) merged['interval_timestamp'] = merged['interval_timestamp'].map(to_millis).astype('Int64') # Set is_crypto=1 where is_crypto is null or symbol is 'solana' if 'is_crypto' in merged.columns: merged['is_crypto'] = merged['is_crypto'].fillna(1) if 'symbol' in merged.columns: merged.loc[merged['symbol'].str.lower() == 'solana', 'is_crypto'] = 1 # Drop unwanted columns for col in ['id', 'name', 'image']: if col in merged.columns: merged = merged.drop(columns=col) merged.to_parquet(out_fp, index=False) print(f"OK Merged top 50 from {bubbles_fp} into {base_fp} -> {out_fp} " f"({merged.shape[0]} rows x {merged.shape[1]} cols)") def load_json_records(path: Path): """ Load a JSON file that is either: - A single JSON object, - A list of objects, - Or NDJSON (one JSON object per line). Returns: List[dict] """ text = path.read_text(encoding="utf8") try: data = json.loads(text) except json.JSONDecodeError: data = [json.loads(line) for line in text.splitlines() if line.strip()] if isinstance(data, dict): data = [data] return data def main_json_merge(features_fp: Path, coininfo_fp: Path, out_fp: Path): # 1) load features feats = load_json_records(features_fp) df_feats = pd.json_normalize(feats) # 2) load coin metadata coins = load_json_records(coininfo_fp) df_coins = pd.json_normalize(coins) # 3) prepare a normalized join key df_feats["join_key"] = df_feats["symbol"] df_coins["join_key"] = df_coins["slug"].str.lower() # 4) merge df_merged = df_feats.merge( df_coins, on="join_key", how="left", suffixes=("", "_meta") ) # 5) clean up df_merged = df_merged.drop(columns=["join_key"]) if "symbol_meta" in df_merged.columns: df_merged = df_merged.drop(columns=["symbol_meta"]) # 6) write out as NDJSON out_fp.parent.mkdir(parents=True, exist_ok=True) with open(out_fp, "w", encoding="utf8") as f: for rec in df_merged.to_dict(orient="records"): f.write(json.dumps(rec) + "\n") print(f"✅ Wrote {len(df_merged)} merged records to {out_fp}") def cli(): p = argparse.ArgumentParser(__doc__) sub = p.add_subparsers(dest="mode", required=False) # JSON merge mode (default) js = sub.add_parser("json", help="Merge features JSON with coininfo JSON") js.add_argument("--features", type=Path, default=Path("data/merged/features/merged_features.json"), help="Path to merged_features JSON/NDJSON") js.add_argument("--coininfo", type=Path, default=Path("data/coininfo/coin_metadata.json"), help="Path to coin-metadata JSON/NDJSON") js.add_argument("--out", type=Path, default=Path("merged_with_coininfo.ndjson"), help="Where to write the merged NDJSON") # Parquet merge mode pq = sub.add_parser("parquet", help="Merge crypto bubbles Parquet into merged features Parquet") pq.add_argument("--base", type=Path, default=Path("data/merged/features/merged_features.parquet"), help="Path to base merged-features Parquet") pq.add_argument("--bubbles", type=Path, default=None, help="Path to crypto bubbles Parquet (if not set, will use latest in data/crypto-bubbles/)") pq.add_argument("--out", type=Path, default=Path("data/merged/features/merged_features.parquet"), help="Where to write the merged Parquet") args = p.parse_args() # If no subcommand is given, default to 'parquet' and reparse if args.mode is None: import sys sys.argv.insert(1, "parquet") args = p.parse_args() # If bubbles is not provided, find the latest crypto_bubbles_*.parquet if args.mode == "parquet": if args.bubbles is None or not args.bubbles.exists(): import glob import os bubble_files = glob.glob(os.path.join("data", "crypto-bubbles", "crypto_bubbles_*.parquet")) if not bubble_files: raise FileNotFoundError("No crypto_bubbles_*.parquet files found in data/crypto-bubbles/") latest_bubble = max(bubble_files, key=os.path.getmtime) print(f"[INFO] Using latest bubbles file: {latest_bubble}") args.bubbles = Path(latest_bubble) merge_parquet_features(args.base, args.bubbles, args.out) else: main_json_merge(args.features, args.coininfo, args.out) if __name__ == "__main__": cli()