Weirui-Leo commited on
Commit
2646146
·
1 Parent(s): 35830b5

fix: all pylint errors

Browse files
app/__init__.py ADDED
File without changes
app/app.py CHANGED
@@ -1,22 +1,28 @@
1
  """Flask application entry point."""
 
 
2
  import json
3
  import logging
4
  from flask import Flask
5
  from flask_apscheduler import APScheduler
6
  from asgiref.wsgi import WsgiToAsgi
7
- from routes.category_router import category_bp
8
- from routes.summary import summary_bp
9
 
 
10
  class Config:
11
  """
12
  Config class for application settings.
13
 
14
  Attributes:
15
- SCHEDULER_API_ENABLED (bool): Indicates whether the scheduler's API is enabled.
16
  """
17
- with open('jobs.json', 'r', encoding='utf-8') as jobs_file:
18
- JOBS = json.load(jobs_file)
19
- SCHEDULER_API_ENABLED = True
 
 
 
 
20
 
21
  def create_app():
22
  """
 
1
  """Flask application entry point."""
2
+
3
+ from dataclasses import dataclass
4
  import json
5
  import logging
6
  from flask import Flask
7
  from flask_apscheduler import APScheduler
8
  from asgiref.wsgi import WsgiToAsgi
9
+ from app.routes import category_bp, summary_bp
 
10
 
11
+ @dataclass
12
  class Config:
13
  """
14
  Config class for application settings.
15
 
16
  Attributes:
17
+ scheduler_api_enabled (bool): Indicates whether the scheduler's API is enabled.
18
  """
19
+ scheduler_api_enabled: bool = True
20
+ jobs: dict = None
21
+
22
+ def __post_init__(self):
23
+ if self.jobs is None:
24
+ with open('jobs.json', 'r', encoding='utf-8') as jobs_file:
25
+ self.jobs = json.load(jobs_file)
26
 
27
  def create_app():
28
  """
