import json import pandas as pd from datetime import datetime import numpy as np import os def parse_news_data(file_path): """Parse the news data file containing multiple JSON objects per line""" news_data = [] with open(file_path, 'r') as f: content = f.read() # Split by newlines and parse each JSON object lines = content.strip().split('\n') for line in lines: if line.strip(): try: news_item = json.loads(line) news_data.append(news_item) except json.JSONDecodeError as e: print(f"Error parsing line: {line[:100]}...") print(f"Error: {e}") continue return news_data def extract_sentiment_features(news_data): """Extract sentiment features from news data for each symbol""" sentiment_features = {} for article in news_data: # Get article-level info published_at = article.get('published_at') title = article.get('title', '') description = article.get('description', '') # Process entities (stocks mentioned in the article) entities = article.get('entities', []) for entity in entities: if entity.get('type') == 'equity': symbol = entity.get('symbol', '').lower() # Convert to lowercase if symbol: if symbol not in sentiment_features: sentiment_features[symbol] = { 'news_sentiment_scores': [], 'news_match_scores': [], 'news_mentions_count': 0, 'news_articles_count': 0, 'latest_news_timestamp': None, 'news_highlights_count': 0 } # Add sentiment and match scores sentiment_score = entity.get('sentiment_score') match_score = entity.get('match_score') if sentiment_score is not None: sentiment_features[symbol]['news_sentiment_scores'].append(sentiment_score) if match_score is not None: sentiment_features[symbol]['news_match_scores'].append(match_score) # Count highlights highlights = entity.get('highlights', []) sentiment_features[symbol]['news_highlights_count'] += len(highlights) # Update latest timestamp if published_at: if (sentiment_features[symbol]['latest_news_timestamp'] is None or published_at > sentiment_features[symbol]['latest_news_timestamp']): sentiment_features[symbol]['latest_news_timestamp'] = published_at sentiment_features[symbol]['news_mentions_count'] += 1 # Count unique articles per symbol mentioned_symbols = set(entity.get('symbol', '').lower() for entity in entities if entity.get('type') == 'equity' and entity.get('symbol')) for symbol in mentioned_symbols: if symbol in sentiment_features: sentiment_features[symbol]['news_articles_count'] += 1 return sentiment_features def aggregate_sentiment_features(sentiment_data): """Aggregate sentiment features into final metrics""" aggregated = {} for symbol, data in sentiment_data.items(): # Calculate aggregated metrics sentiment_scores = data['news_sentiment_scores'] match_scores = data['news_match_scores'] features = { 'news_sentiment_mean': np.mean(sentiment_scores) if sentiment_scores else None, 'news_sentiment_std': np.std(sentiment_scores) if len(sentiment_scores) > 1 else None, 'news_sentiment_min': np.min(sentiment_scores) if sentiment_scores else None, 'news_sentiment_max': np.max(sentiment_scores) if sentiment_scores else None, 'news_match_score_mean': np.mean(match_scores) if match_scores else None, 'news_match_score_max': np.max(match_scores) if match_scores else None, 'news_mentions_count': data['news_mentions_count'], 'news_articles_count': data['news_articles_count'], 'news_highlights_count': data['news_highlights_count'], 'latest_news_timestamp': data['latest_news_timestamp'], 'news_sentiment_range': (np.max(sentiment_scores) - np.min(sentiment_scores)) if len(sentiment_scores) > 0 else None, 'news_activity_score': data['news_mentions_count'] * np.mean(match_scores) if match_scores else 0 } aggregated[symbol] = features return aggregated def merge_with_existing_features(news_features, existing_features_file): """Merge news features with existing market data features""" # Load existing features if existing_features_file.endswith('.parquet'): df_existing = pd.read_parquet(existing_features_file) else: df_existing = pd.read_csv(existing_features_file) print(f"Loaded existing features: {df_existing.shape}") print(f"News features available for {len(news_features)} symbols") # Add news features as new columns news_columns = [ 'news_sentiment_mean', 'news_sentiment_std', 'news_sentiment_min', 'news_sentiment_max', 'news_match_score_mean', 'news_match_score_max', 'news_mentions_count', 'news_articles_count', 'news_highlights_count', 'latest_news_timestamp', 'news_sentiment_range', 'news_activity_score' ] # Initialize all news columns with NaN for col in news_columns: df_existing[col] = np.nan # Fill in news features where available symbols_matched = 0 for idx, row in df_existing.iterrows(): symbol = row['symbol'] if symbol in news_features: for col in news_columns: # The keys in news_features already have the correct names df_existing.loc[idx, col] = news_features[symbol].get(col, None) symbols_matched += 1 print(f"Matched news features for {symbols_matched} symbols out of {len(df_existing)} total records") return df_existing def main(): # Configuration # Use Marketaux parquet file for news data news_file = os.path.join('data', 'marketaux', 'news', 'news_latest.parquet') existing_features_file = os.path.join('data', 'merged', 'features', 'merged_features.parquet') output_file = os.path.join('data', 'merged', 'features', 'merged_features.parquet') # Check if news file exists if not os.path.exists(news_file): print(f"WARNING: News file not found: {news_file}") print("This usually happens when MarketAux API keys are exhausted.") print("Skipping news sentiment merge and keeping existing features unchanged.") # Just copy existing features if they exist if os.path.exists(existing_features_file): import shutil shutil.copy2(existing_features_file, output_file) print(f"Copied existing features to output: {output_file}") else: print(f"WARNING: No existing features file found at {existing_features_file}") return print("Step 1: Loading news data from parquet...") try: news_df = pd.read_parquet(news_file) news_data = news_df.to_dict(orient='records') print(f"Loaded {len(news_data)} news articles from {news_file}") except Exception as e: print(f"ERROR: Failed to load news data: {e}") print("Skipping news sentiment merge.") # Copy existing features as fallback if os.path.exists(existing_features_file): import shutil shutil.copy2(existing_features_file, output_file) print(f"Copied existing features to output: {output_file}") return print("Step 2: Extracting sentiment features...") sentiment_data = extract_sentiment_features(news_data) print(f"Extracted sentiment data for {len(sentiment_data)} symbols") print("Step 3: Aggregating sentiment metrics...") news_features = aggregate_sentiment_features(sentiment_data) # Display sample of extracted features print("\nSample of extracted news features:") for symbol, features in list(news_features.items())[:3]: print(f"\n{symbol.upper()}:") for key, value in features.items(): if value is not None: if isinstance(value, float): print(f" {key}: {value:.4f}") else: print(f" {key}: {value}") print(f"\nStep 4: Merging with existing features...") try: merged_df = merge_with_existing_features(news_features, existing_features_file) # Remove 'links.pulsex' column if present if 'links.pulsex' in merged_df.columns: merged_df = merged_df.drop(columns=['links.pulsex']) print(f"Step 5: Saving merged features...") merged_df.to_parquet(output_file, index=False) print(f"Saved merged features to {output_file}") print(f"Final dataset shape: {merged_df.shape}") # Show summary of news feature coverage news_cols = [col for col in merged_df.columns if col.startswith('news_')] print(f"\nNews feature coverage:") for col in news_cols: non_null_count = merged_df[col].notna().sum() coverage = non_null_count / len(merged_df) * 100 print(f" {col}: {non_null_count}/{len(merged_df)} ({coverage:.1f}%)") except Exception as e: print(f"Error during merging: {e}") print("Make sure your merged_features.parquet file exists and is accessible") if __name__ == "__main__": main()