OxbridgeEconomics commited on
Commit
9d2a0e1
·
1 Parent(s): 1ef74bf

fix: update import paths and remove obsolete workspace file

Browse files
app/collectors/finfast/article.py CHANGED
@@ -2,7 +2,7 @@
2
  from venv import logger
3
  from datetime import datetime, timedelta
4
  from pymongo.errors import PyMongoError
5
- from ...database.mongodb import article_collection
6
  from .utils import scan_dynamodb_table, delete_old_documents, upsert_item
7
 
8
  def _process_article_item(item):
 
2
  from venv import logger
3
  from datetime import datetime, timedelta
4
  from pymongo.errors import PyMongoError
5
+ from database.mongodb import article_collection
6
  from .utils import scan_dynamodb_table, delete_old_documents, upsert_item
7
 
8
  def _process_article_item(item):
app/collectors/finfast/category.py ADDED
@@ -0,0 +1,261 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Script to update category collection in MongoDB with sites from Article_China DynamoDB.
3
+ Reads records from Article_China based on a delta parameter for lastModifiedDate,
4
+ extracts unique site-category pairs from specified categories, and updates
5
+ MongoDB category collection with aggregated data.
6
+ """
7
+
8
+ import logging
9
+ import datetime
10
+ from typing import Dict, List, Tuple
11
+ from collections import defaultdict
12
+ from dataclasses import dataclass
13
+ from botocore.exceptions import ClientError
14
+ from ..utils import get_client_connection
15
+ from database.mongodb import category_collection
16
+
17
+ # Configure logging
18
+ logging.basicConfig(
19
+ level=logging.INFO,
20
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
21
+ datefmt='%Y-%m-%d %H:%M:%S'
22
+ )
23
+ logger = logging.getLogger('category_update')
24
+
25
+ # DynamoDB table name
26
+ ARTICLE_CHINA_TABLE = 'Article_China'
27
+ SCAN_LIMIT = 50 # Limit for scan operations as requested
28
+
29
+ # Target categories with unknown site lists
30
+ TARGET_CATEGORIES = [
31
+ "Dragon Street China Markets",
32
+ "Beijing Briefs",
33
+ "Knowledge Hub"
34
+ ]
35
+
36
+ @dataclass
37
+ class CategoryUpdater:
38
+ """Manages the collection and updating of category-site relationships from DynamoDB to MongoDB.
39
+
40
+ This class handles the complete workflow of:
41
+ 1. Querying recent articles from DynamoDB's Article_China table
42
+ 2. Extracting unique site-category pairs
43
+ 3. Grouping sites by their categories
44
+ 4. Updating MongoDB with the latest category-site relationships
45
+
46
+ The class supports both incremental updates (via delta days) and full refreshes.
47
+
48
+ Attributes:
49
+ delta (int): Default lookback period in days for incremental updates (-1 for full refresh)
50
+ logger (Logger): Configured logger instance for tracking operations
51
+
52
+ Typical usage example:
53
+ >>> updater = CategoryUpdater()
54
+ >>> updater.collect() # Default 1-day delta
55
+ >>> updater.collect(delta=7) # Weekly refresh
56
+ >>> updater.collect(delta=-1) # Full rebuild
57
+ """
58
+ #def __init__(self):
59
+ # self.delta = 1 Default delta value
60
+ delta: int = 1 # Now a type-hinted class field with default
61
+
62
+ def get_articles_by_delta(delta: int) -> List[Dict]:
63
+ """
64
+ Query Article_China based on delta parameter and target categories.
65
+
66
+ Args:
67
+ delta: Number of days to look back. If -1, get all records.
68
+
69
+ Returns:
70
+ List of article records matching the criteria
71
+ """
72
+ dynamodb = get_client_connection()
73
+ articles = []
74
+
75
+ try:
76
+ # Format target categories for filter expression
77
+ target_categories_values = {}
78
+ filter_conditions = []
79
+
80
+ for i, category in enumerate(TARGET_CATEGORIES):
81
+ attribute_name = f':category{i}'
82
+ target_categories_values[attribute_name] = {'S': category}
83
+ filter_conditions.append(f"category = {attribute_name}")
84
+
85
+ category_filter = f"({' OR '.join(filter_conditions)})"
86
+
87
+ if delta == -1:
88
+ logger.info("Retrieving all articles from Article_China for target categories")
89
+ # Scan with only category filter
90
+ scan_params = {
91
+ 'TableName': ARTICLE_CHINA_TABLE,
92
+ 'FilterExpression': category_filter,
93
+ 'ExpressionAttributeValues': target_categories_values,
94
+ 'Limit': SCAN_LIMIT
95
+ }
96
+ else:
97
+ # Calculate cutoff date
98
+ cutoff_date = (datetime.datetime.now() - datetime.timedelta(days
99
+ =delta)).strftime('%Y-%m-%dT%H:%M:%S')
100
+ logger.info("Retrieving articles modified after %s for target categories", cutoff_date)
101
+
102
+ # Add date filter to expression attribute values
103
+ target_categories_values[':cutoff_date'] = {'S': cutoff_date}
104
+
105
+ scan_params = {
106
+ 'TableName': ARTICLE_CHINA_TABLE,
107
+ 'FilterExpression': f"LastModifiedDate >= :cutoff_date AND {category_filter}",
108
+ 'ExpressionAttributeValues': target_categories_values,
109
+ 'Limit': SCAN_LIMIT
110
+ }
111
+
112
+ # Perform initial scan
113
+ response = dynamodb.scan(**scan_params)
114
+ articles.extend(response.get('Items', []))
115
+
116
+ # Continue scanning if there are more items
117
+ while 'LastEvaluatedKey' in response:
118
+ logger.debug("Continuing scan, found %s articles so far", len(articles))
119
+ scan_params['ExclusiveStartKey'] = response['LastEvaluatedKey']
120
+ response = dynamodb.scan(**scan_params)
121
+ articles.extend(response.get('Items', []))
122
+
123
+ logger.info("Retrieved %s articles total", len(articles))
124
+ return articles
125
+
126
+ except ClientError as e:
127
+ logger.error("Error scanning Article_China table: %s", e)
128
+ raise
129
+
130
+
131
+ def extract_unique_site_categories(articles: List[Dict]) -> List[Tuple[str, str]]:
132
+ """
133
+ Extract unique site-category pairs from articles.
134
+
135
+ Args:
136
+ articles: List of article records
137
+
138
+ Returns:
139
+ List of tuples containing (site, category) pairs
140
+ """
141
+ site_category_pairs = set()
142
+
143
+ try:
144
+ for article in articles:
145
+ site = article.get('site', {}).get('S')
146
+ category = article.get('category', {}).get('S')
147
+
148
+ if site and category:
149
+ site_category_pairs.add((site, category))
150
+
151
+ result = list(site_category_pairs)
152
+ logger.info("Extracted %s unique site-category pairs", len(result))
153
+ return result
154
+
155
+ except Exception as e:
156
+ logger.error("Error extracting site-category pairs: %s", e)
157
+ raise
158
+
159
+
160
+ def group_sites_by_category(site_category_pairs: List[Tuple[str, str]]) -> Dict[str, List[str]]:
161
+ """
162
+ Group sites by category.
163
+
164
+ Args:
165
+ site_category_pairs: List of (site, category) tuples
166
+
167
+ Returns:
168
+ Dictionary mapping categories to lists of sites
169
+ """
170
+ category_sites = defaultdict(set)
171
+
172
+ try:
173
+ for site, category in site_category_pairs:
174
+ category_sites[category].add(site)
175
+
176
+ # Convert sets to lists for JSON serialization
177
+ result = {category: list(sites) for category, sites in category_sites.items()}
178
+ logger.info("Grouped sites into %s categories", len(result))
179
+ return result
180
+
181
+ except Exception as e:
182
+ logger.error("Error grouping sites by category: %s", e)
183
+ raise
184
+
185
+
186
+ def update_mongodb_categories(category_sites: Dict[str, List[str]]) -> None:
187
+ """
188
+ Update MongoDB category collection with category-sites mapping.
189
+
190
+ category_collection is imported from mongodb.py in database folder
191
+
192
+ Args:
193
+ category_sites: Dictionary mapping categories to lists of sites
194
+ """
195
+
196
+ try:
197
+ if not category_sites:
198
+ logger.info("No category-sites mappings to add to MongoDB")
199
+ return
200
+
201
+ logger.info("Updating %s categories in MongoDB", len(category_sites))
202
+
203
+ # Update each category document
204
+ for category, sites in category_sites.items():
205
+ try:
206
+ # Use upsert with $addToSet to add unique sites to the array
207
+ result = category_collection.update_one(
208
+ {"_id": category},
209
+ {
210
+ "$set": {"category": category},
211
+ "$addToSet": {"site": {"$each": sites}}
212
+ },
213
+ upsert=True
214
+ )
215
+
216
+ if result.upserted_id:
217
+ logger.info("Created new category '%s' with %s sites", category, len(sites))
218
+ else:
219
+ logger.info("Updated category '%s' with %s sites", category, len(sites))
220
+
221
+ except Exception as e:
222
+ logger.error("Error updating category '%s' in MongoDB: %s", category, e)
223
+ raise
224
+
225
+ except Exception as e:
226
+ logger.error("Error updating MongoDB categories: %s", e)
227
+ raise
228
+
229
+
230
+ def collect(delta: int = 1) -> None:
231
+ """
232
+ Main function to update MongoDB category collection with site-category pairs from Article_China.
233
+
234
+ Args:
235
+ delta: Number of days to look back for modified articles.
236
+ If -1, get all articles.
237
+ """
238
+ try:
239
+ logger.info("Starting category update with delta = %s", delta)
240
+
241
+ # Get articles based on delta and target categories
242
+ articles = get_articles_by_delta(delta)
243
+
244
+ # Extract unique site-category pairs
245
+ site_category_pairs = extract_unique_site_categories(articles)
246
+
247
+ if not site_category_pairs:
248
+ logger.info("No site-category pairs found in articles, nothing to update")
249
+ return
250
+
251
+ # Group sites by category
252
+ category_sites = group_sites_by_category(site_category_pairs)
253
+
254
+ # Update MongoDB with category-sites mapping
255
+ update_mongodb_categories(category_sites)
256
+
257
+ logger.info("Category update completed successfully")
258
+
259
+ except Exception as e:
260
+ logger.error("Category update failed: %s", e)
261
+ raise
app/collectors/finfast/entity.py CHANGED
@@ -1,7 +1,7 @@
1
  """Module for collecting and managing entity data from DynamoDB to MongoDB."""
2
  from datetime import datetime, timedelta
3
  from pymongo.errors import PyMongoError
4
- from ...database.mongodb import entity_collection
5
  from .utils import scan_dynamodb_table, delete_old_documents, upsert_item
6
 
7
  def _process_entity_item(item):
 
1
  """Module for collecting and managing entity data from DynamoDB to MongoDB."""
2
  from datetime import datetime, timedelta
3
  from pymongo.errors import PyMongoError
4
+ from database.mongodb import entity_collection
5
  from .utils import scan_dynamodb_table, delete_old_documents, upsert_item
6
 
7
  def _process_entity_item(item):
app/controllers/category.py CHANGED
@@ -4,7 +4,7 @@ Category Controller - Business logic for handling category data.
4
  This module contains functions that interact with the database
5
  to fetch and process data sorted by category
6
  """