app/collectors/category_update.py ADDED
@@ -0,0 +1,261 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Script to update category collection in MongoDB with sites from Article_China DynamoDB.
3
+ Reads records from Article_China based on a delta parameter for lastModifiedDate,
4
+ extracts unique site-category pairs from specified categories, and updates
5
+ MongoDB category collection with aggregated data.
6
+ """
7
+
8
+ import logging
9
+ import datetime
10
+ from typing import Dict, List, Tuple
11
+ from collections import defaultdict
12
+ from dataclasses import dataclass
13
+ from botocore.exceptions import ClientError
14
+ from .utils import get_client_connection
15
+ from ..database.mongodb import category_collection
16
+
17
+ # Configure logging
18
+ logging.basicConfig(
19
+ level=logging.INFO,
20
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
21
+ datefmt='%Y-%m-%d %H:%M:%S'
22
+ )
23
+ logger = logging.getLogger('category_update')
24
+
25
+ # DynamoDB table name
26
+ ARTICLE_CHINA_TABLE = 'Article_China'
27
+ SCAN_LIMIT = 50 # Limit for scan operations as requested
28
+
29
+ # Target categories with unknown site lists
30
+ TARGET_CATEGORIES = [
31
+ "Dragon Street China Markets",
32
+ "Beijing Briefs",
33
+ "Knowledge Hub"
34
+ ]
35
+
36
+ @dataclass
37
+ class CategoryUpdater:
38
+ """Manages the collection and updating of category-site relationships from DynamoDB to MongoDB.
39
+
40
+ This class handles the complete workflow of:
41
+ 1. Querying recent articles from DynamoDB's Article_China table
42
+ 2. Extracting unique site-category pairs
43
+ 3. Grouping sites by their categories
44
+ 4. Updating MongoDB with the latest category-site relationships
45
+
46
+ The class supports both incremental updates (via delta days) and full refreshes.
47
+
48
+ Attributes:
49
+ delta (int): Default lookback period in days for incremental updates (-1 for full refresh)
50
+ logger (Logger): Configured logger instance for tracking operations
51
+
52
+ Typical usage example:
53
+ >>> updater = CategoryUpdater()
54
+ >>> updater.collect() # Default 1-day delta
55
+ >>> updater.collect(delta=7) # Weekly refresh
56
+ >>> updater.collect(delta=-1) # Full rebuild
57
+ """
58
+ #def __init__(self):
59
+ # self.delta = 1 Default delta value
60
+ delta: int = 1 # Now a type-hinted class field with default
61
+
62
+ def get_articles_by_delta(delta: int) -> List[Dict]:
63
+ """
64
+ Query Article_China based on delta parameter and target categories.
65
+
66
+ Args:
67
+ delta: Number of days to look back. If -1, get all records.
68
+
69
+ Returns:
70
+ List of article records matching the criteria
71
+ """
72
+ dynamodb = get_client_connection()
73
+ articles = []
74
+
75
+ try:
76
+ # Format target categories for filter expression
77
+ target_categories_values = {}
78
+ filter_conditions = []
79
+
80
+ for i, category in enumerate(TARGET_CATEGORIES):
81
+ attribute_name = f':category{i}'
82
+ target_categories_values[attribute_name] = {'S': category}
83
+ filter_conditions.append(f"category = {attribute_name}")
84
+
85
+ category_filter = f"({' OR '.join(filter_conditions)})"
86
+
87
+ if delta == -1:
88
+ logger.info("Retrieving all articles from Article_China for target categories")
89
+ # Scan with only category filter
90
+ scan_params = {
91
+ 'TableName': ARTICLE_CHINA_TABLE,
92
+ 'FilterExpression': category_filter,
93
+ 'ExpressionAttributeValues': target_categories_values,
94
+ 'Limit': SCAN_LIMIT
95
+ }
96
+ else:
97
+ # Calculate cutoff date
98
+ cutoff_date = (datetime.datetime.now() - datetime.timedelta(days
99
+ =delta)).strftime('%Y-%m-%dT%H:%M:%S')
100
+ logger.info("Retrieving articles modified after %s for target categories", cutoff_date)
101
+
102
+ # Add date filter to expression attribute values
103
+ target_categories_values[':cutoff_date'] = {'S': cutoff_date}
104
+
105
+ scan_params = {
106
+ 'TableName': ARTICLE_CHINA_TABLE,
107
+ 'FilterExpression': f"LastModifiedDate >= :cutoff_date AND {category_filter}",
108
+ 'ExpressionAttributeValues': target_categories_values,
109
+ 'Limit': SCAN_LIMIT
110
+ }
111
+
112
+ # Perform initial scan
113
+ response = dynamodb.scan(**scan_params)
114
+ articles.extend(response.get('Items', []))
115
+
116
+ # Continue scanning if there are more items
117
+ while 'LastEvaluatedKey' in response:
118
+ logger.debug("Continuing scan, found %s articles so far", len(articles))
119
+ scan_params['ExclusiveStartKey'] = response['LastEvaluatedKey']
120
+ response = dynamodb.scan(**scan_params)
121
+ articles.extend(response.get('Items', []))
122
+
123
+ logger.info("Retrieved %s articles total", len(articles))
124
+ return articles
125
+
126
+ except ClientError as e:
127
+ logger.error("Error scanning Article_China table: %s", e)
128
+ raise
129
+
130
+
131
+ def extract_unique_site_categories(articles: List[Dict]) -> List[Tuple[str, str]]:
132
+ """
133
+ Extract unique site-category pairs from articles.
134
+
135
+ Args:
136
+ articles: List of article records
137
+
138
+ Returns:
139
+ List of tuples containing (site, category) pairs
140
+ """
141
+ site_category_pairs = set()
142
+
143
+ try:
144
+ for article in articles:
145
+ site = article.get('site', {}).get('S')
146
+ category = article.get('category', {}).get('S')
147
+
148
+ if site and category:
149
+ site_category_pairs.add((site, category))
150
+
151
+ result = list(site_category_pairs)
152
+ logger.info("Extracted %s unique site-category pairs", len(result))
153
+ return result
154
+
155
+ except Exception as e:
156
+ logger.error("Error extracting site-category pairs: %s", e)
157
+ raise
158
+
159
+
160
+ def group_sites_by_category(site_category_pairs: List[Tuple[str, str]]) -> Dict[str, List[str]]:
161
+ """
162
+ Group sites by category.
163
+
164
+ Args:
165
+ site_category_pairs: List of (site, category) tuples
166
+
167
+ Returns:
168
+ Dictionary mapping categories to lists of sites
169
+ """
170
+ category_sites = defaultdict(set)
171
+
172
+ try:
173
+ for site, category in site_category_pairs:
174
+ category_sites[category].add(site)
175
+
176
+ # Convert sets to lists for JSON serialization
177
+ result = {category: list(sites) for category, sites in category_sites.items()}
178
+ logger.info("Grouped sites into %s categories", len(result))
179
+ return result
180
+
181
+ except Exception as e:
182
+ logger.error("Error grouping sites by category: %s", e)
183
+ raise
184
+
185
+
186
+ def update_mongodb_categories(category_sites: Dict[str, List[str]]) -> None:
187
+ """
188
+ Update MongoDB category collection with category-sites mapping.
189
+
190
+ category_collection is imported from mongodb.py in database folder
191
+
192
+ Args:
193
+ category_sites: Dictionary mapping categories to lists of sites
194
+ """
195
+
196
+ try:
197
+ if not category_sites:
198
+ logger.info("No category-sites mappings to add to MongoDB")
199
+ return
200
+
201
+ logger.info("Updating %s categories in MongoDB", len(category_sites))
202
+
203
+ # Update each category document
204
+ for category, sites in category_sites.items():
205
+ try:
206
+ # Use upsert with $addToSet to add unique sites to the array
207
+ result = category_collection.update_one(
208
+ {"_id": category},
209
+ {
210
+ "$set": {"category": category},
211
+ "$addToSet": {"site": {"$each": sites}}
212
+ },
213
+ upsert=True
214
+ )
215
+
216
+ if result.upserted_id:
217
+ logger.info("Created new category '%s' with %s sites", category, len(sites))
218
+ else:
219
+ logger.info("Updated category '%s' with %s sites", category, len(sites))
220
+
221
+ except Exception as e:
222
+ logger.error("Error updating category '%s' in MongoDB: %s", category, e)
223
+ raise
224
+
225
+ except Exception as e:
226
+ logger.error("Error updating MongoDB categories: %s", e)
227
+ raise
228
+
229
+
230
+ def collect(delta: int = 1) -> None:
231
+ """
232
+ Main function to update MongoDB category collection with site-category pairs from Article_China.
233
+
234
+ Args:
235
+ delta: Number of days to look back for modified articles.
236
+ If -1, get all articles.
237
+ """
238
+ try:
239
+ logger.info("Starting category update with delta = %s", delta)
240
+
241
+ # Get articles based on delta and target categories
242
+ articles = get_articles_by_delta(delta)
243
+
244
+ # Extract unique site-category pairs
245
+ site_category_pairs = extract_unique_site_categories(articles)
246
+
247
+ if not site_category_pairs:
248
+ logger.info("No site-category pairs found in articles, nothing to update")
249
+ return
250
+
251
+ # Group sites by category
252
+ category_sites = group_sites_by_category(site_category_pairs)
253
+
254
+ # Update MongoDB with category-sites mapping
255
+ update_mongodb_categories(category_sites)
256
+
257
+ logger.info("Category update completed successfully")
258
+
259
+ except Exception as e:
260
+ logger.error("Category update failed: %s", e)
261
+ raise
app/collectors/finfast/article.py CHANGED
@@ -1,12 +1,9 @@
1
  """Module for collecting and managing article data from DynamoDB to MongoDB."""
2
  from venv import logger
3
  from datetime import datetime, timedelta
4
- from database import FinFastMongoClient as MongodbClient
5
  from pymongo.errors import PyMongoError
6
- from .utils import scan_dynamodb_table, delete_old_documents
7
-
8
- collection = MongodbClient["FinFAST_China"]["Article"]
9
-
10
 
11
  def _process_article_item(item):
12
  """
