File size: 16,513 Bytes
c49b21b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def normalize_stock_data(df_stocks):
    """
    Normalize stock data to ensure consistent format for merging.
    """
    logger.info("=== NORMALIZING STOCK DATA ===")
    df_stocks = df_stocks.copy()
    
    # Normalize symbol to uppercase and strip whitespace
    df_stocks['symbol'] = df_stocks['symbol'].astype(str).str.upper().str.strip()
    
    # Ensure interval_timestamp is int64 (Unix timestamp in milliseconds)
    if 'interval_timestamp' in df_stocks.columns:
        # If it's already numeric, ensure it's int64
        df_stocks['interval_timestamp'] = pd.to_numeric(df_stocks['interval_timestamp'], errors='coerce').astype('int64')
        logger.info(f"Stock timestamp range: {df_stocks['interval_timestamp'].min()} to {df_stocks['interval_timestamp'].max()}")
        logger.info(f"Stock timestamp sample: {df_stocks['interval_timestamp'].head().tolist()}")
    
    logger.info(f"Stock symbols sample: {df_stocks['symbol'].unique()[:10].tolist()}")
    logger.info(f"Stock data shape: {df_stocks.shape}")
    
    return df_stocks

def normalize_news_data(df_news):
    """
    Normalize news data to ensure consistent format for merging.
    """
    logger.info("=== NORMALIZING NEWS DATA ===")
    df_news = df_news.copy()
    
    # Extract entities and create individual records
    news_records = []
    
    for idx, row in df_news.iterrows():
        entities = row.get('entities', [])
        
        # Only proceed if entities is a non-empty list or ndarray
        if not isinstance(entities, (list, np.ndarray)) or len(entities) == 0:
            continue
            
        # Convert published_at to timestamp
        try:
            if isinstance(row['published_at'], str):
                published_dt = pd.to_datetime(row['published_at'])
            else:
                published_dt = row['published_at']
        except:
            logger.warning(f"Could not parse published_at for row {idx}")
            continue
        
        # Process each entity
        for entity in entities:
            if not isinstance(entity, dict):
                continue
                
            # Only process equity type entities with symbols
            if entity.get('type') == 'equity' and 'symbol' in entity:
                symbol = str(entity['symbol']).upper().strip()
                
                # Create 30-minute intervals (matching your stock data)
                interval_dt = published_dt.floor('30min')
                # Convert to Unix timestamp in milliseconds
                interval_timestamp = int(interval_dt.timestamp() * 1000)
                
                news_records.append({
                    'symbol': symbol,
                    'interval_timestamp': interval_timestamp,
                    'published_at': published_dt,
                    'sentiment_score': entity.get('sentiment_score', 0),
                    'match_score': entity.get('match_score', 0),
                    'highlights_count': len(entity.get('highlights', [])),
                    'news_uuid': row.get('uuid', ''),
                    'news_title': row.get('title', ''),
                    'news_source': row.get('source', ''),
                    'relevance_score': row.get('relevance_score', 0)
                })
    
    if not news_records:
        logger.warning("No valid news records found")
        return pd.DataFrame()
    
    df_news_normalized = pd.DataFrame(news_records)
    logger.info(f"Normalized news data shape: {df_news_normalized.shape}")
    # Print columns that are completely null and those that aren't
    null_columns = [col for col in df_news_normalized.columns if df_news_normalized[col].isnull().all()]
    not_null_columns = [col for col in df_news_normalized.columns if not df_news_normalized[col].isnull().all()]
    print(f"Completely null columns: {null_columns}")
    print(f"Non-null columns: {not_null_columns}")
    logger.info(f"News symbols sample: {df_news_normalized['symbol'].unique()[:10].tolist()}")
    logger.info(f"News timestamp range: {df_news_normalized['interval_timestamp'].min()} to {df_news_normalized['interval_timestamp'].max()}")
    logger.info(f"News timestamp sample: {df_news_normalized['interval_timestamp'].head().tolist()}")
    
    return df_news_normalized

