"""Utility functions for FinFast data collection from DynamoDB to MongoDB""" import os import boto3 from boto3.dynamodb.types import TypeDeserializer from pymongo.errors import PyMongoError from botocore.exceptions import ClientError, BotoCoreError def get_dynamodb_client(): """ Create and return a DynamoDB client using AWS credentials from environment variables. Returns: boto3.client: Configured DynamoDB client """ aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') return boto3.client( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key ) def scan_dynamodb_table(table_name, filter_date, item_processor): """ Scan a DynamoDB table and process items using the provided processor function. Args: table_name (str): Name of the DynamoDB table to scan filter_date (str): Filter date in 'YYYY-MM-DD' format item_processor (callable): Function to process each item Raises: ClientError: If there is an error during DynamoDB scan operation BotoCoreError: If there is a boto3 core error """ dynamodb = get_dynamodb_client() last_evaluated_key = None deserializer = TypeDeserializer() try: while True: # Prepare scan parameters scan_params = { 'TableName': table_name, 'FilterExpression': 'publishDate >= :date', 'ExpressionAttributeValues': { ':date': {'S': filter_date} } } # Add ExclusiveStartKey if it exists if last_evaluated_key: scan_params['ExclusiveStartKey'] = last_evaluated_key # Scan the table response = dynamodb.scan(**scan_params) items = [ {k: deserializer.deserialize(v) for k, v in item.items()} for item in response.get('Items', []) ] # Process items using the provided processor for item in items: item_processor(item) last_evaluated_key = response.get('LastEvaluatedKey') if not last_evaluated_key: break except (ClientError, BotoCoreError) as e: print(f"Error in scan operation: {e}") raise def delete_old_documents(collection, cutoff_date, use_logger=None): """ Delete documents from MongoDB collection that are older than the specified cutoff date. Args: collection: MongoDB collection object cutoff_date (str): The cutoff date in 'YYYY-MM-DD' format use_logger: Optional logger object. If provided, will use logger, otherwise use print Raises: PyMongoError: If there is an error during the delete operation """ try: result = collection.delete_many({ 'publishDate': {'$lt': cutoff_date} }) message = f"Deleted {result.deleted_count} documents older than {cutoff_date}" if use_logger: use_logger.info("Deleted %d documents older than %s", result.deleted_count, cutoff_date) else: print(message) except PyMongoError as e: error_message = f"Error deleting old documents: {e}" if use_logger: use_logger.error("Error deleting old documents: %s", e) else: print(error_message) raise def upsert_item(collection, item): """Helper function to upsert an item into a MongoDB collection.""" collection.update_one( {'_id': item['_id']}, {'$set': item}, upsert=True ) print(f"Successfully processed item: {item['_id']}")