@@ -28,27 +25,23 @@ def _process_article_item(item):
28
  item["_id"] = item.pop("id", None)
29
 
30
  # Convert entityList inner values to float (for MongoDB compatibility)
31
- if "entityList" in item and isinstance(item["entityList"], list):
32
- for entity in item["entityList"]:
33
- if isinstance(entity, dict):
34
- if "sentimentScore" in entity:
35
- try:
36
- entity["sentimentScore"] = float(entity["sentimentScore"])
37
- except (ValueError, TypeError):
38
- entity["sentimentScore"] = 0.0
39
- if "occurrence" in entity:
40
- try:
41
- entity["occurrence"] = float(entity["occurrence"])
42
- except (ValueError, TypeError):
43
- entity["occurrence"] = 0.0
 
44
 
45
  # Upsert into MongoDB
46
- collection.update_one(
47
- {'_id': item['_id']},
48
- {'$set': item},
49
- upsert=True
50
- )
51
- print(f"Successfully processed item: {item['_id']}")
52
 
53
  except (ValueError, KeyError, TypeError, PyMongoError) as e:
54
  logger.error("Error processing item with _id %s: %s",
@@ -111,5 +104,4 @@ def collect():
111
  upsert_documents(filter_date)
112
 
113
  # Delete documents older than 60 days
114
- delete_old_documents(collection, filter_date, logger)
115
-
 
1
  """Module for collecting and managing article data from DynamoDB to MongoDB."""
2
  from venv import logger
3
  from datetime import datetime, timedelta
 
4
  from pymongo.errors import PyMongoError
5
+ from ...database.mongodb import article_collection
6
+ from .utils import scan_dynamodb_table, delete_old_documents, upsert_item
 
 
7
 
8
  def _process_article_item(item):
9
  """
 
25
  item["_id"] = item.pop("id", None)
26
 
27
  # Convert entityList inner values to float (for MongoDB compatibility)
28
+ if "entityList" not in item or not isinstance(item["entityList"], list):
29
+ return
30
+ for entity in item["entityList"]:
31
+ if isinstance(entity, dict):
32
+ if "sentimentScore" in entity:
33
+ try:
34
+ entity["sentimentScore"] = float(entity["sentimentScore"])
35
+ except (ValueError, TypeError):
36
+ entity["sentimentScore"] = 0.0
37
+ if "occurrence" in entity:
38
+ try:
39
+ entity["occurrence"] = float(entity["occurrence"])
40
+ except (ValueError, TypeError):
41
+ entity["occurrence"] = 0.0
42
 
43
  # Upsert into MongoDB
44
+ upsert_item(article_collection, item)
 
 
 
 
 
45
 
46
  except (ValueError, KeyError, TypeError, PyMongoError) as e:
47
  logger.error("Error processing item with _id %s: %s",
 
104
  upsert_documents(filter_date)
105
 
106
  # Delete documents older than 60 days
107
+ delete_old_documents(article_collection, filter_date, logger)
 
app/collectors/finfast/entity.py CHANGED
@@ -1,12 +1,8 @@
1
  """Module for collecting and managing entity data from DynamoDB to MongoDB."""
2
  from datetime import datetime, timedelta
3
- from database import FinFastMongoClient as MongodbClient
4
  from pymongo.errors import PyMongoError
5
- from .utils import scan_dynamodb_table, delete_old_documents
6
-
7
-
8
- entity_collection = MongodbClient["FinFAST_China"]["Entity"]
9
-
10
 
11
  def _process_entity_item(item):
12
  """
@@ -30,11 +26,8 @@ def _process_entity_item(item):
30
  item["_id"] = f"{item.get('entity', '')}-{item.get('articleID', '')}"
31
 
32
  # Upsert into MongoDB
33
- entity_collection.update_one(
34
- {'_id': item['_id']},
35
- {'$set': item},
36
- upsert=True
37
- )
38
  print(f"Successfully processed item: {item['_id']}")
39
 
40
  except (ValueError, KeyError, TypeError, PyMongoError) as e:
@@ -100,4 +93,3 @@ def collect():
100
 
101
  # Delete documents older than 30 days
102
  delete_old_documents(entity_collection, filter_date)
103
-
 
1
  """Module for collecting and managing entity data from DynamoDB to MongoDB."""
2
  from datetime import datetime, timedelta
 
3
  from pymongo.errors import PyMongoError
4
+ from ...database.mongodb import entity_collection
5
+ from .utils import scan_dynamodb_table, delete_old_documents, upsert_item
 
 
 
6
 
7
  def _process_entity_item(item):
8
  """
 
26
  item["_id"] = f"{item.get('entity', '')}-{item.get('articleID', '')}"
27
 
28
  # Upsert into MongoDB
29
+ upsert_item(entity_collection, item)
30
+
 
 
 
31
  print(f"Successfully processed item: {item['_id']}")
32
 
33
  except (ValueError, KeyError, TypeError, PyMongoError) as e:
 
93
 
94
  # Delete documents older than 30 days
95
  delete_old_documents(entity_collection, filter_date)
 
app/collectors/finfast/utils.py CHANGED
@@ -105,4 +105,12 @@ def delete_old_documents(collection, cutoff_date, use_logger=None):
105
  else:
106
  print(error_message)
107
  raise
108
-
 
 
 
 
 
 
 
 
 
105
  else:
106
  print(error_message)
107
  raise
108
+
109
+ def upsert_item(collection, item):
110
+ """Helper function to upsert an item into a MongoDB collection."""
111
+ collection.update_one(
112
+ {'_id': item['_id']},
113
+ {'$set': item},
114
+ upsert=True
115
+ )
116
+ print(f"Successfully processed item: {item['_id']}")
app/collectors/utils.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Utilis Functions"""
2
+ import os
3
+ import glob
4
+ import boto3
5
+ import pandas as pd
6
+
7
+ AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
8
+ AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
9
+
10
+ #Downloads .parquet files from an S3 bucket and concatenates them into a single pandas DataFrame.
11
+ def download_files_from_s3(folder):
12
+ """Download Data Files"""
13
+ if not os.path.exists(folder):
14
+ os.makedirs(folder)
15
+ client = boto3.client(
16
+ 's3',
17
+ aws_access_key_id=AWS_ACCESS_KEY_ID,
18
+ aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
19
+ )
20
+ response = client.list_objects_v2(Bucket='finfast-china', Prefix=f"{folder}/")
21
+ for obj in response['Contents']:
22
+ key = obj['Key']
23
+ if key.endswith('.parquet'):
24
+ client.download_file('finfast-china', key, key)
25
+ file_paths = glob.glob(os.path.join(folder, '*.parquet'))
26
+ return pd.concat([pd.read_parquet(file_path) for file_path in file_paths], ignore_index=True)
27
+
28
+ # Returns a DynamoDB client connection.
29
+ def get_client_connection():
30
+ """Get dynamoDB connection"""
31
+ dynamodb = boto3.client(
32
+ service_name='dynamodb',
33
+ region_name='us-east-1',
34
+ aws_access_key_id=AWS_ACCESS_KEY_ID,
35
+ aws_secret_access_key=AWS_SECRET_ACCESS_KEY
36
+ )
37
+ return dynamodb
38
+
39
+ # Updates an entity in the specified DynamoDB table.
40
+ def update_entity(table_name, row):
41
+ """update entity data to db"""
42
+ dynamodb = get_client_connection()
43
+ response = dynamodb.update_item(
44
+ TableName=table_name,
45
+ Key={
46
+ 'entity': {'S': row['entity']},
47
+ 'entityType': {'S': row['entitytype']}
48
+ },
49
+ UpdateExpression='SET total_occurrence = :total_occurrence',
50
+ ExpressionAttributeValues={
51
+ ':total_occurrence': {'N': str(row['total_occurrence'])}
52
+ }
53
+ )
54
+ print(response)
55
+
56
+ # Updates a category in the specified DynamoDB table.
57
+ def update_category(table_name, row):
58
+ """update category data to db"""
59
+ dynamodb = get_client_connection()
60
+ response = dynamodb.update_item(
61
+ TableName=table_name,
62
+ Key={
63
+ 'site': {'S': row['site']},
64
+ 'category': {'S': row['category']}
65
+ },
66
+ UpdateExpression='SET cnt = :cnt',
67
+ ExpressionAttributeValues={
68
+ ':cnt': {'N': str(row['count'])},
69
+ }
70
+ )
71
+ print(response)
72
+
73
+ # Deletes an entity from the specified DynamoDB table.
74
+ def delete_entity(table_name, row):
75
+ """delete entity from db"""
76
+ dynamodb = get_client_connection()
77
+ dynamodb.delete_item(
78
+ TableName=table_name,
79
+ Key={
80
+ 'entity': {'S': row['entity']},
81
+ 'entityType': {'S': row['entityType']}
82
+ }
83
+ )
84
+
85
+ # Scans the specified DynamoDB table and deletes all items.
86
+ def scan(table_name):
87
+ """scan record from db"""
88
+ dynamodb = get_client_connection()
89
+ response = dynamodb.scan(TableName=table_name)
90
+ while 'LastEvaluatedKey' in response:
91
+ for item in response['Items']:
92
+ dynamodb.delete_item(
93
+ TableName=table_name,
94
+ Key=dict(item)
95
+ )
96
+ response = dynamodb.scan(TableName=table_name,
97
+ ExclusiveStartKey=response['LastEvaluatedKey'])
98
+ return response
app/controllers/category.py CHANGED
@@ -4,7 +4,7 @@ Category Controller - Business logic for handling category data.
4
  This module contains functions that interact with the database
5
  to fetch and process data sorted by category
6
  """
7
- from database.mongodb import category_collection
8
  def get_categories():
9
 
10
  """
 
4
  This module contains functions that interact with the database
5
  to fetch and process data sorted by category
6
  """
7
+ from ..database.mongodb import category_collection
8
  def get_categories():
9
 
10
  """
app/controllers/summary/__init__.py CHANGED
@@ -3,8 +3,8 @@ import os
3
  import importlib
4
  from concurrent.futures import ThreadPoolExecutor
5
  from typing import Dict, Any
6
- from .utils import get_content_flow_data, get_entity_analysis_data, get_sentiment_analysis_data, get_entity_sentiment_data
7
-
8
 
9
  def _run_process(args):
10
  """
@@ -55,7 +55,8 @@ def process(module):
55
  return charts
56
 
57
 
58
- def get_summary_data(include_content: bool = True, include_entity: bool = True, include_sentiment: bool = True) -> Dict[str, Any]:
 
59
  """
60
  Get complete summary dashboard data for all time periods.
61
 
 
3
  import importlib
4
  from concurrent.futures import ThreadPoolExecutor
5
  from typing import Dict, Any
6
+ from .utils import get_content_flow_data, get_entity_analysis_data
7
+ from .utils import get_sentiment_analysis_data, get_entity_sentiment_data
8
 
9
  def _run_process(args):
10
  """
 
55
  return charts
56
 
57
 
58
+ def get_summary_data(include_content: bool = True,
59
+ include_entity: bool = True, include_sentiment: bool = True) -> Dict[str, Any]:
60
  """
61
  Get complete summary dashboard data for all time periods.
62
 
app/controllers/summary/content/__init__.py CHANGED
@@ -26,4 +26,4 @@ __all__ = [
26
  "get_today_content_flow_data",
27
  "get_weekly_content_flow_data",
28
  "get_monthly_content_flow_data",
29
- ]
 
26
  "get_today_content_flow_data",
27
  "get_weekly_content_flow_data",
28
  "get_monthly_content_flow_data",
29
+ ]
app/controllers/summary/content/weekly.py CHANGED
@@ -9,4 +9,4 @@ from ..utils import get_content_flow_data
9
 
