Maaroufabousaleh
f
c49b21b
raw
history blame
9.5 kB
"""
Merge your features JSON with coin-metadata JSON, or merge a crypto-bubbles
Parquet into your merged-features Parquet.
Usage:
# JSON mode (default):
python merge_2.py json \
--features data/merged/features/merged_features.json \
--coininfo data/coininfo/coin_metadata.json \
--out merged_with_coininfo.ndjson
# Parquet mode:
python merge_2.py parquet \
--base data/merged/features/merged_features.parquet \
--bubbles data/crypto-bubbles/crypto_bubbles_2025-07-15.parquet \
--out data/merged/features/merged_features.parquet
"""
import json
import pandas as pd
from datetime import datetime
from pathlib import Path
import argparse
def merge_parquet_features(base_fp: Path, bubbles_fp: Path, out_fp: Path):
"""
Merge crypto bubbles Parquet into merged features Parquet.
For overlapping columns, non-null bubbles values overwrite base.
New columns from bubbles are added.
"""
import time
base = pd.read_parquet(base_fp)
bubbles = pd.read_parquet(bubbles_fp)
# Fill missing interval_timestamp with current UTC ms, ensure int (ms) robustly
now_ms = int(time.time() * 1000)
def to_millis(val):
if pd.isna(val):
return pd.NA
if isinstance(val, (pd.Timestamp, datetime)):
return val.value // 1_000_000
try:
return int(float(val))
except (ValueError, TypeError):
try:
return int(pd.to_datetime(val).value // 1_000_000)
except Exception:
return pd.NA
for df in (base, bubbles):
if 'interval_timestamp' in df.columns:
df['interval_timestamp'] = df['interval_timestamp'].fillna(now_ms)
df['interval_timestamp'] = df['interval_timestamp'].map(to_millis).astype('Int64')
# Rename 'slug' in bubbles to 'symbol' for join, if needed
bubbles_renamed = bubbles.rename(columns={"slug": "symbol"}) if "slug" in bubbles.columns else bubbles
# Remove duplicate columns, keep first occurrence
bubbles_renamed = bubbles_renamed.loc[:, ~bubbles_renamed.columns.duplicated()]
# Use 'symbol' and 'interval_timestamp' as join keys
keys = [k for k in ["symbol", "interval_timestamp"] if k in base.columns and k in bubbles_renamed.columns]
if not all(k in base.columns for k in keys) or not all(k in bubbles_renamed.columns for k in keys):
raise ValueError("No common key columns found for merge (need 'symbol' and 'interval_timestamp').")
# Normalize symbol column in both DataFrames for robust merging
def normalize_symbol_col(df):
df['symbol'] = df['symbol'].astype(str).str.lower()
# Map 'ripple' <-> 'xrp' both ways for robust merging
df['symbol'] = df['symbol'].replace({'ripple': 'xrp', 'xrp/ripple': 'xrp'})
# Also add a step to map 'xrp' to 'ripple' for output if needed
df['symbol'] = df['symbol'].replace({'xrp': 'ripple'})
return df
bubbles_renamed = normalize_symbol_col(bubbles_renamed)
base = normalize_symbol_col(base)
# Pick top 50 by rank if present, else first 50 unique
if 'rank' in bubbles_renamed.columns:
sorted_bubbles = bubbles_renamed.sort_values('rank')
else:
sorted_bubbles = bubbles_renamed
top_50 = sorted_bubbles.drop_duplicates(subset='symbol').head(50)
# Always include these must-have assets
must_have = {'xrp', 'ripple', 'solana','eth','btc','bitcoin','ethereum', 'sol', 'ada', 'cardano'}
extra = bubbles_renamed[bubbles_renamed['symbol'].isin(must_have)]
# Combine and dedupe on available keys
dedup_cols = ['symbol']
if 'interval_timestamp' in pd.concat([top_50, extra]).columns:
dedup_cols.append('interval_timestamp')
bubbles_renamed = pd.concat([top_50, extra]).drop_duplicates(subset=dedup_cols)
base = base.set_index(keys)
bubbles_renamed = bubbles_renamed.set_index(keys)
# Union of columns, with bubbles first so its columns take precedence
all_cols = list(dict.fromkeys(bubbles_renamed.columns.tolist() + base.columns.tolist()))
base = base.reindex(columns=all_cols)
bubbles_renamed = bubbles_renamed.reindex(columns=all_cols)
merged = bubbles_renamed.combine_first(base).reset_index()
# Ensure 'symbol' column matches the index value for every row
if 'symbol' in merged.columns:
merged['symbol'] = merged['symbol'].astype(str)
# Always output 'ripple' instead of 'xrp'
merged['symbol'] = merged['symbol'].replace({'xrp': 'ripple'})
# Ensure interval_timestamp is never null in the output and is int (ms), robustly
if 'interval_timestamp' in merged.columns:
merged['interval_timestamp'] = merged['interval_timestamp'].fillna(now_ms)
merged['interval_timestamp'] = merged['interval_timestamp'].map(to_millis).astype('Int64')
# Set is_crypto=1 where is_crypto is null or symbol is 'solana'
if 'is_crypto' in merged.columns:
merged['is_crypto'] = merged['is_crypto'].fillna(1)
if 'symbol' in merged.columns:
merged.loc[merged['symbol'].str.lower() == 'solana', 'is_crypto'] = 1
# Drop unwanted columns
for col in ['id', 'name', 'image']:
if col in merged.columns:
merged = merged.drop(columns=col)
merged.to_parquet(out_fp, index=False)
print(f"OK Merged top 50 from {bubbles_fp} into {base_fp} -> {out_fp} "
f"({merged.shape[0]} rows x {merged.shape[1]} cols)")
def load_json_records(path: Path):
"""
Load a JSON file that is either:
- A single JSON object,
- A list of objects,
- Or NDJSON (one JSON object per line).
Returns: List[dict]
"""
text = path.read_text(encoding="utf8")
try:
data = json.loads(text)
except json.JSONDecodeError:
data = [json.loads(line) for line in text.splitlines() if line.strip()]
if isinstance(data, dict):
data = [data]
return data
def main_json_merge(features_fp: Path, coininfo_fp: Path, out_fp: Path):
# 1) load features
feats = load_json_records(features_fp)
df_feats = pd.json_normalize(feats)
# 2) load coin metadata
coins = load_json_records(coininfo_fp)
df_coins = pd.json_normalize(coins)
# 3) prepare a normalized join key
df_feats["join_key"] = df_feats["symbol"]
df_coins["join_key"] = df_coins["slug"].str.lower()
# 4) merge
df_merged = df_feats.merge(
df_coins,
on="join_key",
how="left",
suffixes=("", "_meta")
)
# 5) clean up
df_merged = df_merged.drop(columns=["join_key"])
if "symbol_meta" in df_merged.columns:
df_merged = df_merged.drop(columns=["symbol_meta"])
# 6) write out as NDJSON
out_fp.parent.mkdir(parents=True, exist_ok=True)
with open(out_fp, "w", encoding="utf8") as f:
for rec in df_merged.to_dict(orient="records"):
f.write(json.dumps(rec) + "\n")
print(f"✅ Wrote {len(df_merged)} merged records to {out_fp}")
def cli():
p = argparse.ArgumentParser(__doc__)
sub = p.add_subparsers(dest="mode", required=False)
# JSON merge mode (default)
js = sub.add_parser("json", help="Merge features JSON with coininfo JSON")
js.add_argument("--features", type=Path,
default=Path("data/merged/features/merged_features.json"),
help="Path to merged_features JSON/NDJSON")
js.add_argument("--coininfo", type=Path,
default=Path("data/coininfo/coin_metadata.json"),
help="Path to coin-metadata JSON/NDJSON")
js.add_argument("--out", type=Path,
default=Path("merged_with_coininfo.ndjson"),
help="Where to write the merged NDJSON")
# Parquet merge mode
pq = sub.add_parser("parquet", help="Merge crypto bubbles Parquet into merged features Parquet")
pq.add_argument("--base", type=Path,
default=Path("data/merged/features/merged_features.parquet"),
help="Path to base merged-features Parquet")
pq.add_argument("--bubbles", type=Path,
default=None,
help="Path to crypto bubbles Parquet (if not set, will use latest in data/crypto-bubbles/)")
pq.add_argument("--out", type=Path,
default=Path("data/merged/features/merged_features.parquet"),
help="Where to write the merged Parquet")
args = p.parse_args()
# If no subcommand is given, default to 'parquet' and reparse
if args.mode is None:
import sys
sys.argv.insert(1, "parquet")
args = p.parse_args()
# If bubbles is not provided, find the latest crypto_bubbles_*.parquet
if args.mode == "parquet":
if args.bubbles is None or not args.bubbles.exists():
import glob
import os
bubble_files = glob.glob(os.path.join("data", "crypto-bubbles", "crypto_bubbles_*.parquet"))
if not bubble_files:
raise FileNotFoundError("No crypto_bubbles_*.parquet files found in data/crypto-bubbles/")
latest_bubble = max(bubble_files, key=os.path.getmtime)
print(f"[INFO] Using latest bubbles file: {latest_bubble}")
args.bubbles = Path(latest_bubble)
merge_parquet_features(args.base, args.bubbles, args.out)
else:
main_json_merge(args.features, args.coininfo, args.out)
if __name__ == "__main__":
cli()