|
import os |
|
from pathlib import Path |
|
import pandas as pd |
|
|
|
try: |
|
from src.config import DATA_DIR as CFG_DATA_DIR |
|
except Exception: |
|
try: |
|
from config import DATA_DIR as CFG_DATA_DIR |
|
except Exception: |
|
CFG_DATA_DIR = "/data" |
|
|
|
|
|
def _resolve_under_data(path_like: str | os.PathLike) -> Path: |
|
p = Path(path_like) |
|
if p.is_absolute(): |
|
return p |
|
parts = p.parts |
|
if parts and parts[0].lower() == "data": |
|
rel = Path(*parts[1:]) if len(parts) > 1 else Path() |
|
else: |
|
rel = p |
|
return Path(CFG_DATA_DIR) / rel |
|
|
|
|
|
def add_sentiment_to_features(features_path, output_path, sentiment_data): |
|
|
|
features_path = _resolve_under_data(features_path) |
|
output_path = _resolve_under_data(output_path) |
|
|
|
|
|
features_df = pd.read_parquet(features_path) |
|
|
|
|
|
ownership_dir = Path(CFG_DATA_DIR) / 'finnhub' / 'ownership' |
|
import glob |
|
sentiment_files = glob.glob(os.path.join(str(ownership_dir), '*_insider_sentiment.parquet')) |
|
newest_rows = [] |
|
for file in sentiment_files: |
|
df = pd.read_parquet(file) |
|
|
|
if 'data' in df.columns: |
|
data_list = df['data'].tolist() |
|
|
|
import numpy as np |
|
if data_list and isinstance(data_list[0], np.ndarray): |
|
|
|
flat_list = [dict(item) for item in data_list[0]] |
|
df = pd.DataFrame.from_records(flat_list) |
|
elif data_list and isinstance(data_list[0], dict): |
|
df = pd.DataFrame.from_records(data_list) |
|
elif data_list and isinstance(data_list[0], list): |
|
expected_cols = ["change", "month", "mspr", "symbol", "year"] |
|
df = pd.DataFrame(data_list, columns=expected_cols[:len(data_list[0])]) |
|
else: |
|
df = pd.DataFrame() |
|
|
|
if 'symbol' not in df.columns: |
|
symbol = os.path.basename(file).split('_')[0] |
|
df['symbol'] = symbol |
|
|
|
if 'year' in df.columns and 'month' in df.columns: |
|
newest = df.sort_values(['year', 'month'], ascending=[False, False]).iloc[[0]] |
|
newest_rows.append(newest) |
|
else: |
|
print(f"[WARN] Skipping {file}: missing 'year' or 'month' column after expansion.") |
|
if newest_rows: |
|
all_newest_sentiment = pd.concat(newest_rows, ignore_index=True) |
|
else: |
|
all_newest_sentiment = pd.DataFrame() |
|
|
|
if not all_newest_sentiment.empty and 'symbol' in all_newest_sentiment.columns: |
|
merged_df = features_df.merge(all_newest_sentiment, on='symbol', how='left', suffixes=('', '_sentiment')) |
|
|
|
merged_df.to_parquet(output_path, compression='snappy') |
|
print(f"[INFO] Added newest sentiment data for all available symbols and saved to: {output_path}") |
|
else: |
|
print("[WARN] No valid sentiment data found to merge. Output not updated.") |
|
|
|
def main(): |
|
features_path = "data/merged/features/stocks_features.parquet" |
|
output_path = features_path |
|
add_sentiment_to_features(features_path, output_path, None) |
|
|
|
if __name__ == "__main__": |
|
main() |