Maaroufabousaleh
f
c49b21b
import os
from pathlib import Path
import pandas as pd
import glob
import json
# Resolve DATA_DIR similar to other modules
try:
from src.config import DATA_DIR as CFG_DATA_DIR
except Exception:
try:
from config import DATA_DIR as CFG_DATA_DIR
except Exception:
CFG_DATA_DIR = "/data"
def _resolve_under_data(path_like: str | os.PathLike) -> Path:
p = Path(path_like)
if p.is_absolute():
return p
parts = p.parts
if parts and parts[0].lower() == "data":
rel = Path(*parts[1:]) if len(parts) > 1 else Path()
else:
rel = p
return Path(CFG_DATA_DIR) / rel
def add_latest_quotes_to_features(features_path, quotes_dir, output_path):
# Resolve paths under DATA_DIR
features_path = _resolve_under_data(features_path)
quotes_dir = _resolve_under_data(quotes_dir)
output_path = _resolve_under_data(output_path)
# Load features
features_df = pd.read_parquet(features_path)
# Load all quote JSONs
quote_rows = []
for file in glob.glob(os.path.join(str(quotes_dir), '*_current_quote.parquet')):
try:
df = pd.read_parquet(file)
# If DataFrame has a 'data' column, expand it
if 'data' in df.columns:
import numpy as np
data_list = df['data'].tolist()
if data_list and isinstance(data_list[0], np.ndarray):
flat_list = [dict(item) for item in data_list[0]]
df = pd.DataFrame.from_records(flat_list)
elif data_list and isinstance(data_list[0], dict):
df = pd.DataFrame.from_records(data_list)
elif data_list and isinstance(data_list[0], list):
expected_cols = ["c", "d", "dp", "h", "l", "o", "pc", "t"]
df = pd.DataFrame(data_list, columns=expected_cols[:len(data_list[0])])
else:
df = pd.DataFrame()
# If DataFrame has only one row, convert to dict
if not df.empty:
record = df.iloc[0].to_dict()
record['symbol'] = os.path.basename(file).split('_')[0]
quote_rows.append(record)
except Exception as e:
print(f"[WARN] Skipping {file}: {e}")
if not quote_rows:
print("[WARN] No valid quote data found to merge. Output not updated.")
return
quotes_df = pd.DataFrame(quote_rows).set_index('symbol')
def merge_quote_into_row(row):
symbol = row['symbol']
if symbol not in quotes_df.index:
return row
quote = quotes_df.loc[symbol]
field_map = {
'o': 'open',
'h': 'high',
'l': 'low',
'c': 'close',
'd': 'change',
'dp': 'price_change_1',
}
for q_key, f_key in field_map.items():
val = quote.get(q_key)
if pd.notnull(val):
if f_key in features_df.columns:
row[f_key] = val
else:
row[f'{f_key}_quote'] = val # if feature doesn’t exist, add it
# Add extra fields
if pd.notnull(quote.get('pc')):
row['prev_close'] = quote['pc']
if pd.notnull(quote.get('t')):
row['timestamp'] = quote['t'] * 1000
row['datetime'] = pd.to_datetime(quote['t'], unit='s')
return row
features_df = features_df.apply(merge_quote_into_row, axis=1)
features_df.to_parquet(output_path, index=False, compression='snappy')
print(f"[INFO] Added latest quote data for all available symbols and saved to: {output_path}")
def main():
features_path = "data/merged/features/stocks_features.parquet"
quotes_dir = "data/finnhub/stock_data"
output_path = features_path
add_latest_quotes_to_features(features_path, quotes_dir, output_path)
if __name__ == "__main__":
main()