""" Merge Alpaca bars + quotes + trades into a single feature table. • data/alpaca/*_bars.parquet ← master timeline (daily) • data/alpaca/*_quotes.parquet ← L1 quotes (intraday ticks) • data/alpaca/*_trades.parquet ← raw trades (intraday ticks) The script logs shapes / null counts so you can eyeball data quality. """ from __future__ import annotations import os import sys from glob import glob import pandas as pd import warnings warnings.filterwarnings("ignore", category=FutureWarning) # --------------------------------------------------------------------------- # # CONFIG # --------------------------------------------------------------------------- # # Resolve writable base using central config (fallback to /data) try: from src import config as app_config BASE_DATA_DIR = app_config.DATA_DIR except Exception: BASE_DATA_DIR = os.environ.get("DATA_DIR", "/data") DATA_DIR = os.path.join(BASE_DATA_DIR, "alpaca") os.makedirs(DATA_DIR, exist_ok=True) OUT_FILE = "alpaca_features.parquet" TOLERANCE = 86_400_000 # 1 day in ms for integer timestamps MERGE_DIR = "nearest" # ← **important change** # --------------------------------------------------------------------------- # # HELPERS # --------------------------------------------------------------------------- # def log(title: str, char: str = "=", width: int = 60) -> None: print(f"\n{title.center(width, char)}") def load_parquets(suffix: str) -> pd.DataFrame: """Read every *{suffix}.parquet in DATA_DIR and concat.""" paths = glob(os.path.join(DATA_DIR, f"*{suffix}.parquet")) if not paths: return pd.DataFrame() def normalize(df: pd.DataFrame) -> pd.DataFrame: # Normalize symbol: "XRP/USD" -> "XRP" df["symbol"] = df["symbol"].astype(str).str.replace(r"([A-Z]+)[/_][A-Z]+", r"\1", regex=True) # Convert timestamp to ms since epoch df["timestamp"] = pd.to_datetime(df["timestamp"]) df["timestamp"] = df["timestamp"].astype("int64") // 10**6 return df dfs: list[pd.DataFrame] = [] for p in paths: df = pd.read_parquet(p) df = normalize(df) dfs.append(df) out = pd.concat(dfs, ignore_index=True) return out # --------------------------------------------------------------------------- # # MAIN LOGIC # --------------------------------------------------------------------------- # def build_features() -> pd.DataFrame: bars = load_parquets("_bars") quotes = load_parquets("_quotes") trades = load_parquets("_trades") if bars.empty: raise RuntimeError(f"No '*_bars.parquet' files found in {DATA_DIR}") # Merge symbol-by-symbol so each group is already sorted features = [] symbols = sorted(bars["symbol"].unique()) for sym in symbols: bar_df = bars[bars["symbol"] == sym].sort_values("timestamp").reset_index(drop=True) # nearest quote merge if not quotes.empty: q = quotes[quotes["symbol"] == sym].sort_values("timestamp") if not q.empty: bar_df = pd.merge_asof( bar_df, q, on="timestamp", suffixes=("", "_quote"), tolerance=TOLERANCE, direction=MERGE_DIR, # ← nearest! ) # nearest trade merge if not trades.empty: t = trades[trades["symbol"] == sym].sort_values("timestamp") if not t.empty: bar_df = pd.merge_asof( bar_df, t, on="timestamp", suffixes=("", "_trade"), tolerance=TOLERANCE, direction=MERGE_DIR, # ← nearest! ) features.append(bar_df) feat = pd.concat(features, ignore_index=True) # --------------------------------------------------------------------- # # Fill remaining holes within each symbol # --------------------------------------------------------------------- # feat = ( feat .groupby("symbol", group_keys=False) .apply(lambda df: df.ffill().bfill()) .reset_index(drop=True) ) return feat def save(df: pd.DataFrame) -> None: out_path = os.path.join(DATA_DIR, OUT_FILE) df.to_parquet(out_path, index=False) print(f"\n-> wrote merged features to {out_path}") # --------------------------------------------------------------------------- # def main() -> None: merged = build_features() save(merged) if __name__ == "__main__": log("Merging Alpaca Features") main()