# pylint: disable=too-many-lines """ LDA controller for topic modeling and visualization. """ import re import logging from datetime import datetime, timedelta from collections import Counter, defaultdict import numpy as np from nltk.tokenize import word_tokenize from nltk.stem import WordNetLemmatizer from nltk.corpus import stopwords import torch from transformers import AutoTokenizer, AutoModelForSequenceClassification from sklearn.metrics import silhouette_score from gensim import corpora from gensim.models import LdaModel from gensim.models.coherencemodel import CoherenceModel import requests from langchain.schema import SystemMessage, HumanMessage from models.llm import gpt # pylint: disable=import-error from .keyword import FIN_KEYWORDS from .keyword_analysis import get_time_range, get_previous_time_range # Configure logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) class FinancialKeywordManager: """ Manages both static predefined financial keywords and dynamically fetched trending keywords. This class handles the fetching and combining of financial keywords from different sources. It provides mechanisms to fetch trending financial terms from external APIs and combine them with a predefined list of static financial keywords. Attributes: static_keywords (set): Predefined set of financial keywords trending_keywords (set): Dynamically fetched trending financial terms """ def __init__(self, static_keywords): """Initialize the FinancialKeywordManager with a set of static keywords.""" self.static_keywords = set(static_keywords) self.trending_keywords = set() def fetch_trending_keywords(self, days_back=30): """ Fetch trending keywords from external APIs Args: days_back (int): Number of days to look back for trending keywords Returns: None: Updates the trending_keywords set internally """ logger.info("Fetching trending keywords from last %d days...", days_back) trending = set() trending.update(self.fetch_from_eodhd(days_back)) self.trending_keywords = trending def _is_valid_keyword(self, keyword: str) -> bool: """ Applies a series of rules to determine if a keyword is a valid, non-noise financial term. This function is designed to filter out stock tickers, short codes, and common junk. Args: keyword (str): The keyword to validate. Returns: bool: True if the keyword is valid, False otherwise. """ # Rule 1: Basic length check if not 3 <= len(keyword) <= 30: return False # Rule 2: Exclude keywords that are purely numeric (e.g., "2024") if keyword.isdigit(): return False # Rule 3: Exclude stock tickers (e.g., 'aapl', 'tsla', 'googl') # Tickers are typically 1-5 characters, all lowercase or all uppercase. if len(keyword) <= 5 and (keyword.islower() or keyword.isupper()) and keyword.isalpha(): # We check if it's NOT in our static keywords to avoid filtering out valid short words. if keyword not in self.static_keywords: return False # Rule 4: Exclude keywords containing digits unless it's a known pattern (e.g., s&p 500) # This removes terms like 'q1-2024', '10k'. if any(char.isdigit() for char in keyword): # Allow specific patterns that are known to be valid if not re.search(r'(s&p 500|p/e|\d{2,4}q\d|\d{1,2}-k)', keyword): return False # Rule 5: Check against an expanded list of common junk words and stop words expanded_junk_terms = { 'announces', 'reports', 'says', 'inc', 'corp', 'ltd', 'company', 'llc', 'plc', 'what', 'why', 'how', 'when', 'where', 'who', 'you', 'your', 'our', 'the', 'and', 'for', 'with', 'that', 'this', 'news', 'article', 'today', 'new', 'all', 'from', 'shares', 'stock', 'investor', 'market', 'business', 'daily', 'update', 'alert', 'breaking', 'story', 'post', 'view', 'click', 'here', 'details' } if keyword in expanded_junk_terms: return False # If all checks pass, the keyword is considered valid return True def fetch_from_eodhd(self, days_back=30): # pylint: disable=too-many-locals, too-many-branches """ Fetch trending financial keywords from EODHD API. This method queries the EODHD news API to retrieve financial news articles, extracts keywords from various metadata fields, and filters them based on frequency and quality criteria. Args: days_back (int): Number of days to look back for news articles Returns: set: Set of trending financial keywords """ API_TOKEN = ' 685e0d635924b8.73169691' # pylint: disable=invalid-name base_url = 'https://eodhd.com/api/news' end_date = datetime.now() start_date = end_date - timedelta(days=days_back) params = { 'api_token': API_TOKEN, 'from': start_date.strftime('%Y-%m-%d'), 'to': end_date.strftime('%Y-%m-%d'), 'limit': 1000, 'tag': ('finance,economy,market,business,banking,stocks,trading,investment,forex,' 'cryptocurrency,bonds,commodities,inflation,recession,gdp,fed,treasury,' 'interest,rates,earnings,ipo,merger,acquisition,economic-data,' 'central-bank,monetary-policy,fiscal-policy') } try: # pylint: disable=too-many-nested-blocks response = requests.get(base_url, params=params, timeout=60) if response.status_code == 200: articles = response.json() keyword_counts = {} for article in articles: metadata_fields = ['tags', 'keywords', 'topics', 'entities', 'concepts'] # Extract keywords from various metadata fields for field in metadata_fields: field_data = article.get(field, []) if not field_data: continue keywords_to_process = [] if isinstance(field_data, list): keywords_to_process = [kw for kw in field_data if isinstance(kw, str)] elif isinstance(field_data, str): keywords_to_process = [ kw.strip() for kw in field_data.split(',') if kw.strip()] for kw in keywords_to_process: clean_kw = kw.lower().strip() if (3 <= len(clean_kw) <= 25 and not (kw.isupper() and len(kw) <= 4) and not clean_kw.startswith('http') and (' ' not in clean_kw or len(clean_kw.split()) <= 3)): keyword_counts[clean_kw] = keyword_counts.get(clean_kw, 0) + 1 title = article.get('title', '') if title: quoted_terms = re.findall(r'"([^"]{3,20})"', title) for term in quoted_terms: clean_term = term.lower().strip() if 3 <= len(clean_term) <= 20: keyword_counts[clean_term] = keyword_counts.get(clean_term, 0) + 1 company_pattern = ( r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\s+' r'(?:Bank|Corp|Financial|Capital|Fund|Securities|Group|Holdings)' ) companies = re.findall(company_pattern, title) for company in companies: clean_company = company.strip().lower() if len(clean_company) > 3: keyword_counts[clean_company] = keyword_counts.get( clean_company, 0) + 1 min_frequency = 2 filtered_keywords = {} for kw, count in keyword_counts.items(): if count >= min_frequency and self._is_valid_keyword(kw): filtered_keywords[kw] = count trending_keywords = set(filtered_keywords.keys()) if len(trending_keywords) > 0: # Get the top keywords from the filtered list top_trending = sorted( filtered_keywords.items(), key=lambda x: x[1], reverse=True)[:15] sample_keywords = [f"{kw}({count})" for kw, count in top_trending] logger.info("EODHD top trending examples: %s", sample_keywords) return trending_keywords except Exception as e: # pylint: disable=broad-exception-caught logger.error("EODHD API error: %s", e) return set() class HeatedKeywordsAnalyzer: # pylint: disable=too-many-instance-attributes """ This class handles the extraction of financial terms from articles, calculates heating scores, performs LDA topic clustering, and analyzes sentiment of financial terms. Attributes: finance_keywords (dict): Comprehensive dictionary of financial keywords by category all_finance_keywords (set): Flattened set of all financial keywords keyword_manager (FinancialKeywordManager): Manager for static and trending keywords lemmatizer (WordNetLemmatizer): NLTK lemmatizer for word normalization stop_words (set): Stop words for text processing sentiment_tokenizer (AutoTokenizer): FinBERT tokenizer for sentiment analysis sentiment_model (AutoModelForSequenceClassification): FinBERT model for sentiment analysis vectorizer (CountVectorizer): Text vectorizer for LDA lda_model (LatentDirichletAllocation): LDA model for topic clustering feature_names (ndarray): Feature names from vectorizer """ def __init__(self, article_collection, keyword_collection): logger.info("Initializing HeatedKeywordsAnalyzer...") self.article_collection = article_collection self.keyword_collection = keyword_collection self.finance_keywords = FIN_KEYWORDS self.all_finance_keywords = set() for category_keywords in self.finance_keywords.values(): self.all_finance_keywords.update(category_keywords) self.keyword_manager = FinancialKeywordManager(self.all_finance_keywords) self.lemmatizer = WordNetLemmatizer() self.stop_words = set(stopwords.words('english')) self.stop_words.update(['said', 'says', 'according', 'report', 'news', 'article']) self.sentiment_tokenizer = None self.sentiment_model = None self._sentiment_initialized = False self.lda_model = None self.id2word = None self.topics_info = {} self.feature_names = None self.lda_model = None self.feature_names = None self.llm_model = gpt self.geographic_context = None # Will store the detected geographic context def _initialize_sentiment_model(self): """Lazy initialization of sentiment model.""" if not self._sentiment_initialized: logger.info("Initializing sentiment analysis model...") try: self.sentiment_tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone") self.sentiment_model = AutoModelForSequenceClassification.from_pretrained( "yiyanghkust/finbert-tone") self._sentiment_initialized = True logger.info("Sentiment analysis model initialized.") except Exception as e: # pylint: disable=broad-exception-caught logger.error("Failed to initialize sentiment models: %s", e) self._sentiment_initialized = True # Prevent retry def fetch_articles(self, start_date, end_date, sample_size=None): """ Fetch articles from MongoDB within a specified date range. Args: start_date (datetime): Start date for article retrieval end_date (datetime): End date for article retrieval Returns: list: List of document dictionaries with 'text', 'title', 'content', and 'date' fields """ logger.info("Fetching articles from %s to %s", start_date.date(), end_date.date()) if sample_size: pipeline = [ {"$match": { "publishDate": { "$gte": start_date.strftime("%Y-%m-%d"), "$lte": end_date.strftime("%Y-%m-%d") } }}, {"$sample": {"size": sample_size}} ] cursor = self.article_collection.aggregate(pipeline) else: cursor = self.article_collection.find({ "publishDate": { "$gte": start_date.strftime("%Y-%m-%d"), "$lte": end_date.strftime("%Y-%m-%d") } }) articles_text = [ (f"Title: {doc.get('title', '')}\n" f"Content: {doc.get('content', '')}\n" f"sentimentScore: {doc.get('sentimentScore', '')}") for doc in cursor] documents = [] for article_text in articles_text: lines = article_text.split('\n') title = "" content = "" sentiment_score = 0 for line in lines: if line.startswith('Title: '): title = line[7:] elif line.startswith('Content: '): content = line[9:] elif line.startswith('sentimentScore: '): sentiment_score = float(line[16:]) if title or content: full_text = f"{title} {content}".strip() if len(full_text) > 50: documents.append({ 'text': full_text, 'title': title, 'content': content, 'sentimentScore': sentiment_score, 'date': start_date.strftime("%Y-%m-%d") }) return documents def extract_financial_terms(self, text): # pylint: disable=too-many-locals, too-many-branches, too-many-statements """ Extract financial terms from text with advanced matching techniques. Args: text (str): Text to extract financial terms from Returns: list: List of tuples (term, category) of extracted financial terms """ text_lower = text.lower() found_terms = [] used_spans = [] # Build a map of term variations to canonical terms and categories term_variations_map = {} for category, terms in self.finance_keywords.items(): for canonical_term in terms: variations = set() # Handle multi-word terms if ' ' in canonical_term: term_words = canonical_term.split() variations.add(canonical_term) # Add lemmatized form lemmatized_words = [self.lemmatizer.lemmatize(word) for word in term_words] lemmatized_term = ' '.join(lemmatized_words) variations.add(lemmatized_term) # Add plural form if not already plural if not canonical_term.endswith('s'): plural_words = term_words[:-1] + [term_words[-1] + 's'] plural_term = ' '.join(plural_words) variations.add(plural_term) # Add singular form if plural if canonical_term.endswith('s') and len(term_words[-1]) > 1: singular_words = term_words[:-1] + [term_words[-1][:-1]] singular_term = ' '.join(singular_words) variations.add(singular_term) # Handle single-word terms else: variations.add(canonical_term) lemmatized_term = self.lemmatizer.lemmatize(canonical_term) variations.add(lemmatized_term) if not canonical_term.endswith('s'): plural_term = canonical_term + 's' variations.add(plural_term) if canonical_term.endswith('s') and len(canonical_term) > 1: singular_term = canonical_term[:-1] variations.add(singular_term) for variation in variations: if variation and len(variation.strip()) > 0: term_variations_map[variation] = (canonical_term, category) # First process multi-word terms (to avoid partial matches) multi_word_variations = {k: v for k, v in term_variations_map.items() if ' ' in k} sorted_multi_word = sorted(multi_word_variations.keys(), key=len, reverse=True) for variation in sorted_multi_word: canonical_term, category = multi_word_variations[variation] start = 0 while True: idx = text_lower.find(variation, start) if idx == -1: break # Check for word boundaries if ((idx == 0 or not text_lower[idx-1].isalpha()) and (idx + len(variation) == len(text_lower) or not text_lower[idx + len(variation)].isalpha())): # Check if this span overlaps with previously found terms span_covered = any( s <= idx < e or s < idx + len(variation) <= e for s, e in used_spans) if not span_covered: found_terms.append((canonical_term, category)) used_spans.append((idx, idx + len(variation))) start = idx + len(variation) # Then process single-word terms single_word_variations = {k: v for k, v in term_variations_map.items() if ' ' not in k} tokens = word_tokenize(text_lower) offset = 0 for token in tokens: token_start = text_lower.find(token, offset) token_end = token_start + len(token) # Check if this token overlaps with previously found terms if not any(s <= token_start < e for s, e in used_spans): if token in single_word_variations: canonical_term, category = single_word_variations[token] found_terms.append((canonical_term, category)) offset = token_end return found_terms def analyze_sentiment(self, text): """ Analyze sentiment of text using FinBERT model. Args: text (str): Text to analyze sentiment for Returns: tuple: (sentiment_label, confidence_score) Note: Sentiment label is one of 'positive', 'neutral', or 'negative' Confidence score is a float between 0.0 and 1.0 """ try: if not self._sentiment_initialized: self._initialize_sentiment_model() inputs = self.sentiment_tokenizer( text[:512], return_tensors="pt", truncation=True, max_length=512 ) with torch.no_grad(): outputs = self.sentiment_model(**inputs) predictions = torch.nn.functional.softmax(outputs.logits, dim=-1) labels = ['negative', 'neutral', 'positive'] scores = predictions[0].numpy() predicted_label = labels[np.argmax(scores)] confidence = float(np.max(scores)) return predicted_label, confidence except Exception as e: # pylint: disable=broad-exception-caught logger.error("Sentiment analysis error: %s", e) return 'neutral', 0.5 def calculate_heating_scores(self, current_docs, previous_docs, filter_type=None): # pylint: disable=too-many-locals """ Calculate heating scores and perform LDA clustering. Args: current_docs (list): List of document dictionaries for current period previous_docs (list): List of document dictionaries for previous period filter_type (str): Time filter type for minimum topic determination Returns: tuple: (term_frequencies, heating_scores, lda_results) - term_frequencies: Counter of term frequencies - heating_scores: Dict mapping terms to heating scores - lda_results: Dict with LDA clustering results """ # Extract terms from current documents current_terms = [] for doc in current_docs: doc_terms = [term for term, _ in self.extract_financial_terms(doc['text'])] current_terms.extend(doc_terms) current_freq = Counter(current_terms) heating_scores = {} # Calculate heating scores if previous documents exist if previous_docs and len(previous_docs) > 0: previous_terms = [] for doc in previous_docs: doc_terms = [term for term, _ in self.extract_financial_terms(doc['text'])] previous_terms.extend(doc_terms) previous_freq = Counter(previous_terms) total_current = sum(current_freq.values()) total_previous = sum(previous_freq.values()) if total_current > 0 and total_previous > 0: for term in current_freq: current_pct = (current_freq[term] / total_current) * 100 previous_pct = (previous_freq.get(term, 0) / total_previous) * 100 if previous_pct > 0: heating_score = ((current_pct - previous_pct) / previous_pct) * 100 else: heating_score = 100.0 if current_pct > 0 else 0.0 heating_scores[term] = heating_score else: for term in current_freq: heating_scores[term] = 0.0 else: for term in current_freq: heating_scores[term] = 0.0 lda_results = self.perform_lda_clustering(current_docs, filter_type=filter_type) return current_freq, heating_scores, lda_results def filter_topics_by_financial_keywords(self, topics_info): """ Filters and enhances topic keywords. """ all_financial_keywords = self.all_finance_keywords for _, topic_info in topics_info.items(): llm_name = topic_info.get('llm_name', 'Unnamed Topic') words = topic_info['words'] weights = topic_info['weights'] financial_terms = [] financial_weights = [] for word, weight in zip(words, weights): word_clean = word.replace('_', ' ').lower() is_financial = False if word_clean in all_financial_keywords: is_financial = True else: for financial_term in all_financial_keywords: if ' ' in financial_term and word_clean in financial_term.split(): is_financial = True break if word_clean in financial_term or financial_term in word_clean: is_financial = True break if is_financial: financial_terms.append(word.replace('_', ' ')) financial_weights.append(weight) if financial_terms: sorted_pairs = sorted( zip(financial_terms, financial_weights), key=lambda x: x[1], reverse=True ) topic_info['words'] = [term for term, _ in sorted_pairs[:10]] topic_info['weights'] = [weight for _, weight in sorted_pairs[:10]] topic_info['word_weight_pairs'] = sorted_pairs[:10] topic_info['financial_terms_count'] = len(financial_terms) else: topic_info['words'] = [word.replace('_', ' ') for word in words[:5]] topic_info['weights'] = weights[:5] topic_info['word_weight_pairs'] = list(zip( [word.replace('_', ' ') for word in words[:5]], weights[:5] )) topic_info['financial_terms_count'] = 0 topic_info['llm_name'] = llm_name return topics_info def preprocess_documents(self, documents): """ Preprocess documents for LDA analysis while preserving financial terms. This method performs text preprocessing specifically designed for financial text analysis, including preserving multi-word financial terms by replacing spaces with underscores. Args: documents (list): List of document dictionaries Returns: list: List of preprocessed text strings """ processed_texts = [] for doc in documents: text = doc['text'].lower() protected_text = text # Get all financial terms all_financial_terms = [] for category_terms in self.finance_keywords.values(): all_financial_terms.extend(category_terms) # Sort terms by length (descending) to prioritize longer matches sorted_terms = sorted(all_financial_terms, key=len, reverse=True) # Replace spaces in multi-word terms with underscores to preserve them for term in sorted_terms: if ' ' in term: term_pattern = term.replace(' ', r'\s+') underscore_term = term.replace(' ', '_') protected_text = re.sub( r'\b' + term_pattern + r'\b', underscore_term, protected_text ) # Remove non-alphabetic characters except underscores text = re.sub(r'[^a-zA-Z\s_]', '', protected_text) text = ' '.join(text.split()) processed_texts.append(text) return processed_texts def _gensim_to_dense_matrix(self, gensim_corpus, num_topics): """ Helper to convert gensim's sparse output to a dense numpy array for silhouette scoring. """ num_docs = len(gensim_corpus) dense_matrix = np.zeros((num_docs, num_topics)) for i, doc_topics in enumerate(gensim_corpus): for topic_id, prob in doc_topics: dense_matrix[i, topic_id] = prob return dense_matrix def find_optimal_topics_gensim( self, corpus, id2word, tokenized_texts, documents_count, filter_type=None ): # pylint: disable=too-many-locals """ Dynamically determines the optimal number of topics for a gensim model. """ # Define minimum topics by filter type min_topics_by_filter = { 'daily': 10, 'week': 12, 'month': 20 } # Get minimum topics for this filter type min_topics = min_topics_by_filter.get(filter_type, 2) if documents_count < 100: topic_range = range(max(6, min_topics), min(12, documents_count // 8)) elif documents_count< 300: topic_range = range(max(12, min_topics), min(20, documents_count // 10)) else: topic_range = range(max(20, min_topics), min(35, documents_count // 25)) if len(topic_range) < 2: return max(min_topics, min(3, documents_count)) logger.info("Testing %d topic configurations with gensim...", len(topic_range)) perplexities, coherence_scores, silhouette_scores = [], [], [] for n_topics in topic_range: try: lda_model = LdaModel(corpus=corpus, id2word=id2word, num_topics=n_topics, random_state=42, passes=10, alpha='auto', eta='auto') # Metric 1: Perplexity perplexities.append(lda_model.log_perplexity(corpus)) # Metric 2: Coherence coherence_model = CoherenceModel(model=lda_model, texts=tokenized_texts, dictionary=id2word, coherence='c_v') coherence_scores.append(coherence_model.get_coherence()) # Metric 3: Silhouette doc_topic_matrix_sparse = lda_model.get_document_topics( corpus, minimum_probability=0) doc_topic_matrix_dense = self._gensim_to_dense_matrix( doc_topic_matrix_sparse, n_topics) if len(set(np.argmax(doc_topic_matrix_dense, axis=1))) > 1: silhouette_scores.append(silhouette_score( doc_topic_matrix_dense, np.argmax(doc_topic_matrix_dense, axis=1))) else: silhouette_scores.append(0.0) except Exception as e: # pylint: disable=broad-exception-caught logger.warning("Error evaluating %d topics: %s", n_topics, e) perplexities.append(float('inf')) coherence_scores.append(0.0) silhouette_scores.append(0.0) if not all([perplexities, coherence_scores, silhouette_scores]): return max(2, min(5, documents_count // 10)) norm_perp = [(max(perplexities) - p) / (max(perplexities) - min(perplexities)) if max(perplexities) != min(perplexities) else 0.5 for p in perplexities] norm_coh = [(c - min(coherence_scores)) / (max(coherence_scores) - min(coherence_scores)) if max(coherence_scores) != min(coherence_scores) else 0.5 for c in coherence_scores] norm_sil = [(s - min(silhouette_scores)) / (max(silhouette_scores) - min(silhouette_scores)) if max(silhouette_scores) != min(silhouette_scores) else 0.5 for s in silhouette_scores] combined_scores = [0.4 * nc + 0.3 * np + 0.3 * ns for np, nc, ns in zip(norm_perp, norm_coh, norm_sil)] optimal_topics = list(topic_range)[np.argmax(combined_scores)] logger.info("Optimal number of topics found: %d", optimal_topics) return optimal_topics def perform_lda_clustering(self, documents, n_topics=None, filter_type=None): """ Perform LDA clustering using gensim, with a sophisticated method for finding the optimal number of topics. """ try: if len(documents) < 5: logger.warning("Too few documents for meaningful LDA clustering.") return None self.geographic_context = self.detect_geographic_context(documents) logger.info("Detected geographic context: %s", self.geographic_context) processed_texts_str = self.preprocess_documents(documents) tokenized_texts = [text.split() for text in processed_texts_str] self.id2word = corpora.Dictionary(tokenized_texts) self.id2word.filter_extremes(no_below=max(2, len(documents) // 20), no_above=0.85) corpus = [self.id2word.doc2bow(text) for text in tokenized_texts] if not corpus or not any(corpus): logger.warning("Corpus is empty after preprocessing.") return None if n_topics is None: n_topics = self.find_optimal_topics_gensim( corpus, self.id2word, tokenized_texts, len(documents), filter_type) logger.info("Fitting FINAL gensim LDA model with %d topics...", n_topics) self.lda_model = LdaModel(corpus=corpus, id2word=self.id2word, num_topics=n_topics, random_state=42, passes=15, alpha='auto', eta='auto') topic_assignments = [] for doc_bow in corpus: topics = self.lda_model.get_document_topics(doc_bow, minimum_probability=0.3) best_topic = max(topics, key=lambda item: item[1]) if topics else None topic_assignments.append(best_topic[0] if best_topic else -1) # pylint: disable=unsubscriptable-object # Extract topic words with geographic context self.topics_info = self.extract_topic_words(n_topics) return { 'topic_assignments': topic_assignments, 'topics_info': self.topics_info, 'n_topics': n_topics, 'total_documents': len(documents), 'corpus': corpus, 'id2word': self.id2word, 'geographic_context': self.geographic_context # Include context in results } except Exception as e: # pylint: disable=broad-exception-caught logger.error("Gensim LDA clustering error: %s", e) return None def detect_geographic_context(self, documents): """ Detect the primary geographic context from a collection of documents. Args: documents (list): List of document dictionaries Returns: str: Primary geographic region (e.g., 'China', 'Global', 'US', etc.) """ geographic_indicators = { 'china': ['china', 'chinese', 'beijing', 'shanghai', 'yuan', 'renminbi', 'pboc', 'ccp'], 'us': ['united states', 'america', 'american', 'federal reserve', 'fed', 'dollar', 'usd', 'wall street'], 'europe': ['europe', 'european', 'euro', 'ecb', 'brexit', 'eu'], 'japan': ['japan', 'japanese', 'yen', 'tokyo', 'boj'], 'global': ['global', 'worldwide', 'international', 'world'] } region_scores = {region: 0 for region in geographic_indicators.keys()} # pylint: disable=consider-iterating-dictionary total_docs = len(documents) for doc in documents: text_lower = doc['text'].lower() for region, indicators in geographic_indicators.items(): for indicator in indicators: if indicator in text_lower: region_scores[region] += 1 # Normalize scores by document count for region in region_scores: region_scores[region] = region_scores[region] / total_docs if total_docs > 0 else 0 # Determine primary region primary_region = max(region_scores, key=region_scores.get) # If no clear geographic context, default to 'Global' if region_scores[primary_region] < 0.1: primary_region = 'global' return primary_region.title() def name_topic_with_llm(self, topic_keywords, geographic_context=None): """ Generates a concise, human-readable name for a topic using an LLM with geographic context. Args: topic_keywords (list): List of keywords for the topic geographic_context (str): Geographic context of the articles (e.g., 'China', 'US', 'Global') """ # Fallback name in case of LLM failure fallback_name = f"Topic: {', '.join(topic_keywords[:3])}" if not self.llm_model: logger.warning("LLM model not available. Using fallback topic name.") return fallback_name # Prepare the keywords for the prompt keywords_str = ", ".join(topic_keywords) # Construct geographic context string geo_context_str = "" if geographic_context and geographic_context.lower() != 'global': geo_context_str = f" in the {geographic_context} market" # Construct the prompt with geographic context system_message = SystemMessage( content=( "You are an expert financial analyst. Your task is to analyze a list of keywords " "from a financial news topic and create a concise, descriptive title for it. " "Consider the geographic context when creating the topic name." ) ) human_message = HumanMessage( content=( f"Keywords: {keywords_str}\n" f"Geographic Context: {geographic_context or 'Global'}\n\n" f"Based on these keywords from financial news{geo_context_str}, " "what is the core financial theme? " "Please provide a topic name that is 3-5 words long." " The name should be clear and professional. " "If the geographic context is significant to understanding the topic," " incorporate it naturally. " "Do NOT use quotes or any other formatting in your response. Just the topic name." ) ) try: response = self.llm_model.invoke([system_message, human_message]) topic_name = response.content.strip().replace('"', '').replace("'", "") return topic_name.title() except Exception as e: # pylint: disable=broad-exception-caught logger.warning("LLM naming failed: %s. Using fallback name.", e) return fallback_name def extract_topic_words(self, n_topics, n_top_words=15): """ Extracts top words for each topic and adds an LLM-generated descriptive name with geographic context. """ topics_info = {} if not self.lda_model: logger.error("LDA model is not available for extracting topic words.") return topics_info for topic_idx in range(n_topics): # Correctly get top words and their weights using the Gensim API topic_terms = self.lda_model.get_topic_terms(topic_idx, topn=n_top_words) # The id2word dictionary maps word IDs to the actual words top_words = [self.id2word[word_id] for word_id, _ in topic_terms] top_weights = [weight for _, weight in topic_terms] # Clean up underscores from multi-word terms before processing cleaned_words = [word.replace('_', ' ') for word in top_words] llm_generated_name = self.name_topic_with_llm( cleaned_words[:10], self.geographic_context ) topics_info[f'Topic_{topic_idx}'] = { 'llm_name': llm_generated_name, 'words': cleaned_words, 'weights': top_weights, 'word_weight_pairs': list(zip(cleaned_words, top_weights)) } filtered_topics = self.filter_topics_by_financial_keywords(topics_info) return filtered_topics def calculate_topic_sentiments_from_documents(self, lda_results, documents): """ Calculate topic sentiments using document-level sentiment scores. Args: lda_results (dict): LDA clustering results documents (list): List of document dictionaries with sentimentScore field Returns: dict: {topic_id: sentiment_score} where sentiment_score is -1.0 to 1.0 """ topic_sentiments = {} topic_assignments = lda_results.get('topic_assignments', []) # Group documents by their assigned topic topic_documents = {} for i, topic_id in enumerate(topic_assignments): if topic_id != -1: # Skip unassigned documents topic_key = f'Topic_{topic_id}' if topic_key not in topic_documents: topic_documents[topic_key] = [] topic_documents[topic_key].append(documents[i]) # Calculate average sentiment for each topic for topic_id, topic_docs in topic_documents.items(): if topic_docs: # Extract sentiment scores from documents sentiment_scores = [] for doc in topic_docs: sentiment_score = doc.get('sentimentScore', 0) sentiment_scores.append(sentiment_score) # Calculate average sentiment if sentiment_scores: avg_sentiment = sum(sentiment_scores) / len(sentiment_scores) topic_sentiments[topic_id] = avg_sentiment else: topic_sentiments[topic_id] = 0.0 else: topic_sentiments[topic_id] = 0.0 return topic_sentiments def get_lda_results(self, lda_results): """ Get LDA topic names, document counts and sentiment scores. """ topics_info = lda_results.get('topics_info', {}) topic_sentiments = lda_results.get('topic_sentiments', {}) topic_assignments = lda_results.get('topic_assignments', []) if not topics_info: logger.warning("No topic information available") return None # Calculate document counts for each topic doc_counts = Counter(t for t in topic_assignments if t != -1) topic_data = [] # Prepare data in a dictionary for topic_id, topic_info in topics_info.items(): topic_num = int(topic_id.split('_')[-1]) topic_name = topic_info.get('llm_name', f'Topic {topic_num + 1}') doc_count = doc_counts.get(topic_num, 0) sentiment_score = topic_sentiments.get(topic_id, 0.0) if doc_count > 0: topic_data.append({ 'topic_name': topic_name, 'doc_count': doc_count, 'sentiment_score': sentiment_score }) return topic_data def analyze_heated_keywords(self, filter_type, analyzer=None): # pylint: disable=too-many-locals """ Analyzes heated keywords for a specific time period. Fetches articles for the current and previous time periods, calculates heating scores, performs LDA topic clustering, and analyzes sentiment for financial keywords. Args: filter_type (str): Time filter type ('today', 'week', or 'month') analyzer (HeatedKeywordsAnalyzer, optional): Analyzer instance Returns: dict: Results containing term frequencies, heating scores, sentiment scores, LDA results, categorized terms, term weights, and current documents """ if analyzer is None: analyzer = self current_start, current_end = get_time_range(filter_type) prev_start, prev_end = get_previous_time_range(filter_type, current_start) current_docs = analyzer.fetch_articles(current_start, current_end) previous_docs = analyzer.fetch_articles(prev_start, prev_end) if len(current_docs) < 1: logger.warning("Insufficient documents (%d) for %s analysis", len(current_docs), filter_type) return None term_frequencies, heating_scores, lda_results = analyzer.calculate_heating_scores( current_docs, previous_docs) if not term_frequencies: logger.warning("No financial terms found for %s", filter_type) return None sentiment_scores = self.calculate_keyword_sentiments( current_docs, term_frequencies, analyzer) total_mentions = sum(term_frequencies.values()) term_weights = { term: (freq / total_mentions) * 100 for term, freq in term_frequencies.items()} results = { 'term_frequencies': term_frequencies, 'heating_scores': heating_scores, 'sentiment_scores': sentiment_scores, 'lda_results': lda_results, 'categorized_terms': analyzer.categorize_terms_thematically(term_frequencies), 'term_weights': term_weights, 'current_docs': current_docs } return results def calculate_keyword_sentiments(self, documents, term_frequencies, analyzer): """ Calculate sentiment scores for top keywords. """ sentiment_scores = {} top_terms = term_frequencies.most_common(30) for term, _ in top_terms: relevant_docs = [doc for doc in documents if term.lower() in doc['text'].lower()] if relevant_docs: relevant_sentences = [] for doc in relevant_docs[:5]: sentences = doc['text'].split('.') for sentence in sentences: if term.lower() in sentence.lower(): relevant_sentences.append(sentence.strip()) if relevant_sentences: combined_text = ' '.join(relevant_sentences[:3]) sentiment, confidence = analyzer.analyze_sentiment(combined_text) sentiment_scores[term] = (sentiment, confidence) else: sentiment_scores[term] = ('neutral', 0.5) else: sentiment_scores[term] = ('neutral', 0.5) return sentiment_scores def categorize_terms_thematically(self, term_frequencies): """ Categorize terms by financial themes. Groups financial terms into predefined categories based on the finance_keywords dictionary. Args: term_frequencies (Counter): Counter object with term frequencies Returns: dict: Dictionary with terms grouped by financial categories """ categorized = defaultdict(list) for term, frequency in term_frequencies.items(): found = False for category, terms in self.finance_keywords.items(): if term in terms: categorized[category].append({'term': term, 'frequency': frequency}) found = True break if not found: categorized['general'].append({'term': term, 'frequency': frequency}) return dict(categorized)