def find_nearest_timestamp_matches(df_stocks, df_news, time_tolerance_minutes=30):
    """
    Find the nearest timestamp matches within a tolerance window.
    This handles cases where timestamps don't align exactly.
    """
    logger.info(f"=== FINDING NEAREST TIMESTAMP MATCHES (tolerance: {time_tolerance_minutes} min) ===")
    
    if df_news.empty:
        return df_stocks.assign(**{col: 0 for col in [
            'news_sentiment_mean', 'news_sentiment_std', 'news_sentiment_min', 'news_sentiment_max',
            'news_match_score_mean', 'news_match_score_max', 'news_highlights_count',
            'news_articles_count', 'latest_news_timestamp', 'news_sentiment_range',
            'news_activity_score', 'news_mentions_count'
        ]})
    
    # Convert tolerance to milliseconds
    tolerance_ms = time_tolerance_minutes * 60 * 1000
    
    # Get unique combinations for efficient processing
    stock_keys = df_stocks[['symbol', 'interval_timestamp']].drop_duplicates()
    
    matched_records = []
    
    for _, stock_row in stock_keys.iterrows():
        symbol = stock_row['symbol']
        stock_timestamp = stock_row['interval_timestamp']
        
        # Find news for this symbol
        symbol_news = df_news[df_news['symbol'] == symbol].copy()
        
        if symbol_news.empty:
            continue
        
        # Calculate time differences
        symbol_news['time_diff'] = abs(symbol_news['interval_timestamp'] - stock_timestamp)
        
        # Filter within tolerance
        nearby_news = symbol_news[symbol_news['time_diff'] <= tolerance_ms]
        
        if nearby_news.empty:
            continue
        
        # Aggregate the nearby news
        agg_data = {
            'symbol': symbol,
            'interval_timestamp': stock_timestamp,
            'news_sentiment_mean': nearby_news['sentiment_score'].mean(),
            'news_sentiment_std': nearby_news['sentiment_score'].std(),
            'news_sentiment_min': nearby_news['sentiment_score'].min(),
            'news_sentiment_max': nearby_news['sentiment_score'].max(),
            'news_match_score_mean': nearby_news['match_score'].mean(),
            'news_match_score_max': nearby_news['match_score'].max(),
            'news_highlights_count': nearby_news['highlights_count'].sum(),
            'news_articles_count': len(nearby_news),
            'latest_news_timestamp': nearby_news['published_at'].max(),
            'news_mentions_count': len(nearby_news)
        }
        
        # Calculate additional features
        agg_data['news_sentiment_range'] = agg_data['news_sentiment_max'] - agg_data['news_sentiment_min']
        agg_data['news_activity_score'] = agg_data['news_match_score_mean'] + agg_data['news_match_score_max']
        
        # Fill NaN values
        for key, value in agg_data.items():
            if pd.isna(value) and key not in ['symbol', 'interval_timestamp', 'latest_news_timestamp']:
                agg_data[key] = 0
        
        matched_records.append(agg_data)
    
    if matched_records:
        df_matched_news = pd.DataFrame(matched_records)
        logger.info(f"Found {len(df_matched_news)} symbol-timestamp matches")
        
        # Merge with stock data
        df_result = df_stocks.merge(
            df_matched_news,
            on=['symbol', 'interval_timestamp'],
            how='left'
        )
    else:
        logger.warning("No timestamp matches found within tolerance")
        df_result = df_stocks.copy()
    
    # Fill remaining NaN values for stocks without news
    news_columns = [
        'news_sentiment_mean', 'news_sentiment_std', 'news_sentiment_min', 'news_sentiment_max',
        'news_match_score_mean', 'news_match_score_max', 'news_highlights_count',
        'news_articles_count', 'news_sentiment_range', 'news_activity_score', 'news_mentions_count'
    ]
    
    for col in news_columns:
        if col in df_result.columns:
            df_result[col] = df_result[col].fillna(0)
    
    # Report results
    if 'news_articles_count' in df_result.columns:
        stocks_with_news = len(df_result[df_result['news_articles_count'] > 0])
        total_news_articles = df_result['news_articles_count'].sum()
        logger.info(f"Successfully matched news for {stocks_with_news} stock records out of {len(df_result)}")
        logger.info(f"Total news articles matched: {total_news_articles}")
    
    return df_result

