import os from pathlib import Path import pandas as pd import glob import json # Resolve DATA_DIR similar to other modules try: from src.config import DATA_DIR as CFG_DATA_DIR except Exception: try: from config import DATA_DIR as CFG_DATA_DIR except Exception: CFG_DATA_DIR = "/data" def _resolve_under_data(path_like: str | os.PathLike) -> Path: p = Path(path_like) if p.is_absolute(): return p parts = p.parts if parts and parts[0].lower() == "data": rel = Path(*parts[1:]) if len(parts) > 1 else Path() else: rel = p return Path(CFG_DATA_DIR) / rel def add_latest_quotes_to_features(features_path, quotes_dir, output_path): # Resolve paths under DATA_DIR features_path = _resolve_under_data(features_path) quotes_dir = _resolve_under_data(quotes_dir) output_path = _resolve_under_data(output_path) # Load features features_df = pd.read_parquet(features_path) # Load all quote JSONs quote_rows = [] for file in glob.glob(os.path.join(str(quotes_dir), '*_current_quote.parquet')): try: df = pd.read_parquet(file) # If DataFrame has a 'data' column, expand it if 'data' in df.columns: import numpy as np data_list = df['data'].tolist() if data_list and isinstance(data_list[0], np.ndarray): flat_list = [dict(item) for item in data_list[0]] df = pd.DataFrame.from_records(flat_list) elif data_list and isinstance(data_list[0], dict): df = pd.DataFrame.from_records(data_list) elif data_list and isinstance(data_list[0], list): expected_cols = ["c", "d", "dp", "h", "l", "o", "pc", "t"] df = pd.DataFrame(data_list, columns=expected_cols[:len(data_list[0])]) else: df = pd.DataFrame() # If DataFrame has only one row, convert to dict if not df.empty: record = df.iloc[0].to_dict() record['symbol'] = os.path.basename(file).split('_')[0] quote_rows.append(record) except Exception as e: print(f"[WARN] Skipping {file}: {e}") if not quote_rows: print("[WARN] No valid quote data found to merge. Output not updated.") return quotes_df = pd.DataFrame(quote_rows).set_index('symbol') def merge_quote_into_row(row): symbol = row['symbol'] if symbol not in quotes_df.index: return row quote = quotes_df.loc[symbol] field_map = { 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'd': 'change', 'dp': 'price_change_1', } for q_key, f_key in field_map.items(): val = quote.get(q_key) if pd.notnull(val): if f_key in features_df.columns: row[f_key] = val else: row[f'{f_key}_quote'] = val # if feature doesn’t exist, add it # Add extra fields if pd.notnull(quote.get('pc')): row['prev_close'] = quote['pc'] if pd.notnull(quote.get('t')): row['timestamp'] = quote['t'] * 1000 row['datetime'] = pd.to_datetime(quote['t'], unit='s') return row features_df = features_df.apply(merge_quote_into_row, axis=1) features_df.to_parquet(output_path, index=False, compression='snappy') print(f"[INFO] Added latest quote data for all available symbols and saved to: {output_path}") def main(): features_path = "data/merged/features/stocks_features.parquet" quotes_dir = "data/finnhub/stock_data" output_path = features_path add_latest_quotes_to_features(features_path, quotes_dir, output_path) if __name__ == "__main__": main()