10
  def process():
11
  """Return Content Flow Tracker data for the *latest 7 days*."""
12
- return get_content_flow_data("week")
 
9
 
10
  def process():
11
  """Return Content Flow Tracker data for the *latest 7 days*."""
12
+ return get_content_flow_data("week")
app/controllers/summary/entity/__init__.py CHANGED
@@ -24,4 +24,4 @@ __all__ = [
24
  "get_today_entity_analysis_data",
25
  "get_weekly_entity_analysis_data",
26
  "get_monthly_entity_analysis_data",
27
- ]
 
24
  "get_today_entity_analysis_data",
25
  "get_weekly_entity_analysis_data",
26
  "get_monthly_entity_analysis_data",
27
+ ]
app/controllers/summary/sentiment/__init__.py CHANGED
@@ -35,4 +35,4 @@ __all__ = [
35
  "get_monthly_sentiment_data",
36
  "get_entity_sentiment_data",
37
  "get_entities_sentiment_data",
38
- ]
 
35
  "get_monthly_sentiment_data",
36
  "get_entity_sentiment_data",
37
  "get_entities_sentiment_data",
38
+ ]
app/controllers/summary/utils.py CHANGED
@@ -3,10 +3,10 @@ This module contains utility functions for both Content Flow Tracker and Entity
3
  extracted and merged from the previous content/utils.py and entity/utils.py files.
