File size: 2,549 Bytes
c49b21b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
from pathlib import Path
import pandas as pd
import glob

# Resolve DATA_DIR similar to other modules
try:
    from src.config import DATA_DIR as CFG_DATA_DIR  # when run as module
except Exception:
    try:
        from config import DATA_DIR as CFG_DATA_DIR  # when run as script
    except Exception:
        CFG_DATA_DIR = "/data"


def _resolve_under_data(path_like: str | os.PathLike) -> Path:
    """Map a repo-style path like 'data/...' to <DATA_DIR>/...; keep absolute paths as-is."""
    p = Path(path_like)
    if p.is_absolute():
        return p
    parts = p.parts
    if parts and parts[0].lower() == "data":
        rel = Path(*parts[1:]) if len(parts) > 1 else Path()
    else:
        rel = p
    return Path(CFG_DATA_DIR) / rel

def load_company_profiles(profiles_dir):
    """
    Load all company profile parquet files from the directory into a DataFrame.
    Returns a DataFrame indexed by symbol.
    """
    profile_files = glob.glob(os.path.join(profiles_dir, '*_company_profile.parquet'))
    profiles = []
    for file in profile_files:
        df = pd.read_parquet(file)
        # Extract symbol from filename
        symbol = os.path.basename(file).split('_')[0]
        df['symbol'] = symbol
        profiles.append(df)
    if profiles:
        profiles_df = pd.concat(profiles, ignore_index=True)
        profiles_df.set_index('symbol', inplace=True)
        return profiles_df
    else:
        return pd.DataFrame()

def merge_company_info_to_features(features_path, profiles_dir, output_path):
    """
    Merge company profile info into stocks features DataFrame by symbol.
    """
    # Resolve all paths under DATA_DIR
    features_path = _resolve_under_data(features_path)
    profiles_dir = _resolve_under_data(profiles_dir)
    output_path = _resolve_under_data(output_path)
    # Load features
    features_df = pd.read_parquet(features_path)
    # Load company profiles
    profiles_df = load_company_profiles(profiles_dir)
    # Merge on symbol
    merged_df = features_df.join(profiles_df, on='symbol', rsuffix='_company')
    # Save result
    merged_df.to_parquet(output_path, compression='snappy')
    return merged_df

# Example usage
def main():
    features_path = "data/merged/features/stocks_features.parquet"
    profiles_dir = "data/finnhub/company_info"
    output_path = features_path
    merge_company_info_to_features(features_path, profiles_dir, output_path)
    print(f"[INFO] Merged company info into features and saved to: {output_path}")

if __name__ == "__main__":
    main()