|
""" |
|
Santiment Data Merger |
|
===================== |
|
|
|
This script merges all Santiment data files into a unified features dataset. |
|
It reads all parquet files from data/santiment/, merges them by slug and datetime |
|
with 1-hour interval tolerance, and creates merged_features.parquet. |
|
|
|
Features: |
|
- Reads all Santiment parquet files automatically |
|
- Merges by slug and datetime with 1-hour tolerance |
|
- Handles different data formats (financial, ohlcv, prices, etc.) |
|
- Creates comprehensive feature dataset |
|
- Robust error handling and logging |
|
|
|
Author: AI Assistant |
|
Date: August 2025 |
|
""" |
|
|
|
import os |
|
import sys |
|
import pandas as pd |
|
import numpy as np |
|
from pathlib import Path |
|
from datetime import datetime, timedelta |
|
import logging |
|
import glob |
|
from typing import List, Dict, Optional, Tuple |
|
import warnings |
|
|
|
|
|
try: |
|
from src.config import DATA_DIR as CFG_DATA_DIR |
|
except Exception: |
|
try: |
|
from config import DATA_DIR as CFG_DATA_DIR |
|
except Exception: |
|
CFG_DATA_DIR = "/data" |
|
|
|
|
|
def _resolve_under_data(path_like: str | os.PathLike) -> Path: |
|
p = Path(path_like) |
|
if p.is_absolute(): |
|
return p |
|
parts = p.parts |
|
if parts and parts[0].lower() == "data": |
|
rel = Path(*parts[1:]) if len(parts) > 1 else Path() |
|
else: |
|
rel = p |
|
return Path(CFG_DATA_DIR) / rel |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
class SantimentDataMerger: |
|
""" |
|
Comprehensive Santiment Data Merger |
|
|
|
Merges all Santiment parquet files into a unified features dataset |
|
with proper handling of different data formats and time alignment. |
|
""" |
|
|
|
def __init__(self, |
|
source_dir: str = "data/santiment", |
|
output_dir: str = "data/santiment", |
|
time_tolerance_hours: int = 1): |
|
""" |
|
Initialize the Santiment Data Merger |
|
|
|
Args: |
|
source_dir: Directory containing Santiment parquet files |
|
output_dir: Directory to save merged features |
|
time_tolerance_hours: Tolerance for datetime matching (hours) |
|
""" |
|
|
|
self.source_dir = _resolve_under_data(source_dir) |
|
self.output_dir = _resolve_under_data(output_dir) |
|
self.time_tolerance = timedelta(hours=time_tolerance_hours) |
|
|
|
|
|
self.source_dir.mkdir(parents=True, exist_ok=True) |
|
self.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
self.dataframes: Dict[str, pd.DataFrame] = {} |
|
self.merged_data: Optional[pd.DataFrame] = None |
|
self.processing_stats = { |
|
'files_found': 0, |
|
'files_processed': 0, |
|
'files_failed': 0, |
|
'total_records': 0, |
|
'unique_slugs': set(), |
|
'date_range': {}, |
|
'categories': set() |
|
} |
|
|
|
|
|
self.placeholder_created = False |
|
|
|
|
|
self.symbol_normalizer = self._setup_symbol_normalizer() |
|
|
|
def _setup_symbol_normalizer(self): |
|
""" |
|
Set up symbol normalization mapping for consistent asset identification |
|
|
|
Returns: |
|
Dictionary mapping various symbol formats to canonical slugs |
|
""" |
|
|
|
|
|
symbol_mapping = { |
|
|
|
'bitcoin': 'BTC', |
|
'btc': 'BTC', |
|
'Bitcoin': 'BTC', |
|
'BTC': 'BTC', |
|
|
|
|
|
'ethereum': 'ETH', |
|
'eth': 'ETH', |
|
'Ethereum': 'ETH', |
|
'ETH': 'ETH', |
|
|
|
|
|
'ripple': 'XRP', |
|
'xrp': 'XRP', |
|
'Ripple': 'XRP', |
|
'XRP': 'XRP', |
|
|
|
|
|
'solana': 'SOL', |
|
'sol': 'SOL', |
|
'Solana': 'SOL', |
|
'SOL': 'SOL', |
|
|
|
|
|
'cardano': 'ADA', |
|
'ada': 'ADA', |
|
'Cardano': 'ADA', |
|
'ADA': 'ADA', |
|
|
|
|
|
'polkadot': 'DOT', |
|
'dot': 'DOT', |
|
'Polkadot': 'DOT', |
|
'DOT': 'DOT', |
|
|
|
|
|
'chainlink': 'LINK', |
|
'link': 'LINK', |
|
'Chainlink': 'LINK', |
|
'LINK': 'LINK', |
|
|
|
|
|
'litecoin': 'LTC', |
|
'ltc': 'LTC', |
|
'Litecoin': 'LTC', |
|
'LTC': 'LTC', |
|
|
|
|
|
'bitcoin-cash': 'BCH', |
|
'bch': 'BCH', |
|
'Bitcoin Cash': 'BCH', |
|
'BCH': 'BCH', |
|
|
|
|
|
'stellar': 'XLM', |
|
'xlm': 'XLM', |
|
'Stellar': 'XLM', |
|
'XLM': 'XLM', |
|
|
|
|
|
'ethereum-classic': 'ETC', |
|
'etc': 'ETC', |
|
'Ethereum Classic': 'ETC', |
|
'ETC': 'ETC', |
|
|
|
|
|
'eos': 'EOS', |
|
'EOS': 'EOS', |
|
} |
|
|
|
logger.info(f"Initialized symbol normalizer with {len(symbol_mapping)} mappings") |
|
return symbol_mapping |
|
|
|
def normalize_symbol(self, symbol: str) -> str: |
|
""" |
|
Normalize a symbol to its canonical uppercase format |
|
|
|
Args: |
|
symbol: Symbol to normalize |
|
|
|
Returns: |
|
Canonical uppercase symbol (e.g., BTC, ETH, SOL) |
|
""" |
|
if symbol in self.symbol_normalizer: |
|
canonical = self.symbol_normalizer[symbol] |
|
if symbol != canonical: |
|
logger.debug(f"Normalized '{symbol}' -> '{canonical}'") |
|
return canonical |
|
|
|
|
|
logger.warning(f"Unknown symbol '{symbol}' not found in normalization mapping, using uppercase") |
|
return symbol.upper() |
|
|
|
def find_parquet_files(self) -> List[Path]: |
|
""" |
|
Find all parquet files in the source directory |
|
|
|
Returns: |
|
List of parquet file paths |
|
""" |
|
parquet_files = list(self.source_dir.glob("*.parquet")) |
|
|
|
|
|
santiment_files = [] |
|
for file_path in parquet_files: |
|
filename = file_path.name.lower() |
|
|
|
if ('santiment_' in filename or 'ohlcv' in filename or 'prices' in filename) and 'merged' not in filename: |
|
santiment_files.append(file_path) |
|
|
|
self.processing_stats['files_found'] = len(santiment_files) |
|
logger.info(f"Found {len(santiment_files)} Santiment parquet files") |
|
|
|
return santiment_files |
|
|
|
def parse_filename(self, file_path: Path) -> Dict[str, str]: |
|
""" |
|
Parse filename to extract metadata |
|
|
|
Args: |
|
file_path: Path to the parquet file |
|
|
|
Returns: |
|
Dictionary with parsed metadata |
|
""" |
|
filename = file_path.stem |
|
parts = filename.split('_') |
|
|
|
metadata = { |
|
'source': 'santiment', |
|
'category': 'unknown', |
|
'metric': 'unknown', |
|
'asset': 'unknown', |
|
'timestamp': 'unknown' |
|
} |
|
|
|
try: |
|
if filename.startswith('santiment_'): |
|
|
|
if len(parts) >= 4: |
|
metadata['category'] = parts[1] |
|
metadata['metric'] = parts[2] |
|
metadata['timestamp'] = '_'.join(parts[3:]) |
|
elif 'ohlcv' in filename: |
|
|
|
if len(parts) >= 4: |
|
metadata['category'] = 'ohlcv' |
|
metadata['metric'] = 'ohlcv' |
|
metadata['asset'] = parts[2] |
|
metadata['timestamp'] = '_'.join(parts[3:]) |
|
elif 'prices' in filename: |
|
|
|
if len(parts) >= 4: |
|
metadata['category'] = 'prices' |
|
metadata['metric'] = 'prices_detailed' |
|
metadata['asset'] = parts[2] |
|
metadata['timestamp'] = '_'.join(parts[3:]) |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to parse filename {filename}: {e}") |
|
|
|
return metadata |
|
|
|
def load_and_standardize_dataframe(self, file_path: Path) -> Optional[pd.DataFrame]: |
|
""" |
|
Load and standardize a parquet file |
|
|
|
Args: |
|
file_path: Path to the parquet file |
|
|
|
Returns: |
|
Standardized DataFrame or None if failed |
|
""" |
|
try: |
|
df = pd.read_parquet(file_path) |
|
|
|
if df.empty: |
|
logger.warning(f"Empty dataframe: {file_path.name}") |
|
return None |
|
|
|
|
|
metadata = self.parse_filename(file_path) |
|
|
|
|
|
if 'datetime' in df.columns: |
|
df['datetime'] = pd.to_datetime(df['datetime']) |
|
df.set_index('datetime', inplace=True) |
|
elif df.index.name == 'datetime' or pd.api.types.is_datetime64_any_dtype(df.index): |
|
df.index = pd.to_datetime(df.index) |
|
df.index.name = 'datetime' |
|
else: |
|
|
|
datetime_cols = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()] |
|
if datetime_cols: |
|
df[datetime_cols[0]] = pd.to_datetime(df[datetime_cols[0]]) |
|
df.set_index(datetime_cols[0], inplace=True) |
|
df.index.name = 'datetime' |
|
else: |
|
logger.warning(f"No datetime column found in {file_path.name}") |
|
return None |
|
|
|
|
|
if 'slug' not in df.columns: |
|
if metadata['asset'] != 'unknown': |
|
|
|
normalized_asset = self.normalize_symbol(metadata['asset']) |
|
df['slug'] = normalized_asset |
|
if metadata['asset'] != normalized_asset: |
|
logger.info(f"Normalized asset '{metadata['asset']}' -> '{normalized_asset}' in {file_path.name}") |
|
else: |
|
logger.warning(f"No slug information found in {file_path.name}") |
|
return None |
|
else: |
|
|
|
df['slug'] = df['slug'].apply(self.normalize_symbol) |
|
logger.debug(f"Normalized existing slug column in {file_path.name}") |
|
|
|
|
|
df['source_file'] = file_path.name |
|
df['category'] = metadata['category'] |
|
|
|
|
|
value_columns = [col for col in df.columns if col not in ['slug', 'metric', 'source_file', 'category']] |
|
|
|
|
|
category = metadata['category'] |
|
metric = metadata['metric'] |
|
|
|
column_mapping = {} |
|
for col in value_columns: |
|
if col in ['slug', 'source_file', 'category']: |
|
continue |
|
|
|
|
|
if col == 'value': |
|
new_col = f"{category}_{metric}" |
|
elif col in ['open', 'high', 'low', 'close', 'volume']: |
|
new_col = f"{category}_{col}" |
|
else: |
|
new_col = f"{category}_{col}" |
|
|
|
column_mapping[col] = new_col |
|
|
|
df.rename(columns=column_mapping, inplace=True) |
|
|
|
|
|
self.processing_stats['unique_slugs'].update(df['slug'].unique()) |
|
self.processing_stats['categories'].add(category) |
|
|
|
logger.info(f"Loaded {file_path.name}: {len(df)} records, {len(df.columns)} columns") |
|
|
|
return df |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to load {file_path.name}: {e}") |
|
return None |
|
|
|
def merge_dataframes_by_slug_datetime(self, dataframes: List[pd.DataFrame]) -> pd.DataFrame: |
|
""" |
|
Merge multiple dataframes by slug and datetime with tolerance |
|
|
|
Args: |
|
dataframes: List of DataFrames to merge |
|
|
|
Returns: |
|
Merged DataFrame |
|
""" |
|
if not dataframes: |
|
return pd.DataFrame() |
|
|
|
logger.info(f"Merging {len(dataframes)} dataframes...") |
|
|
|
|
|
merged = dataframes[0].copy() |
|
logger.info(f"Starting with base dataframe: {len(merged)} records") |
|
|
|
|
|
for i, df in enumerate(dataframes[1:], 1): |
|
logger.info(f"Merging dataframe {i+1}/{len(dataframes)}: {len(df)} records") |
|
|
|
try: |
|
|
|
merged = self._merge_with_time_tolerance(merged, df) |
|
logger.info(f"After merge {i}: {len(merged)} records") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to merge dataframe {i+1}: {e}") |
|
continue |
|
|
|
return merged |
|
|
|
def _merge_with_time_tolerance(self, left_df: pd.DataFrame, right_df: pd.DataFrame) -> pd.DataFrame: |
|
""" |
|
Merge two dataframes with time tolerance |
|
|
|
Args: |
|
left_df: Left DataFrame |
|
right_df: Right DataFrame |
|
|
|
Returns: |
|
Merged DataFrame |
|
""" |
|
|
|
left_reset = left_df.reset_index() |
|
right_reset = right_df.reset_index() |
|
|
|
|
|
common_slugs = set(left_reset['slug'].unique()) & set(right_reset['slug'].unique()) |
|
|
|
if not common_slugs: |
|
|
|
logger.warning("No common slugs found, concatenating dataframes") |
|
combined = pd.concat([left_df, right_df], axis=0, sort=False) |
|
return combined.sort_index() |
|
|
|
merged_parts = [] |
|
|
|
for slug in common_slugs: |
|
left_slug = left_reset[left_reset['slug'] == slug].copy() |
|
right_slug = right_reset[right_reset['slug'] == slug].copy() |
|
|
|
if left_slug.empty or right_slug.empty: |
|
continue |
|
|
|
|
|
left_slug = left_slug.sort_values('datetime') |
|
right_slug = right_slug.sort_values('datetime') |
|
|
|
|
|
try: |
|
merged_slug = pd.merge_asof( |
|
left_slug, |
|
right_slug, |
|
on='datetime', |
|
by='slug', |
|
tolerance=self.time_tolerance, |
|
direction='nearest', |
|
suffixes=('', '_right') |
|
) |
|
|
|
|
|
duplicate_cols = [col for col in merged_slug.columns if col.endswith('_right')] |
|
for col in duplicate_cols: |
|
base_col = col.replace('_right', '') |
|
if base_col in merged_slug.columns: |
|
|
|
merged_slug[base_col] = merged_slug[base_col].fillna(merged_slug[col]) |
|
else: |
|
|
|
merged_slug[base_col] = merged_slug[col] |
|
merged_slug.drop(columns=[col], inplace=True) |
|
|
|
merged_parts.append(merged_slug) |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to merge slug {slug}: {e}") |
|
|
|
slug_combined = pd.concat([left_slug, right_slug], axis=0, sort=False) |
|
merged_parts.append(slug_combined) |
|
|
|
|
|
left_only_slugs = set(left_reset['slug'].unique()) - common_slugs |
|
right_only_slugs = set(right_reset['slug'].unique()) - common_slugs |
|
|
|
for slug in left_only_slugs: |
|
merged_parts.append(left_reset[left_reset['slug'] == slug]) |
|
|
|
for slug in right_only_slugs: |
|
merged_parts.append(right_reset[right_reset['slug'] == slug]) |
|
|
|
|
|
if merged_parts: |
|
final_merged = pd.concat(merged_parts, axis=0, sort=False, ignore_index=True) |
|
|
|
final_merged.set_index('datetime', inplace=True) |
|
return final_merged.sort_index() |
|
else: |
|
return left_df |
|
|
|
def fill_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: |
|
""" |
|
Comprehensive null filling strategy for the merged dataset |
|
|
|
Args: |
|
df: DataFrame with potential null values |
|
|
|
Returns: |
|
DataFrame with filled null values |
|
""" |
|
logger.info("Applying comprehensive null filling strategy...") |
|
|
|
filled_df = df.copy() |
|
null_counts_before = filled_df.isnull().sum().sum() |
|
|
|
|
|
logger.info("Step 1: Forward filling within each asset...") |
|
for slug in filled_df['slug'].unique(): |
|
slug_mask = filled_df['slug'] == slug |
|
filled_df.loc[slug_mask] = filled_df.loc[slug_mask].ffill() |
|
|
|
|
|
logger.info("Step 2: Backward filling within each asset...") |
|
for slug in filled_df['slug'].unique(): |
|
slug_mask = filled_df['slug'] == slug |
|
filled_df.loc[slug_mask] = filled_df.loc[slug_mask].bfill() |
|
|
|
|
|
logger.info("Step 3: Filling remaining nulls with type-specific defaults...") |
|
|
|
for col in filled_df.columns: |
|
if filled_df[col].isnull().any(): |
|
|
|
if any(keyword in col.lower() for keyword in ['price', 'usd', 'btc', 'eth', 'marketcap', 'volume']): |
|
median_val = filled_df[col].median() |
|
filled_df[col] = filled_df[col].fillna(median_val) |
|
logger.debug(f"Filled {col} nulls with median: {median_val}") |
|
|
|
|
|
elif any(keyword in col.lower() for keyword in ['address', 'network', 'active', 'transaction']): |
|
filled_df[col] = filled_df[col].fillna(0) |
|
logger.debug(f"Filled {col} nulls with 0") |
|
|
|
|
|
elif any(keyword in col.lower() for keyword in ['exchange', 'inflow', 'outflow', 'balance']): |
|
filled_df[col] = filled_df[col].fillna(0) |
|
logger.debug(f"Filled {col} nulls with 0") |
|
|
|
|
|
elif any(keyword in col.lower() for keyword in ['supply', 'circulation', 'velocity']): |
|
mean_val = filled_df[col].mean() |
|
filled_df[col] = filled_df[col].fillna(mean_val) |
|
logger.debug(f"Filled {col} nulls with mean: {mean_val}") |
|
|
|
|
|
elif any(keyword in col.lower() for keyword in ['dev', 'github', 'contributors']): |
|
filled_df[col] = filled_df[col].fillna(0) |
|
logger.debug(f"Filled {col} nulls with 0") |
|
|
|
|
|
elif any(keyword in col.lower() for keyword in ['social', 'sentiment', 'volume_4chan', 'volume_reddit']): |
|
filled_df[col] = filled_df[col].fillna(0) |
|
logger.debug(f"Filled {col} nulls with 0") |
|
|
|
|
|
elif any(keyword in col.lower() for keyword in ['open', 'high', 'low', 'close', 'ohlcv']): |
|
filled_df[col] = filled_df[col].ffill().bfill() |
|
logger.debug(f"Filled {col} nulls with forward/backward fill") |
|
|
|
|
|
elif any(keyword in col.lower() for keyword in ['funding', 'interest', 'whale', 'holders']): |
|
filled_df[col] = filled_df[col].fillna(0) |
|
logger.debug(f"Filled {col} nulls with 0") |
|
|
|
|
|
elif filled_df[col].dtype == 'object': |
|
if col in ['slug', 'category', 'source_file', 'metric', 'development_alternative_slug_used']: |
|
|
|
continue |
|
else: |
|
mode_val = filled_df[col].mode() |
|
if len(mode_val) > 0: |
|
filled_df[col] = filled_df[col].fillna(mode_val[0]) |
|
else: |
|
filled_df[col] = filled_df[col].fillna('unknown') |
|
logger.debug(f"Filled {col} nulls with mode/unknown") |
|
|
|
|
|
elif pd.api.types.is_numeric_dtype(filled_df[col]): |
|
median_val = filled_df[col].median() |
|
if pd.notna(median_val): |
|
filled_df[col] = filled_df[col].fillna(median_val) |
|
logger.debug(f"Filled {col} nulls with median: {median_val}") |
|
else: |
|
filled_df[col] = filled_df[col].fillna(0) |
|
logger.debug(f"Filled {col} nulls with 0 (median was NaN)") |
|
|
|
null_counts_after = filled_df.isnull().sum().sum() |
|
nulls_filled = null_counts_before - null_counts_after |
|
|
|
logger.info(f"Null filling completed:") |
|
logger.info(f" Nulls before: {null_counts_before:,}") |
|
logger.info(f" Nulls after: {null_counts_after:,}") |
|
logger.info(f" Nulls filled: {nulls_filled:,}") |
|
|
|
return filled_df |
|
|
|
def process_all_files(self) -> bool: |
|
""" |
|
Process all Santiment parquet files |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
try: |
|
|
|
parquet_files = self.find_parquet_files() |
|
|
|
if not parquet_files: |
|
logger.warning("No Santiment parquet files found") |
|
|
|
try: |
|
|
|
placeholder = pd.DataFrame({'slug': pd.Series(dtype='object')}) |
|
|
|
placeholder.index = pd.DatetimeIndex([], name='datetime') |
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True) |
|
out_path = self.output_dir / "merged_features.parquet" |
|
|
|
placeholder.to_parquet(out_path, index=True) |
|
|
|
self.placeholder_created = True |
|
logger.info(f"Created placeholder Santiment merged_features.parquet with 0 rows at {out_path}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to create placeholder Santiment file: {e}") |
|
return False |
|
|
|
|
|
dataframes = [] |
|
|
|
for file_path in parquet_files: |
|
try: |
|
df = self.load_and_standardize_dataframe(file_path) |
|
if df is not None: |
|
dataframes.append(df) |
|
self.processing_stats['files_processed'] += 1 |
|
self.processing_stats['total_records'] += len(df) |
|
else: |
|
self.processing_stats['files_failed'] += 1 |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to process {file_path.name}: {e}") |
|
self.processing_stats['files_failed'] += 1 |
|
|
|
if not dataframes: |
|
logger.error("No dataframes were successfully loaded") |
|
return False |
|
|
|
|
|
logger.info("Starting merge process...") |
|
self.merged_data = self.merge_dataframes_by_slug_datetime(dataframes) |
|
|
|
if self.merged_data.empty: |
|
logger.error("Merged dataframe is empty") |
|
return False |
|
|
|
|
|
self.processing_stats['date_range'] = { |
|
'start': str(self.merged_data.index.min()), |
|
'end': str(self.merged_data.index.max()), |
|
'total_days': (self.merged_data.index.max() - self.merged_data.index.min()).days |
|
} |
|
|
|
logger.info("All files processed successfully") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to process files: {e}") |
|
return False |
|
|
|
def save_merged_features(self, filename: str = "merged_features.parquet") -> bool: |
|
""" |
|
Save the merged features to a parquet file with comprehensive null filling |
|
|
|
Args: |
|
filename: Output filename |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
if self.merged_data is None or self.merged_data.empty: |
|
logger.error("No merged data to save") |
|
return False |
|
|
|
try: |
|
output_path = self.output_dir / filename |
|
|
|
|
|
cleaned_df = self.merged_data.copy() |
|
|
|
|
|
null_columns = cleaned_df.columns[cleaned_df.isnull().all()].tolist() |
|
if null_columns: |
|
logger.info(f"Removing {len(null_columns)} completely null columns: {null_columns}") |
|
cleaned_df = cleaned_df.dropna(axis=1, how='all') |
|
|
|
|
|
logger.info("Applying comprehensive null filling...") |
|
cleaned_df = self.fill_missing_values(cleaned_df) |
|
|
|
|
|
columns_to_remove = ['metric', 'source_file', 'category', 'development_alternative_slug_used'] |
|
existing_cols_to_remove = [col for col in columns_to_remove if col in cleaned_df.columns] |
|
if existing_cols_to_remove: |
|
logger.info(f"Removing unwanted columns: {existing_cols_to_remove}") |
|
cleaned_df = cleaned_df.drop(columns=existing_cols_to_remove) |
|
|
|
|
|
logger.info("Ensuring all slugs are in uppercase format...") |
|
cleaned_df['slug'] = cleaned_df['slug'].apply(lambda x: x.upper() if isinstance(x, str) else x) |
|
|
|
|
|
logger.info("Fixing data types for parquet compatibility...") |
|
for col in cleaned_df.columns: |
|
if cleaned_df[col].dtype == 'object': |
|
|
|
sample_values = cleaned_df[col].dropna().head(100) |
|
if len(sample_values) > 0: |
|
|
|
try: |
|
pd.to_numeric(sample_values, errors='raise') |
|
|
|
cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors='coerce') |
|
logger.debug(f"Converted {col} to numeric") |
|
except (ValueError, TypeError): |
|
|
|
cleaned_df[col] = cleaned_df[col].astype(str) |
|
logger.debug(f"Converted {col} to string") |
|
|
|
|
|
cleaned_df = cleaned_df.sort_index() |
|
cleaned_df = cleaned_df.sort_values(['slug'], kind='mergesort') |
|
|
|
|
|
remaining_nulls = cleaned_df.isnull().sum().sum() |
|
if remaining_nulls > 0: |
|
logger.warning(f"Warning: {remaining_nulls} null values remain after filling") |
|
|
|
null_cols = cleaned_df.columns[cleaned_df.isnull().any()].tolist() |
|
logger.warning(f"Columns with remaining nulls: {null_cols}") |
|
else: |
|
logger.info("✓ All null values successfully filled") |
|
|
|
|
|
try: |
|
cleaned_df.to_parquet(output_path, compression='snappy') |
|
except Exception as parquet_error: |
|
logger.error(f"Parquet save failed: {parquet_error}") |
|
|
|
logger.info("Analyzing columns for parquet compatibility...") |
|
for col in cleaned_df.columns: |
|
try: |
|
test_df = cleaned_df[[col]].copy() |
|
test_df.to_parquet(output_path.with_suffix('.test.parquet')) |
|
output_path.with_suffix('.test.parquet').unlink() |
|
except Exception as col_error: |
|
logger.error(f"Column {col} causing issues: {col_error}") |
|
|
|
cleaned_df[col] = cleaned_df[col].astype(str) |
|
logger.info(f"Converted problematic column {col} to string") |
|
|
|
|
|
cleaned_df.to_parquet(output_path, compression='snappy') |
|
|
|
logger.info(f"Merged features saved to {output_path}") |
|
logger.info(f"Final dataset: {len(cleaned_df)} records, {len(cleaned_df.columns)} columns") |
|
logger.info(f"Data completeness: {100 - (remaining_nulls / (len(cleaned_df) * len(cleaned_df.columns)) * 100):.2f}%") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to save merged features: {e}") |
|
return False |
|
|
|
def generate_summary_report(self) -> Dict: |
|
""" |
|
Generate a comprehensive summary report |
|
|
|
Returns: |
|
Summary dictionary |
|
""" |
|
summary = { |
|
'processing_timestamp': datetime.now().isoformat(), |
|
'files_statistics': { |
|
'files_found': self.processing_stats['files_found'], |
|
'files_processed': self.processing_stats['files_processed'], |
|
'files_failed': self.processing_stats['files_failed'], |
|
'success_rate': f"{(self.processing_stats['files_processed'] / max(1, self.processing_stats['files_found'])) * 100:.1f}%" |
|
}, |
|
'data_statistics': { |
|
'total_records': self.processing_stats['total_records'], |
|
'unique_slugs': list(self.processing_stats['unique_slugs']), |
|
'categories_found': list(self.processing_stats['categories']), |
|
'date_range': self.processing_stats['date_range'] |
|
} |
|
} |
|
|
|
if self.merged_data is not None: |
|
summary['merged_statistics'] = { |
|
'final_records': len(self.merged_data), |
|
'final_columns': len(self.merged_data.columns), |
|
'memory_usage_mb': f"{self.merged_data.memory_usage(deep=True).sum() / 1024 / 1024:.2f}", |
|
'slug_distribution': self.merged_data['slug'].value_counts().to_dict(), |
|
'null_percentage': f"{(self.merged_data.isnull().sum().sum() / (len(self.merged_data) * len(self.merged_data.columns))) * 100:.2f}%" |
|
} |
|
|
|
return summary |
|
|
|
def print_summary(self): |
|
"""Print a comprehensive summary of the merge process""" |
|
summary = self.generate_summary_report() |
|
|
|
print("\n" + "="*60) |
|
print("SANTIMENT DATA MERGER SUMMARY") |
|
print("="*60) |
|
|
|
|
|
print(f"\nFile Processing:") |
|
print(f" Files found: {summary['files_statistics']['files_found']}") |
|
print(f" Files processed: {summary['files_statistics']['files_processed']}") |
|
print(f" Files failed: {summary['files_statistics']['files_failed']}") |
|
print(f" Success rate: {summary['files_statistics']['success_rate']}") |
|
|
|
|
|
print(f"\nData Overview:") |
|
print(f" Total records processed: {summary['data_statistics']['total_records']:,}") |
|
print(f" Unique assets (slugs): {len(summary['data_statistics']['unique_slugs'])}") |
|
print(f" Categories found: {', '.join(summary['data_statistics']['categories_found'])}") |
|
|
|
if summary['data_statistics']['date_range']: |
|
print(f" Date range: {summary['data_statistics']['date_range']['start']} to {summary['data_statistics']['date_range']['end']}") |
|
print(f" Total days: {summary['data_statistics']['date_range']['total_days']}") |
|
|
|
|
|
if 'merged_statistics' in summary: |
|
print(f"\nMerged Dataset:") |
|
print(f" Final records: {summary['merged_statistics']['final_records']:,}") |
|
print(f" Final columns: {summary['merged_statistics']['final_columns']}") |
|
print(f" Memory usage: {summary['merged_statistics']['memory_usage_mb']} MB") |
|
print(f" Data completeness: {100 - float(summary['merged_statistics']['null_percentage'].rstrip('%')):.1f}%") |
|
|
|
|
|
print(f"\nTop Assets by Record Count:") |
|
slug_dist = summary['merged_statistics']['slug_distribution'] |
|
for slug, count in list(slug_dist.items())[:5]: |
|
print(f" {slug}: {count:,} records") |
|
|
|
print("="*60) |
|
|
|
|
|
def main(): |
|
"""Main function to run the Santiment data merger""" |
|
logger.info("Starting Santiment Data Merger...") |
|
|
|
|
|
merger = SantimentDataMerger( |
|
source_dir="data/santiment", |
|
output_dir="data/santiment", |
|
time_tolerance_hours=1 |
|
) |
|
|
|
try: |
|
|
|
success = merger.process_all_files() |
|
|
|
if not success: |
|
logger.error("Failed to process Santiment files") |
|
return False |
|
|
|
|
|
if merger.placeholder_created: |
|
logger.info("Placeholder Santiment dataset created; skipping save and summary.") |
|
return True |
|
|
|
|
|
save_success = merger.save_merged_features("merged_features.parquet") |
|
|
|
if not save_success: |
|
logger.error("Failed to save merged features") |
|
return False |
|
|
|
|
|
merger.print_summary() |
|
|
|
|
|
summary = merger.generate_summary_report() |
|
summary_path = Path("data/santiment") / "merge_summary.json" |
|
|
|
import json |
|
with open(summary_path, 'w') as f: |
|
json.dump(summary, f, indent=2, default=str) |
|
|
|
logger.info(f"Summary report saved to {summary_path}") |
|
logger.info("Santiment data merge completed successfully!") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Santiment data merge failed: {e}") |
|
return False |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|