"""Utility functions for Summary calculations. This module contains utility functions for both Content Flow Tracker and Entity Analysis, extracted and merged from the previous content/utils.py and entity/utils.py files. """ import json from datetime import datetime, timedelta from typing import Dict, Any, List from collections import defaultdict from models.database import article_collection, entity_collection # pylint: disable=import-error # Entity type full names mapping ENTITY_TYPE_FULL_NAMES = { "GPE": "Geopolitical Entities (Countries/Cities)", "LOC": "Locations (Non-political)", "ORG": "Organizations", "PERSON": "People", "PROD": "Products", "PRODUCT": "Products", "PRODCAT": "Product Categories", "PRODUCT_CATEGORY": "Product Categories", "COMPANY": "Companies", "FINANCIAL_ASSET": "Financial Assets", "ECONOMIC_INDICATOR": "Economic Indicators", "EVENT": "Events", "LANGUAGE": "Languages", "NORP": "Nationalities/Religious/Political Groups", "LAW": "Laws/Legal Documents", "FAC": "Facilities/Landmarks", "INDUSTRY": "Industries", } # Allowed entity types for analysis ALLOWED_ENTITY_TYPES = { "GPE", "ORG", "PERSON", "COMPANY", "FINANCIAL_ASSET", "ECONOMIC_INDICATOR", "INDUSTRY" } # Load entity normalization mapping def _load_entity_mapping() -> Dict[str, str]: """Load entity normalization mapping from JSON file.""" try: with open("mapping.json", 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: return {} def normalize_entity_name(entity_name: str) -> str: """ Normalize entity names using the mapping file. Parameters ---------- entity_name : str The original entity name to normalize. Returns ------- str The normalized entity name, or original if no mapping found. """ if not entity_name: return entity_name # Convert to string and clean normalized = str(entity_name).strip() # Apply basic replacements normalized = normalized.replace("U.S.", "US") normalized = normalized.replace("consumer price index", "CPI") normalized = normalized.replace("Gross Domestic Product", "GDP") # Load and apply mapping mapping = _load_entity_mapping() return mapping.get(normalized, normalized) def aggregate_entities(entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Aggregate duplicate entities by summing their occurrence counts. Parameters ---------- entities : List[Dict[str, Any]] A list of entity dictionaries where each dictionary must contain: - 'entity' (str): The name of the entity - 'type' (str): The type/category of the entity - 'occurrence' (int): The count of occurrences for this entity Returns ------- List[Dict[str, Any]] A list of unique entity dictionaries with aggregated occurrence counts, where each dictionary contains: - 'entity' (str): The normalized entity name - 'type' (str): The entity type (unchanged) - 'occurrence' (int): The summed occurrence count across all duplicates """ aggregated = {} for entity in entities: # Normalize entity name normalized_name = normalize_entity_name(entity['entity']) key = (normalized_name, entity['type']) if key in aggregated: aggregated[key] += entity['occurrence'] else: aggregated[key] = entity['occurrence'] # Convert back to list of dictionaries result = [] for (entity_name, entity_type), count in aggregated.items(): result.append({ 'entity': entity_name, 'type': entity_type, 'occurrence': count }) return result def _build_sentiment_lookup(sentiment_results: list) -> Dict: """Build sentiment lookup dictionary from sentiment aggregation results.""" sentiment_lookup = {} for result in sentiment_results: key = (result["_id"]["entity"], result["_id"]["type"]) sentiment_lookup[key] = round(result["avgSentiment"], 3) return sentiment_lookup def _process_entity_with_sentiment(mentions_result: Dict, sentiment_lookup: Dict) -> Dict[str, Any]: """Process a single entity result and add sentiment information.""" entity_id = mentions_result["_id"] entity_key = (entity_id["entity"], entity_id["type"]) return { "entityName": entity_id["entity"].replace("_", " "), "mentions": mentions_result["mentions"], "sentiment": sentiment_lookup.get(entity_key) } def _get_latest_publish_date_from_collection(collection) -> datetime: """Return the latest publish date found in the specified collection. Parameters ---------- collection: MongoDB collection to query for the latest publishDate. Returns ------- datetime Latest publish date found, or current date if collection is empty. """ latest_doc = collection.find_one( sort=[("publishDate", -1)], projection={"publishDate": 1} ) if latest_doc and "publishDate" in latest_doc: return datetime.strptime(latest_doc["publishDate"], "%Y-%m-%d") return datetime.today() def _time_range(filter_type: str, collection) -> tuple[str, str]: """Calculate *inclusive* start / end date strings using rolling window approach. Uses rolling window logic: - today: only the latest date - weekly: latest date - 6 days (total 7 days) - monthly: latest date - 29 days (total 30 days) Parameters ---------- filter_type: One of ``today``, ``week``/``weekly`` or ``month``/``monthly``. Any unrecognised value will fall back to *all time* where the start date is ``datetime.min``. collection: MongoDB collection to get the latest date from. Returns ------- tuple[str, str] Start and end dates as strings in YYYY-MM-DD format. """ latest_date = _get_latest_publish_date_from_collection(collection) if filter_type in {"today"}: start = latest_date.date() elif filter_type in {"week", "weekly"}: # Latest date minus 6 days (total 7 days) start = (latest_date - timedelta(days=6)).date() elif filter_type in {"month", "monthly"}: # Latest date minus 29 days (total 30 days) start = (latest_date - timedelta(days=29)).date() else: start = datetime.min.date() return str(start), str(latest_date.date()) def get_content_flow_data(time_filter: str) -> Dict[str, Any]: """Return aggregated *Content Flow Tracker* data for the given period. Uses rolling window approach: - today: only the latest date - weekly: latest date - 6 days (total 7 days) - monthly: latest date - 29 days (total 30 days) Parameters ---------- time_filter: Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time). Returns ------- Dict[str, Any] Dictionary containing title, dateRange, and aggregated content flow data. """ start, end = _time_range(time_filter, article_collection) pipeline = [ {"$match": {"publishDate": {"$gte": start, "$lte": end}}}, {"$group": {"_id": {"source": "$site", "category": "$category"}, "count": {"$sum": 1}}}, {"$sort": {"count": -1}}, ] results = list(article_collection.aggregate(pipeline)) data = [ { "category": r["_id"].get("category", "Uncategorized"), "source": r["_id"]["source"], "count": r["count"], } for r in results ] return { "title": f"Content Flow Tracker {time_filter.capitalize()}", "dateRange": {"start": start, "end": end}, "data": data, } def get_entity_analysis_data(time_filter: str) -> Dict[str, Any]: """Return *Entity Analysis* data for the given period with sentiment information. Uses rolling window approach: - today: only the latest date - weekly: latest date - 6 days (total 7 days) - monthly: latest date - 29 days (total 30 days) Parameters ---------- time_filter: Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time). Returns ------- Dict[str, Any] Dictionary containing title, dateRange, and aggregated entity analysis data with sentiment. """ start, end = _time_range(time_filter, entity_collection) # Get mentions count pipeline mentions_pipeline = [ {"$match": {"publishDate": {"$gte": start, "$lte": end}}}, {"$group": {"_id": {"entity": "$entity", "type": "$entityType"}, "mentions": {"$sum": "$occurrence"}}}, {"$sort": {"mentions": -1}}, ] # Get sentiment data pipeline sentiment_pipeline = [ {"$match": { "publishDate": {"$gte": start, "$lte": end}, "sentimentScore": {"$exists": True, "$ne": None} }}, {"$group": { "_id": {"entity": "$entity", "type": "$entityType"}, "avgSentiment": {"$avg": "$sentimentScore"}, }} ] mentions_results = list(entity_collection.aggregate(mentions_pipeline)) sentiment_results = list(entity_collection.aggregate(sentiment_pipeline)) # Filter to only include allowed entity types mentions_results = [r for r in mentions_results if r["_id"]["type"] in ALLOWED_ENTITY_TYPES] sentiment_results = [r for r in sentiment_results if r["_id"]["type"] in ALLOWED_ENTITY_TYPES] # Convert mentions results to format expected by aggregate_entities entities_for_aggregation = [] for result in mentions_results: entities_for_aggregation.append({ 'entity': result['_id']['entity'], 'type': result['_id']['type'], 'occurrence': result['mentions'] }) # Normalize and aggregate entities aggregated_entities = aggregate_entities(entities_for_aggregation) # Rebuild mentions results with normalized names normalized_mentions_results = [] for agg_entity in aggregated_entities: normalized_mentions_results.append({ '_id': {'entity': agg_entity['entity'], 'type': agg_entity['type']}, 'mentions': agg_entity['occurrence'] }) # Rebuild sentiment lookup with normalized names normalized_sentiment_lookup = {} for result in sentiment_results: normalized_name = normalize_entity_name(result["_id"]["entity"]) key = (normalized_name, result["_id"]["type"]) if key in normalized_sentiment_lookup: # Average multiple sentiment scores for the same normalized entity normalized_sentiment_lookup[key] = ( normalized_sentiment_lookup[key] + result["avgSentiment"]) / 2 else: normalized_sentiment_lookup[key] = result["avgSentiment"] entity_types: Dict[str, Any] = {} for mentions_result in normalized_mentions_results: entity_type = mentions_result["_id"]["type"] if entity_type not in entity_types: entity_types[entity_type] = { "fullName": ENTITY_TYPE_FULL_NAMES.get(entity_type, entity_type), "entities": [], } entity_types[entity_type]["entities"].append( _process_entity_with_sentiment(mentions_result, normalized_sentiment_lookup) ) # Keep only the top 10 per type for entity_data in entity_types.values(): entity_data["entities"] = sorted( entity_data["entities"], key=lambda x: -x["mentions"] )[:10] return { "title": f"Top Entities - {time_filter.capitalize()}", "dateRange": {"start": start, "end": end}, "data": entity_types, } def get_sentiment_analysis_data(time_filter: str) -> Dict[str, Any]: """Return aggregated *Sentiment Analysis* data for articles by category for the given period. Uses rolling window approach: - today: only the latest date - weekly: latest date - 6 days (total 7 days) - monthly: latest date - 29 days (total 30 days) Parameters ---------- time_filter: Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time). Returns ------- Dict[str, Any] Dictionary containing title, dateRange, and sentiment data by category and date. """ start, end = _time_range(time_filter, article_collection) # Convert time_filter to match the original logic if time_filter == "today": start_date = datetime.strptime(end, "%Y-%m-%d").date() num_days = 1 elif time_filter in {"week", "weekly"}: start_date = datetime.strptime(start, "%Y-%m-%d").date() num_days = 7 elif time_filter in {"month", "monthly"}: start_date = datetime.strptime(start, "%Y-%m-%d").date() num_days = 30 else: start_date = datetime.strptime(start, "%Y-%m-%d").date() end_date = datetime.strptime(end, "%Y-%m-%d").date() num_days = (end_date - start_date).days + 1 # Query articles with sentiment scores query = { "publishDate": {"$gte": start, "$lte": end}, "sentimentScore": {"$exists": True} } daily_scores = defaultdict(lambda: defaultdict(list)) # Aggregate sentiment scores by category and date for doc in list(article_collection.find(query)): category = doc.get("category", "Unknown") score = doc.get("sentimentScore") if category and score is not None and doc.get("publishDate"): daily_scores[category][doc.get("publishDate")].append(score) # Generate nested data structure: date -> category -> sentiment data = {} for i in range(num_days): day = (start_date + timedelta(days=i)).isoformat() data[day] = {} for category in daily_scores: scores = daily_scores[category].get(day, []) if scores: # This checks if scores is not empty data[day][category] = sum(scores) / len(scores) return { "title": f"Sentiment Analysis by Category {time_filter.capitalize()}", "dateRange": {"start": start, "end": end}, "data": data }