finfast-summary / app /collectors /finfast /keyword_analysis.py
OxbridgeEconomics
move the route to summary prefix
e106d7d
"""
Daily keyword pipeline collector for combined keyword detection, analysis, and cleanup.
"""
from datetime import datetime
import logging
from models.database import article_collection, keywords_collection, summary_collection # pylint: disable=import-error
from controllers.keyword import ( # pylint: disable=import-error
fetch_articles_for_period,
fetch_historical_keywords,
run_llm_extraction,
calculate_metrics_and_save_for_date
)
from controllers.keyword_analysis import ( # pylint: disable=import-error
analyze_keywords_from_database,
save_summary_to_database,
cleanup_old_keywords
)
# Configure logger
logger = logging.getLogger(__name__)
def collect():
"""
Daily pipeline: keyword extraction + analysis + cleanup.
This function orchestrates the complete daily pipeline:
1. Fetch latest articles (past day), run keyword detection, and store keywords
2. Perform analysis for "today", "recent week", and "recent month" periods
3. Delete keywords older than 60 days from the keywords collection
"""
logger.info("Starting daily keyword analysis pipeline")
try:
# Step 1: Daily keyword detection (past day articles)
logger.info("Step 1: Daily keyword detection")
latest_date = _get_latest_article_date()
if not latest_date:
logger.warning("No articles found in database. Exiting pipeline.")
return
# Check if we already have keywords for this date
existing_doc = keywords_collection.find_one({"_id": latest_date.strftime("%Y-%m-%d")})
if existing_doc:
logger.info("Keywords already exist for %s, skipping extraction",
latest_date.strftime("%Y-%m-%d"))
else:
_extract_keywords_for_date(latest_date)
# Step 2: Analysis for today, week, month periods
logger.info("Step 2: Performing analysis for all periods")
_perform_analysis_for_all_periods()
# Step 3: Cleanup (delete keywords older than 60 days)
logger.info("Step 3: Cleaning up old keywords")
deleted_count = cleanup_old_keywords(keywords_collection, latest_date)
logger.info("Cleanup completed: deleted %d old records", deleted_count)
logger.info("Daily keyword pipeline completed successfully")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error in daily keyword pipeline: %s", str(e))
raise
def _get_latest_article_date() -> datetime:
"""
Get the latest article date from the database.
Returns:
datetime: Latest article date or None if no articles found
"""
logger.info("Finding the most recent date with articles")
latest_article = article_collection.find().sort("publishDate", -1).limit(1)
latest_articles_list = list(latest_article)
if not latest_articles_list:
logger.warning("No articles found in database")
return None
latest_date_str = latest_articles_list[0]['publishDate']
latest_date = datetime.strptime(latest_date_str, "%Y-%m-%d")
logger.info("Processing articles for %s", latest_date.date())
return latest_date
def _extract_keywords_for_date(target_date: datetime) -> None:
"""
Extract keywords for a specific date.
Args:
target_date: Date for which to extract keywords
"""
# Get articles for the target date
start_date = target_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = target_date.replace(hour=23, minute=59, second=59, microsecond=999999)
articles = fetch_articles_for_period(article_collection, start_date, end_date)
if not articles:
logger.warning("No articles found for %s", target_date.date())
return
# Get historical keywords for context
historical_keywords = fetch_historical_keywords(keywords_collection)
# Extract keywords using LLM
logger.info("Extracting keywords from articles for %s", target_date.date())
extracted_keywords = run_llm_extraction(articles, historical_keywords)
if extracted_keywords:
# Save to database
calculate_metrics_and_save_for_date(keywords_collection, extracted_keywords, target_date)
logger.info("Saved %d keywords for %s",
len(extracted_keywords), target_date.strftime('%Y-%m-%d'))
else:
logger.warning("No keywords extracted from articles for %s", target_date.date())
def _perform_analysis_for_all_periods() -> None:
"""
Perform analysis for today, week, and month periods.
"""
periods = ["today", "week", "month"]
for period in periods:
try:
logger.info("Analyzing keywords for %s period", period)
# Perform analysis using existing database data
results = analyze_keywords_from_database(period, keywords_collection)
if results:
# Save results to Summary collection
save_summary_to_database(period, results, summary_collection)
logger.info("Analysis completed for %s: %d terms, %d themes",
period, len(results.get('terms', [])), len(results.get('themes', [])))
else:
logger.warning("No analysis results for %s period", period)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error analyzing %s period: %s", period, str(e))
# Continue with other periods even if one fails