|
"""Module for collecting and managing entity data from DynamoDB to MongoDB.""" |
|
from datetime import datetime, timedelta |
|
from pymongo.errors import PyMongoError |
|
from models.database import entity_collection |
|
|
|
from .utils import scan_dynamodb_table, delete_old_documents, upsert_item |
|
|
|
def _process_entity_item(item): |
|
""" |
|
Process a single entity item by converting data types and upserting to MongoDB. |
|
|
|
Args: |
|
item (dict): The entity item to process |
|
|
|
Raises: |
|
ValueError: If data type conversion fails |
|
KeyError: If required keys are missing |
|
TypeError: If type conversion fails |
|
PyMongoError: If MongoDB operation fails |
|
""" |
|
try: |
|
|
|
item["sentimentScore"] = float(item.get("sentimentScore", 0.0)) |
|
item["confidenceLevel"] = float(item.get("confidenceLevel", 0.0)) |
|
item["occurrence"] = int(item.get("occurrence", 0)) |
|
|
|
item["_id"] = f"{item.get('entity', '')}-{item.get('articleID', '')}" |
|
|
|
|
|
upsert_item(entity_collection, item) |
|
|
|
print(f"Successfully processed item: {item['_id']}") |
|
|
|
except (ValueError, KeyError, TypeError, PyMongoError) as e: |
|
print(f"Error processing item with _id {item.get('_id', 'unknown')}: {e}") |
|
|
|
|
|
def upsert_documents(filter_date): |
|
""" |
|
Scan and upsert entity documents from DynamoDB article_entity_china table to MongoDB collection. |
|
|
|
This function scans the DynamoDB table for entity records published on or after the specified |
|
filter date, processes each item by converting data types and creating composite IDs, then |
|
upserts them into the MongoDB collection. The operation handles pagination automatically using |
|
LastEvaluatedKey from DynamoDB scan responses. |
|
|
|
Args: |
|
filter_date (str): The minimum publish date in 'YYYY-MM-DD' format to filter entity records. |
|
|
|
Raises: |
|
ClientError: If there is an error during DynamoDB scan operation |
|
BotoCoreError: If there is a boto3 core error |
|
|
|
Note: |
|
- Converts sentimentScore and confidenceLevel to float type |
|
- Converts occurrence to int type |
|
- Creates composite _id using format: '{entity}-{articleID}' |
|
- Uses upsert operation to avoid duplicates |
|
""" |
|
scan_dynamodb_table('article_entity_china', filter_date, _process_entity_item) |
|
|
|
|
|
def collect(): |
|
""" |
|
Main collection function that orchestrates the entity data collection process. |
|
|
|
This function performs a complete entity data collection cycle by: |
|
1. Calculating a filter date (30 days ago from current date) |
|
2. Scanning and upserting recent entity records from DynamoDB to MongoDB |
|
3. Deleting entity records older than 30 days from MongoDB to maintain data freshness |
|
|
|
The function maintains a rolling 30-day window of entity data, ensuring that |
|
only recent and relevant entity associations are kept in the MongoDB collection. |
|
|
|
Args: |
|
None |
|
|
|
Returns: |
|
None |
|
|
|
Raises: |
|
Exception: Propagates any exceptions from upsert_documents or delete_old_documents. |
|
|
|
Note: |
|
This function is typically called as part of a scheduled data collection process. |
|
The timezone consideration (HKT) is noted in the original comment but uses system time. |
|
""" |
|
|
|
thirty_days_ago = datetime.now() - timedelta(days=30) |
|
filter_date = thirty_days_ago.strftime('%Y-%m-%d') |
|
|
|
|
|
upsert_documents(filter_date) |
|
|
|
|
|
delete_old_documents(entity_collection, filter_date) |
|
|