File size: 7,817 Bytes
c49b21b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import subprocess
from pathlib import Path
import sys
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv

DAYS_OLD = 7
MERGED_PATH = Path("data/merged/features/merged_features.parquet")
ARCHIVE_DIR = Path("data/merged/archive")
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)

def run_script(script, args=None):
    cmd = [sys.executable, str(Path(__file__).parent / script)]
    if args:
        cmd += args
    print(f"Running: {' '.join(cmd)}")
    result = subprocess.run(cmd, check=True)
    return result

def archive_old_records():
    feature_files = [
        Path("data/merged/features/crypto_features.parquet"),
        Path("data/merged/features/stocks_features.parquet")
    ]
    now = datetime.utcnow()
    cutoff = int((now - timedelta(days=DAYS_OLD)).timestamp() * 1000)

    for feature_path in feature_files:
        if not feature_path.exists():
            print(f"[WARN] {feature_path} does not exist.")
            continue

        df = pd.read_parquet(feature_path)
        old = df.loc[df['interval_timestamp'] < cutoff].copy()
        keep = df.loc[df['interval_timestamp'] >= cutoff].copy()

        if old.empty:
            print(f"[INFO] No records to archive in {feature_path}.")
            continue

        # Group by day (UTC) and write each group to a separate parquet file under archive/{day}/
        old['archive_date'] = pd.to_datetime(old['interval_timestamp'], unit='ms').dt.strftime('%Y%m%d')
        for day, group in old.groupby('archive_date'):
            day_dir = ARCHIVE_DIR / day
            day_dir.mkdir(parents=True, exist_ok=True)
            out_path = day_dir / f"{feature_path.stem}_archived_{day}.parquet"
            if out_path.exists():
                existing = pd.read_parquet(out_path)
                group = pd.concat([existing, group.drop(columns=['archive_date'])], ignore_index=True)
            else:
                group = group.drop(columns=['archive_date'])

            group.to_parquet(out_path, index=False)
            print(f"[ARCHIVE] {len(group)} records -> {out_path}")

        # Save the remaining (unarchived) records back to the feature file
        keep.to_parquet(feature_path, index=False)
        print(f"[INFO] Archived {len(old)} records from {feature_path}. {len(keep)} remain.")

def store_in_cloud():
    # Import StorageHandler from cloud_utils, ensuring src is in sys.path
    import os
    import sys
    sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'src')))
    from data_cloud.cloud_utils import StorageHandler

    # Filebase credentials from env
    load_dotenv()
    endpoint_url = os.getenv("FILEBASE_ENDPOINT")
    access_key = os.getenv("FILEBASE_ACCESS_KEY")
    secret_key = os.getenv("FILEBASE_SECRET_KEY")
    bucket_name = os.getenv("FILEBASE_BUCKET")
    if not all([endpoint_url, access_key, secret_key, bucket_name]):
        print("[ERROR] Filebase credentials not set in environment.")
        return

    storage = StorageHandler(endpoint_url, access_key, secret_key, bucket_name)

    merged_dir = os.path.join("data", "merged")
    archive_dir = os.path.join(merged_dir, "archive")
    # Upload all files in merged except archive
    for root, dirs, files in os.walk(merged_dir):
        # Skip archive subdir for now
        if os.path.abspath(root) == os.path.abspath(archive_dir):
            continue
        for fname in files:
            local_path = os.path.join(root, fname)
            rel_path = os.path.relpath(local_path, "data")
            key = rel_path.replace(os.sep, "/")
            with open(local_path, "rb") as f:
                data = f.read()
            storage.upload(key, data)

    # Only upload archive files newer than DAYS_OLD days
    import time
    cutoff = time.time() - DAYS_OLD * 86400
    if os.path.exists(archive_dir):
        for fname in os.listdir(archive_dir):
            local_path = os.path.join(archive_dir, fname)
            if not os.path.isfile(local_path):
                continue
            mtime = os.path.getmtime(local_path)
            if mtime >= cutoff:
                rel_path = os.path.relpath(local_path, "data")
                key = rel_path.replace(os.sep, "/")
                with open(local_path, "rb") as f:
                    data = f.read()
                storage.upload(key, data)

# Save stocks and crypto features to data/merged/raw
def save_raw_features():
    import shutil
    raw_dir = Path('data/merged/raw')
    raw_dir.mkdir(parents=True, exist_ok=True)
    src_stocks = Path('data/merged/features/stocks_features.parquet')
    src_crypto = Path('data/merged/features/crypto_features.parquet')
    dst_stocks = raw_dir / 'stocks_features.parquet'
    dst_crypto = raw_dir / 'crypto_features.parquet'
    if src_stocks.exists():
        shutil.copy2(src_stocks, dst_stocks)
        print(f"[RAW] Saved stocks features to {dst_stocks}")
    else:
        print(f"[RAW] Source stocks features not found: {src_stocks}")
    if src_crypto.exists():
        shutil.copy2(src_crypto, dst_crypto)
        print(f"[RAW] Saved crypto features to {dst_crypto}")
    else:
        print(f"[RAW] Source crypto features not found: {src_crypto}")

def main():
    # Run all merge steps
    run_script('merge_0.py')
    run_script('merge_1.py', [
        '--latest', 'data/advisorai-data/features/latest_features.parquet',
        '--finnhub', 'data/advisorai-data/features/latest_features.parquet',
        '--out', 'data/merged/features/merged_features.parquet'
    ])
    run_script('merge_2.py')
    run_script('merge_3.py')
    run_script('merge_4.py')
    run_script('separator.py')
    run_script('merge_5.py')
    run_script('merge_6.py')
    run_script('merge_7.py')

    save_raw_features()

    # Extract symbols from exchange symbol data before data fillers
    try:
        run_script('extract_symbols.py')
    except subprocess.CalledProcessError as e:
        print(f"[WARNING] Symbol extraction failed: {e}")

    # Remove rows with null symbols after symbol extraction
    try:
        run_script('remove_null_symbols.py')
    except subprocess.CalledProcessError as e:
        print(f"[WARNING] Null symbol removal failed: {e}")

    # # Run normalization scripts with error handling
    # run_script('stocks_data_filler.py')
    
    # try:
    #     run_script('crypto_data_filler.py')
    # except subprocess.CalledProcessError as e:
    #     print(f"[WARNING] Crypto data filler failed: {e}")

    # Merge temp files into merged - with error handling
    try:
        run_script('merge_temp.py')
    except subprocess.CalledProcessError as e:
        print(f"[WARNING] Merge temp failed: {e}")

    try:
        run_script('merge_sant.py')
    except subprocess.CalledProcessError as e:
        print(f"[WARNING] Santiment merge failed: {e}")
        
    try:
        run_script('merge_santiment_with_crypto.py')
    except subprocess.CalledProcessError as e:
        print(f"[WARNING] Santiment-crypto merge failed: {e}")

    # # Final comprehensive null value handling - clean up any remaining nulls
    # try:
    #     run_script('run_final_null_handling.py')
    # except subprocess.CalledProcessError as e:
    #     print(f"[WARNING] Final null handling failed: {e}")

    # # Normalize features
    # run_script('normalize.py')
    # # Normalize train files for both crypto and stocks
    # run_script('norm/crypto.py', ['--train'])
    # run_script('norm/stocks.py', ['--train'])

    # Archive old records
    archive_old_records()

    # Generate and store full report
    run_script('full_report.py')

    # Store all merged data in cloud
    store_in_cloud()

    print("[OK] All merge steps, null handling, normalization, and reporting completed successfully.")

if __name__ == "__main__":
    main()