finfast-summary / app /controllers /keyword_analysis.py
Muhammad Abdur Rahman Saad
fix pylint errors
72f4cb5
"""
Keyword analysis controller for analysis-related functions.
"""
from datetime import datetime, timedelta
import logging
# Configure logger
logger = logging.getLogger(__name__)
def get_time_range(filter_type: str) -> tuple[datetime, datetime]:
"""
Calculate time range for analysis periods.
Args:
filter_type: One of 'today', 'week', or 'month'
Returns:
tuple: (start_date, end_date) as datetime objects
"""
end_date = datetime.now()
if filter_type == "today":
start_date = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
elif filter_type == "week":
start_date = end_date - timedelta(days=6)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
elif filter_type == "month":
start_date = end_date - timedelta(days=29)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
else:
# Default to today
start_date = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
return start_date, end_date
def get_previous_time_range(
filter_type: str,
current_start: datetime
) -> tuple[datetime, datetime]:
"""
Calculate previous time range for comparison.
Args:
filter_type: One of 'today', 'week', or 'month'
current_start: Start date of current period
Returns:
tuple: (previous_start, previous_end) as datetime objects
"""
if filter_type == "today":
# Previous day
previous_end = current_start - timedelta(seconds=1)
previous_start = previous_end.replace(hour=0, minute=0, second=0, microsecond=0)
elif filter_type == "week":
# Previous week (7 days before)
previous_end = current_start - timedelta(seconds=1)
previous_start = previous_end - timedelta(days=6)
previous_start = previous_start.replace(hour=0, minute=0, second=0, microsecond=0)
elif filter_type == "month":
# Previous month (30 days before)
previous_end = current_start - timedelta(seconds=1)
previous_start = previous_end - timedelta(days=29)
previous_start = previous_start.replace(hour=0, minute=0, second=0, microsecond=0)
else:
# Default to previous day
previous_end = current_start - timedelta(seconds=1)
previous_start = previous_end.replace(hour=0, minute=0, second=0, microsecond=0)
return previous_start, previous_end
def calculate_heating_scores_from_database( # pylint: disable=too-many-locals, too-many-branches, too-many-statements
filter_type: str,
keywords_collection
) -> tuple[dict, dict, dict, dict]: # Add dict for categories
"""
Calculate heating scores by comparing stored keywords from database.
Args:
filter_type: Analysis period ('today', 'week', 'month')
keywords_collection: MongoDB collection containing keywords
Returns:
tuple: (heating_scores, current_keywords, previous_keywords, keyword_categories)
"""
heating_scores = {}
keyword_categories = {} # Add this to store category mappings
# Get current period date range
current_start, current_end = get_time_range(filter_type)
prev_start, prev_end = get_previous_time_range(filter_type, current_start)
logger.info("Fetching stored keywords for heating calculation")
logger.info("Current: %s to %s",
current_start.strftime('%Y-%m-%d'), current_end.strftime('%Y-%m-%d'))
logger.info("Previous: %s to %s",
prev_start.strftime('%Y-%m-%d'), prev_end.strftime('%Y-%m-%d'))
# Get current period keywords from database
current_keywords = {}
current_docs_found = 0
current_cursor = keywords_collection.find({
"_id": {
"$gte": current_start.strftime("%Y-%m-%d"),
"$lte": current_end.strftime("%Y-%m-%d")
}
})
for doc in current_cursor:
current_docs_found += 1
if 'keywords' in doc:
for kw in doc['keywords']:
keyword = kw.get('keyword', '')
frequency = kw.get('frequency', 0)
category = kw.get('category', 'other_categories') # Retrieve category
if keyword in current_keywords:
current_keywords[keyword] += frequency
else:
current_keywords[keyword] = frequency
# Store category mapping (use latest category if keyword appears multiple times)
keyword_categories[keyword] = category
# For daily analysis, if no current keywords found, try to find most recent data
if filter_type == "today" and current_docs_found == 0: # pylint: disable=too-many-nested-blocks
logger.warning("No keywords found for today. Looking for most recent available data...")
# Find the most recent date with keywords
most_recent_doc = keywords_collection.find_one(
sort=[("_id", -1)],
projection={"_id": 1}
)
if most_recent_doc:
most_recent_date_str = most_recent_doc['_id']
most_recent_date = datetime.strptime(most_recent_date_str, "%Y-%m-%d")
logger.info("Using most recent available date: %s", most_recent_date_str)
# Recalculate date ranges using the most recent date
current_start = most_recent_date.replace(hour=0, minute=0, second=0, microsecond=0)
current_end = most_recent_date.replace(
hour=23, minute=59, second=59, microsecond=999999)
prev_start, prev_end = get_previous_time_range(filter_type, current_start)
logger.info("Adjusted current: %s to %s",
current_start.strftime('%Y-%m-%d'), current_end.strftime('%Y-%m-%d'))
logger.info("Adjusted previous: %s to %s",
prev_start.strftime('%Y-%m-%d'), prev_end.strftime('%Y-%m-%d'))
# Re-fetch current period keywords with adjusted date range
current_keywords = {}
current_docs_found = 0
current_cursor = keywords_collection.find({
"_id": {
"$gte": current_start.strftime("%Y-%m-%d"),
"$lte": current_end.strftime("%Y-%m-%d")
}
})
for doc in current_cursor:
current_docs_found += 1
if 'keywords' in doc:
for kw in doc['keywords']:
keyword = kw.get('keyword', '')
frequency = kw.get('frequency', 0)
category = kw.get('category', 'other_categories')
if keyword in current_keywords:
current_keywords[keyword] += frequency
else:
current_keywords[keyword] = frequency
keyword_categories[keyword] = category
else:
logger.error("No keywords found in database at all")
return {}, {}, {}, {}
# Get previous period keywords from database
previous_keywords = {}
previous_docs_found = 0
previous_cursor = keywords_collection.find({
"_id": {
"$gte": prev_start.strftime("%Y-%m-%d"),
"$lte": prev_end.strftime("%Y-%m-%d")
}
})
for doc in previous_cursor:
previous_docs_found += 1
if 'keywords' in doc:
for kw in doc['keywords']:
keyword = kw.get('keyword', '')
frequency = kw.get('frequency', 0)
category = kw.get('category', 'other_categories') # Retrieve category
if keyword in previous_keywords:
previous_keywords[keyword] += frequency
else:
previous_keywords[keyword] = frequency
# Store category mapping for previous keywords too
if keyword not in keyword_categories:
keyword_categories[keyword] = category
# Calculate totals
total_current = sum(current_keywords.values()) or 1
total_previous = sum(previous_keywords.values()) or 1
logger.info("Current period: %d database records, %d terms, total freq: %d",
current_docs_found, len(current_keywords), total_current)
logger.info("Previous period: %d database records, %d terms, total freq: %d",
previous_docs_found, len(previous_keywords), total_previous)
if not previous_keywords:
# No previous data - assign 0.0 for existing terms
logger.info("No previous data - assigning neutral heating scores")
for term in current_keywords:
heating_scores[term] = 0.0
else:
logger.info("Calculating heating scores")
# Calculate heating scores
for term, current_freq in current_keywords.items():
previous_freq = previous_keywords.get(term, 0)
if previous_freq > 0:
# Term existed before - calculate percentage change in frequency
freq_change = ((current_freq - previous_freq) / previous_freq) * 100
heating_scores[term] = round(freq_change, 1)
else:
# New term - assign high positive heating
heating_scores[term] = 200.0
# Handle terms that disappeared (were in previous but not current)
for term in previous_keywords:
if term not in current_keywords:
heating_scores[term] = -200.0
return heating_scores, current_keywords, previous_keywords, keyword_categories
def calculate_heating_scores_for_period(
current_terms_data: list,
previous_terms_data: list
) -> dict:
"""
Calculate heating scores between two periods.
Args:
current_terms_data: List of current period keyword data
previous_terms_data: List of previous period keyword data
Returns:
dict: Heating scores for each keyword
"""
heating_scores = {}
# Create frequency maps
current_freq = {term['keyword']: term['frequency'] for term in current_terms_data}
previous_freq = {term['keyword']: term['frequency'] for term in previous_terms_data}
# Calculate heating scores
for term, current_freq_val in current_freq.items():
previous_freq_val = previous_freq.get(term, 0)
if previous_freq_val > 0:
# Term existed before - calculate percentage change
freq_change = ((current_freq_val - previous_freq_val) / previous_freq_val) * 100
heating_scores[term] = round(freq_change, 1)
else:
# New term - assign high positive heating
heating_scores[term] = 200.0
# Handle terms that disappeared
for term in previous_freq:
if term not in current_freq:
heating_scores[term] = -200.0
return heating_scores
def create_thematic_analysis(terms_with_data: list) -> list:
"""
Create thematic groupings from keyword data.
Args:
terms_with_data: List of keyword data with categories
Returns:
list: List of theme objects
"""
themes = {}
# Group terms by category
for term in terms_with_data:
category = term.get('category', 'other_categories')
keyword = term.get('keyword', '')
frequency = term.get('frequency', 0)
_ = term.get('popularity', 0.0)
if category not in themes:
themes[category] = {
'theme_name': category,
'description': f"Keywords related to {category}",
'terms': [],
'total_frequency': 0,
'heating_score': 0.0
}
themes[category]['terms'].append(keyword)
themes[category]['total_frequency'] += frequency
# Calculate average heating score for each theme
for theme in themes.values():
if theme['terms']:
avg_heating = sum(term.get('popularity', 0.0) for term in terms_with_data
if term.get('keyword') in theme['terms']) / len(theme['terms'])
theme['heating_score'] = round(avg_heating, 1)
return list(themes.values())
def analyze_keywords_from_database(
filter_type: str,
keywords_collection
) -> dict:
"""
Analyze keywords using existing database data (no re-extraction).
Args:
filter_type: Analysis period ('today', 'week', 'month')
keywords_collection: MongoDB collection containing keywords
Returns:
dict: Analysis results with keywords and themes
"""
logger.info("Starting database analysis for %s", filter_type)
# Get heating scores from database (now includes categories)
heating_scores, current_keywords, previous_keywords, keyword_categories = (
calculate_heating_scores_from_database(filter_type, keywords_collection)
)
# Convert to standardized format
terms_data = []
for keyword, frequency in current_keywords.items():
term_data = {
'keyword': keyword,
'category': keyword_categories.get(keyword, 'other_categories'), # Use actual category
'frequency': frequency,
'popularity': heating_scores.get(keyword, 0.0),
'new': keyword not in previous_keywords,
'variations': [], # Will be populated from database if needed
'importanceScore': 0.8 if keyword not in previous_keywords else 0.5,
'context': f"Found {frequency} times in {filter_type} period"
}
terms_data.append(term_data)
# Sort by frequency and popularity
terms_data.sort(key=lambda x: (x['frequency'], abs(x['popularity'])), reverse=True)
# Create thematic analysis
themes_data = create_thematic_analysis(terms_data)
results = {
'terms': terms_data,
'themes': themes_data
}
logger.info("Analysis completed: %d terms, %d themes", len(terms_data), len(themes_data))
return results
def save_summary_to_database(
filter_type: str,
results_data: dict,
summary_collection
) -> None:
"""
Save analysis results to Summary collection.
Args:
filter_type: Analysis period ('today', 'week', 'month')
results_data: Analysis results to save
summary_collection: MongoDB collection to save results
"""
try:
# Process keywords for storage
processed_keywords = []
for term in results_data.get('terms', [])[:50]: # Limit to top 50
processed_term = term.copy()
# Ensure label is present
processed_term['label'] = processed_term.get('keyword', '').title()
# Rename heating_score to popularity if present
if 'heating_score' in processed_term:
processed_term['popularity'] = processed_term.pop('heating_score')
# Rename sentiment fields to camelCase
if 'sentiment_score' in processed_term:
processed_term['sentimentScore'] = processed_term.pop('sentiment_score')
if 'sentiment_label' in processed_term:
processed_term['sentimentLabel'] = processed_term.pop('sentiment_label')
processed_keywords.append(processed_term)
# Process categories/themes for storage
processed_categories = []
for theme in results_data.get('themes', []):
processed_categories.append({
"category": theme.get('theme_name', ''),
"description": theme.get('description', ''),
"keywords": theme.get('terms', []),
"frequency": theme.get('total_frequency', 0),
"popularity": theme.get('heating_score', 0.0)
})
# Create document for storage
document = {
"_id": filter_type, # Fixed IDs: "today", "week", "month"
"keywords": processed_keywords,
"categories": processed_categories
}
# Remove old results for this specific analysis and insert
if processed_keywords and processed_categories:
if len(processed_keywords) > 0 and len(processed_categories) > 0:
summary_collection.delete_one({"_id": filter_type})
result = summary_collection.insert_one(document)
logger.info("Saved %s summary to database (ID: %s)",
filter_type, result.inserted_id)
except Exception as e:
logger.error("Error saving summary to database: %s", str(e))
raise
def cleanup_old_keywords(keywords_collection, latest_date: datetime) -> int:
"""
Remove keywords older than 60 days from the latest date.
Args:
keywords_collection: MongoDB collection containing keywords
latest_date: Latest article date to use as reference
Returns:
int: Number of deleted records
"""
logger.info("Cleaning up records to maintain a 60-day window from the latest article")
# The anchor for the 60-day window is the latest article date, not exactly today's date
cutoff_date = latest_date - timedelta(days=60)
result = keywords_collection.delete_many({
"_id": {"$lt": cutoff_date.strftime("%Y-%m-%d")}
})
logger.info("Cutoff Date: %s, based on latest article on %s",
cutoff_date.strftime('%Y-%m-%d'), latest_date.strftime('%Y-%m-%d'))
logger.info("Deleted %d old records", result.deleted_count)
# Show current data range
oldest_doc = keywords_collection.find_one(
sort=[("_id", 1)],
projection={"_id": 1}
)
newest_doc = keywords_collection.find_one(
sort=[("_id", -1)],
projection={"_id": 1}
)
if oldest_doc and newest_doc:
logger.info("Current data range: %s to %s", oldest_doc['_id'], newest_doc['_id'])
return result.deleted_count