""" Cache Manager for Smart Auto-Complete Provides efficient caching of API responses to improve performance """ import hashlib import json import logging import time from typing import Any, Dict, List, Optional, Union import threading from collections import OrderedDict logger = logging.getLogger(__name__) class CacheManager: """ In-memory cache manager with TTL (Time To Live) support Uses LRU (Least Recently Used) eviction policy """ def __init__(self, settings=None): """ Initialize the cache manager Args: settings: Application settings object """ self.settings = settings # Cache configuration self.max_size = getattr(settings, 'CACHE_MAX_SIZE', 1000) if settings else 1000 self.default_ttl = getattr(settings, 'CACHE_TTL', 3600) if settings else 3600 # 1 hour # Cache storage self._cache = OrderedDict() self._timestamps = {} self._access_counts = {} # Thread safety self._lock = threading.RLock() # Statistics self._stats = { 'hits': 0, 'misses': 0, 'evictions': 0, 'sets': 0 } logger.info(f"Cache manager initialized with max_size={self.max_size}, ttl={self.default_ttl}s") def get(self, key: str) -> Optional[Any]: """ Get a value from the cache Args: key: Cache key Returns: Cached value or None if not found/expired """ with self._lock: try: # Generate hash key for consistency hash_key = self._generate_key_hash(key) # Check if key exists if hash_key not in self._cache: self._stats['misses'] += 1 return None # Check if expired if self._is_expired(hash_key): self._remove_key(hash_key) self._stats['misses'] += 1 return None # Move to end (mark as recently used) value = self._cache[hash_key] self._cache.move_to_end(hash_key) self._access_counts[hash_key] = self._access_counts.get(hash_key, 0) + 1 self._stats['hits'] += 1 logger.debug(f"Cache hit for key: {key[:50]}...") return value except Exception as e: logger.error(f"Error getting from cache: {str(e)}") self._stats['misses'] += 1 return None def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """ Set a value in the cache Args: key: Cache key value: Value to cache ttl: Time to live in seconds (uses default if None) Returns: True if successfully cached, False otherwise """ with self._lock: try: # Generate hash key hash_key = self._generate_key_hash(key) # Use default TTL if not specified cache_ttl = ttl if ttl is not None else self.default_ttl # Check if we need to evict items if len(self._cache) >= self.max_size and hash_key not in self._cache: self._evict_lru() # Store the value self._cache[hash_key] = value self._timestamps[hash_key] = time.time() + cache_ttl self._access_counts[hash_key] = 1 # Move to end (mark as recently used) self._cache.move_to_end(hash_key) self._stats['sets'] += 1 logger.debug(f"Cached value for key: {key[:50]}... (TTL: {cache_ttl}s)") return True except Exception as e: logger.error(f"Error setting cache: {str(e)}") return False def delete(self, key: str) -> bool: """ Delete a key from the cache Args: key: Cache key to delete Returns: True if key was deleted, False if not found """ with self._lock: try: hash_key = self._generate_key_hash(key) if hash_key in self._cache: self._remove_key(hash_key) logger.debug(f"Deleted cache key: {key[:50]}...") return True return False except Exception as e: logger.error(f"Error deleting from cache: {str(e)}") return False def clear(self) -> bool: """ Clear all items from the cache Returns: True if cache was cleared successfully """ with self._lock: try: self._cache.clear() self._timestamps.clear() self._access_counts.clear() logger.info("Cache cleared") return True except Exception as e: logger.error(f"Error clearing cache: {str(e)}") return False def cleanup_expired(self) -> int: """ Remove all expired items from the cache Returns: Number of items removed """ with self._lock: try: current_time = time.time() expired_keys = [] for hash_key, expiry_time in self._timestamps.items(): if current_time > expiry_time: expired_keys.append(hash_key) for hash_key in expired_keys: self._remove_key(hash_key) if expired_keys: logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") return len(expired_keys) except Exception as e: logger.error(f"Error cleaning up expired items: {str(e)}") return 0 def get_stats(self) -> Dict[str, Union[int, float]]: """ Get cache statistics Returns: Dictionary with cache statistics """ with self._lock: total_requests = self._stats['hits'] + self._stats['misses'] hit_rate = (self._stats['hits'] / total_requests * 100) if total_requests > 0 else 0 return { 'size': len(self._cache), 'max_size': self.max_size, 'hits': self._stats['hits'], 'misses': self._stats['misses'], 'hit_rate': round(hit_rate, 2), 'evictions': self._stats['evictions'], 'sets': self._stats['sets'] } def get_cache_info(self) -> Dict[str, Any]: """ Get detailed cache information Returns: Dictionary with detailed cache info """ with self._lock: current_time = time.time() # Count expired items expired_count = sum(1 for expiry_time in self._timestamps.values() if current_time > expiry_time) # Get most accessed keys top_keys = sorted(self._access_counts.items(), key=lambda x: x[1], reverse=True)[:5] return { 'total_items': len(self._cache), 'expired_items': expired_count, 'active_items': len(self._cache) - expired_count, 'top_accessed_keys': [key[:20] + '...' for key, count in top_keys], 'memory_usage_estimate': self._estimate_memory_usage(), 'stats': self.get_stats() } def _generate_key_hash(self, key: str) -> str: """Generate a consistent hash for the cache key""" return hashlib.md5(key.encode('utf-8')).hexdigest() def _is_expired(self, hash_key: str) -> bool: """Check if a cache entry is expired""" if hash_key not in self._timestamps: return True return time.time() > self._timestamps[hash_key] def _remove_key(self, hash_key: str): """Remove a key and its associated data""" if hash_key in self._cache: del self._cache[hash_key] if hash_key in self._timestamps: del self._timestamps[hash_key] if hash_key in self._access_counts: del self._access_counts[hash_key] def _evict_lru(self): """Evict the least recently used item""" if self._cache: # Get the first item (least recently used) lru_key = next(iter(self._cache)) self._remove_key(lru_key) self._stats['evictions'] += 1 logger.debug("Evicted LRU cache entry") def _estimate_memory_usage(self) -> str: """Estimate memory usage of the cache""" try: # Rough estimation based on string representation total_size = 0 for key, value in self._cache.items(): total_size += len(str(key)) + len(str(value)) # Convert to human readable format if total_size < 1024: return f"{total_size} bytes" elif total_size < 1024 * 1024: return f"{total_size / 1024:.1f} KB" else: return f"{total_size / (1024 * 1024):.1f} MB" except Exception: return "Unknown" class SimpleDiskCache: """ Simple disk-based cache for persistence (optional enhancement) This is a basic implementation - in production, consider using Redis or similar """ def __init__(self, cache_dir: str = "./cache"): """ Initialize disk cache Args: cache_dir: Directory to store cache files """ import os self.cache_dir = cache_dir # Create cache directory if it doesn't exist os.makedirs(cache_dir, exist_ok=True) logger.info(f"Disk cache initialized at: {cache_dir}") def _get_file_path(self, key: str) -> str: """Get file path for a cache key""" import os hash_key = hashlib.md5(key.encode('utf-8')).hexdigest() return os.path.join(self.cache_dir, f"{hash_key}.json") def get(self, key: str) -> Optional[Any]: """Get value from disk cache""" try: import os file_path = self._get_file_path(key) if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Check expiry if time.time() > data.get('expires_at', 0): os.remove(file_path) return None return data.get('value') except Exception as e: logger.error(f"Error reading from disk cache: {str(e)}") return None def set(self, key: str, value: Any, ttl: int = 3600) -> bool: """Set value in disk cache""" try: file_path = self._get_file_path(key) data = { 'value': value, 'created_at': time.time(), 'expires_at': time.time() + ttl } with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"Error writing to disk cache: {str(e)}") return False