""" Unified report generator for merged features - generates all 3 reports with automatic column discovery. Supports merged, crypto, and stocks feature files with dynamic schema detection. Usage: # Generate all 3 reports python unified_report_generator.py --generate-all # Generate specific reports python unified_report_generator.py --merged-input data/merged/features/merged_features.parquet python unified_report_generator.py --crypto-input data/merged/features/crypto_features.parquet python unified_report_generator.py --stocks-input data/merged/features/stocks_features.parquet # Custom paths python unified_report_generator.py \ --merged-input path/to/merged.parquet \ --crypto-input path/to/crypto.parquet \ --stocks-input path/to/stocks.parquet \ --output-dir reports/ \ --baseline-schema schemas/baseline.json """ import argparse import pandas as pd import json import os from datetime import datetime from typing import Dict, List, Set, Optional from pathlib import Path def categorize_column_by_name(col_name: str) -> str: """Automatically categorize columns based on naming patterns.""" col_lower = col_name.lower() # Exchange-related if col_name.startswith(('symbols.', 'exchangePrices.')): return "Exchange Data" # Performance metrics if col_name.startswith(('performance.', 'rankDiffs.')): return "Performance Metrics" # Technical indicators if col_lower in ['rsi', 'macd', 'macd_signal', 'macd_histogram', 'atr', 'bb_width', 'bb_position', 'stoch_k', 'stoch_d', 'cci', 'mfi'] or col_name.startswith('roc_'): return "Technical Indicators" # Price-related if any(word in col_lower for word in ['price', 'open', 'volume', 'marketcap', 'volatility']): return "Price & Volume" # On-chain/blockchain if any(word in col_lower for word in ['transaction', 'gas', 'fees', 'tx_', 'blockchain']): return "On-chain Features" # Sentiment if any(word in col_lower for word in ['sentiment', 'social', 'confidence']): return "Sentiment Features" # Temporal if any(word in col_lower for word in ['timestamp', 'hour', 'day', 'weekend', 'trading_hours']): return "Temporal Features" # Completeness metrics if 'completeness' in col_lower or 'data_quality' in col_lower: return "Data Quality Metrics" # Market/Exchange info if col_lower in ['dominance', 'rank', 'stable', 'cg_id']: return "Market Metrics" # Flags if col_name.startswith('is_') or col_lower in ['stable']: return "Asset Flags" # Metadata if col_name.startswith('_') or col_lower in ['backup_id', 'stock_market', 'blockchain_network']: return "Metadata" # Links if col_name.startswith('links.'): return "External Links" # Interaction features if any(word in col_lower for word in ['correlation', 'convergence', 'alignment', 'trend']): return "Interaction Features" # Default for unknown return "Other Features" def load_baseline_schema(baseline_path: str) -> Set[str]: """Load baseline schema if it exists.""" if os.path.exists(baseline_path): try: with open(baseline_path, 'r') as f: baseline = json.load(f) return set(baseline.get('columns', [])) except (json.JSONDecodeError, KeyError): print(f"Warning: Could not load baseline schema from {baseline_path}") return set() def save_baseline_schema(columns: List[str], baseline_path: str): """Save current columns as baseline schema.""" os.makedirs(os.path.dirname(baseline_path), exist_ok=True) schema = { "generated_at": datetime.utcnow().isoformat() + "Z", "total_columns": len(columns), "columns": sorted(columns) } with open(baseline_path, 'w') as f: json.dump(schema, f, indent=2) def detect_asset_type(df: pd.DataFrame, all_columns: List[str]) -> str: """Detect asset type based on column patterns.""" if any(col.startswith('symbols.') for col in all_columns): return "crypto" elif "stock_market" in all_columns: return "stocks" elif "is_crypto" in all_columns and "is_stock" in all_columns: return "mixed" else: return "unknown" def get_asset_specific_stats(df: pd.DataFrame, asset_type: str, all_columns: List[str]) -> Dict: """Get asset-specific statistics.""" stats = {"asset_type": asset_type} if asset_type == "crypto": # Crypto-specific stats if "stable" in df.columns: stats["stable_coins_count"] = int(df["stable"].sum()) if "cg_id" in df.columns or "symbol" in df.columns: symbol_col = "symbol" if "symbol" in df.columns else "cg_id" stats["unique_crypto_assets"] = df[symbol_col].nunique() # Exchange coverage exchange_columns = [col for col in all_columns if col.startswith(("symbols.", "exchangePrices."))] if exchange_columns: exchange_coverage = {} for col in exchange_columns[:10]: # Limit to avoid huge reports coverage = (df[col].notna().sum() / len(df)) * 100 exchange_coverage[col] = round(coverage, 2) stats["exchange_coverage"] = exchange_coverage elif asset_type == "stocks": # Stock-specific stats if "symbol" in df.columns: stats["unique_stock_symbols"] = df["symbol"].nunique() if "stock_market" in df.columns: stats["stock_market_distribution"] = df["stock_market"].value_counts().to_dict() if "is_trading_hours" in df.columns: trading_hours_pct = (df["is_trading_hours"].sum() / len(df)) * 100 stats["trading_hours_coverage_pct"] = round(trading_hours_pct, 2) elif asset_type == "mixed": # Mixed dataset stats if "is_crypto" in df.columns: stats["crypto_records"] = int(df["is_crypto"].sum()) if "is_stock" in df.columns: stats["stock_records"] = int(df["is_stock"].sum()) if "symbol" in df.columns: stats["total_unique_symbols"] = df["symbol"].nunique() return stats def generate_report(input_path: str, output_path: str, baseline_schema_path: Optional[str] = None, report_type: str = "auto") -> bool: """Generate a feature report for any dataset type.""" # Check if input file exists if not os.path.exists(input_path): print(f"Warning: Input file not found: {input_path}") return False try: # Load the dataset df = pd.read_parquet(input_path) all_columns = list(df.columns) print(f"Processing {input_path}...") print(f" - Shape: {df.shape}") print(f" - Columns: {len(all_columns)}") # Load baseline schema for comparison baseline_columns = set() if baseline_schema_path: baseline_columns = load_baseline_schema(baseline_schema_path) # Identify new columns current_columns = set(all_columns) new_columns = current_columns - baseline_columns if baseline_columns else set() # Auto-categorize all columns categories = {} new_features_by_category = {} for col in all_columns: category = categorize_column_by_name(col) if category not in categories: categories[category] = {"count": 0, "features": []} new_features_by_category[category] = [] categories[category]["features"].append(col) categories[category]["count"] += 1 # Track if it's a new feature if col in new_columns: new_features_by_category[category].append(col) # Clean up empty new feature lists new_features_by_category = {k: v for k, v in new_features_by_category.items() if v} # Basic dataset stats ts_col = df["interval_timestamp"] if "interval_timestamp" in df.columns else df.iloc[:, 0] if pd.api.types.is_datetime64_any_dtype(ts_col): start_ts = int(ts_col.min().timestamp() * 1000) end_ts = int(ts_col.max().timestamp() * 1000) else: start_ts = int(ts_col.min()) end_ts = int(ts_col.max()) memory_mb = df.memory_usage(deep=True).sum() / 1024**2 # Data quality missing = df.isna().sum().to_dict() total_cells = df.size non_missing = int(df.notna().sum().sum()) completeness_pct = (non_missing / total_cells) * 100 avg_dq_score = df.get("data_quality_score", pd.Series(dtype=float)).mean() # Detect asset type and get specific stats asset_type = detect_asset_type(df, all_columns) asset_stats = get_asset_specific_stats(df, asset_type, all_columns) # Build the report report = { "generated_at_utc": datetime.utcnow().isoformat() + "Z", "report_type": report_type, "schema_version": "unified_v1.0", "source_file": os.path.basename(input_path), "dataset_info": { "shape": list(df.shape), "memory_usage_mb": round(memory_mb, 2), "time_range": {"start": start_ts, "end": end_ts}, "total_columns": len(all_columns), "total_categories": len(categories), "new_columns_count": len(new_columns), **asset_stats }, "feature_categories": categories, "data_quality": { "overall_completeness_pct": round(completeness_pct, 2), "missing_values_by_column": missing, "average_data_quality_score": None if pd.isna(avg_dq_score) else round(avg_dq_score, 4) } } # Add new features section if any exist if new_columns: report["new_features"] = { "total_new_features": len(new_columns), "new_features_by_category": new_features_by_category, "all_new_features": sorted(list(new_columns)) } # Add baseline comparison if available if baseline_columns: removed_columns = baseline_columns - current_columns if removed_columns: report["removed_features"] = sorted(list(removed_columns)) # Ensure output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) # Write report with open(output_path, "w") as f: json.dump(report, f, indent=2) print(f" Report generated: {output_path}") print(f" - {len(categories)} categories") if new_columns: print(f" - {len(new_columns)} new features detected") return True except Exception as e: print(f" Error processing {input_path}: {str(e)}") return False def main(): parser = argparse.ArgumentParser(description=__doc__) # Input files parser.add_argument("--merged-input", default="data/merged/features/merged_features.parquet", help="Path to merged_features.parquet") parser.add_argument("--crypto-input", default="data/merged/features/crypto_features.parquet", help="Path to crypto_features.parquet") parser.add_argument("--stocks-input", default="data/merged/features/stocks_features.parquet", help="Path to stocks_features.parquet") # Output settings parser.add_argument("--output-dir", default="data/merged/features/", help="Output directory for reports") parser.add_argument("--baseline-schema", default="schemas/baseline.json", help="Path to baseline schema JSON") # Convenience flags parser.add_argument("--generate-all", action="store_true", help="Generate all reports using default paths") args = parser.parse_args() # Default paths for --generate-all if args.generate_all: default_paths = { "merged": "data/merged/features/merged_features.parquet", "crypto": "data/merged/features/crypto_features.parquet", "stocks": "data/merged/features/stocks_features.parquet" } print("Generating all feature reports...") success_count = 0 for report_type, input_path in default_paths.items(): output_dir = args.output_dir if args.output_dir else "data/merged/features/" output_path = os.path.join(output_dir, f"{report_type}_report.json") baseline_path = args.baseline_schema if args.baseline_schema else f"schemas/{report_type}_baseline.json" if generate_report(input_path, output_path, baseline_path, report_type): success_count += 1 print(f"\nGenerated {success_count}/3 reports successfully!") # Update baseline schema with merged features if it exists if args.baseline_schema and os.path.exists(default_paths["merged"]): df = pd.read_parquet(default_paths["merged"]) save_baseline_schema(list(df.columns), args.baseline_schema) print(f"Updated baseline schema: {args.baseline_schema}") return # Individual file processing reports_generated = 0 if args.merged_input: output_dir = args.output_dir if args.output_dir else "data/merged/features/" output_path = os.path.join(output_dir, "merged_report.json") if generate_report(args.merged_input, output_path, args.baseline_schema, "merged"): reports_generated += 1 if args.crypto_input: output_dir = args.output_dir if args.output_dir else "data/merged/features/" output_path = os.path.join(output_dir, "crypto_report.json") if generate_report(args.crypto_input, output_path, args.baseline_schema, "crypto"): reports_generated += 1 # Print crypto count and data quality try: with open(output_path, "r") as f: report = json.load(f) count = report.get("dataset_info", {}).get("shape", [None])[0] dq = report.get("data_quality", {}).get("overall_completeness_pct", None) print(f"[CRYPTO] Count: {count}, Data Quality: {dq}%") except Exception as e: print(f"[CRYPTO] Error reading report for stats: {e}") if args.stocks_input: output_dir = args.output_dir if args.output_dir else "data/merged/features/" output_path = os.path.join(output_dir, "stocks_report.json") if generate_report(args.stocks_input, output_path, args.baseline_schema, "stocks"): reports_generated += 1 # Print stocks count and data quality try: with open(output_path, "r") as f: report = json.load(f) count = report.get("dataset_info", {}).get("shape", [None])[0] dq = report.get("data_quality", {}).get("overall_completeness_pct", None) print(f"[STOCKS] Count: {count}, Data Quality: {dq}%") except Exception as e: print(f"[STOCKS] Error reading report for stats: {e}") if reports_generated == 0: print("No input files specified. Use --generate-all or specify input files.") parser.print_help() else: print(f"\nGenerated {reports_generated} report(s) successfully!") if __name__ == "__main__": main()