import os from pathlib import Path import pandas as pd try: from src.config import DATA_DIR as CFG_DATA_DIR except Exception: try: from config import DATA_DIR as CFG_DATA_DIR except Exception: CFG_DATA_DIR = "/data" def _resolve_under_data(path_like: str | os.PathLike) -> Path: p = Path(path_like) if p.is_absolute(): return p parts = p.parts if parts and parts[0].lower() == "data": rel = Path(*parts[1:]) if len(parts) > 1 else Path() else: rel = p return Path(CFG_DATA_DIR) / rel def add_sentiment_to_features(features_path, output_path, sentiment_data): # Resolve paths under DATA_DIR features_path = _resolve_under_data(features_path) output_path = _resolve_under_data(output_path) # Load features features_df = pd.read_parquet(features_path) # Load newest sentiment data for all symbols from ownership directory under DATA_DIR ownership_dir = Path(CFG_DATA_DIR) / 'finnhub' / 'ownership' import glob sentiment_files = glob.glob(os.path.join(str(ownership_dir), '*_insider_sentiment.parquet')) newest_rows = [] for file in sentiment_files: df = pd.read_parquet(file) # If file has a 'data' column, expand it if 'data' in df.columns: data_list = df['data'].tolist() # If first item is a numpy array, flatten to list of dicts import numpy as np if data_list and isinstance(data_list[0], np.ndarray): # Flatten array to list flat_list = [dict(item) for item in data_list[0]] df = pd.DataFrame.from_records(flat_list) elif data_list and isinstance(data_list[0], dict): df = pd.DataFrame.from_records(data_list) elif data_list and isinstance(data_list[0], list): expected_cols = ["change", "month", "mspr", "symbol", "year"] df = pd.DataFrame(data_list, columns=expected_cols[:len(data_list[0])]) else: df = pd.DataFrame() # Extract symbol from filename if not present if 'symbol' not in df.columns: symbol = os.path.basename(file).split('_')[0] df['symbol'] = symbol # Only process if both 'year' and 'month' columns exist if 'year' in df.columns and 'month' in df.columns: newest = df.sort_values(['year', 'month'], ascending=[False, False]).iloc[[0]] newest_rows.append(newest) else: print(f"[WARN] Skipping {file}: missing 'year' or 'month' column after expansion.") if newest_rows: all_newest_sentiment = pd.concat(newest_rows, ignore_index=True) else: all_newest_sentiment = pd.DataFrame() # Merge only if sentiment data is available and has 'symbol' column if not all_newest_sentiment.empty and 'symbol' in all_newest_sentiment.columns: merged_df = features_df.merge(all_newest_sentiment, on='symbol', how='left', suffixes=('', '_sentiment')) # Save result merged_df.to_parquet(output_path, compression='snappy') print(f"[INFO] Added newest sentiment data for all available symbols and saved to: {output_path}") else: print("[WARN] No valid sentiment data found to merge. Output not updated.") def main(): features_path = "data/merged/features/stocks_features.parquet" output_path = features_path add_sentiment_to_features(features_path, output_path, None) if __name__ == "__main__": main()