Maaroufabousaleh
f
c49b21b
raw
history blame
19.7 kB
import pandas as pd
import os
import numpy as np
from datetime import datetime, timedelta
DAYS_OLD = 7
MERGED_DIR = "data/merged/features"
TEMP_DIR = "data/merged/temp"
# Helper: safely cast a value to match a target column's dtype (e.g., drop tz on datetimes)
def _cast_value_for_column(target_series: pd.Series, value):
try:
# If target is datetime64[ns], ensure assigned value is tz-naive
if pd.api.types.is_datetime64_any_dtype(target_series.dtype):
v = pd.to_datetime(value, errors='coerce', utc=True)
if isinstance(v, pd.Timestamp):
return v.tz_localize(None)
return v
return value
except Exception:
return value
def fill_nulls_from_temp(df_merged, df_temp):
"""
Fill null values in df_merged using non-null values from df_temp
for the same symbol + interval_timestamp combination.
Returns the number of null values filled.
"""
nulls_filled = 0
if df_merged.empty or df_temp.empty:
return nulls_filled
# Create lookup key for efficient matching
key_cols = ["symbol", "interval_timestamp"]
# Check if key columns exist in both dataframes
if not all(col in df_merged.columns for col in key_cols):
print("[WARN] Key columns missing in merged data, skipping null filling")
return nulls_filled
if not all(col in df_temp.columns for col in key_cols):
print("[WARN] Key columns missing in temp data, skipping null filling")
return nulls_filled
# Create a lookup dictionary from temp data
# Format: {(symbol, timestamp): {column: value, ...}}
temp_lookup = {}
for _, row in df_temp.iterrows():
key = (row['symbol'], row['interval_timestamp'])
temp_lookup[key] = row.to_dict()
# Find common columns between merged and temp (excluding keys)
common_cols = [col for col in df_merged.columns
if col in df_temp.columns and col not in key_cols]
if not common_cols:
print("[WARN] No common columns found for null filling")
return nulls_filled
# Track columns with null values before processing
null_cols_before = []
for col in common_cols:
if df_merged[col].isnull().any():
null_cols_before.append(col)
if not null_cols_before:
print("[INFO] No null values found in common columns")
return nulls_filled
print(f"[INFO] Attempting to fill nulls in {len(null_cols_before)} columns: {null_cols_before}")
# Fill null values row by row
for idx, row in df_merged.iterrows():
key = (row['symbol'], row['interval_timestamp'])
# Check if we have corresponding temp data for this key
if key in temp_lookup:
temp_row = temp_lookup[key]
# Fill null values for each column
for col in null_cols_before:
try:
# Use more robust null checking to handle arrays/scalars
row_val = row[col]
temp_val = temp_row.get(col)
# Check if row value is null (handle both scalar and array cases)
row_is_null = pd.isnull(row_val)
if hasattr(row_is_null, '__len__') and len(row_is_null) > 1:
row_is_null = row_is_null.any() # For arrays, check if any are null
# Check if temp value is not null
temp_is_not_null = not pd.isnull(temp_val)
if hasattr(temp_is_not_null, '__len__') and len(temp_is_not_null) > 1:
temp_is_not_null = temp_is_not_null.all() # For arrays, check if all are not null
if row_is_null and temp_is_not_null:
# Fill the null value with dtype-compatible casting
df_merged.at[idx, col] = _cast_value_for_column(df_merged[col], temp_val)
nulls_filled += 1
except Exception as e:
# Skip problematic columns with a warning
print(f"[WARN] Could not process column '{col}' for null filling: {e}")
continue
if nulls_filled > 0:
print(f"[INFO] Successfully filled {nulls_filled} null values from temp data")
# Report which columns were improved
for col in null_cols_before:
nulls_remaining = df_merged[col].isnull().sum()
print(f"[INFO] Column '{col}': {nulls_remaining} nulls remaining")
return nulls_filled
# Helper to filter new records (DISABLED - now keeps ALL data for accumulative merging)
def filter_new(df):
# IMPORTANT: Return ALL data instead of filtering by days
# This ensures accumulative merging from day one
return df.copy()
def merge_temp_to_merged(temp_name, merged_name):
temp_path = os.path.join(TEMP_DIR, temp_name)
merged_path = os.path.join(MERGED_DIR, merged_name)
if not os.path.exists(temp_path):
print(f"[WARN] Temp file missing: {temp_path}")
return
if not os.path.exists(merged_path):
print(f"[WARN] Merged file missing: {merged_path}")
return
df_temp = pd.read_parquet(temp_path)
df_merged = pd.read_parquet(merged_path)
# Check if required columns exist
required_cols = ["symbol", "interval_timestamp"]
missing_cols_temp = [col for col in required_cols if col not in df_temp.columns]
missing_cols_merged = [col for col in required_cols if col not in df_merged.columns]
if missing_cols_temp:
print(f"[ERROR] Missing columns in temp file {temp_name}: {missing_cols_temp}")
print(f"[INFO] Available columns in temp: {list(df_temp.columns)}")
return
if missing_cols_merged:
print(f"[ERROR] Missing columns in merged file {merged_name}: {missing_cols_merged}")
print(f"[INFO] Available columns in merged: {list(df_merged.columns)}")
return
new_temp = filter_new(df_temp)
# Step 1: Fill null values in merged data using temp data for same symbol+timestamp
nulls_filled = fill_nulls_from_temp(df_merged, df_temp)
# Step 2: Only add truly new rows (not already in merged)
key_cols = ["symbol", "interval_timestamp"]
merged_keys = set(tuple(row) for row in df_merged[key_cols].values)
new_rows = new_temp[~new_temp[key_cols].apply(tuple, axis=1).isin(merged_keys)]
if new_rows.empty and nulls_filled == 0:
print(f"[INFO] No new records to add from {temp_name} and no nulls filled")
return
df_final = pd.concat([df_merged, new_rows], ignore_index=True)
df_final.to_parquet(merged_path, index=False)
print(f"[OK] Added {len(new_rows)} new records from {temp_name} to {merged_name}, filled {nulls_filled} null values")
def merge_all_to_train(archive_name, features_name, temp_name, train_name):
"""
Merge archive, features, and temp files into a deduplicated train file under merge/train/.
Uniqueness is enforced on (symbol, interval_timestamp).
Also performs null filling between different sources.
"""
ARCHIVE_DIR = os.path.join(MERGED_DIR, "archive")
TRAIN_DIR = os.path.join("data", "merged", "train")
os.makedirs(TRAIN_DIR, exist_ok=True)
features_path = os.path.join(MERGED_DIR, features_name)
temp_path = os.path.join(TEMP_DIR, temp_name)
train_path = os.path.join(TRAIN_DIR, train_name)
dfs = []
df_sources = {} # Track which dataframe came from which source
# 1. Read all relevant archive files (recursively)
archive_dfs = []
if os.path.isdir(ARCHIVE_DIR):
for root, dirs, files in os.walk(ARCHIVE_DIR):
for fname in files:
# Only include files matching the asset (e.g., crypto_features_archived_*.parquet)
if fname.startswith(archive_name.replace('.parquet', '_archived_')) and fname.endswith('.parquet'):
fpath = os.path.join(root, fname)
try:
archive_dfs.append(pd.read_parquet(fpath))
except Exception as e:
print(f"[WARN] Could not read archive file {fpath}: {e}")
if archive_dfs:
df_archive = pd.concat(archive_dfs, ignore_index=True)
dfs.append(df_archive)
df_sources['archive'] = df_archive
else:
print(f"[WARN] No archive files found for {archive_name}")
# 2. Read features and temp
if os.path.exists(features_path):
df_features = pd.read_parquet(features_path)
dfs.append(df_features)
df_sources['features'] = df_features
else:
print(f"[WARN] Missing: {features_path}")
if os.path.exists(temp_path):
df_temp = pd.read_parquet(temp_path)
dfs.append(df_temp)
df_sources['temp'] = df_temp
else:
print(f"[WARN] Missing: {temp_path}")
if not dfs:
print("[ERROR] No input files found.")
return
# 3. Merge all data
df_all = pd.concat(dfs, ignore_index=True)
# 4. Before deduplication, try to fill nulls using data from different sources
total_nulls_filled = 0
if len(df_sources) > 1:
print(f"[INFO] Attempting cross-source null filling for {train_name}")
# Create a comprehensive lookup from all sources
all_data_lookup = {}
for source_name, df_source in df_sources.items():
for _, row in df_source.iterrows():
key = (row['symbol'], row['interval_timestamp'])
if key not in all_data_lookup:
all_data_lookup[key] = {}
# Add non-null values from this source
for col in df_source.columns:
try:
# Use more robust null checking to handle arrays/scalars
col_val = row[col]
# Check if value is not null (handle both scalar and array cases)
is_not_null = not pd.isnull(col_val)
if hasattr(is_not_null, '__len__') and len(is_not_null) > 1:
is_not_null = is_not_null.all() # For arrays, check if all are not null
if is_not_null:
all_data_lookup[key][col] = col_val
except Exception as e:
# Skip problematic columns with a warning
print(f"[WARN] Could not process column '{col}' for train lookup: {e}")
continue
# Fill nulls in the combined dataframe
for idx, row in df_all.iterrows():
key = (row['symbol'], row['interval_timestamp'])
if key in all_data_lookup:
lookup_row = all_data_lookup[key]
for col in df_all.columns:
try:
# Use more robust null checking
row_val = row[col]
# Check if row value is null (handle both scalar and array cases)
row_is_null = pd.isnull(row_val)
if hasattr(row_is_null, '__len__') and len(row_is_null) > 1:
row_is_null = row_is_null.any() # For arrays, check if any are null
if row_is_null and col in lookup_row:
df_all.at[idx, col] = _cast_value_for_column(df_all[col], lookup_row[col])
total_nulls_filled += 1
except Exception as e:
# Skip problematic columns with a warning
print(f"[WARN] Could not process column '{col}' for train null filling: {e}")
continue
# 5. Deduplicate by symbol+interval_timestamp, keeping the last occurrence
df_all = df_all.drop_duplicates(subset=["symbol", "interval_timestamp"], keep="last")
# 6. Handle problematic columns that can't be serialized to parquet
problematic_cols = []
for col in df_all.columns:
try:
# Test if column can be converted to parquet-compatible format
sample = df_all[col].iloc[0] if len(df_all) > 0 else None
if sample is not None and hasattr(sample, '__len__') and not isinstance(sample, str):
# If it's an array-like object (but not string), it might cause issues
if len(sample) > 1: # Multi-dimensional array
problematic_cols.append(col)
except:
# If we can't even check the sample, it's definitely problematic
problematic_cols.append(col)
if problematic_cols:
print(f"[WARN] Dropping problematic columns that can't be serialized: {problematic_cols}")
df_all = df_all.drop(columns=problematic_cols)
# Save to parquet
df_all.to_parquet(train_path, index=False)
if total_nulls_filled > 0:
print(f"[OK] Created train file: {train_path} with {len(df_all)} records, filled {total_nulls_filled} nulls")
else:
print(f"[OK] Created train file: {train_path} with {len(df_all)} records.")
def create_merged_features():
"""
Create the main merged_features.parquet file by combining crypto and stock features
with intelligent null filling between the two datasets.
"""
crypto_path = os.path.join(MERGED_DIR, "crypto_features.parquet")
stocks_path = os.path.join(MERGED_DIR, "stocks_features.parquet")
merged_path = os.path.join(MERGED_DIR, "merged_features.parquet")
dfs_to_merge = []
# Read crypto features
if os.path.exists(crypto_path):
df_crypto = pd.read_parquet(crypto_path)
dfs_to_merge.append(('crypto', df_crypto))
print(f"[INFO] Loaded crypto features: {len(df_crypto)} rows, {len(df_crypto.columns)} columns")
else:
print(f"[WARN] Crypto features not found: {crypto_path}")
# Read stock features
if os.path.exists(stocks_path):
df_stocks = pd.read_parquet(stocks_path)
dfs_to_merge.append(('stocks', df_stocks))
print(f"[INFO] Loaded stock features: {len(df_stocks)} rows, {len(df_stocks.columns)} columns")
else:
print(f"[WARN] Stock features not found: {stocks_path}")
if not dfs_to_merge:
print("[ERROR] No feature files found to merge")
return
if len(dfs_to_merge) == 1:
# Only one dataset available, just copy it
df_merged = dfs_to_merge[0][1].copy()
print(f"[INFO] Only {dfs_to_merge[0][0]} features available")
else:
# Multiple datasets - merge with null filling
print("[INFO] Merging crypto and stock features with cross-dataset null filling")
# Combine all dataframes
all_dfs = [df for _, df in dfs_to_merge]
df_merged = pd.concat(all_dfs, ignore_index=True, sort=False)
# Perform cross-dataset null filling
# Create lookup from all datasets for same symbol+timestamp
lookup_data = {}
for dataset_name, df in dfs_to_merge:
for _, row in df.iterrows():
key = (row['symbol'], row['interval_timestamp'])
if key not in lookup_data:
lookup_data[key] = {}
# Add non-null values from this dataset
for col in df.columns:
try:
# Use more robust null checking to handle arrays/scalars
col_val = row[col]
# Check if value is not null (handle both scalar and array cases)
is_not_null = not pd.isnull(col_val)
if hasattr(is_not_null, '__len__') and len(is_not_null) > 1:
is_not_null = is_not_null.all() # For arrays, check if all are not null
if is_not_null:
lookup_data[key][col] = col_val
except Exception as e:
# Skip problematic columns with a warning
print(f"[WARN] Could not process column '{col}' for lookup: {e}")
continue
# Fill nulls using the comprehensive lookup
nulls_filled = 0
for idx, row in df_merged.iterrows():
key = (row['symbol'], row['interval_timestamp'])
if key in lookup_data:
lookup_row = lookup_data[key]
for col in df_merged.columns:
try:
# Use more robust null checking
row_val = row[col]
# Check if row value is null (handle both scalar and array cases)
row_is_null = pd.isnull(row_val)
if hasattr(row_is_null, '__len__') and len(row_is_null) > 1:
row_is_null = row_is_null.any() # For arrays, check if any are null
if row_is_null and col in lookup_row:
df_merged.at[idx, col] = _cast_value_for_column(df_merged[col], lookup_row[col])
nulls_filled += 1
except Exception as e:
# Skip problematic columns with a warning
print(f"[WARN] Could not process column '{col}' for null filling: {e}")
continue
if nulls_filled > 0:
print(f"[INFO] Cross-dataset null filling: {nulls_filled} values filled")
# Remove duplicates if any (keeping last occurrence)
initial_len = len(df_merged)
df_merged = df_merged.drop_duplicates(subset=["symbol", "interval_timestamp"], keep="last")
final_len = len(df_merged)
if initial_len != final_len:
print(f"[INFO] Removed {initial_len - final_len} duplicate records")
# Save merged features
df_merged.to_parquet(merged_path, index=False)
print(f"[OK] Created merged features: {merged_path} with {len(df_merged)} rows, {len(df_merged.columns)} columns")
# Report statistics
nulls_remaining = df_merged.isnull().sum().sum()
print(f"[INFO] Merged features null count: {nulls_remaining}")
# Report symbol breakdown
if 'symbol' in df_merged.columns:
symbol_counts = df_merged['symbol'].value_counts()
print(f"[INFO] Top symbols: {dict(symbol_counts.head(10))}")
def main():
import sys
# Check if this is being run as a test
if len(sys.argv) > 1 and sys.argv[1] == '--test-null-filling':
from test_null_filling_merge import main as test_main
sys.exit(test_main())
merge_temp_to_merged("crypto_features.parquet", "crypto_features.parquet")
merge_temp_to_merged("stocks_features.parquet", "stocks_features.parquet")
# Create the main merged features file
create_merged_features()
merge_all_to_train("crypto_features.parquet", "crypto_features.parquet", "crypto_features.parquet", "crypto_features_train.parquet")
merge_all_to_train("stocks_features.parquet", "stocks_features.parquet", "stocks_features.parquet", "stocks_features_train.parquet")
if __name__ == "__main__":
main()