import os from pathlib import Path import pandas as pd import glob # Resolve DATA_DIR similar to other modules try: from src.config import DATA_DIR as CFG_DATA_DIR except Exception: try: from config import DATA_DIR as CFG_DATA_DIR except Exception: CFG_DATA_DIR = "/data" def _resolve_under_data(path_like: str | os.PathLike) -> Path: p = Path(path_like) if p.is_absolute(): return p parts = p.parts if parts and parts[0].lower() == "data": rel = Path(*parts[1:]) if len(parts) > 1 else Path() else: rel = p return Path(CFG_DATA_DIR) / rel def add_latest_ratings_to_features(features_path, ratings_dir, output_path): # Resolve paths under DATA_DIR features_path = _resolve_under_data(features_path) ratings_dir = _resolve_under_data(ratings_dir) output_path = _resolve_under_data(output_path) # Load features features_df = pd.read_parquet(features_path) # Find all ratings files ratings_files = glob.glob(os.path.join(str(ratings_dir), '*_recommendation_trends.parquet')) latest_rows = [] for file in ratings_files: # Read as Parquet file df = pd.read_parquet(file) # Get latest row by period (assuming period is YYYY-MM-DD) if 'period' in df.columns: df['period'] = pd.to_datetime(df['period']) latest = df.sort_values('period', ascending=False).iloc[[0]] latest_rows.append(latest) if latest_rows: all_latest_ratings = pd.concat(latest_rows, ignore_index=True) else: all_latest_ratings = pd.DataFrame() # Merge only if ratings data is available and has 'symbol' column if not all_latest_ratings.empty and 'symbol' in all_latest_ratings.columns: merged_df = features_df.merge(all_latest_ratings, on='symbol', how='left', suffixes=('', '_ratings')) merged_df.to_parquet(output_path, compression='snappy') print(f"[INFO] Added latest ratings data for all available symbols and saved to: {output_path}") else: print("[WARN] No valid ratings data found to merge. Output not updated.") def main(): features_path = "data/merged/features/stocks_features.parquet" ratings_dir = "data/finnhub/ratings" output_path = features_path add_latest_ratings_to_features(features_path, ratings_dir, output_path) if __name__ == "__main__": main()