Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
"""Cache management for radiology report structuring results. | |
This module provides the CacheManager class that handles caching of | |
structured radiology report results to improve performance and reduce | |
API calls. Supports both sample-based and custom text caching with | |
JSON file persistence. | |
Example usage: | |
cache_manager = CacheManager(cache_dir="cache") | |
cached_result = cache_manager.get_cached_result(report_text, sample_id) | |
if not cached_result: | |
result = process_report(report_text) | |
cache_manager.cache_result(report_text, result, sample_id) | |
""" | |
import hashlib | |
import json | |
import logging | |
import os | |
import time | |
from typing import Any | |
from langextract.data import AnnotatedDocument, CharInterval, Extraction | |
logger = logging.getLogger(__name__) | |
class CacheManager: | |
"""Manages caching of radiology report structuring results. | |
This class provides efficient caching capabilities for structured | |
radiology report results, supporting both file-based persistence | |
and in-memory access with automatic cache key generation and management. | |
Attributes: | |
cache_dir: Directory path for cache storage. | |
cache_file: Full path to the cache JSON file. | |
""" | |
def __init__(self, cache_dir: str = "cache"): | |
"""Initializes the CacheManager with specified cache directory. | |
Args: | |
cache_dir: Directory path for cache storage. Defaults to "cache". | |
""" | |
self.cache_dir = cache_dir | |
self.cache_file = os.path.join(cache_dir, "sample_cache.json") | |
self._cache_data: dict[str, Any] = {} | |
self._load_cache() | |
def _ensure_cache_dir(self): | |
"""Ensures the cache directory exists, creating it if necessary.""" | |
os.makedirs(self.cache_dir, exist_ok=True) | |
def _load_cache(self): | |
"""Loads existing cache data from file into memory. | |
Attempts to load cache from the JSON file. If the file doesn't | |
exist or cannot be loaded, initializes with an empty cache. | |
""" | |
try: | |
if os.path.exists(self.cache_file): | |
with open(self.cache_file, "r", encoding="utf-8") as f: | |
self._cache_data = json.load(f) | |
logger.info(f"Loaded cache with {len(self._cache_data)} entries") | |
else: | |
self._cache_data = {} | |
logger.info("No existing cache file found, starting with empty cache") | |
except Exception as e: | |
logger.error(f"Error loading cache: {e}") | |
self._cache_data = {} | |
def _save_cache(self): | |
"""Saves current cache data to the JSON file. | |
Ensures the cache directory exists before writing the cache data | |
to the JSON file with proper formatting. | |
""" | |
try: | |
self._ensure_cache_dir() | |
with open(self.cache_file, "w", encoding="utf-8") as f: | |
json.dump(self._cache_data, f, indent=2, ensure_ascii=False) | |
logger.info(f"Saved cache with {len(self._cache_data)} entries") | |
except Exception as e: | |
logger.error(f"Error saving cache: {e}") | |
def _get_cache_key(self, text: str, sample_id: str | None = None) -> str: | |
"""Generates a cache key for the given text and optional sample ID. | |
Args: | |
text: The input text to generate a key for. | |
sample_id: Optional sample identifier for predefined samples. | |
Returns: | |
A string cache key, either sample-based or hash-based. | |
""" | |
if sample_id: | |
# Avoid double "sample_" prefix if sample_id already starts with "sample_" | |
if sample_id.startswith("sample_"): | |
return sample_id | |
else: | |
return f"sample_{sample_id}" | |
else: | |
return f"custom_{hashlib.md5(text.encode('utf-8')).hexdigest()}" | |
def get_cached_result(self, text: str, sample_id: str | None = None) -> dict | None: | |
"""Gets cached result for given text. | |
Args: | |
text: The input text to look up. | |
sample_id: Optional sample identifier for predefined samples. | |
Returns: | |
The cached result dictionary if found, None otherwise. | |
""" | |
cache_key = self._get_cache_key(text, sample_id) | |
result = self._cache_data.get(cache_key) | |
if result: | |
logger.info(f"Cache hit for key: {cache_key}") | |
return result | |
def _dict_to_extraction(self, extraction_dict: dict[str, Any]) -> Extraction: | |
"""Converts a cached extraction dictionary to an Extraction object.""" | |
char_interval = None | |
if extraction_dict.get("char_interval"): | |
interval_data = extraction_dict["char_interval"] | |
char_interval = CharInterval( | |
start_pos=interval_data.get("start_pos"), | |
end_pos=interval_data.get("end_pos"), | |
) | |
return Extraction( | |
extraction_text=extraction_dict.get("extraction_text", ""), | |
extraction_class=extraction_dict.get("extraction_class", ""), | |
attributes=extraction_dict.get("attributes", {}), | |
char_interval=char_interval, | |
alignment_status=extraction_dict.get("alignment_status"), | |
) | |
def convert_cached_response_to_annotated_document( | |
self, cached_response: dict[str, Any] | |
) -> AnnotatedDocument: | |
"""Converts a cached response to an AnnotatedDocument with proper Extraction objects.""" | |
extractions = [] | |
if ( | |
"annotated_document_json" in cached_response | |
and "extractions" in cached_response["annotated_document_json"] | |
): | |
for extraction_dict in cached_response["annotated_document_json"][ | |
"extractions" | |
]: | |
extractions.append(self._dict_to_extraction(extraction_dict)) | |
return AnnotatedDocument(text="", extractions=extractions) | |
def cache_result( | |
self, text: str, result: dict[str, Any] | Any, sample_id: str | None = None | |
) -> None: | |
"""Caches result for given text. | |
Args: | |
text: The input text to cache results for. | |
result: The structured result dictionary to cache. | |
sample_id: Optional sample identifier for predefined samples. | |
""" | |
cache_key = self._get_cache_key(text, sample_id) | |
self._cache_data[cache_key] = result | |
self._save_cache() | |
logger.info(f"Cached result for key: {cache_key}") | |
def clear_cache(self) -> None: | |
"""Clears all cached results and saves the empty cache to file.""" | |
self._cache_data = {} | |
self._save_cache() | |
logger.info("Cache cleared") | |
def remove_sample(self, sample_id: str) -> bool: | |
"""Removes a specific sample from cache. | |
Args: | |
sample_id: The sample identifier to remove. | |
Returns: | |
True if the sample was found and removed, False otherwise. | |
""" | |
cache_key = f"sample_{sample_id}" | |
if cache_key in self._cache_data: | |
del self._cache_data[cache_key] | |
self._save_cache() | |
logger.info(f"Removed sample {sample_id} from cache") | |
return True | |
else: | |
logger.warning(f"Sample {sample_id} not found in cache") | |
return False | |
def prepopulate_cache_with_samples( | |
self, | |
sample_reports: list[dict[str, Any]], | |
structurer_callable, | |
force_refresh: bool = False, | |
) -> None: | |
"""Prepopulates cache with sample reports. | |
Processes a list of sample reports and caches their structured | |
results to improve initial application performance. Includes rate | |
limiting and error handling for robust cache population. | |
Args: | |
sample_reports: List of sample report dictionaries with 'id' and 'text'. | |
structurer_callable: Function to call for structuring reports. | |
force_refresh: If True, reprocesses samples even if already cached. | |
""" | |
if not sample_reports: | |
logger.info("No sample reports provided for cache prepopulation") | |
return | |
logger.info(f"Starting cache prepopulation with {len(sample_reports)} samples") | |
lock_file = os.path.join(self.cache_dir, ".cache_lock") | |
if os.path.exists(lock_file) and not force_refresh: | |
logger.info("Cache prepopulation already in progress or recently completed") | |
return | |
try: | |
self._ensure_cache_dir() | |
with open(lock_file, "w") as f: | |
f.write(str(os.getpid())) | |
for i, sample in enumerate(sample_reports): | |
sample_id = sample.get("id") | |
sample_text = sample.get("text", "") | |
if not sample_id or not sample_text: | |
logger.warning(f"Sample {i} missing id or text, skipping") | |
continue | |
if not force_refresh and self.get_cached_result(sample_text, sample_id): | |
logger.info(f"Sample {sample_id} already cached, skipping") | |
continue | |
logger.info( | |
f"Processing sample {sample_id} ({i+1}/{len(sample_reports)})" | |
) | |
try: | |
result = structurer_callable(sample_text) | |
self.cache_result(sample_text, result, sample_id) | |
logger.info(f"Successfully cached sample {sample_id}") | |
except Exception as e: | |
logger.error(f"Error processing sample {sample_id}: {e}") | |
continue | |
time.sleep(6) | |
logger.info("Cache prepopulation completed") | |
self._save_cache() | |
except Exception as e: | |
logger.error(f"Error during cache prepopulation: {e}") | |
finally: | |
if os.path.exists(lock_file): | |
os.remove(lock_file) | |
def get_cache_stats(self) -> dict[str, Any]: | |
"""Gets cache statistics. | |
Returns: | |
Dictionary containing cache statistics including entry counts, | |
file information, and cache status details. | |
""" | |
sample_count = sum( | |
1 for key in self._cache_data.keys() if key.startswith("sample_") | |
) | |
custom_count = sum( | |
1 for key in self._cache_data.keys() if key.startswith("custom_") | |
) | |
return { | |
"total_entries": len(self._cache_data), | |
"sample_entries": sample_count, | |
"custom_entries": custom_count, | |
"cache_file": self.cache_file, | |
"cache_file_exists": os.path.exists(self.cache_file), | |
} | |