Maaroufabousaleh
f
c49b21b
import os
from pathlib import Path
import pandas as pd
import glob
# Resolve DATA_DIR similar to other modules
try:
from src.config import DATA_DIR as CFG_DATA_DIR
except Exception:
try:
from config import DATA_DIR as CFG_DATA_DIR
except Exception:
CFG_DATA_DIR = "/data"
def _resolve_under_data(path_like: str | os.PathLike) -> Path:
p = Path(path_like)
if p.is_absolute():
return p
parts = p.parts
if parts and parts[0].lower() == "data":
rel = Path(*parts[1:]) if len(parts) > 1 else Path()
else:
rel = p
return Path(CFG_DATA_DIR) / rel
def add_latest_ratings_to_features(features_path, ratings_dir, output_path):
# Resolve paths under DATA_DIR
features_path = _resolve_under_data(features_path)
ratings_dir = _resolve_under_data(ratings_dir)
output_path = _resolve_under_data(output_path)
# Load features
features_df = pd.read_parquet(features_path)
# Find all ratings files
ratings_files = glob.glob(os.path.join(str(ratings_dir), '*_recommendation_trends.parquet'))
latest_rows = []
for file in ratings_files:
# Read as Parquet file
df = pd.read_parquet(file)
# Get latest row by period (assuming period is YYYY-MM-DD)
if 'period' in df.columns:
df['period'] = pd.to_datetime(df['period'])
latest = df.sort_values('period', ascending=False).iloc[[0]]
latest_rows.append(latest)
if latest_rows:
all_latest_ratings = pd.concat(latest_rows, ignore_index=True)
else:
all_latest_ratings = pd.DataFrame()
# Merge only if ratings data is available and has 'symbol' column
if not all_latest_ratings.empty and 'symbol' in all_latest_ratings.columns:
merged_df = features_df.merge(all_latest_ratings, on='symbol', how='left', suffixes=('', '_ratings'))
merged_df.to_parquet(output_path, compression='snappy')
print(f"[INFO] Added latest ratings data for all available symbols and saved to: {output_path}")
else:
print("[WARN] No valid ratings data found to merge. Output not updated.")
def main():
features_path = "data/merged/features/stocks_features.parquet"
ratings_dir = "data/finnhub/ratings"
output_path = features_path
add_latest_ratings_to_features(features_path, ratings_dir, output_path)
if __name__ == "__main__":
main()