""" Script to update category collection in MongoDB with sites from Article_China DynamoDB. Reads records from Article_China based on a delta parameter for lastModifiedDate, extracts unique site-category pairs from specified categories, and updates MongoDB category collection with aggregated data. """ import logging import datetime from typing import Dict, List, Tuple from collections import defaultdict from dataclasses import dataclass from botocore.exceptions import ClientError from models.database import category_collection # pylint: disable=import-error from ..utils import get_client_connection # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger('category_update') # DynamoDB table name ARTICLE_CHINA_TABLE = 'Article_China' SCAN_LIMIT = 50 # Limit for scan operations as requested # Target categories with unknown site lists TARGET_CATEGORIES = [ "Dragon Street China Markets", "Beijing Briefs", "Knowledge Hub" ] @dataclass class CategoryUpdater: """Manages the collection and updating of category-site relationships from DynamoDB to MongoDB. This class handles the complete workflow of: 1. Querying recent articles from DynamoDB's Article_China table 2. Extracting unique site-category pairs 3. Grouping sites by their categories 4. Updating MongoDB with the latest category-site relationships The class supports both incremental updates (via delta days) and full refreshes. Attributes: delta (int): Default lookback period in days for incremental updates (-1 for full refresh) logger (Logger): Configured logger instance for tracking operations Typical usage example: >>> updater = CategoryUpdater() >>> updater.collect() # Default 1-day delta >>> updater.collect(delta=7) # Weekly refresh >>> updater.collect(delta=-1) # Full rebuild """ #def __init__(self): # self.delta = 1 Default delta value delta: int = 1 # Now a type-hinted class field with default def get_articles_by_delta(delta: int) -> List[Dict]: """ Query Article_China based on delta parameter and target categories. Args: delta: Number of days to look back. If -1, get all records. Returns: List of article records matching the criteria """ dynamodb = get_client_connection() articles = [] try: # Format target categories for filter expression target_categories_values = {} filter_conditions = [] for i, category in enumerate(TARGET_CATEGORIES): attribute_name = f':category{i}' target_categories_values[attribute_name] = {'S': category} filter_conditions.append(f"category = {attribute_name}") category_filter = f"({' OR '.join(filter_conditions)})" if delta == -1: logger.info("Retrieving all articles from Article_China for target categories") # Scan with only category filter scan_params = { 'TableName': ARTICLE_CHINA_TABLE, 'FilterExpression': category_filter, 'ExpressionAttributeValues': target_categories_values, 'Limit': SCAN_LIMIT } else: # Calculate cutoff date cutoff_date = (datetime.datetime.now() - datetime.timedelta(days =delta)).strftime('%Y-%m-%dT%H:%M:%S') logger.info("Retrieving articles modified after %s for target categories", cutoff_date) # Add date filter to expression attribute values target_categories_values[':cutoff_date'] = {'S': cutoff_date} scan_params = { 'TableName': ARTICLE_CHINA_TABLE, 'FilterExpression': f"LastModifiedDate >= :cutoff_date AND {category_filter}", 'ExpressionAttributeValues': target_categories_values, 'Limit': SCAN_LIMIT } # Perform initial scan response = dynamodb.scan(**scan_params) articles.extend(response.get('Items', [])) # Continue scanning if there are more items while 'LastEvaluatedKey' in response: logger.debug("Continuing scan, found %s articles so far", len(articles)) scan_params['ExclusiveStartKey'] = response['LastEvaluatedKey'] response = dynamodb.scan(**scan_params) articles.extend(response.get('Items', [])) logger.info("Retrieved %s articles total", len(articles)) return articles except ClientError as e: logger.error("Error scanning Article_China table: %s", e) raise def extract_unique_site_categories(articles: List[Dict]) -> List[Tuple[str, str]]: """ Extract unique site-category pairs from articles. Args: articles: List of article records Returns: List of tuples containing (site, category) pairs """ site_category_pairs = set() try: for article in articles: site = article.get('site', {}).get('S') category = article.get('category', {}).get('S') if site and category: site_category_pairs.add((site, category)) result = list(site_category_pairs) logger.info("Extracted %s unique site-category pairs", len(result)) return result except Exception as e: logger.error("Error extracting site-category pairs: %s", e) raise def group_sites_by_category(site_category_pairs: List[Tuple[str, str]]) -> Dict[str, List[str]]: """ Group sites by category. Args: site_category_pairs: List of (site, category) tuples Returns: Dictionary mapping categories to lists of sites """ category_sites = defaultdict(set) try: for site, category in site_category_pairs: category_sites[category].add(site) # Convert sets to lists for JSON serialization result = {category: list(sites) for category, sites in category_sites.items()} logger.info("Grouped sites into %s categories", len(result)) return result except Exception as e: logger.error("Error grouping sites by category: %s", e) raise def update_mongodb_categories(category_sites: Dict[str, List[str]]) -> None: """ Update MongoDB category collection with category-sites mapping. category_collection is imported from mongodb.py in database folder Args: category_sites: Dictionary mapping categories to lists of sites """ try: if not category_sites: logger.info("No category-sites mappings to add to MongoDB") return logger.info("Updating %s categories in MongoDB", len(category_sites)) # Update each category document for category, sites in category_sites.items(): try: # Use upsert with $addToSet to add unique sites to the array result = category_collection.update_one( {"_id": category}, { "$set": {"category": category}, "$addToSet": {"site": {"$each": sites}} }, upsert=True ) if result.upserted_id: logger.info("Created new category '%s' with %s sites", category, len(sites)) else: logger.info("Updated category '%s' with %s sites", category, len(sites)) except Exception as e: logger.error("Error updating category '%s' in MongoDB: %s", category, e) raise except Exception as e: logger.error("Error updating MongoDB categories: %s", e) raise def collect(delta: int = 1) -> None: """ Main function to update MongoDB category collection with site-category pairs from Article_China. Args: delta: Number of days to look back for modified articles. If -1, get all articles. """ try: logger.info("Starting category update with delta = %s", delta) # Get articles based on delta and target categories articles = get_articles_by_delta(delta) # Extract unique site-category pairs site_category_pairs = extract_unique_site_categories(articles) if not site_category_pairs: logger.info("No site-category pairs found in articles, nothing to update") return # Group sites by category category_sites = group_sites_by_category(site_category_pairs) # Update MongoDB with category-sites mapping update_mongodb_categories(category_sites) logger.info("Category update completed successfully") except Exception as e: logger.error("Category update failed: %s", e) raise