|
"""Utility functions for Summary calculations. |
|
This module contains utility functions for both Content Flow Tracker and Entity Analysis, |
|
extracted and merged from the previous content/utils.py and entity/utils.py files. |
|
""" |
|
import json |
|
from datetime import datetime, timedelta |
|
from typing import Dict, Any, List |
|
from collections import defaultdict |
|
|
|
from models.database import article_collection, entity_collection |
|
|
|
|
|
ENTITY_TYPE_FULL_NAMES = { |
|
"GPE": "Geopolitical Entities (Countries/Cities)", |
|
"LOC": "Locations (Non-political)", |
|
"ORG": "Organizations", |
|
"PERSON": "People", |
|
"PROD": "Products", |
|
"PRODUCT": "Products", |
|
"PRODCAT": "Product Categories", |
|
"PRODUCT_CATEGORY": "Product Categories", |
|
"COMPANY": "Companies", |
|
"FINANCIAL_ASSET": "Financial Assets", |
|
"ECONOMIC_INDICATOR": "Economic Indicators", |
|
"EVENT": "Events", |
|
"LANGUAGE": "Languages", |
|
"NORP": "Nationalities/Religious/Political Groups", |
|
"LAW": "Laws/Legal Documents", |
|
"FAC": "Facilities/Landmarks", |
|
"INDUSTRY": "Industries", |
|
} |
|
|
|
|
|
ALLOWED_ENTITY_TYPES = { |
|
"GPE", "ORG", "PERSON", "COMPANY", |
|
"FINANCIAL_ASSET", "ECONOMIC_INDICATOR", "INDUSTRY" |
|
} |
|
|
|
|
|
def _load_entity_mapping() -> Dict[str, str]: |
|
"""Load entity normalization mapping from JSON file.""" |
|
try: |
|
with open("mapping.json", 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
except FileNotFoundError: |
|
return {} |
|
|
|
|
|
def normalize_entity_name(entity_name: str) -> str: |
|
""" |
|
Normalize entity names using the mapping file. |
|
|
|
Parameters |
|
---------- |
|
entity_name : str |
|
The original entity name to normalize. |
|
|
|
Returns |
|
------- |
|
str |
|
The normalized entity name, or original if no mapping found. |
|
""" |
|
if not entity_name: |
|
return entity_name |
|
|
|
|
|
normalized = str(entity_name).strip() |
|
|
|
|
|
normalized = normalized.replace("U.S.", "US") |
|
normalized = normalized.replace("consumer price index", "CPI") |
|
normalized = normalized.replace("Gross Domestic Product", "GDP") |
|
|
|
|
|
mapping = _load_entity_mapping() |
|
return mapping.get(normalized, normalized) |
|
|
|
|
|
def aggregate_entities(entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
""" |
|
Aggregate duplicate entities by summing their occurrence counts. |
|
|
|
Parameters |
|
---------- |
|
entities : List[Dict[str, Any]] |
|
A list of entity dictionaries where each dictionary must contain: |
|
- 'entity' (str): The name of the entity |
|
- 'type' (str): The type/category of the entity |
|
- 'occurrence' (int): The count of occurrences for this entity |
|
|
|
Returns |
|
------- |
|
List[Dict[str, Any]] |
|
A list of unique entity dictionaries with aggregated occurrence counts, |
|
where each dictionary contains: |
|
- 'entity' (str): The normalized entity name |
|
- 'type' (str): The entity type (unchanged) |
|
- 'occurrence' (int): The summed occurrence count across all duplicates |
|
""" |
|
aggregated = {} |
|
|
|
for entity in entities: |
|
|
|
normalized_name = normalize_entity_name(entity['entity']) |
|
key = (normalized_name, entity['type']) |
|
|
|
if key in aggregated: |
|
aggregated[key] += entity['occurrence'] |
|
else: |
|
aggregated[key] = entity['occurrence'] |
|
|
|
|
|
result = [] |
|
for (entity_name, entity_type), count in aggregated.items(): |
|
result.append({ |
|
'entity': entity_name, |
|
'type': entity_type, |
|
'occurrence': count |
|
}) |
|
|
|
return result |
|
|
|
|
|
def _build_sentiment_lookup(sentiment_results: list) -> Dict: |
|
"""Build sentiment lookup dictionary from sentiment aggregation results.""" |
|
sentiment_lookup = {} |
|
for result in sentiment_results: |
|
key = (result["_id"]["entity"], result["_id"]["type"]) |
|
sentiment_lookup[key] = round(result["avgSentiment"], 3) |
|
return sentiment_lookup |
|
|
|
|
|
def _process_entity_with_sentiment(mentions_result: Dict, sentiment_lookup: Dict) -> Dict[str, Any]: |
|
"""Process a single entity result and add sentiment information.""" |
|
entity_id = mentions_result["_id"] |
|
entity_key = (entity_id["entity"], entity_id["type"]) |
|
|
|
return { |
|
"entityName": entity_id["entity"].replace("_", " "), |
|
"mentions": mentions_result["mentions"], |
|
"sentiment": sentiment_lookup.get(entity_key) |
|
} |
|
|
|
|
|
def _get_latest_publish_date_from_collection(collection) -> datetime: |
|
"""Return the latest publish date found in the specified collection. |
|
|
|
Parameters |
|
---------- |
|
collection: |
|
MongoDB collection to query for the latest publishDate. |
|
|
|
Returns |
|
------- |
|
datetime |
|
Latest publish date found, or current date if collection is empty. |
|
""" |
|
latest_doc = collection.find_one( |
|
sort=[("publishDate", -1)], projection={"publishDate": 1} |
|
) |
|
if latest_doc and "publishDate" in latest_doc: |
|
return datetime.strptime(latest_doc["publishDate"], "%Y-%m-%d") |
|
return datetime.today() |
|
|
|
|
|
def _time_range(filter_type: str, collection) -> tuple[str, str]: |
|
"""Calculate *inclusive* start / end date strings using rolling window approach. |
|
|
|
Uses rolling window logic: |
|
- today: only the latest date |
|
- weekly: latest date - 6 days (total 7 days) |
|
- monthly: latest date - 29 days (total 30 days) |
|
|
|
Parameters |
|
---------- |
|
filter_type: |
|
One of ``today``, ``week``/``weekly`` or ``month``/``monthly``. Any |
|
unrecognised value will fall back to *all time* where the start date is |
|
``datetime.min``. |
|
collection: |
|
MongoDB collection to get the latest date from. |
|
|
|
Returns |
|
------- |
|
tuple[str, str] |
|
Start and end dates as strings in YYYY-MM-DD format. |
|
""" |
|
latest_date = _get_latest_publish_date_from_collection(collection) |
|
|
|
if filter_type in {"today"}: |
|
start = latest_date.date() |
|
elif filter_type in {"week", "weekly"}: |
|
|
|
start = (latest_date - timedelta(days=6)).date() |
|
elif filter_type in {"month", "monthly"}: |
|
|
|
start = (latest_date - timedelta(days=29)).date() |
|
else: |
|
start = datetime.min.date() |
|
|
|
return str(start), str(latest_date.date()) |
|
|
|
|
|
|
|
def get_content_flow_data(time_filter: str) -> Dict[str, Any]: |
|
"""Return aggregated *Content Flow Tracker* data for the given period. |
|
|
|
Uses rolling window approach: |
|
- today: only the latest date |
|
- weekly: latest date - 6 days (total 7 days) |
|
- monthly: latest date - 29 days (total 30 days) |
|
|
|
Parameters |
|
---------- |
|
time_filter: |
|
Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time). |
|
|
|
Returns |
|
------- |
|
Dict[str, Any] |
|
Dictionary containing title, dateRange, and aggregated content flow data. |
|
""" |
|
start, end = _time_range(time_filter, article_collection) |
|
|
|
pipeline = [ |
|
{"$match": {"publishDate": {"$gte": start, "$lte": end}}}, |
|
{"$group": {"_id": {"source": "$site", "category": "$category"}, "count": {"$sum": 1}}}, |
|
{"$sort": {"count": -1}}, |
|
] |
|
|
|
results = list(article_collection.aggregate(pipeline)) |
|
|
|
data = [ |
|
{ |
|
"category": r["_id"].get("category", "Uncategorized"), |
|
"source": r["_id"]["source"], |
|
"count": r["count"], |
|
} |
|
for r in results |
|
] |
|
|
|
return { |
|
"title": f"Content Flow Tracker {time_filter.capitalize()}", |
|
"dateRange": {"start": start, "end": end}, |
|
"data": data, |
|
} |
|
|
|
|
|
def get_entity_analysis_data(time_filter: str) -> Dict[str, Any]: |
|
"""Return *Entity Analysis* data for the given period with sentiment information. |
|
|
|
Uses rolling window approach: |
|
- today: only the latest date |
|
- weekly: latest date - 6 days (total 7 days) |
|
- monthly: latest date - 29 days (total 30 days) |
|
|
|
Parameters |
|
---------- |
|
time_filter: |
|
Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time). |
|
|
|
Returns |
|
------- |
|
Dict[str, Any] |
|
Dictionary containing title, dateRange, and aggregated entity analysis data with sentiment. |
|
""" |
|
start, end = _time_range(time_filter, entity_collection) |
|
|
|
|
|
mentions_pipeline = [ |
|
{"$match": {"publishDate": {"$gte": start, "$lte": end}}}, |
|
{"$group": {"_id": {"entity": "$entity", "type": "$entityType"}, |
|
"mentions": {"$sum": "$occurrence"}}}, |
|
{"$sort": {"mentions": -1}}, |
|
] |
|
|
|
|
|
sentiment_pipeline = [ |
|
{"$match": { |
|
"publishDate": {"$gte": start, "$lte": end}, |
|
"sentimentScore": {"$exists": True, "$ne": None} |
|
}}, |
|
{"$group": { |
|
"_id": {"entity": "$entity", "type": "$entityType"}, |
|
"avgSentiment": {"$avg": "$sentimentScore"}, |
|
|
|
}} |
|
] |
|
|
|
mentions_results = list(entity_collection.aggregate(mentions_pipeline)) |
|
sentiment_results = list(entity_collection.aggregate(sentiment_pipeline)) |
|
|
|
|
|
mentions_results = [r for r in mentions_results if r["_id"]["type"] in ALLOWED_ENTITY_TYPES] |
|
sentiment_results = [r for r in sentiment_results if r["_id"]["type"] in ALLOWED_ENTITY_TYPES] |
|
|
|
|
|
entities_for_aggregation = [] |
|
for result in mentions_results: |
|
entities_for_aggregation.append({ |
|
'entity': result['_id']['entity'], |
|
'type': result['_id']['type'], |
|
'occurrence': result['mentions'] |
|
}) |
|
|
|
|
|
aggregated_entities = aggregate_entities(entities_for_aggregation) |
|
|
|
|
|
normalized_mentions_results = [] |
|
for agg_entity in aggregated_entities: |
|
normalized_mentions_results.append({ |
|
'_id': {'entity': agg_entity['entity'], 'type': agg_entity['type']}, |
|
'mentions': agg_entity['occurrence'] |
|
}) |
|
|
|
|
|
normalized_sentiment_lookup = {} |
|
for result in sentiment_results: |
|
normalized_name = normalize_entity_name(result["_id"]["entity"]) |
|
key = (normalized_name, result["_id"]["type"]) |
|
if key in normalized_sentiment_lookup: |
|
|
|
normalized_sentiment_lookup[key] = ( |
|
normalized_sentiment_lookup[key] + result["avgSentiment"]) / 2 |
|
else: |
|
normalized_sentiment_lookup[key] = result["avgSentiment"] |
|
|
|
entity_types: Dict[str, Any] = {} |
|
for mentions_result in normalized_mentions_results: |
|
entity_type = mentions_result["_id"]["type"] |
|
|
|
if entity_type not in entity_types: |
|
entity_types[entity_type] = { |
|
"fullName": ENTITY_TYPE_FULL_NAMES.get(entity_type, entity_type), |
|
"entities": [], |
|
} |
|
|
|
entity_types[entity_type]["entities"].append( |
|
_process_entity_with_sentiment(mentions_result, normalized_sentiment_lookup) |
|
) |
|
|
|
|
|
for entity_data in entity_types.values(): |
|
entity_data["entities"] = sorted( |
|
entity_data["entities"], key=lambda x: -x["mentions"] |
|
)[:10] |
|
|
|
return { |
|
"title": f"Top Entities - {time_filter.capitalize()}", |
|
"dateRange": {"start": start, "end": end}, |
|
"data": entity_types, |
|
} |
|
|
|
def get_sentiment_analysis_data(time_filter: str) -> Dict[str, Any]: |
|
"""Return aggregated *Sentiment Analysis* data for articles by category for the given period. |
|
|
|
Uses rolling window approach: |
|
- today: only the latest date |
|
- weekly: latest date - 6 days (total 7 days) |
|
- monthly: latest date - 29 days (total 30 days) |
|
|
|
Parameters |
|
---------- |
|
time_filter: |
|
Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time). |
|
|
|
Returns |
|
------- |
|
Dict[str, Any] |
|
Dictionary containing title, dateRange, and sentiment data by category and date. |
|
""" |
|
start, end = _time_range(time_filter, article_collection) |
|
|
|
|
|
if time_filter == "today": |
|
start_date = datetime.strptime(end, "%Y-%m-%d").date() |
|
num_days = 1 |
|
elif time_filter in {"week", "weekly"}: |
|
start_date = datetime.strptime(start, "%Y-%m-%d").date() |
|
num_days = 7 |
|
elif time_filter in {"month", "monthly"}: |
|
start_date = datetime.strptime(start, "%Y-%m-%d").date() |
|
num_days = 30 |
|
else: |
|
start_date = datetime.strptime(start, "%Y-%m-%d").date() |
|
end_date = datetime.strptime(end, "%Y-%m-%d").date() |
|
num_days = (end_date - start_date).days + 1 |
|
|
|
|
|
query = { |
|
"publishDate": {"$gte": start, "$lte": end}, |
|
"sentimentScore": {"$exists": True} |
|
} |
|
|
|
daily_scores = defaultdict(lambda: defaultdict(list)) |
|
|
|
|
|
for doc in list(article_collection.find(query)): |
|
category = doc.get("category", "Unknown") |
|
score = doc.get("sentimentScore") |
|
if category and score is not None and doc.get("publishDate"): |
|
daily_scores[category][doc.get("publishDate")].append(score) |
|
|
|
|
|
data = {} |
|
for i in range(num_days): |
|
day = (start_date + timedelta(days=i)).isoformat() |
|
data[day] = {} |
|
for category in daily_scores: |
|
scores = daily_scores[category].get(day, []) |
|
if scores: |
|
data[day][category] = sum(scores) / len(scores) |
|
|
|
return { |
|
"title": f"Sentiment Analysis by Category {time_filter.capitalize()}", |
|
"dateRange": {"start": start, "end": end}, |
|
"data": data |
|
} |
|
|