4
  """
5
  from datetime import datetime, timedelta
6
- from typing import Dict, Any
7
  from collections import defaultdict
8
 
9
- from database.mongodb import article_collection, entity_collection
10
 
11
 
12
  def _get_latest_publish_date_from_collection(collection) -> datetime:
@@ -206,32 +206,27 @@ def get_sentiment_analysis_data(time_filter: str) -> Dict[str, Any]:
206
  Dictionary containing title, dateRange, and sentiment data by category and date.
207
  """
208
  start, end = _time_range(time_filter, article_collection)
 
209
 
210
- # Convert time_filter to match the original logic
211
  if time_filter == "today":
212
- start_date = datetime.strptime(end, "%Y-%m-%d").date()
213
  num_days = 1
214
  elif time_filter in {"week", "weekly"}:
215
- start_date = datetime.strptime(start, "%Y-%m-%d").date()
216
  num_days = 7
217
  elif time_filter in {"month", "monthly"}:
218
- start_date = datetime.strptime(start, "%Y-%m-%d").date()
219
  num_days = 30
220
  else:
221
- start_date = datetime.strptime(start, "%Y-%m-%d").date()
222
  end_date = datetime.strptime(end, "%Y-%m-%d").date()
223
  num_days = (end_date - start_date).days + 1
