|
import subprocess |
|
from pathlib import Path |
|
import sys |
|
import pandas as pd |
|
from datetime import datetime, timedelta |
|
from dotenv import load_dotenv |
|
import gc |
|
import psutil |
|
import os |
|
|
|
|
|
class MergeMemoryOptimizer: |
|
"""Memory optimizer for merge operations""" |
|
|
|
def __init__(self, max_memory_mb=350): |
|
self.max_memory_mb = max_memory_mb |
|
self.process = psutil.Process() |
|
|
|
def get_memory_usage(self): |
|
return self.process.memory_info().rss / 1024 / 1024 |
|
|
|
def cleanup_after_script(self, script_name): |
|
collected = gc.collect() |
|
memory_after = self.get_memory_usage() |
|
print(f"[MemOpt] After {script_name}: {memory_after:.1f}MB (freed {collected} objects)") |
|
|
|
if memory_after > self.max_memory_mb: |
|
print(f"[MemOpt] WARNING: High memory after {script_name}") |
|
|
|
gc.collect() |
|
|
|
return memory_after |
|
|
|
|
|
memory_optimizer = MergeMemoryOptimizer() |
|
|
|
DAYS_OLD = 7 |
|
MERGED_PATH = Path("data/merged/features/merged_features.parquet") |
|
ARCHIVE_DIR = Path("data/merged/archive") |
|
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
def run_script(script, args=None): |
|
cmd = [sys.executable, str(Path(__file__).parent / script)] |
|
if args: |
|
cmd += args |
|
print(f"Running: {' '.join(cmd)}") |
|
|
|
|
|
memory_before = memory_optimizer.get_memory_usage() |
|
print(f"[MemOpt] Before {script}: {memory_before:.1f}MB") |
|
|
|
result = subprocess.run(cmd, check=True) |
|
|
|
|
|
memory_optimizer.cleanup_after_script(script) |
|
|
|
return result |
|
|
|
def archive_old_records(): |
|
feature_files = [ |
|
Path("data/merged/features/crypto_features.parquet"), |
|
Path("data/merged/features/stocks_features.parquet") |
|
] |
|
now = datetime.utcnow() |
|
cutoff = int((now - timedelta(days=DAYS_OLD)).timestamp() * 1000) |
|
|
|
for feature_path in feature_files: |
|
if not feature_path.exists(): |
|
print(f"[WARN] {feature_path} does not exist.") |
|
continue |
|
|
|
df = pd.read_parquet(feature_path) |
|
old = df.loc[df['interval_timestamp'] < cutoff].copy() |
|
keep = df.loc[df['interval_timestamp'] >= cutoff].copy() |
|
|
|
if old.empty: |
|
print(f"[INFO] No records to archive in {feature_path}.") |
|
continue |
|
|
|
|
|
old['archive_date'] = pd.to_datetime(old['interval_timestamp'], unit='ms').dt.strftime('%Y%m%d') |
|
for day, group in old.groupby('archive_date'): |
|
day_dir = ARCHIVE_DIR / day |
|
day_dir.mkdir(parents=True, exist_ok=True) |
|
out_path = day_dir / f"{feature_path.stem}_archived_{day}.parquet" |
|
if out_path.exists(): |
|
existing = pd.read_parquet(out_path) |
|
group = pd.concat([existing, group.drop(columns=['archive_date'])], ignore_index=True) |
|
else: |
|
group = group.drop(columns=['archive_date']) |
|
|
|
group.to_parquet(out_path, index=False) |
|
print(f"[ARCHIVE] {len(group)} records -> {out_path}") |
|
|
|
|
|
keep.to_parquet(feature_path, index=False) |
|
print(f"[INFO] Archived {len(old)} records from {feature_path}. {len(keep)} remain.") |
|
|
|
def store_in_cloud(): |
|
|
|
import os |
|
import sys |
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'src'))) |
|
from data_cloud.cloud_utils import StorageHandler |
|
|
|
|
|
load_dotenv() |
|
endpoint_url = os.getenv("FILEBASE_ENDPOINT") |
|
access_key = os.getenv("FILEBASE_ACCESS_KEY") |
|
secret_key = os.getenv("FILEBASE_SECRET_KEY") |
|
bucket_name = os.getenv("FILEBASE_BUCKET") |
|
if not all([endpoint_url, access_key, secret_key, bucket_name]): |
|
print("[ERROR] Filebase credentials not set in environment.") |
|
return |
|
|
|
storage = StorageHandler(endpoint_url, access_key, secret_key, bucket_name) |
|
|
|
merged_dir = os.path.join("data", "merged") |
|
archive_dir = os.path.join(merged_dir, "archive") |
|
|
|
for root, dirs, files in os.walk(merged_dir): |
|
|
|
if os.path.abspath(root) == os.path.abspath(archive_dir): |
|
continue |
|
for fname in files: |
|
local_path = os.path.join(root, fname) |
|
rel_path = os.path.relpath(local_path, "data") |
|
key = rel_path.replace(os.sep, "/") |
|
with open(local_path, "rb") as f: |
|
data = f.read() |
|
storage.upload(key, data) |
|
|
|
|
|
import time |
|
cutoff = time.time() - DAYS_OLD * 86400 |
|
if os.path.exists(archive_dir): |
|
for fname in os.listdir(archive_dir): |
|
local_path = os.path.join(archive_dir, fname) |
|
if not os.path.isfile(local_path): |
|
continue |
|
mtime = os.path.getmtime(local_path) |
|
if mtime >= cutoff: |
|
rel_path = os.path.relpath(local_path, "data") |
|
key = rel_path.replace(os.sep, "/") |
|
with open(local_path, "rb") as f: |
|
data = f.read() |
|
storage.upload(key, data) |
|
|
|
|
|
def save_raw_features(): |
|
import shutil |
|
raw_dir = Path('data/merged/raw') |
|
raw_dir.mkdir(parents=True, exist_ok=True) |
|
src_stocks = Path('data/merged/features/stocks_features.parquet') |
|
src_crypto = Path('data/merged/features/crypto_features.parquet') |
|
dst_stocks = raw_dir / 'stocks_features.parquet' |
|
dst_crypto = raw_dir / 'crypto_features.parquet' |
|
if src_stocks.exists(): |
|
shutil.copy2(src_stocks, dst_stocks) |
|
print(f"[RAW] Saved stocks features to {dst_stocks}") |
|
else: |
|
print(f"[RAW] Source stocks features not found: {src_stocks}") |
|
if src_crypto.exists(): |
|
shutil.copy2(src_crypto, dst_crypto) |
|
print(f"[RAW] Saved crypto features to {dst_crypto}") |
|
else: |
|
print(f"[RAW] Source crypto features not found: {src_crypto}") |
|
|
|
def main(): |
|
print("[MergeOpt] Starting memory-optimized merge pipeline...") |
|
initial_memory = memory_optimizer.get_memory_usage() |
|
print(f"[MergeOpt] Initial memory: {initial_memory:.1f}MB") |
|
|
|
|
|
run_script('merge_0.py') |
|
run_script('merge_1.py', [ |
|
'--latest', 'data/advisorai-data/features/latest_features.parquet', |
|
'--finnhub', 'data/advisorai-data/features/latest_features.parquet', |
|
'--out', 'data/merged/features/merged_features.parquet' |
|
]) |
|
run_script('merge_2.py') |
|
run_script('merge_3.py') |
|
run_script('merge_4.py') |
|
run_script('separator.py') |
|
run_script('merge_5.py') |
|
run_script('merge_6.py') |
|
run_script('merge_7.py') |
|
|
|
save_raw_features() |
|
|
|
|
|
try: |
|
run_script('extract_symbols.py') |
|
except subprocess.CalledProcessError as e: |
|
print(f"[WARNING] Symbol extraction failed: {e}") |
|
|
|
|
|
try: |
|
run_script('remove_null_symbols.py') |
|
except subprocess.CalledProcessError as e: |
|
print(f"[WARNING] Null symbol removal failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
run_script('merge_temp.py') |
|
except subprocess.CalledProcessError as e: |
|
print(f"[WARNING] Merge temp failed: {e}") |
|
|
|
try: |
|
run_script('merge_sant.py') |
|
except subprocess.CalledProcessError as e: |
|
print(f"[WARNING] Santiment merge failed: {e}") |
|
|
|
try: |
|
run_script('merge_santiment_with_crypto.py') |
|
except subprocess.CalledProcessError as e: |
|
print(f"[WARNING] Santiment-crypto merge failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
archive_old_records() |
|
|
|
|
|
run_script('full_report.py') |
|
|
|
|
|
store_in_cloud() |
|
|
|
|
|
final_memory = memory_optimizer.get_memory_usage() |
|
print(f"[MergeOpt] Final memory usage: {final_memory:.1f}MB") |
|
|
|
if final_memory > 400: |
|
print("[MergeOpt] WARNING: High final memory usage") |
|
memory_optimizer.cleanup_after_script("final cleanup") |
|
|
|
print("[OK] All merge steps, null handling, normalization, and reporting completed successfully.") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|