Maaroufabousaleh
f
c49b21b
"""
Unified report generator for merged features - generates all 3 reports with automatic column discovery.
Supports merged, crypto, and stocks feature files with dynamic schema detection.
Usage:
# Generate all 3 reports
python unified_report_generator.py --generate-all
# Generate specific reports
python unified_report_generator.py --merged-input data/merged/features/merged_features.parquet
python unified_report_generator.py --crypto-input data/merged/features/crypto_features.parquet
python unified_report_generator.py --stocks-input data/merged/features/stocks_features.parquet
# Custom paths
python unified_report_generator.py \
--merged-input path/to/merged.parquet \
--crypto-input path/to/crypto.parquet \
--stocks-input path/to/stocks.parquet \
--output-dir reports/ \
--baseline-schema schemas/baseline.json
"""
import argparse
import pandas as pd
import json
import os
from datetime import datetime
from typing import Dict, List, Set, Optional
from pathlib import Path
def categorize_column_by_name(col_name: str) -> str:
"""Automatically categorize columns based on naming patterns."""
col_lower = col_name.lower()
# Exchange-related
if col_name.startswith(('symbols.', 'exchangePrices.')):
return "Exchange Data"
# Performance metrics
if col_name.startswith(('performance.', 'rankDiffs.')):
return "Performance Metrics"
# Technical indicators
if col_lower in ['rsi', 'macd', 'macd_signal', 'macd_histogram', 'atr', 'bb_width',
'bb_position', 'stoch_k', 'stoch_d', 'cci', 'mfi'] or col_name.startswith('roc_'):
return "Technical Indicators"
# Price-related
if any(word in col_lower for word in ['price', 'open', 'volume', 'marketcap', 'volatility']):
return "Price & Volume"
# On-chain/blockchain
if any(word in col_lower for word in ['transaction', 'gas', 'fees', 'tx_', 'blockchain']):
return "On-chain Features"
# Sentiment
if any(word in col_lower for word in ['sentiment', 'social', 'confidence']):
return "Sentiment Features"
# Temporal
if any(word in col_lower for word in ['timestamp', 'hour', 'day', 'weekend', 'trading_hours']):
return "Temporal Features"
# Completeness metrics
if 'completeness' in col_lower or 'data_quality' in col_lower:
return "Data Quality Metrics"
# Market/Exchange info
if col_lower in ['dominance', 'rank', 'stable', 'cg_id']:
return "Market Metrics"
# Flags
if col_name.startswith('is_') or col_lower in ['stable']:
return "Asset Flags"
# Metadata
if col_name.startswith('_') or col_lower in ['backup_id', 'stock_market', 'blockchain_network']:
return "Metadata"
# Links
if col_name.startswith('links.'):
return "External Links"
# Interaction features
if any(word in col_lower for word in ['correlation', 'convergence', 'alignment', 'trend']):
return "Interaction Features"
# Default for unknown
return "Other Features"
def load_baseline_schema(baseline_path: str) -> Set[str]:
"""Load baseline schema if it exists."""
if os.path.exists(baseline_path):
try:
with open(baseline_path, 'r') as f:
baseline = json.load(f)
return set(baseline.get('columns', []))
except (json.JSONDecodeError, KeyError):
print(f"Warning: Could not load baseline schema from {baseline_path}")
return set()
def save_baseline_schema(columns: List[str], baseline_path: str):
"""Save current columns as baseline schema."""
os.makedirs(os.path.dirname(baseline_path), exist_ok=True)
schema = {
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_columns": len(columns),
"columns": sorted(columns)
}
with open(baseline_path, 'w') as f:
json.dump(schema, f, indent=2)
def detect_asset_type(df: pd.DataFrame, all_columns: List[str]) -> str:
"""Detect asset type based on column patterns."""
if any(col.startswith('symbols.') for col in all_columns):
return "crypto"
elif "stock_market" in all_columns:
return "stocks"
elif "is_crypto" in all_columns and "is_stock" in all_columns:
return "mixed"
else:
return "unknown"
def get_asset_specific_stats(df: pd.DataFrame, asset_type: str, all_columns: List[str]) -> Dict:
"""Get asset-specific statistics."""
stats = {"asset_type": asset_type}
if asset_type == "crypto":
# Crypto-specific stats
if "stable" in df.columns:
stats["stable_coins_count"] = int(df["stable"].sum())
if "cg_id" in df.columns or "symbol" in df.columns:
symbol_col = "symbol" if "symbol" in df.columns else "cg_id"
stats["unique_crypto_assets"] = df[symbol_col].nunique()
# Exchange coverage
exchange_columns = [col for col in all_columns if col.startswith(("symbols.", "exchangePrices."))]
if exchange_columns:
exchange_coverage = {}
for col in exchange_columns[:10]: # Limit to avoid huge reports
coverage = (df[col].notna().sum() / len(df)) * 100
exchange_coverage[col] = round(coverage, 2)
stats["exchange_coverage"] = exchange_coverage
elif asset_type == "stocks":
# Stock-specific stats
if "symbol" in df.columns:
stats["unique_stock_symbols"] = df["symbol"].nunique()
if "stock_market" in df.columns:
stats["stock_market_distribution"] = df["stock_market"].value_counts().to_dict()
if "is_trading_hours" in df.columns:
trading_hours_pct = (df["is_trading_hours"].sum() / len(df)) * 100
stats["trading_hours_coverage_pct"] = round(trading_hours_pct, 2)
elif asset_type == "mixed":
# Mixed dataset stats
if "is_crypto" in df.columns:
stats["crypto_records"] = int(df["is_crypto"].sum())
if "is_stock" in df.columns:
stats["stock_records"] = int(df["is_stock"].sum())
if "symbol" in df.columns:
stats["total_unique_symbols"] = df["symbol"].nunique()
return stats
def generate_report(input_path: str, output_path: str, baseline_schema_path: Optional[str] = None, report_type: str = "auto") -> bool:
"""Generate a feature report for any dataset type."""
# Check if input file exists
if not os.path.exists(input_path):
print(f"Warning: Input file not found: {input_path}")
return False
try:
# Load the dataset
df = pd.read_parquet(input_path)
all_columns = list(df.columns)
print(f"Processing {input_path}...")
print(f" - Shape: {df.shape}")
print(f" - Columns: {len(all_columns)}")
# Load baseline schema for comparison
baseline_columns = set()
if baseline_schema_path:
baseline_columns = load_baseline_schema(baseline_schema_path)
# Identify new columns
current_columns = set(all_columns)
new_columns = current_columns - baseline_columns if baseline_columns else set()
# Auto-categorize all columns
categories = {}
new_features_by_category = {}
for col in all_columns:
category = categorize_column_by_name(col)
if category not in categories:
categories[category] = {"count": 0, "features": []}
new_features_by_category[category] = []
categories[category]["features"].append(col)
categories[category]["count"] += 1
# Track if it's a new feature
if col in new_columns:
new_features_by_category[category].append(col)
# Clean up empty new feature lists
new_features_by_category = {k: v for k, v in new_features_by_category.items() if v}
# Basic dataset stats
ts_col = df["interval_timestamp"] if "interval_timestamp" in df.columns else df.iloc[:, 0]
if pd.api.types.is_datetime64_any_dtype(ts_col):
start_ts = int(ts_col.min().timestamp() * 1000)
end_ts = int(ts_col.max().timestamp() * 1000)
else:
start_ts = int(ts_col.min())
end_ts = int(ts_col.max())
memory_mb = df.memory_usage(deep=True).sum() / 1024**2
# Data quality
missing = df.isna().sum().to_dict()
total_cells = df.size
non_missing = int(df.notna().sum().sum())
completeness_pct = (non_missing / total_cells) * 100
avg_dq_score = df.get("data_quality_score", pd.Series(dtype=float)).mean()
# Detect asset type and get specific stats
asset_type = detect_asset_type(df, all_columns)
asset_stats = get_asset_specific_stats(df, asset_type, all_columns)
# Build the report
report = {
"generated_at_utc": datetime.utcnow().isoformat() + "Z",
"report_type": report_type,
"schema_version": "unified_v1.0",
"source_file": os.path.basename(input_path),
"dataset_info": {
"shape": list(df.shape),
"memory_usage_mb": round(memory_mb, 2),
"time_range": {"start": start_ts, "end": end_ts},
"total_columns": len(all_columns),
"total_categories": len(categories),
"new_columns_count": len(new_columns),
**asset_stats
},
"feature_categories": categories,
"data_quality": {
"overall_completeness_pct": round(completeness_pct, 2),
"missing_values_by_column": missing,
"average_data_quality_score": None if pd.isna(avg_dq_score) else round(avg_dq_score, 4)
}
}
# Add new features section if any exist
if new_columns:
report["new_features"] = {
"total_new_features": len(new_columns),
"new_features_by_category": new_features_by_category,
"all_new_features": sorted(list(new_columns))
}
# Add baseline comparison if available
if baseline_columns:
removed_columns = baseline_columns - current_columns
if removed_columns:
report["removed_features"] = sorted(list(removed_columns))
# Ensure output directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Write report
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
print(f" Report generated: {output_path}")
print(f" - {len(categories)} categories")
if new_columns:
print(f" - {len(new_columns)} new features detected")
return True
except Exception as e:
print(f" Error processing {input_path}: {str(e)}")
return False
def main():
parser = argparse.ArgumentParser(description=__doc__)
# Input files
parser.add_argument("--merged-input", default="data/merged/features/merged_features.parquet", help="Path to merged_features.parquet")
parser.add_argument("--crypto-input", default="data/merged/features/crypto_features.parquet", help="Path to crypto_features.parquet")
parser.add_argument("--stocks-input", default="data/merged/features/stocks_features.parquet", help="Path to stocks_features.parquet")
# Output settings
parser.add_argument("--output-dir", default="data/merged/features/", help="Output directory for reports")
parser.add_argument("--baseline-schema", default="schemas/baseline.json", help="Path to baseline schema JSON")
# Convenience flags
parser.add_argument("--generate-all", action="store_true", help="Generate all reports using default paths")
args = parser.parse_args()
# Default paths for --generate-all
if args.generate_all:
default_paths = {
"merged": "data/merged/features/merged_features.parquet",
"crypto": "data/merged/features/crypto_features.parquet",
"stocks": "data/merged/features/stocks_features.parquet"
}
print("Generating all feature reports...")
success_count = 0
for report_type, input_path in default_paths.items():
output_dir = args.output_dir if args.output_dir else "data/merged/features/"
output_path = os.path.join(output_dir, f"{report_type}_report.json")
baseline_path = args.baseline_schema if args.baseline_schema else f"schemas/{report_type}_baseline.json"
if generate_report(input_path, output_path, baseline_path, report_type):
success_count += 1
print(f"\nGenerated {success_count}/3 reports successfully!")
# Update baseline schema with merged features if it exists
if args.baseline_schema and os.path.exists(default_paths["merged"]):
df = pd.read_parquet(default_paths["merged"])
save_baseline_schema(list(df.columns), args.baseline_schema)
print(f"Updated baseline schema: {args.baseline_schema}")
return
# Individual file processing
reports_generated = 0
if args.merged_input:
output_dir = args.output_dir if args.output_dir else "data/merged/features/"
output_path = os.path.join(output_dir, "merged_report.json")
if generate_report(args.merged_input, output_path, args.baseline_schema, "merged"):
reports_generated += 1
if args.crypto_input:
output_dir = args.output_dir if args.output_dir else "data/merged/features/"
output_path = os.path.join(output_dir, "crypto_report.json")
if generate_report(args.crypto_input, output_path, args.baseline_schema, "crypto"):
reports_generated += 1
# Print crypto count and data quality
try:
with open(output_path, "r") as f:
report = json.load(f)
count = report.get("dataset_info", {}).get("shape", [None])[0]
dq = report.get("data_quality", {}).get("overall_completeness_pct", None)
print(f"[CRYPTO] Count: {count}, Data Quality: {dq}%")
except Exception as e:
print(f"[CRYPTO] Error reading report for stats: {e}")
if args.stocks_input:
output_dir = args.output_dir if args.output_dir else "data/merged/features/"
output_path = os.path.join(output_dir, "stocks_report.json")
if generate_report(args.stocks_input, output_path, args.baseline_schema, "stocks"):
reports_generated += 1
# Print stocks count and data quality
try:
with open(output_path, "r") as f:
report = json.load(f)
count = report.get("dataset_info", {}).get("shape", [None])[0]
dq = report.get("data_quality", {}).get("overall_completeness_pct", None)
print(f"[STOCKS] Count: {count}, Data Quality: {dq}%")
except Exception as e:
print(f"[STOCKS] Error reading report for stats: {e}")
if reports_generated == 0:
print("No input files specified. Use --generate-all or specify input files.")
parser.print_help()
else:
print(f"\nGenerated {reports_generated} report(s) successfully!")
if __name__ == "__main__":
main()