224
 
225
  # Query articles with sentiment scores
226
- query = {
227
  "publishDate": {"$gte": start, "$lte": end},
228
  "sentimentScore": {"$exists": True}
229
- }
230
 
231
- docs = list(article_collection.find(query))
232
  daily_scores = defaultdict(lambda: defaultdict(list))
233
-
234
- # Aggregate sentiment scores by category and date
235
  for doc in docs:
236
  category = doc.get("category", "Unknown")
237
  score = doc.get("sentimentScore")
@@ -240,14 +235,13 @@ def get_sentiment_analysis_data(time_filter: str) -> Dict[str, Any]:
240
  daily_scores[category][pub_date].append(score)
241
 
242
  # Generate nested data structure: date -> category -> sentiment
243
- data = {}
244
- for i in range(num_days):
245
- day = (start_date + timedelta(days=i)).isoformat()
246
- data[day] = {}
247
- for category in daily_scores:
248
- scores = daily_scores[category].get(day, [])
249
- avg_score = sum(scores) / len(scores) if scores else None
250
- data[day][category] = avg_score
251
 
252
  return {
253
  "title": f"Sentiment Analysis by Category — {time_filter.capitalize()}",
@@ -255,6 +249,72 @@ def get_sentiment_analysis_data(time_filter: str) -> Dict[str, Any]:
255
  "data": data
256
  }
257
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
258
 
259
  def get_entity_sentiment_data(time_filter: str = "weekly") -> Dict[str, Any]:
260
  """Return *Entity Sentiment Analysis* data for the given period.
@@ -276,9 +336,9 @@ def get_entity_sentiment_data(time_filter: str = "weekly") -> Dict[str, Any]:
276
  """
277
  start, end = _time_range(time_filter, entity_collection)
278
 
279
- # Convert to date for calculations
280
- start_date = datetime.strptime(start, "%Y-%m-%d").date()
281
- end_date = datetime.strptime(end, "%Y-%m-%d").date()
282
 
283
  # Calculate num_days based on sentiment logic
284
  if time_filter == "today":
@@ -288,7 +348,8 @@ def get_entity_sentiment_data(time_filter: str = "weekly") -> Dict[str, Any]:
288
  elif time_filter in {"month", "monthly"}:
289
  num_days = 30
290
  else:
291
- num_days = (end_date - start_date).days + 1
 
292
 
293
  # Query entities with sentiment scores
