""" LDA collector for topic modeling and visualization. """ import logging from datetime import timedelta from controllers.lda import HeatedKeywordsAnalyzer # pylint: disable=import-error from controllers.keyword_analysis import get_time_range, get_previous_time_range # pylint: disable=import-error from models.database import article_collection, keywords_collection, lda_collection # pylint: disable=import-error # Configure logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def analyze_heated_keywords(filter_type, analyzer=None): # pylint: disable=too-many-locals """ Analyzes heated keywords for a specific time period. Fetches articles for the current and previous time periods, calculates heating scores, performs LDA topic clustering, and analyzes sentiment for financial keywords. For daily analysis, implements fallback logic to find the most recent available articles if current day articles are not found. Args: filter_type (str): Time filter type ('today', 'week', or 'month') analyzer (HeatedKeywordsAnalyzer, optional): Analyzer instance Returns: tuple: (results_dict, adjusted_start_date, adjusted_end_date) where: - results_dict: Results containing term frequencies, heating scores, sentiment scores, LDA results, categorized terms, term weights, and current documents - adjusted_start_date: The actual start date used for analysis - adjusted_end_date: The actual end date used for analysis """ if analyzer is None: analyzer = HeatedKeywordsAnalyzer(article_collection, keywords_collection) current_start, current_end = get_time_range(filter_type) prev_start, prev_end = get_previous_time_range(filter_type, current_start) logger.info("Fetching articles for current period: %s to %s", current_start.strftime('%Y-%m-%d'), current_end.strftime('%Y-%m-%d')) current_docs = analyzer.fetch_articles(current_start, current_end) # Implement fallback logic for daily analysis when no current articles found if len(current_docs) < 5 and filter_type == "daily": logger.warning( "Insufficient articles (%d) found for current day.", len(current_docs)) # Find articles going back up to 30 days to get at least 5 articles max_days_back = 3 days_back = 1 found_sufficient_articles = False while days_back <= max_days_back and not found_sufficient_articles: # Go back one day at a time to find a single day with sufficient articles fallback_date = current_start - timedelta(days=days_back) fallback_start = fallback_date.replace(hour=0, minute=0, second=0, microsecond=0) fallback_end = fallback_date.replace(hour=23, minute=59, second=59, microsecond=999999) logger.info( "Searching for articles on %s (going back %d days)", fallback_date.strftime('%Y-%m-%d'), days_back ) # Fetch articles for this single day fallback_docs = analyzer.fetch_articles(fallback_start, fallback_end) if len(fallback_docs) >= 5: # Found sufficient articles for a single day, use this as our current period current_start = fallback_start current_end = fallback_end current_docs = fallback_docs # Calculate the previous period for comparison prev_start, prev_end = get_previous_time_range("daily", current_start) found_sufficient_articles = True logger.info( "Found sufficient articles (%d) for single day: %s", len(current_docs), fallback_date.strftime('%Y-%m-%d') ) logger.info("Adjusted previous period: %s to %s", prev_start.strftime('%Y-%m-%d'), prev_end.strftime('%Y-%m-%d')) else: logger.info("Only found %d articles on %s", len(fallback_docs), fallback_date.strftime('%Y-%m-%d')) days_back += 1 if not found_sufficient_articles: logger.error("Could not find at least 5 articles in the last %d days", max_days_back) return None, None, None logger.info("Fetching articles for previous period: %s to %s", prev_start.strftime('%Y-%m-%d'), prev_end.strftime('%Y-%m-%d')) previous_docs = analyzer.fetch_articles(prev_start, prev_end) if len(current_docs) < 1: logger.warning("Insufficient documents (%d) for %s analysis", len(current_docs), filter_type) return None, None, None term_frequencies, heating_scores, lda_results = analyzer.calculate_heating_scores( current_docs, previous_docs, filter_type) if not term_frequencies: logger.warning("No financial terms found for %s", filter_type) return None, None, None sentiment_scores = calculate_keyword_sentiments(current_docs, term_frequencies, analyzer) # Calculate topic sentiments using document sentiment scores if lda_results and lda_results.get('topic_assignments'): topic_sentiments = analyzer.calculate_topic_sentiments_from_documents( lda_results, current_docs ) lda_results['topic_sentiments'] = topic_sentiments logger.info("Calculated topic sentiments using document sentiment scores") total_mentions = sum(term_frequencies.values()) term_weights = {term: (freq / total_mentions) * 100 for term, freq in term_frequencies.items()} results = { 'term_frequencies': term_frequencies, 'heating_scores': heating_scores, 'sentiment_scores': sentiment_scores, 'lda_results': lda_results, 'categorized_terms': analyzer.categorize_terms_thematically(term_frequencies), 'term_weights': term_weights, 'current_docs': current_docs } return results, current_start, current_end def calculate_keyword_sentiments(documents, term_frequencies, analyzer): """ Calculate sentiment scores for top keywords. """ sentiment_scores = {} top_terms = term_frequencies.most_common(30) for term, _ in top_terms: relevant_docs = [doc for doc in documents if term.lower() in doc['text'].lower()] if relevant_docs: relevant_sentences = [] for doc in relevant_docs[:5]: sentences = doc['text'].split('.') for sentence in sentences: if term.lower() in sentence.lower(): relevant_sentences.append(sentence.strip()) if relevant_sentences: combined_text = ' '.join(relevant_sentences[:3]) sentiment, confidence = analyzer.analyze_sentiment(combined_text) sentiment_scores[term] = (sentiment, confidence) else: sentiment_scores[term] = ('neutral', 0.5) else: sentiment_scores[term] = ('neutral', 0.5) return sentiment_scores def update_lda_result(filter_type, lda_results, start_date, end_date): """ Update LDA results in MongoDB collection. Args: filter_type (str): Time filter type ('today', 'week', 'month') lda_results (list): List of topic data from get_lda_results() start_date (datetime): Start date of the analysis period end_date (datetime): End date of the analysis period Returns: bool: True if successful, False otherwise """ try: # Prepare document for MongoDB document = { '_id': filter_type, 'result': lda_results, 'dateRange': f"{start_date.strftime('%Y-%m-%d')} - {end_date.strftime('%Y-%m-%d')}" } # Upsert document (insert if not exists, update if exists) lda_collection.replace_one( {'_id': filter_type}, document, upsert=True ) logger.info("Successfully updated LDA results for %s in MongoDB", filter_type) return True except Exception as e: # pylint: disable=broad-exception-caught logger.error("Error updating LDA results for %s: %s", filter_type, e) return False def collect(): """ Main collection function that runs the full LDA analysis pipeline. This function performs the complete analysis including: - Fetching articles for different time periods - Calculating heating scores - Performing LDA topic clustering - Generating interactive HTML visualizations Returns: dict: Results for all time periods (today, week, month) """ logger.info("Initializing new analysis run...") analyzer = HeatedKeywordsAnalyzer(article_collection, keywords_collection) # Fetch trending keywords analyzer.keyword_manager.fetch_trending_keywords(days_back=30) results = {} time_filters = ["daily", "week", "month"] for filter_type in time_filters: logger.info("=" * 60) logger.info("RUNNING ANALYSIS - %s", filter_type.upper()) logger.info("=" * 60) analysis_results, adjusted_start, adjusted_end = analyze_heated_keywords( filter_type, analyzer) if analysis_results: results[filter_type] = analysis_results # display_heated_keywords(filter_type, analysis_results) # Generate interactive HTML visualization lda_results = analyzer.get_lda_results(analysis_results['lda_results']) update_lda_result(filter_type, lda_results, adjusted_start, adjusted_end) logger.info("Collection completed successfully.") return results