def diagnose_data_alignment(df_stocks, df_news):
    """
    Diagnose alignment issues between stock and news data.
    """
    logger.info("=== DATA ALIGNMENT DIAGNOSIS ===")
    
    # Check symbol overlap
    stock_symbols = set(df_stocks['symbol'].unique()) if 'symbol' in df_stocks.columns else set()
    news_symbols = set(df_news['symbol'].unique()) if len(df_news) > 0 and 'symbol' in df_news.columns else set()
    
    common_symbols = stock_symbols.intersection(news_symbols)
    
    logger.info(f"Stock symbols: {len(stock_symbols)} unique")
    logger.info(f"News symbols: {len(news_symbols)} unique")
    logger.info(f"Common symbols: {len(common_symbols)}")
    logger.info(f"Common symbols sample: {list(common_symbols)[:10]}")
    
    # Check timestamp ranges
    if 'interval_timestamp' in df_stocks.columns:
        stock_ts_min = df_stocks['interval_timestamp'].min()
        stock_ts_max = df_stocks['interval_timestamp'].max()
        stock_ts_range = pd.to_datetime([stock_ts_min, stock_ts_max], unit='ms')
        logger.info(f"Stock timestamp range: {stock_ts_range[0]} to {stock_ts_range[1]}")
    
    if len(df_news) > 0 and 'interval_timestamp' in df_news.columns:
        news_ts_min = df_news['interval_timestamp'].min()
        news_ts_max = df_news['interval_timestamp'].max()
        news_ts_range = pd.to_datetime([news_ts_min, news_ts_max], unit='ms')
        logger.info(f"News timestamp range: {news_ts_range[0]} to {news_ts_range[1]}")
        
        # Check for timestamp overlap
        if 'interval_timestamp' in df_stocks.columns:
            overlap_start = max(stock_ts_min, news_ts_min)
            overlap_end = min(stock_ts_max, news_ts_max)
            if overlap_start <= overlap_end:
                overlap_range = pd.to_datetime([overlap_start, overlap_end], unit='ms')
                logger.info(f"Timestamp overlap: {overlap_range[0]} to {overlap_range[1]}")
            else:
                logger.warning("No timestamp overlap between stock and news data")

def parse_json_news_file(news_file_path):
    """
    Parse news file that contains JSON records (one per line or structured).
    """
    logger.info(f"Parsing news file: {news_file_path}")
    
    try:
        # Try reading as parquet first
        df_news = pd.read_parquet(news_file_path)
        logger.info(f"Successfully read parquet file with shape: {df_news.shape}")
        
        # Check if the data contains JSON strings that need parsing
        if len(df_news.columns) == 1 and df_news.iloc[0, 0] and isinstance(df_news.iloc[0, 0], str):
            logger.info("Detected JSON strings in single column, parsing...")
            json_records = []
            for idx, row in df_news.iterrows():
                try:
                    json_data = json.loads(row.iloc[0])
                    json_records.append(json_data)
                except json.JSONDecodeError as e:
                    logger.warning(f"Failed to parse JSON at row {idx}: {e}")
                    continue
            
            if json_records:
                df_news = pd.DataFrame(json_records)
                logger.info(f"Parsed {len(json_records)} JSON records")
        
        return df_news
        
    except Exception as e:
        logger.error(f"Error reading news file: {e}")
        return pd.DataFrame()

