import pandas as pd import os import numpy as np from datetime import datetime, timedelta DAYS_OLD = 7 MERGED_DIR = "data/merged/features" TEMP_DIR = "data/merged/temp" # Helper: safely cast a value to match a target column's dtype (e.g., drop tz on datetimes) def _cast_value_for_column(target_series: pd.Series, value): try: # If target is datetime64[ns], ensure assigned value is tz-naive if pd.api.types.is_datetime64_any_dtype(target_series.dtype): v = pd.to_datetime(value, errors='coerce', utc=True) if isinstance(v, pd.Timestamp): return v.tz_localize(None) return v return value except Exception: return value def fill_nulls_from_temp(df_merged, df_temp): """ Fill null values in df_merged using non-null values from df_temp for the same symbol + interval_timestamp combination. Returns the number of null values filled. """ nulls_filled = 0 if df_merged.empty or df_temp.empty: return nulls_filled # Create lookup key for efficient matching key_cols = ["symbol", "interval_timestamp"] # Check if key columns exist in both dataframes if not all(col in df_merged.columns for col in key_cols): print("[WARN] Key columns missing in merged data, skipping null filling") return nulls_filled if not all(col in df_temp.columns for col in key_cols): print("[WARN] Key columns missing in temp data, skipping null filling") return nulls_filled # Create a lookup dictionary from temp data # Format: {(symbol, timestamp): {column: value, ...}} temp_lookup = {} for _, row in df_temp.iterrows(): key = (row['symbol'], row['interval_timestamp']) temp_lookup[key] = row.to_dict() # Find common columns between merged and temp (excluding keys) common_cols = [col for col in df_merged.columns if col in df_temp.columns and col not in key_cols] if not common_cols: print("[WARN] No common columns found for null filling") return nulls_filled # Track columns with null values before processing null_cols_before = [] for col in common_cols: if df_merged[col].isnull().any(): null_cols_before.append(col) if not null_cols_before: print("[INFO] No null values found in common columns") return nulls_filled print(f"[INFO] Attempting to fill nulls in {len(null_cols_before)} columns: {null_cols_before}") # Fill null values row by row for idx, row in df_merged.iterrows(): key = (row['symbol'], row['interval_timestamp']) # Check if we have corresponding temp data for this key if key in temp_lookup: temp_row = temp_lookup[key] # Fill null values for each column for col in null_cols_before: try: # Use more robust null checking to handle arrays/scalars row_val = row[col] temp_val = temp_row.get(col) # Check if row value is null (handle both scalar and array cases) row_is_null = pd.isnull(row_val) if hasattr(row_is_null, '__len__') and len(row_is_null) > 1: row_is_null = row_is_null.any() # For arrays, check if any are null # Check if temp value is not null temp_is_not_null = not pd.isnull(temp_val) if hasattr(temp_is_not_null, '__len__') and len(temp_is_not_null) > 1: temp_is_not_null = temp_is_not_null.all() # For arrays, check if all are not null if row_is_null and temp_is_not_null: # Fill the null value with dtype-compatible casting df_merged.at[idx, col] = _cast_value_for_column(df_merged[col], temp_val) nulls_filled += 1 except Exception as e: # Skip problematic columns with a warning print(f"[WARN] Could not process column '{col}' for null filling: {e}") continue if nulls_filled > 0: print(f"[INFO] Successfully filled {nulls_filled} null values from temp data") # Report which columns were improved for col in null_cols_before: nulls_remaining = df_merged[col].isnull().sum() print(f"[INFO] Column '{col}': {nulls_remaining} nulls remaining") return nulls_filled # Helper to filter new records (DISABLED - now keeps ALL data for accumulative merging) def filter_new(df): # IMPORTANT: Return ALL data instead of filtering by days # This ensures accumulative merging from day one return df.copy() def merge_temp_to_merged(temp_name, merged_name): temp_path = os.path.join(TEMP_DIR, temp_name) merged_path = os.path.join(MERGED_DIR, merged_name) if not os.path.exists(temp_path): print(f"[WARN] Temp file missing: {temp_path}") return if not os.path.exists(merged_path): print(f"[WARN] Merged file missing: {merged_path}") return df_temp = pd.read_parquet(temp_path) df_merged = pd.read_parquet(merged_path) # Check if required columns exist required_cols = ["symbol", "interval_timestamp"] missing_cols_temp = [col for col in required_cols if col not in df_temp.columns] missing_cols_merged = [col for col in required_cols if col not in df_merged.columns] if missing_cols_temp: print(f"[ERROR] Missing columns in temp file {temp_name}: {missing_cols_temp}") print(f"[INFO] Available columns in temp: {list(df_temp.columns)}") return if missing_cols_merged: print(f"[ERROR] Missing columns in merged file {merged_name}: {missing_cols_merged}") print(f"[INFO] Available columns in merged: {list(df_merged.columns)}") return new_temp = filter_new(df_temp) # Step 1: Fill null values in merged data using temp data for same symbol+timestamp nulls_filled = fill_nulls_from_temp(df_merged, df_temp) # Step 2: Only add truly new rows (not already in merged) key_cols = ["symbol", "interval_timestamp"] merged_keys = set(tuple(row) for row in df_merged[key_cols].values) new_rows = new_temp[~new_temp[key_cols].apply(tuple, axis=1).isin(merged_keys)] if new_rows.empty and nulls_filled == 0: print(f"[INFO] No new records to add from {temp_name} and no nulls filled") return df_final = pd.concat([df_merged, new_rows], ignore_index=True) df_final.to_parquet(merged_path, index=False) print(f"[OK] Added {len(new_rows)} new records from {temp_name} to {merged_name}, filled {nulls_filled} null values") def merge_all_to_train(archive_name, features_name, temp_name, train_name): """ Merge archive, features, and temp files into a deduplicated train file under merge/train/. Uniqueness is enforced on (symbol, interval_timestamp). Also performs null filling between different sources. """ ARCHIVE_DIR = os.path.join(MERGED_DIR, "archive") TRAIN_DIR = os.path.join("data", "merged", "train") os.makedirs(TRAIN_DIR, exist_ok=True) features_path = os.path.join(MERGED_DIR, features_name) temp_path = os.path.join(TEMP_DIR, temp_name) train_path = os.path.join(TRAIN_DIR, train_name) dfs = [] df_sources = {} # Track which dataframe came from which source # 1. Read all relevant archive files (recursively) archive_dfs = [] if os.path.isdir(ARCHIVE_DIR): for root, dirs, files in os.walk(ARCHIVE_DIR): for fname in files: # Only include files matching the asset (e.g., crypto_features_archived_*.parquet) if fname.startswith(archive_name.replace('.parquet', '_archived_')) and fname.endswith('.parquet'): fpath = os.path.join(root, fname) try: archive_dfs.append(pd.read_parquet(fpath)) except Exception as e: print(f"[WARN] Could not read archive file {fpath}: {e}") if archive_dfs: df_archive = pd.concat(archive_dfs, ignore_index=True) dfs.append(df_archive) df_sources['archive'] = df_archive else: print(f"[WARN] No archive files found for {archive_name}") # 2. Read features and temp if os.path.exists(features_path): df_features = pd.read_parquet(features_path) dfs.append(df_features) df_sources['features'] = df_features else: print(f"[WARN] Missing: {features_path}") if os.path.exists(temp_path): df_temp = pd.read_parquet(temp_path) dfs.append(df_temp) df_sources['temp'] = df_temp else: print(f"[WARN] Missing: {temp_path}") if not dfs: print("[ERROR] No input files found.") return # 3. Merge all data df_all = pd.concat(dfs, ignore_index=True) # 4. Before deduplication, try to fill nulls using data from different sources total_nulls_filled = 0 if len(df_sources) > 1: print(f"[INFO] Attempting cross-source null filling for {train_name}") # Create a comprehensive lookup from all sources all_data_lookup = {} for source_name, df_source in df_sources.items(): for _, row in df_source.iterrows(): key = (row['symbol'], row['interval_timestamp']) if key not in all_data_lookup: all_data_lookup[key] = {} # Add non-null values from this source for col in df_source.columns: try: # Use more robust null checking to handle arrays/scalars col_val = row[col] # Check if value is not null (handle both scalar and array cases) is_not_null = not pd.isnull(col_val) if hasattr(is_not_null, '__len__') and len(is_not_null) > 1: is_not_null = is_not_null.all() # For arrays, check if all are not null if is_not_null: all_data_lookup[key][col] = col_val except Exception as e: # Skip problematic columns with a warning print(f"[WARN] Could not process column '{col}' for train lookup: {e}") continue # Fill nulls in the combined dataframe for idx, row in df_all.iterrows(): key = (row['symbol'], row['interval_timestamp']) if key in all_data_lookup: lookup_row = all_data_lookup[key] for col in df_all.columns: try: # Use more robust null checking row_val = row[col] # Check if row value is null (handle both scalar and array cases) row_is_null = pd.isnull(row_val) if hasattr(row_is_null, '__len__') and len(row_is_null) > 1: row_is_null = row_is_null.any() # For arrays, check if any are null if row_is_null and col in lookup_row: df_all.at[idx, col] = _cast_value_for_column(df_all[col], lookup_row[col]) total_nulls_filled += 1 except Exception as e: # Skip problematic columns with a warning print(f"[WARN] Could not process column '{col}' for train null filling: {e}") continue # 5. Deduplicate by symbol+interval_timestamp, keeping the last occurrence df_all = df_all.drop_duplicates(subset=["symbol", "interval_timestamp"], keep="last") # 6. Handle problematic columns that can't be serialized to parquet problematic_cols = [] for col in df_all.columns: try: # Test if column can be converted to parquet-compatible format sample = df_all[col].iloc[0] if len(df_all) > 0 else None if sample is not None and hasattr(sample, '__len__') and not isinstance(sample, str): # If it's an array-like object (but not string), it might cause issues if len(sample) > 1: # Multi-dimensional array problematic_cols.append(col) except: # If we can't even check the sample, it's definitely problematic problematic_cols.append(col) if problematic_cols: print(f"[WARN] Dropping problematic columns that can't be serialized: {problematic_cols}") df_all = df_all.drop(columns=problematic_cols) # Save to parquet df_all.to_parquet(train_path, index=False) if total_nulls_filled > 0: print(f"[OK] Created train file: {train_path} with {len(df_all)} records, filled {total_nulls_filled} nulls") else: print(f"[OK] Created train file: {train_path} with {len(df_all)} records.") def create_merged_features(): """ Create the main merged_features.parquet file by combining crypto and stock features with intelligent null filling between the two datasets. """ crypto_path = os.path.join(MERGED_DIR, "crypto_features.parquet") stocks_path = os.path.join(MERGED_DIR, "stocks_features.parquet") merged_path = os.path.join(MERGED_DIR, "merged_features.parquet") dfs_to_merge = [] # Read crypto features if os.path.exists(crypto_path): df_crypto = pd.read_parquet(crypto_path) dfs_to_merge.append(('crypto', df_crypto)) print(f"[INFO] Loaded crypto features: {len(df_crypto)} rows, {len(df_crypto.columns)} columns") else: print(f"[WARN] Crypto features not found: {crypto_path}") # Read stock features if os.path.exists(stocks_path): df_stocks = pd.read_parquet(stocks_path) dfs_to_merge.append(('stocks', df_stocks)) print(f"[INFO] Loaded stock features: {len(df_stocks)} rows, {len(df_stocks.columns)} columns") else: print(f"[WARN] Stock features not found: {stocks_path}") if not dfs_to_merge: print("[ERROR] No feature files found to merge") return if len(dfs_to_merge) == 1: # Only one dataset available, just copy it df_merged = dfs_to_merge[0][1].copy() print(f"[INFO] Only {dfs_to_merge[0][0]} features available") else: # Multiple datasets - merge with null filling print("[INFO] Merging crypto and stock features with cross-dataset null filling") # Combine all dataframes all_dfs = [df for _, df in dfs_to_merge] df_merged = pd.concat(all_dfs, ignore_index=True, sort=False) # Perform cross-dataset null filling # Create lookup from all datasets for same symbol+timestamp lookup_data = {} for dataset_name, df in dfs_to_merge: for _, row in df.iterrows(): key = (row['symbol'], row['interval_timestamp']) if key not in lookup_data: lookup_data[key] = {} # Add non-null values from this dataset for col in df.columns: try: # Use more robust null checking to handle arrays/scalars col_val = row[col] # Check if value is not null (handle both scalar and array cases) is_not_null = not pd.isnull(col_val) if hasattr(is_not_null, '__len__') and len(is_not_null) > 1: is_not_null = is_not_null.all() # For arrays, check if all are not null if is_not_null: lookup_data[key][col] = col_val except Exception as e: # Skip problematic columns with a warning print(f"[WARN] Could not process column '{col}' for lookup: {e}") continue # Fill nulls using the comprehensive lookup nulls_filled = 0 for idx, row in df_merged.iterrows(): key = (row['symbol'], row['interval_timestamp']) if key in lookup_data: lookup_row = lookup_data[key] for col in df_merged.columns: try: # Use more robust null checking row_val = row[col] # Check if row value is null (handle both scalar and array cases) row_is_null = pd.isnull(row_val) if hasattr(row_is_null, '__len__') and len(row_is_null) > 1: row_is_null = row_is_null.any() # For arrays, check if any are null if row_is_null and col in lookup_row: df_merged.at[idx, col] = _cast_value_for_column(df_merged[col], lookup_row[col]) nulls_filled += 1 except Exception as e: # Skip problematic columns with a warning print(f"[WARN] Could not process column '{col}' for null filling: {e}") continue if nulls_filled > 0: print(f"[INFO] Cross-dataset null filling: {nulls_filled} values filled") # Remove duplicates if any (keeping last occurrence) initial_len = len(df_merged) df_merged = df_merged.drop_duplicates(subset=["symbol", "interval_timestamp"], keep="last") final_len = len(df_merged) if initial_len != final_len: print(f"[INFO] Removed {initial_len - final_len} duplicate records") # Save merged features df_merged.to_parquet(merged_path, index=False) print(f"[OK] Created merged features: {merged_path} with {len(df_merged)} rows, {len(df_merged.columns)} columns") # Report statistics nulls_remaining = df_merged.isnull().sum().sum() print(f"[INFO] Merged features null count: {nulls_remaining}") # Report symbol breakdown if 'symbol' in df_merged.columns: symbol_counts = df_merged['symbol'].value_counts() print(f"[INFO] Top symbols: {dict(symbol_counts.head(10))}") def main(): import sys # Check if this is being run as a test if len(sys.argv) > 1 and sys.argv[1] == '--test-null-filling': from test_null_filling_merge import main as test_main sys.exit(test_main()) merge_temp_to_merged("crypto_features.parquet", "crypto_features.parquet") merge_temp_to_merged("stocks_features.parquet", "stocks_features.parquet") # Create the main merged features file create_merged_features() merge_all_to_train("crypto_features.parquet", "crypto_features.parquet", "crypto_features.parquet", "crypto_features_train.parquet") merge_all_to_train("stocks_features.parquet", "stocks_features.parquet", "stocks_features.parquet", "stocks_features_train.parquet") if __name__ == "__main__": main()