Weirui-Leo commited on
Commit
754cdf8
·
1 Parent(s): 9a5a0a3

fix: added finfast collectors

Browse files
app/collectors/finfast/__init__.py ADDED
File without changes
app/collectors/finfast/article.py ADDED
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Module for collecting and managing article data from DynamoDB to MongoDB."""
2
+ from venv import logger
3
+ from datetime import datetime, timedelta
4
+ from database import FinFastMongoClient as MongodbClient
5
+ from pymongo.errors import PyMongoError
6
+ from .utils import scan_dynamodb_table, delete_old_documents
7
+
8
+ collection = MongodbClient["FinFAST_China"]["Article"]
9
+
10
+
11
+ def _process_article_item(item):
12
+ """
13
+ Process a single article item by converting data types and upserting to MongoDB.
14
+
15
+ Args:
16
+ item (dict): The article item to process
17
+
18
+ Raises:
19
+ ValueError: If data type conversion fails
20
+ KeyError: If required keys are missing
21
+ TypeError: If type conversion fails
22
+ PyMongoError: If MongoDB operation fails
23
+ """
24
+ try:
25
+ # Convert sentimentScore to float
26
+ item["sentimentScore"] = float(item.get("sentimentScore", 0.0))
27
+ # Set _id and remove id
28
+ item["_id"] = item.pop("id", None)
29
+
30
+ # Convert entityList inner values to float (for MongoDB compatibility)
31
+ if "entityList" in item and isinstance(item["entityList"], list):
32
+ for entity in item["entityList"]:
33
+ if isinstance(entity, dict):
34
+ if "sentimentScore" in entity:
35
+ try:
36
+ entity["sentimentScore"] = float(entity["sentimentScore"])
37
+ except (ValueError, TypeError):
38
+ entity["sentimentScore"] = 0.0
39
+ if "occurrence" in entity:
40
+ try:
41
+ entity["occurrence"] = float(entity["occurrence"])
42
+ except (ValueError, TypeError):
43
+ entity["occurrence"] = 0.0
44
+
45
+ # Upsert into MongoDB
46
+ collection.update_one(
47
+ {'_id': item['_id']},
48
+ {'$set': item},
49
+ upsert=True
50
+ )
51
+ print(f"Successfully processed item: {item['_id']}")
52
+
53
+ except (ValueError, KeyError, TypeError, PyMongoError) as e:
54
+ logger.error("Error processing item with _id %s: %s",
55
+ item.get('_id', 'unknown'), e)
56
+
57
+
58
+ def upsert_documents(filter_date):
59
+ """
60
+ Scan and upsert documents from DynamoDB Article_China table to MongoDB collection.
61
+
62
+ This function scans the DynamoDB table for articles published on or after the specified
63
+ filter date, processes each item by converting data types and field names, then upserts
64
+ them into the MongoDB collection. The operation handles pagination automatically using
65
+ LastEvaluatedKey from DynamoDB scan responses.
66
+
67
+ Args:
68
+ filter_date (str): The minimum publish date in 'YYYY-MM-DD' format to filter articles.
69
+
70
+ Raises:
71
+ ClientError: If there is an error during DynamoDB scan operation
72
+ BotoCoreError: If there is a boto3 core error
73
+
74
+ Note:
75
+ - Converts sentimentScore to float type
76
+ - Renames 'id' field to '_id' for MongoDB compatibility
77
+ - Uses upsert operation to avoid duplicates
78
+ """
79
+ scan_dynamodb_table("Article_China", filter_date, _process_article_item)
80
+
81
+
82
+ def collect():
83
+ """
84
+ Main collection function that orchestrates the article data collection process.
85
+
86
+ This function performs a complete data collection cycle by:
87
+ 1. Calculating a filter date (30 days ago from current date)
88
+ 2. Scanning and upserting recent articles from DynamoDB to MongoDB
89
+ 3. Deleting articles older than 30 days from MongoDB to maintain data freshness
90
+
91
+ The function maintains a rolling 30-day window of article data, ensuring that
92
+ only recent and relevant articles are kept in the MongoDB collection.
93
+
94
+ Args:
95
+ None
96
+
97
+ Returns:
98
+ None
99
+
100
+ Raises:
101
+ Exception: Propagates any exceptions from upsert_documents or delete_old_documents.
102
+
103
+ Note:
104
+ This function is typically called as part of a scheduled data collection process.
105
+ """
106
+ # Calculate date 60 days ago
107
+ sixty_days_ago = datetime.now() - timedelta(days=60)
108
+ filter_date = sixty_days_ago.strftime('%Y-%m-%d')
109
+
110
+ # Scan and process items
111
+ upsert_documents(filter_date)
112
+
113
+ # Delete documents older than 60 days
114
+ delete_old_documents(collection, filter_date, logger)
app/collectors/finfast/entity.py ADDED
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Module for collecting and managing entity data from DynamoDB to MongoDB."""
2
+ from datetime import datetime, timedelta
3
+ from database import FinFastMongoClient as MongodbClient
4
+ from pymongo.errors import PyMongoError
5
+ from .utils import scan_dynamodb_table, delete_old_documents
6
+
7
+
8
+ entity_collection = MongodbClient["FinFAST_China"]["Entity"]
9
+
10
+
11
+ def _process_entity_item(item):
12
+ """
13
+ Process a single entity item by converting data types and upserting to MongoDB.
14
+
15
+ Args:
16
+ item (dict): The entity item to process
17
+
18
+ Raises:
19
+ ValueError: If data type conversion fails
20
+ KeyError: If required keys are missing
21
+ TypeError: If type conversion fails
22
+ PyMongoError: If MongoDB operation fails
23
+ """
24
+ try:
25
+ # Convert fields to appropriate types
26
+ item["sentimentScore"] = float(item.get("sentimentScore", 0.0))
27
+ item["confidenceLevel"] = float(item.get("confidenceLevel", 0.0))
28
+ item["occurrence"] = int(item.get("occurrence", 0))
29
+ # Create composite _id
30
+ item["_id"] = f"{item.get('entity', '')}-{item.get('articleID', '')}"
31
+
32
+ # Upsert into MongoDB
33
+ entity_collection.update_one(
34
+ {'_id': item['_id']},
35
+ {'$set': item},
36
+ upsert=True
37
+ )
38
+ print(f"Successfully processed item: {item['_id']}")
39
+
40
+ except (ValueError, KeyError, TypeError, PyMongoError) as e:
41
+ print(f"Error processing item with _id {item.get('_id', 'unknown')}: {e}")
42
+
43
+
44
+ def upsert_documents(filter_date):
45
+ """
46
+ Scan and upsert entity documents from DynamoDB article_entity_china table to MongoDB collection.
47
+
48
+ This function scans the DynamoDB table for entity records published on or after the specified
49
+ filter date, processes each item by converting data types and creating composite IDs, then
50
+ upserts them into the MongoDB collection. The operation handles pagination automatically using
51
+ LastEvaluatedKey from DynamoDB scan responses.
52
+
53
+ Args:
54
+ filter_date (str): The minimum publish date in 'YYYY-MM-DD' format to filter entity records.
55
+
56
+ Raises:
57
+ ClientError: If there is an error during DynamoDB scan operation
58
+ BotoCoreError: If there is a boto3 core error
59
+
60
+ Note:
61
+ - Converts sentimentScore and confidenceLevel to float type
62
+ - Converts occurrence to int type
63
+ - Creates composite _id using format: '{entity}-{articleID}'
64
+ - Uses upsert operation to avoid duplicates
65
+ """
66
+ scan_dynamodb_table('article_entity_china', filter_date, _process_entity_item)
67
+
68
+
69
+ def collect():
70
+ """
71
+ Main collection function that orchestrates the entity data collection process.
72
+
73
+ This function performs a complete entity data collection cycle by:
74
+ 1. Calculating a filter date (30 days ago from current date)
75
+ 2. Scanning and upserting recent entity records from DynamoDB to MongoDB
76
+ 3. Deleting entity records older than 30 days from MongoDB to maintain data freshness
77
+
78
+ The function maintains a rolling 30-day window of entity data, ensuring that
79
+ only recent and relevant entity associations are kept in the MongoDB collection.
80
+
81
+ Args:
82
+ None
83
+
84
+ Returns:
85
+ None
86
+
87
+ Raises:
88
+ Exception: Propagates any exceptions from upsert_documents or delete_old_documents.
89
+
90
+ Note:
91
+ This function is typically called as part of a scheduled data collection process.
92
+ The timezone consideration (HKT) is noted in the original comment but uses system time.
93
+ """
94
+ # Calculate date 30 days ago (accounting for HKT timezone)
95
+ thirty_days_ago = datetime.now() - timedelta(days=30)
96
+ filter_date = thirty_days_ago.strftime('%Y-%m-%d')
97
+
98
+ # Scan and process items
99
+ upsert_documents(filter_date)
100
+
101
+ # Delete documents older than 30 days
102
+ delete_old_documents(entity_collection, filter_date)
103
+
app/collectors/finfast/utils.py ADDED
@@ -0,0 +1,107 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Utility functions for FinFast data collection from DynamoDB to MongoDB."""
2
+
3
+ import os
4
+ import boto3
5
+ from boto3.dynamodb.types import TypeDeserializer
6
+ from pymongo.errors import PyMongoError
7
+ from botocore.exceptions import ClientError, BotoCoreError
8
+
9
+
10
+ def get_dynamodb_client():
11
+ """
12
+ Create and return a DynamoDB client using AWS credentials from environment variables.
13
+
14
+ Returns:
15
+ boto3.client: Configured DynamoDB client
16
+ """
17
+ aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
18
+ aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
19
+
20
+ return boto3.client(
21
+ service_name='dynamodb',
22
+ region_name='us-east-1',
23
+ aws_access_key_id=aws_access_key_id,
24
+ aws_secret_access_key=aws_secret_access_key
25
+ )
26
+
27
+
28
+ def scan_dynamodb_table(table_name, filter_date, item_processor):
29
+ """
30
+ Scan a DynamoDB table and process items using the provided processor function.
31
+
32
+ Args:
33
+ table_name (str): Name of the DynamoDB table to scan
34
+ filter_date (str): Filter date in 'YYYY-MM-DD' format
35
+ item_processor (callable): Function to process each item
36
+
37
+ Raises:
38
+ ClientError: If there is an error during DynamoDB scan operation
39
+ BotoCoreError: If there is a boto3 core error
40
+ """
41
+ dynamodb = get_dynamodb_client()
42
+ last_evaluated_key = None
43
+ deserializer = TypeDeserializer()
44
+
45
+ try:
46
+ while True:
47
+ # Prepare scan parameters
48
+ scan_params = {
49
+ 'TableName': table_name,
50
+ 'FilterExpression': 'publishDate >= :date',
51
+ 'ExpressionAttributeValues': {
52
+ ':date': {'S': filter_date}
53
+ }
54
+ }
55
+
56
+ # Add ExclusiveStartKey if it exists
57
+ if last_evaluated_key:
58
+ scan_params['ExclusiveStartKey'] = last_evaluated_key
59
+
60
+ # Scan the table
61
+ response = dynamodb.scan(**scan_params)
62
+ items = [
63
+ {k: deserializer.deserialize(v) for k, v in item.items()}
64
+ for item in response.get('Items', [])
65
+ ]
66
+
67
+ # Process items using the provided processor
68
+ for item in items:
69
+ item_processor(item)
70
+
71
+ last_evaluated_key = response.get('LastEvaluatedKey')
72
+ if not last_evaluated_key:
73
+ break
74
+
75
+ except (ClientError, BotoCoreError) as e:
76
+ print(f"Error in scan operation: {e}")
77
+ raise
78
+
79
+
80
+ def delete_old_documents(collection, cutoff_date, use_logger=None):
81
+ """
82
+ Delete documents from MongoDB collection that are older than the specified cutoff date.
83
+
84
+ Args:
85
+ collection: MongoDB collection object
86
+ cutoff_date (str): The cutoff date in 'YYYY-MM-DD' format
87
+ use_logger: Optional logger object. If provided, will use logger, otherwise use print
88
+
89
+ Raises:
90
+ PyMongoError: If there is an error during the delete operation
91
+ """
92
+ try:
93
+ result = collection.delete_many({
94
+ 'publishDate': {'$lt': cutoff_date}
95
+ })
96
+ message = f"Deleted {result.deleted_count} documents older than {cutoff_date}"
97
+ if use_logger:
98
+ use_logger.info("Deleted %d documents older than %s", result.deleted_count, cutoff_date)
99
+ else:
100
+ print(message)
101
+ except PyMongoError as e:
102
+ error_message = f"Error deleting old documents: {e}"
103
+ if use_logger:
104
+ use_logger.error("Error deleting old documents: %s", e)
105
+ else:
106
+ print(error_message)
107
+ raise