|
"""Module for collecting and managing article data from DynamoDB to MongoDB.""" |
|
from venv import logger |
|
from datetime import datetime, timedelta |
|
from pymongo.errors import PyMongoError |
|
from models.database import article_collection |
|
|
|
from .utils import scan_dynamodb_table, delete_old_documents, upsert_item |
|
|
|
def _process_article_item(item): |
|
""" |
|
Process a single article item by converting data types and upserting to MongoDB. |
|
|
|
Args: |
|
item (dict): The article item to process |
|
|
|
Raises: |
|
ValueError: If data type conversion fails |
|
KeyError: If required keys are missing |
|
TypeError: If type conversion fails |
|
PyMongoError: If MongoDB operation fails |
|
""" |
|
try: |
|
|
|
item["sentimentScore"] = float(item.get("sentimentScore", 0.0)) |
|
|
|
item["_id"] = item.pop("id", None) |
|
|
|
|
|
if "entityList" not in item or not isinstance(item["entityList"], list): |
|
return |
|
for entity in item["entityList"]: |
|
if isinstance(entity, dict): |
|
if "sentimentScore" in entity: |
|
try: |
|
entity["sentimentScore"] = float(entity["sentimentScore"]) |
|
except (ValueError, TypeError): |
|
entity["sentimentScore"] = 0.0 |
|
if "occurrence" in entity: |
|
try: |
|
entity["occurrence"] = float(entity["occurrence"]) |
|
except (ValueError, TypeError): |
|
entity["occurrence"] = 0.0 |
|
|
|
|
|
upsert_item(article_collection, item) |
|
|
|
except (ValueError, KeyError, TypeError, PyMongoError) as e: |
|
logger.error("Error processing item with _id %s: %s", |
|
item.get('_id', 'unknown'), e) |
|
|
|
|
|
def upsert_documents(filter_date): |
|
""" |
|
Scan and upsert documents from DynamoDB Article_China table to MongoDB collection. |
|
|
|
This function scans the DynamoDB table for articles published on or after the specified |
|
filter date, processes each item by converting data types and field names, then upserts |
|
them into the MongoDB collection. The operation handles pagination automatically using |
|
LastEvaluatedKey from DynamoDB scan responses. |
|
|
|
Args: |
|
filter_date (str): The minimum publish date in 'YYYY-MM-DD' format to filter articles. |
|
|
|
Raises: |
|
ClientError: If there is an error during DynamoDB scan operation |
|
BotoCoreError: If there is a boto3 core error |
|
|
|
Note: |
|
- Converts sentimentScore to float type |
|
- Renames 'id' field to '_id' for MongoDB compatibility |
|
- Uses upsert operation to avoid duplicates |
|
""" |
|
scan_dynamodb_table("Article_China", filter_date, _process_article_item) |
|
|
|
|
|
def collect(): |
|
""" |
|
Main collection function that orchestrates the article data collection process. |
|
|
|
This function performs a complete data collection cycle by: |
|
1. Calculating a filter date (30 days ago from current date) |
|
2. Scanning and upserting recent articles from DynamoDB to MongoDB |
|
3. Deleting articles older than 30 days from MongoDB to maintain data freshness |
|
|
|
The function maintains a rolling 30-day window of article data, ensuring that |
|
only recent and relevant articles are kept in the MongoDB collection. |
|
|
|
Args: |
|
None |
|
|
|
Returns: |
|
None |
|
|
|
Raises: |
|
Exception: Propagates any exceptions from upsert_documents or delete_old_documents. |
|
|
|
Note: |
|
This function is typically called as part of a scheduled data collection process. |
|
""" |
|
|
|
sixty_days_ago = datetime.now() - timedelta(days=60) |
|
filter_date = sixty_days_ago.strftime('%Y-%m-%d') |
|
|
|
|
|
upsert_documents(filter_date) |
|
|
|
|
|
delete_old_documents(article_collection, filter_date, logger) |
|
|