Muhammad Abdur Rahman Saad
fix pylint errors
72f4cb5
"""
LDA collector for topic modeling and visualization.
"""
import logging
from controllers.lda import HeatedKeywordsAnalyzer # pylint: disable=import-error
from controllers.keyword_analysis import get_time_range, get_previous_time_range # pylint: disable=import-error
from models.database import article_collection, keywords_collection, lda_collection # pylint: disable=import-error
# Configure logger
logger = logging.getLogger(__name__)
def analyze_heated_keywords(filter_type, analyzer=None): # pylint: disable=too-many-locals
"""
Analyzes heated keywords for a specific time period.
Fetches articles for the current and previous time periods, calculates heating scores,
performs LDA topic clustering, and analyzes sentiment for financial keywords.
Args:
filter_type (str): Time filter type ('today', 'week', or 'month')
analyzer (HeatedKeywordsAnalyzer, optional): Analyzer instance
Returns:
dict: Results containing term frequencies, heating scores, sentiment scores,
LDA results, categorized terms, term weights, and current documents
"""
if analyzer is None:
analyzer = HeatedKeywordsAnalyzer(article_collection, keywords_collection)
current_start, current_end = get_time_range(filter_type)
prev_start, prev_end = get_previous_time_range(filter_type, current_start)
logger.info("Fetching articles for current period: %s to %s",
current_start.strftime('%Y-%m-%d'), current_end.strftime('%Y-%m-%d'))
current_docs = analyzer.fetch_articles(current_start, current_end)
logger.info("Fetching articles for previous period: %s to %s",
prev_start.strftime('%Y-%m-%d'), prev_end.strftime('%Y-%m-%d'))
previous_docs = analyzer.fetch_articles(prev_start, prev_end)
if len(current_docs) < 1:
logger.warning("Insufficient documents (%d) for %s analysis",
len(current_docs), filter_type)
return None
term_frequencies, heating_scores, lda_results = analyzer.calculate_heating_scores(
current_docs, previous_docs)
if not term_frequencies:
logger.warning("No financial terms found for %s", filter_type)
return None
sentiment_scores = calculate_keyword_sentiments(current_docs, term_frequencies, analyzer)
# Calculate topic sentiments using document sentiment scores
if lda_results and lda_results.get('topic_assignments'):
topic_sentiments = analyzer.calculate_topic_sentiments_from_documents(
lda_results, current_docs
)
lda_results['topic_sentiments'] = topic_sentiments
logger.info("Calculated topic sentiments using document sentiment scores")
total_mentions = sum(term_frequencies.values())
term_weights = {term: (freq / total_mentions) * 100 for term, freq in term_frequencies.items()}
results = {
'term_frequencies': term_frequencies,
'heating_scores': heating_scores,
'sentiment_scores': sentiment_scores,
'lda_results': lda_results,
'categorized_terms': analyzer.categorize_terms_thematically(term_frequencies),
'term_weights': term_weights,
'current_docs': current_docs
}
return results
def calculate_keyword_sentiments(documents, term_frequencies, analyzer):
"""
Calculate sentiment scores for top keywords.
"""
sentiment_scores = {}
top_terms = term_frequencies.most_common(30)
for term, _ in top_terms:
relevant_docs = [doc for doc in documents if term.lower() in doc['text'].lower()]
if relevant_docs:
relevant_sentences = []
for doc in relevant_docs[:5]:
sentences = doc['text'].split('.')
for sentence in sentences:
if term.lower() in sentence.lower():
relevant_sentences.append(sentence.strip())
if relevant_sentences:
combined_text = ' '.join(relevant_sentences[:3])
sentiment, confidence = analyzer.analyze_sentiment(combined_text)
sentiment_scores[term] = (sentiment, confidence)
else:
sentiment_scores[term] = ('neutral', 0.5)
else:
sentiment_scores[term] = ('neutral', 0.5)
return sentiment_scores
def update_lda_result(filter_type, lda_results):
"""
Update LDA results in MongoDB collection.
Args:
filter_type (str): Time filter type ('today', 'week', 'month')
lda_results (list): List of topic data from get_lda_results()
Returns:
bool: True if successful, False otherwise
"""
try:
# Prepare document for MongoDB
document = {
'_id': filter_type,
'result': lda_results,
}
# Upsert document (insert if not exists, update if exists)
lda_collection.replace_one(
{'_id': filter_type},
document,
upsert=True
)
logger.info("Successfully updated LDA results for %s in MongoDB", filter_type)
return True
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error updating LDA results for %s: %s", filter_type, e)
return False
def collect():
"""
Main collection function that runs the full LDA analysis pipeline.
This function performs the complete analysis including:
- Fetching articles for different time periods
- Calculating heating scores
- Performing LDA topic clustering
- Generating interactive HTML visualizations
Returns:
dict: Results for all time periods (today, week, month)
"""
logger.info("Initializing new analysis run...")
analyzer = HeatedKeywordsAnalyzer(article_collection, keywords_collection)
# Fetch trending keywords
analyzer.keyword_manager.fetch_trending_keywords(days_back=30)
results = {}
time_filters = ["daily", "week", "month"]
for filter_type in time_filters:
logger.info("=" * 60)
logger.info("RUNNING ANALYSIS - %s", filter_type.upper())
logger.info("=" * 60)
analysis_results = analyze_heated_keywords(filter_type, analyzer)
if analysis_results:
results[filter_type] = analysis_results
# display_heated_keywords(filter_type, analysis_results)
# Generate interactive HTML visualization
lda_results = analyzer.get_lda_results(analysis_results['lda_results'])
update_lda_result(filter_type, lda_results)
logger.info("Collection completed successfully.")
return results