advisorai-data-enhanced / src /merge /alpaca_features.py
Maaroufabousaleh
f
c49b21b
"""
Merge Alpaca bars + quotes + trades into a single feature table.
β€’ data/alpaca/*_bars.parquet ← master timeline (daily)
β€’ data/alpaca/*_quotes.parquet ← L1 quotes (intraday ticks)
β€’ data/alpaca/*_trades.parquet ← raw trades (intraday ticks)
The script logs shapes / null counts so you can eyeball data quality.
"""
from __future__ import annotations
import os
import sys
from glob import glob
import pandas as pd
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
# --------------------------------------------------------------------------- #
# CONFIG
# --------------------------------------------------------------------------- #
# Resolve writable base using central config (fallback to /data)
try:
from src import config as app_config
BASE_DATA_DIR = app_config.DATA_DIR
except Exception:
BASE_DATA_DIR = os.environ.get("DATA_DIR", "/data")
DATA_DIR = os.path.join(BASE_DATA_DIR, "alpaca")
os.makedirs(DATA_DIR, exist_ok=True)
OUT_FILE = "alpaca_features.parquet"
TOLERANCE = 86_400_000 # 1 day in ms for integer timestamps
MERGE_DIR = "nearest" # ← **important change**
# --------------------------------------------------------------------------- #
# HELPERS
# --------------------------------------------------------------------------- #
def log(title: str, char: str = "=", width: int = 60) -> None:
print(f"\n{title.center(width, char)}")
def load_parquets(suffix: str) -> pd.DataFrame:
"""Read every *{suffix}.parquet in DATA_DIR and concat."""
paths = glob(os.path.join(DATA_DIR, f"*{suffix}.parquet"))
if not paths:
return pd.DataFrame()
def normalize(df: pd.DataFrame) -> pd.DataFrame:
# Normalize symbol: "XRP/USD" -> "XRP"
df["symbol"] = df["symbol"].astype(str).str.replace(r"([A-Z]+)[/_][A-Z]+", r"\1", regex=True)
# Convert timestamp to ms since epoch
df["timestamp"] = pd.to_datetime(df["timestamp"])
df["timestamp"] = df["timestamp"].astype("int64") // 10**6
return df
dfs: list[pd.DataFrame] = []
for p in paths:
df = pd.read_parquet(p)
df = normalize(df)
dfs.append(df)
out = pd.concat(dfs, ignore_index=True)
return out
# --------------------------------------------------------------------------- #
# MAIN LOGIC
# --------------------------------------------------------------------------- #
def build_features() -> pd.DataFrame:
bars = load_parquets("_bars")
quotes = load_parquets("_quotes")
trades = load_parquets("_trades")
if bars.empty:
raise RuntimeError(f"No '*_bars.parquet' files found in {DATA_DIR}")
# Merge symbol-by-symbol so each group is already sorted
features = []
symbols = sorted(bars["symbol"].unique())
for sym in symbols:
bar_df = bars[bars["symbol"] == sym].sort_values("timestamp").reset_index(drop=True)
# nearest quote merge
if not quotes.empty:
q = quotes[quotes["symbol"] == sym].sort_values("timestamp")
if not q.empty:
bar_df = pd.merge_asof(
bar_df,
q,
on="timestamp",
suffixes=("", "_quote"),
tolerance=TOLERANCE,
direction=MERGE_DIR, # ← nearest!
)
# nearest trade merge
if not trades.empty:
t = trades[trades["symbol"] == sym].sort_values("timestamp")
if not t.empty:
bar_df = pd.merge_asof(
bar_df,
t,
on="timestamp",
suffixes=("", "_trade"),
tolerance=TOLERANCE,
direction=MERGE_DIR, # ← nearest!
)
features.append(bar_df)
feat = pd.concat(features, ignore_index=True)
# --------------------------------------------------------------------- #
# Fill remaining holes within each symbol
# --------------------------------------------------------------------- #
feat = (
feat
.groupby("symbol", group_keys=False)
.apply(lambda df: df.ffill().bfill())
.reset_index(drop=True)
)
return feat
def save(df: pd.DataFrame) -> None:
out_path = os.path.join(DATA_DIR, OUT_FILE)
df.to_parquet(out_path, index=False)
print(f"\n-> wrote merged features to {out_path}")
# --------------------------------------------------------------------------- #
def main() -> None:
merged = build_features()
save(merged)
if __name__ == "__main__":
log("Merging Alpaca Features")
main()