7
- from ..database.mongodb import category_collection
8
  def get_categories():
9
 
10
  """
 
4
  This module contains functions that interact with the database
5
  to fetch and process data sorted by category
6
  """
7
+ from database.mongodb import category_collection
8
  def get_categories():
9
 
10
  """
app/controllers/finfast-summary-backend.code-workspace DELETED
@@ -1 +0,0 @@
1
- {}
 
 
app/controllers/summary/utils.py CHANGED
@@ -6,7 +6,7 @@ from datetime import datetime, timedelta
6
  from typing import Dict, Any, List, DefaultDict
7
  from collections import defaultdict
8
 
9
- from ...database.mongodb import article_collection, entity_collection
10
 
11
 
12
  def _get_latest_publish_date_from_collection(collection) -> datetime:
 
6
  from typing import Dict, Any, List, DefaultDict
7
  from collections import defaultdict
8
 
9
+ from database.mongodb import article_collection, entity_collection
10
 
11
 
12
  def _get_latest_publish_date_from_collection(collection) -> datetime:
app/jobs.json CHANGED
@@ -15,7 +15,7 @@
15
  },
16
  {
17
  "id": "daily_category_update",
18
- "func": "collectors.category_update:collect",
19
  "trigger": "cron",
20
  "hour": 16,
21
  "minute": 0
 
15
  },
16
  {
17
  "id": "daily_category_update",
18
+ "func": "collectors.finfast.category:collect",
19
  "trigger": "cron",
20
  "hour": 16,
21
  "minute": 0
app/routes/category.py CHANGED
@@ -8,7 +8,7 @@ Routes:
8
  - GET /category: Fetch all categories.
9
  """
10
  from flask import jsonify
11
- from ..controllers.category import get_categories
12
  from . import category_bp
13
 
14
  @category_bp.route("/category", methods=["GET"])
 
8
  - GET /category: Fetch all categories.
9
  """
10
  from flask import jsonify
11
+ from controllers.category import get_categories
12
  from . import category_bp
13
 
14
  @category_bp.route("/category", methods=["GET"])
app/routes/summary.py CHANGED
@@ -2,7 +2,7 @@
2
 
3
  import importlib
4
  from flask import jsonify
5
- from ..controllers.summary import get_summary_data
6
  from . import summary_bp
7
 
8
  @summary_bp.route('', methods=['GET'])
 
2
 
3
  import importlib
4
  from flask import jsonify
5
+ from controllers.summary import get_summary_data
6
  from . import summary_bp
7
 
8
  @summary_bp.route('', methods=['GET'])