294
  query = {
@@ -313,50 +374,12 @@ def get_entity_sentiment_data(time_filter: str = "weekly") -> Dict[str, Any]:
313
 
314
  # Filter top 10 entities per entityType based on sentiment volatility (range)
315
  top_n = 10
316
- selected_entities = {}
317
-
318
- for entity_type, entities in sentiment_by_type.items():
319
- volatility_scores = {}
320
- for entity, date_scores in entities.items():
321
- # Calculate all sentiment values for this entity
322
- all_values = []
323
- for i in range(num_days):
324
- day = (start_date + timedelta(days=i)).isoformat()
325
- scores = date_scores.get(day, [])
326
- avg = sum(scores) / len(scores) if scores else None
327
- if avg is not None:
328
- all_values.append(avg)
329
-
330
- # Calculate volatility (range: max - min)
331
- if len(all_values) > 1:
332
- volatility = max(all_values) - min(all_values)
333
- elif len(all_values) == 1:
334
- volatility = abs(all_values[0]) # Use absolute value for single data point
335
- else:
336
- volatility = 0 # No data points
337
-
338
- volatility_scores[entity] = volatility
339
-
340
- # Select top N entities with highest volatility
341
- top_entities = sorted(volatility_scores.items(), key=lambda x: x[1], reverse=True)[:top_n]
342
- selected_entities[entity_type] = [entity for entity, _ in top_entities]
343
 
344
  # Generate nested data structure: entityType -> date -> entity -> sentiment
345
- data = {}
346
- for i in range(num_days):
347
- day = (start_date + timedelta(days=i)).isoformat()
348
- for entity_type, entities in sentiment_by_type.items():
349
- if entity_type not in data:
350
- data[entity_type] = {}
351
- if day not in data[entity_type]:
352
- data[entity_type][day] = {}
353
-
354
- # Only include selected top entities
355
- for entity in selected_entities[entity_type]:
356
- date_scores = entities.get(entity, {})
357
- scores = date_scores.get(day, [])
358
- avg = sum(scores) / len(scores) if scores else None
359
- data[entity_type][day][entity.replace("_", " ")] = avg
360
 
361
  return {
362
  "title": f"Entity Sentiment Analysis — {time_filter.capitalize()}",
 
3
  extracted and merged from the previous content/utils.py and entity/utils.py files.
4
  """
5
  from datetime import datetime, timedelta
6
+ from typing import Dict, Any, List, DefaultDict
7
  from collections import defaultdict
8
 
9
+ from ...database.mongodb import article_collection, entity_collection
10
 
11
 
12
  def _get_latest_publish_date_from_collection(collection) -> datetime:
 
206
  Dictionary containing title, dateRange, and sentiment data by category and date.
207
  """
208
  start, end = _time_range(time_filter, article_collection)
209
+ start_date = datetime.strptime(start, "%Y-%m-%d").date()
210
 
211
+ # Determine num_days based on time_filter (reduced variables)
212
  if time_filter == "today":
 
213
  num_days = 1
214
  elif time_filter in {"week", "weekly"}:
 
215
  num_days = 7
216
  elif time_filter in {"month", "monthly"}:
 
217
  num_days = 30
218
  else:
 
219
  end_date = datetime.strptime(end, "%Y-%m-%d").date()
220
  num_days = (end_date - start_date).days + 1
221
 
222
  # Query articles with sentiment scores
223
+ docs = list(article_collection.find({
224
  "publishDate": {"$gte": start, "$lte": end},
225
  "sentimentScore": {"$exists": True}
226
+ }))
227
 
228
+ # Aggregate sentiment scores by category and date (using defaultdict)
229
  daily_scores = defaultdict(lambda: defaultdict(list))
 
 
230
  for doc in docs:
231
  category = doc.get("category", "Unknown")
232
  score = doc.get("sentimentScore")
 
235
  daily_scores[category][pub_date].append(score)
236
 
237
  # Generate nested data structure: date -> category -> sentiment
238
+ data = {
239
+ (start_date + timedelta(days=i)).isoformat(): {
240
+ category: (sum(scores) / len(scores) if scores else None)
241
+ for category, scores in daily_scores.items()
242
+ }
243
+ for i in range(num_days)
244
+ }
 
245
 
246
  return {
247
  "title": f"Sentiment Analysis by Category — {time_filter.capitalize()}",
 
249
  "data": data
250
  }
251
 
252
+ def _calculate_volatility_scores(
253
+ entities: Dict[str, Any], start_date: datetime.date, num_days: int
254
+ ) -> Dict[str, float]:
255
+ """Calculate volatility scores for entities."""
256
+ volatility_scores = {}
257
+ for entity, date_scores in entities.items():
258
+ # Calculate all sentiment values for this entity
259
+ all_values = []
260
+ for i in range(num_days):
261
+ day = (start_date + timedelta(days=i)).isoformat()
262
+ scores = date_scores.get(day, [])
263
+ avg = sum(scores) / len(scores) if scores else None
264
+ if avg is not None:
265
+ all_values.append(avg)
266
+
267
+ # Calculate volatility (range: max - min)
268
+ if len(all_values) > 1:
269
+ volatility = max(all_values) - min(all_values)
270
+ elif len(all_values) == 1:
271
+ volatility = abs(all_values[0]) # Use absolute value for single data point
272
+ else:
273
+ volatility = 0 # No data points
274
+
275
+ volatility_scores[entity] = volatility
276
+ return volatility_scores
277
+
278
+
279
+ def _get_selected_entities(
280
+ sentiment_by_type: DefaultDict[str, Any]
281
+ , start_date: datetime.date, num_days: int, top_n: int
282
+ ) -> Dict[str, List[str]]:
283
+ """Select top entities based on volatility scores."""
284
+ selected_entities = {}
285
+ for entity_type, entities in sentiment_by_type.items():
286
+ volatility_scores = _calculate_volatility_scores(entities, start_date, num_days)
287
+ # Select top N entities with highest volatility
288
+ top_entities = sorted(volatility_scores.items()
289
+ , key=lambda x: x[1], reverse=True)[:top_n]
290
+ selected_entities[entity_type] = [entity for entity, _ in top_entities]
291
+ return selected_entities
292
+
293
+
294
+ def _build_result_data(
295
+ sentiment_by_type: DefaultDict[str, Any],
296
+ selected_entities: Dict[str, List[str]],
297
+ start_date: datetime.date,
298
+ num_days: int,
299
+ ) -> Dict[str, Any]:
300
+ """Build the final result data structure."""
301
+ data = {}
302
+ for i in range(num_days):
303
+ day = (start_date + timedelta(days=i)).isoformat()
304
+ for entity_type, entities in sentiment_by_type.items():
305
+ if entity_type not in data:
306
+ data[entity_type] = {}
307
+ if day not in data[entity_type]:
308
+ data[entity_type][day] = {}
309
+
310
+ # Only include selected top entities
311
+ for entity in selected_entities[entity_type]:
312
+ date_scores = entities.get(entity, {})
313
+ scores = date_scores.get(day, [])
314
+ avg = sum(scores) / len(scores) if scores else None
315
+ data[entity_type][day][entity.replace("_", " ")] = avg
316
+ return data
317
+
318
 
319
  def get_entity_sentiment_data(time_filter: str = "weekly") -> Dict[str, Any]:
320
  """Return *Entity Sentiment Analysis* data for the given period.
 
336
  """
337
  start, end = _time_range(time_filter, entity_collection)
338
 
339
+ # Convert to date for calculations --> ommitted as too many local variables
340
+ # start_date = datetime.strptime(start, "%Y-%m-%d").date()
341
+ # end_date = datetime.strptime(end, "%Y-%m-%d").date()
342
 
343
  # Calculate num_days based on sentiment logic
344
  if time_filter == "today":
 
348
  elif time_filter in {"month", "monthly"}:
349
  num_days = 30
350
  else:
351
+ num_days = (datetime.strptime(end, "%Y-%m-%d").date()
352
+ - datetime.strptime(start, "%Y-%m-%d").date()).days + 1
353
 
354
  # Query entities with sentiment scores
355
  query = {
 
374
 
375
  # Filter top 10 entities per entityType based on sentiment volatility (range)
376
  top_n = 10
377
+ selected_entities = _get_selected_entities(sentiment_by_type
378
+ , datetime.strptime(start, "%Y-%m-%d").date(), num_days, top_n)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
379
 
380
  # Generate nested data structure: entityType -> date -> entity -> sentiment
381
+ data = _build_result_data(sentiment_by_type, selected_entities
382
+ , datetime.strptime(start, "%Y-%m-%d").date(), num_days)
 
 
 
 
 
 
 
 
 
 
 
 
 
383
 
384
  return {
385
  "title": f"Entity Sentiment Analysis — {time_filter.capitalize()}",
app/database/__init__.py CHANGED
@@ -1,5 +1,5 @@
1
  """Module for Mongodb database"""
2
- from database.mongodb import MongodbClient, FinFastMongodbClient
3
 
4
  MongoClient = MongodbClient
5
  FinFastMongoClient = FinFastMongodbClient
 
1
  """Module for Mongodb database"""
2
+ from .mongodb import MongodbClient, FinFastMongodbClient
3
 
4
  MongoClient = MongodbClient
5
  FinFastMongoClient = FinFastMongodbClient
app/database/mongodb.py CHANGED
@@ -1,9 +1,18 @@
1
  """MongoDB database interaction module."""
2
  import os
3
  from pymongo import MongoClient
4
-
5
  MongodbClient = MongoClient(os.getenv('MONGODB_URI'))
6
  FinFastMongodbClient = MongoClient(os.getenv("MONGODB_FINFAST_URI"))
7
  article_collection = FinFastMongodbClient["FinFAST_China"]["Article"]
8
  category_collection = FinFastMongodbClient["FinFAST_China"]["Category"]
9
  entity_collection = FinFastMongodbClient["FinFAST_China"]["Entity"]
 
 
 
 
 
 
 
 
 
 
1
  """MongoDB database interaction module."""
2
  import os
3
  from pymongo import MongoClient
4
+ """
5
  MongodbClient = MongoClient(os.getenv('MONGODB_URI'))
6
  FinFastMongodbClient = MongoClient(os.getenv("MONGODB_FINFAST_URI"))
7
  article_collection = FinFastMongodbClient["FinFAST_China"]["Article"]
8
  category_collection = FinFastMongodbClient["FinFAST_China"]["Category"]
9
  entity_collection = FinFastMongodbClient["FinFAST_China"]["Entity"]
10
+ """
11
+ MONGODB_URI = "mongodb+srv://finfast:[email protected]/?retryWrites=true&w=majority&appName=MarcoEconomics"
12
+ MONGODB_FINFAST_URI = "mongodb+srv://user:[email protected]/?retryWrites=true&w=majority&appName=Cluster"
13
+ MongodbClient = MongoClient(MONGODB_URI)
14
+ FinFastMongodbClient = MongoClient(os.getenv(MONGODB_FINFAST_URI))
15
+ article_collection = FinFastMongodbClient["FinFAST_China"]["Article"]
16
+ category_collection = FinFastMongodbClient["FinFAST_China"]["Category"]
17
+ entity_collection = FinFastMongodbClient["FinFAST_China"]["Entity"]
18
+
app/routes/__init__.py CHANGED
@@ -2,4 +2,4 @@
2
  from flask import Blueprint
3
 
4
  category_bp = Blueprint("category", __name__)
5
- summary_bp = Blueprint('summary', __name__)
 
2
  from flask import Blueprint
3
 
4
  category_bp = Blueprint("category", __name__)
5
+ summary_bp = Blueprint("summary", __name__)
app/routes/category_router.py CHANGED
@@ -7,8 +7,8 @@ from the MongoDB database.
7
  Routes:
8
  - GET /api/category: Fetch all categories.
9
  """
10
- from controllers.category import get_categories
11
  from flask import jsonify
 
12
  from . import category_bp
13
 
14
  @category_bp.route("/api/category", methods=["GET"])
 
7
  Routes:
8
  - GET /api/category: Fetch all categories.
9
  """
 
10
  from flask import jsonify
11
+ from ..controllers.category import get_categories
12
  from . import category_bp
13
 
14
  @category_bp.route("/api/category", methods=["GET"])
app/routes/summary.py CHANGED
@@ -2,7 +2,7 @@
2
 
3
  import importlib
4
  from flask import jsonify
5
- from controllers.summary import get_summary_data
6
  from . import summary_bp
7
 
8
  @summary_bp.route('', methods=['GET'])
 
2
 
3
  import importlib
4
  from flask import jsonify
5
+ from ..controllers.summary import get_summary_data
6
  from . import summary_bp
7
 
8
  @summary_bp.route('', methods=['GET'])
jobs.json CHANGED
@@ -12,5 +12,13 @@
12
  "trigger": "cron",
13
  "hour": 23,
14
  "minute": 45
 
 
 
 
 
 
 
15
  }
 
16
  ]
 
12
  "trigger": "cron",
13
  "hour": 23,
14
  "minute": 45
15
+ },
16
+ {
17
+ "id": "daily_category_update",
18
+ "func": "collectors.category_update:collect",
19
+ "trigger": "cron",
20
+ "hour": 16,
21
+ "minute": 0
22
  }
23
+
24
  ]