Maaroufabousaleh
f
c49b21b
"""
fetch_crypto_bubbles.py – Fetches CryptoBubbles data, converts to Parquet and JSON report,
then uploads both directly to Filebase S3 instead of local storage.
✱ 2025-07-11 – switched backend from local filesystem to Filebase S3
β€’ Uses boto3 against FILEBASE_ENDPOINT
β€’ No local disk writes; everything streams directly to S3
Requirements:
β€’ FILEBASE_ENDPOINT env var, e.g. https://s3.filebase.com
β€’ FILEBASE_ACCESS_KEY and FILEBASE_SECRET_KEY env vars
β€’ FILEBASE_BUCKET env var with your bucket name
β€’ dotenv for loading env vars from .env (optional)
"""
import os
import sys
import json
import datetime as _dt
import argparse
from io import BytesIO
from collections import defaultdict
import numpy as np
import pandas as pd
import requests
# Ensure src is in sys.path for direct script execution
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from data_cloud.cloud_utils import StorageHandler
from dotenv import load_dotenv
# ─── Configuration ────────────────────────────────────────────────────────────
load_dotenv()
URL = os.getenv("CRYPTOBUBBLES_URL", "https://cryptobubbles.net/backend/data/bubbles1000.usd.json")
# Filebase S3 credentials
FILEBASE_ENDPOINT = os.getenv("FILEBASE_ENDPOINT")
FILEBASE_ACCESS_KEY = os.getenv("FILEBASE_ACCESS_KEY")
FILEBASE_SECRET_KEY = os.getenv("FILEBASE_SECRET_KEY")
FILEBASE_BUCKET = os.getenv("FILEBASE_BUCKET")
if not all([FILEBASE_ENDPOINT, FILEBASE_ACCESS_KEY, FILEBASE_SECRET_KEY, FILEBASE_BUCKET]):
print("[ERROR] FILEBASE_ENDPOINT, FILEBASE_ACCESS_KEY, FILEBASE_SECRET_KEY, and FILEBASE_BUCKET must be set")
sys.exit(1)
# boto3 S3 client config
from botocore.config import Config
CFG = Config(
signature_version="s3v4",
s3={"addressing_style": "path"},
)
# ─── Data fetch & processing ─────────────────────────────────────────────────
def fetch_json(url: str = URL, timeout: int = 15):
resp = requests.get(url, timeout=timeout)
resp.raise_for_status()
payload = resp.json()
return payload.get("data", payload) if isinstance(payload, dict) else payload
def to_dataframe(raw):
return pd.json_normalize(raw)
def categorize_columns(df: pd.DataFrame):
groups = defaultdict(list)
for col in df.columns:
if "." in col:
prefix, _ = col.split('.', 1)
groups[prefix].append(col)
else:
groups['base'].append(col)
nice = {
'base': 'Base Features',
'symbols': 'Symbols',
'performance': 'Performance',
'rankDiffs': 'Rank Differences',
'exchangePrices': 'Exchange Prices',
'links': 'Links',
}
fc = {}
for key, cols in groups.items():
name = nice.get(key, key.capitalize())
fc[name] = {'count': len(cols), 'features': cols}
return fc
def generate_report(df, configuration):
now = _dt.datetime.utcnow().isoformat()
mem_mb = df.memory_usage(deep=True).sum() / 1024**2
dataset_info = {
'shape': [df.shape[0], df.shape[1]],
'memory_usage_mb': mem_mb,
'time_range': {'start': None, 'end': None},
}
fc = categorize_columns(df)
missing = df.isna().sum().to_dict()
total_cells = df.shape[0] * df.shape[1]
non_missing = df.count().sum()
completeness = non_missing / total_cells * 100
col_quals = [(df.shape[0] - m) / df.shape[0] for m in missing.values()]
avg_quality = float(np.mean(col_quals))
data_quality = {
'completeness': completeness,
'missing_values_by_column': missing,
'avg_quality_score': avg_quality,
}
report = {
'timestamp': now,
'dataset_info': dataset_info,
'feature_categories': fc,
'data_quality': data_quality,
'feature_importance': {},
'configuration': configuration,
}
return report
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description='Fetch CryptoBubbles, upload to Filebase')
parser.add_argument('--prefix', default='crypto-bubbles', help='S3 key prefix')
args = parser.parse_args()
prefix = args.prefix.rstrip('/')
today = _dt.date.today().isoformat()
raw = fetch_json()
df = to_dataframe(raw)
# configuration placeholder
configuration = {
'enable_advanced_indicators': True,
'enable_feature_selection': True,
'enable_anomaly_detection': True,
'max_correlation_threshold': 0.95,
'min_feature_importance': 0.001,
'outlier_detection_method': 'iqr',
'feature_scaling': True,
}
report = generate_report(df, configuration)
# prepare Parquet bytes
buf = BytesIO()
df.to_parquet(buf, index=False)
parquet_data = buf.getvalue()
# prepare JSON report bytes
report_json = json.dumps(report, indent=2).encode()
# Use StorageHandler for unified cloud/local upload
storage = StorageHandler(
endpoint_url=None,
access_key=None,
secret_key=None,
bucket_name=None,
local_base="data"
)
key_parquet = f"{prefix}/crypto_bubbles_{today}.parquet"
key_report = f"{prefix}/crypto_bubbles_report_{today}.json"
# Upload Parquet
try:
storage.upload(key_parquet, parquet_data, content_type='application/octet-stream')
print(f"[OK] Uploaded Parquet -> {storage.get_last_mode()}:{key_parquet}")
except Exception as e:
print(f"[ERROR] Failed uploading Parquet: {e}", file=sys.stderr)
# Upload JSON report
try:
storage.upload(key_report, report_json, content_type='application/json')
print(f"[OK] Uploaded report -> {storage.get_last_mode()}:{key_report}")
except Exception as e:
print(f"[ERROR] Failed uploading report: {e}", file=sys.stderr)
if __name__ == '__main__':
main()