def main(stocks_file_path, news_file_path, output_file_path, time_tolerance_minutes=30):
    """
    Main function to normalize and merge stock and news data.
    """
    try:
        logger.info("=== STARTING DATA NORMALIZATION AND MERGE ===")
        
        # Step 1: Load stock data
        logger.info("Step 1: Loading stock data...")
        df_stocks = pd.read_parquet(stocks_file_path)
        logger.info(f"Loaded stock data with shape: {df_stocks.shape}")
        
        # Step 2: Load and parse news data
        logger.info("Step 2: Loading news data...")
        df_news_raw = parse_json_news_file(news_file_path)
        
        if df_news_raw.empty:
            logger.warning("No news data found, creating stock data with empty news columns")
            df_stocks = normalize_stock_data(df_stocks)
            # Add empty news columns
            for col in ['news_sentiment_mean', 'news_sentiment_std', 'news_sentiment_min', 
                       'news_sentiment_max', 'news_match_score_mean', 'news_match_score_max',
                       'news_highlights_count', 'news_articles_count', 'latest_news_timestamp',
                       'news_sentiment_range', 'news_activity_score', 'news_mentions_count']:
                df_stocks[col] = 0 if col != 'latest_news_timestamp' else None
            df_stocks.to_parquet(output_file_path, index=False)
            logger.info("Saved stock data with empty news columns")
            return df_stocks
        
        # Step 3: Normalize both datasets
        logger.info("Step 3: Normalizing stock data...")
        df_stocks_norm = normalize_stock_data(df_stocks)
        
        logger.info("Step 4: Normalizing news data...")
        df_news_norm = normalize_news_data(df_news_raw)
        
        # Step 5: Diagnose alignment
        logger.info("Step 5: Diagnosing data alignment...")
        diagnose_data_alignment(df_stocks_norm, df_news_norm)
        
        # Step 6: Find nearest timestamp matches and merge
        logger.info("Step 6: Finding nearest timestamp matches and merging...")
        df_merged = find_nearest_timestamp_matches(
            df_stocks_norm, 
            df_news_norm, 
            time_tolerance_minutes=time_tolerance_minutes
        )
        
        # Step 7: Save results
        logger.info("Step 7: Saving merged data...")
        df_merged.to_parquet(output_file_path, index=False)
        logger.info(f"Saved merged data to {output_file_path}")
        
        # Final report
        logger.info("=== MERGE COMPLETED ===")
        logger.info(f"Final dataset shape: {df_merged.shape}")
        
        news_cols = [col for col in df_merged.columns if col.startswith('news_')]
        logger.info(f"News columns added: {len(news_cols)}")
        
        if 'news_articles_count' in df_merged.columns:
            total_articles = df_merged['news_articles_count'].sum()
            records_with_news = len(df_merged[df_merged['news_articles_count'] > 0])
            logger.info(f"Total news articles merged: {total_articles}")
            logger.info(f"Stock records with news: {records_with_news} / {len(df_merged)}")
        
        return df_merged
        
    except Exception as e:
        logger.error(f"Error in main process: {e}")
        import traceback
        logger.error(traceback.format_exc())
        raise

# Example usage
if __name__ == "__main__":
    import os
    
    # Update these paths to match your actual file locations
    base_dir = "data/"  # Update this
    stocks_file = os.path.join(base_dir, "merged/features/stocks_features.parquet")
    news_file = os.path.join(base_dir, "marketaux/news/news_latest.parquet")
    output_file = os.path.join(base_dir, "merged/features/stocks_features.parquet")

    # Check if stocks_features.parquet exists before running
    if not os.path.exists(stocks_file):
        logger.error(f"Input file missing: {stocks_file}")
        print(f"ERROR: Input file missing: {stocks_file}")
        exit(1)

    # Run the merge with 30-minute tolerance (adjust as needed)
    df_result = main(
        stocks_file_path=stocks_file,
        news_file_path=news_file,
        output_file_path=output_file,
        time_tolerance_minutes=60*24 # Adjust this based on your needs
    )