"""Utilis Functions""" import os import glob import boto3 import pandas as pd AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") #Downloads .parquet files from an S3 bucket and concatenates them into a single pandas DataFrame. def download_files_from_s3(folder): """Download Data Files""" if not os.path.exists(folder): os.makedirs(folder) client = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) response = client.list_objects_v2(Bucket='finfast-china', Prefix=f"{folder}/") for obj in response['Contents']: key = obj['Key'] if key.endswith('.parquet'): client.download_file('finfast-china', key, key) file_paths = glob.glob(os.path.join(folder, '*.parquet')) return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True) # Returns a DynamoDB client connection. def get_client_connection(): """Get dynamoDB connection""" dynamodb = boto3.client( service_name='dynamodb', region_name='us-east-1', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) return dynamodb # Updates an entity in the specified DynamoDB table. def update_entity(table_name, row): """update entity data to db""" dynamodb = get_client_connection() response = dynamodb.update_item( TableName=table_name, Key={ 'entity': {'S': row['entity']}, 'entityType': {'S': row['entitytype']} }, UpdateExpression='SET total_occurrence = :total_occurrence', ExpressionAttributeValues={ ':total_occurrence': {'N': str(row['total_occurrence'])} } ) print(response) # Updates a category in the specified DynamoDB table. def update_category(table_name, row): """update category data to db""" dynamodb = get_client_connection() response = dynamodb.update_item( TableName=table_name, Key={ 'site': {'S': row['site']}, 'category': {'S': row['category']} }, UpdateExpression='SET cnt = :cnt', ExpressionAttributeValues={ ':cnt': {'N': str(row['count'])}, } ) print(response) # Deletes an entity from the specified DynamoDB table. def delete_entity(table_name, row): """delete entity from db""" dynamodb = get_client_connection() dynamodb.delete_item( TableName=table_name, Key={ 'entity': {'S': row['entity']}, 'entityType': {'S': row['entityType']} } ) # Scans the specified DynamoDB table and deletes all items. def scan(table_name): """scan record from db""" dynamodb = get_client_connection() response = dynamodb.scan(TableName=table_name) while 'LastEvaluatedKey' in response: for item in response['Items']: dynamodb.delete_item( TableName=table_name, Key=dict(item) ) response = dynamodb.scan(TableName=table_name, ExclusiveStartKey=response['LastEvaluatedKey']) return response