Spaces:
Running
Running
""" | |
Advanced Adverse Drug Reaction (ADR) Analysis Tools | |
This module provides comprehensive pharmacovigilance capabilities including: | |
- Enhanced FAERS database searches with filtering | |
- Naranjo probability scale calculator | |
- Disproportionality analysis (PRR, ROR, IC) | |
- Case similarity analysis | |
- Temporal pattern analysis | |
""" | |
import requests | |
import re | |
import math | |
import logging | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Any, Optional, Tuple | |
from collections import defaultdict, Counter | |
from caching import with_caching | |
from utils import with_error_handling, make_api_request | |
logger = logging.getLogger(__name__) | |
def enhanced_faers_search( | |
drug_name: str, | |
adverse_event: str = None, | |
age_range: str = None, | |
gender: str = None, | |
serious_only: bool = False, | |
limit: int = 100 | |
) -> Dict[str, Any]: | |
""" | |
Enhanced FAERS search with filtering capabilities for pharmacovigilance analysis. | |
Args: | |
drug_name: Drug name to search for | |
adverse_event: Specific adverse event/reaction to filter by (optional) | |
age_range: Age range filter like "18-65" or ">65" (optional) | |
gender: Gender filter "1" (male) or "2" (female) (optional) | |
serious_only: If True, only return serious adverse events | |
limit: Maximum number of results (default 100) | |
Returns: | |
Dict with enhanced case data including demographics, outcomes, and temporal info | |
""" | |
if not drug_name or not drug_name.strip(): | |
raise ValueError("Drug name cannot be empty") | |
# Build search query | |
search_parts = [f'patient.drug.medicinalproduct:"{drug_name.strip()}"'] | |
if adverse_event: | |
search_parts.append(f'patient.reaction.reactionmeddrapt:"{adverse_event.strip()}"') | |
if serious_only: | |
search_parts.append('serious:"1"') | |
if gender in ["1", "2"]: | |
search_parts.append(f'patient.patientsex:"{gender}"') | |
search_query = " AND ".join(search_parts) | |
base_url = "https://api.fda.gov/drug/event.json" | |
query_params = { | |
"search": search_query, | |
"limit": min(max(1, limit), 1000) | |
} | |
response = make_api_request(base_url, query_params, timeout=15) | |
if response.status_code != 200: | |
if response.status_code == 404: | |
return { | |
"cases": [], | |
"total_found": 0, | |
"query_info": { | |
"drug": drug_name, | |
"adverse_event": adverse_event, | |
"filters_applied": { | |
"age_range": age_range, | |
"gender": gender, | |
"serious_only": serious_only | |
} | |
}, | |
"message": "No matching cases found" | |
} | |
raise requests.exceptions.RequestException(f"Enhanced FAERS search failed: {response.status_code}") | |
data = response.json() | |
cases = [] | |
for rec in data.get("results", []): | |
case = extract_case_details(rec, age_range) | |
if case: # Only include if age filter passes | |
cases.append(case) | |
# Calculate summary statistics | |
summary_stats = calculate_case_statistics(cases) | |
return { | |
"cases": cases, | |
"total_found": data.get("meta", {}).get("results", {}).get("total", 0), | |
"filtered_count": len(cases), | |
"query_info": { | |
"drug": drug_name, | |
"adverse_event": adverse_event, | |
"filters_applied": { | |
"age_range": age_range, | |
"gender": gender, | |
"serious_only": serious_only | |
} | |
}, | |
"summary_statistics": summary_stats | |
} | |
def extract_case_details(rec: Dict, age_range: str = None) -> Optional[Dict]: | |
"""Extract and structure case details from FAERS record.""" | |
patient = rec.get("patient", {}) | |
# Extract patient demographics | |
age = patient.get("patientagegroup") | |
age_years = patient.get("patientage") | |
gender = patient.get("patientsex") | |
# Apply age filter if specified | |
if age_range and age_years: | |
try: | |
age_num = float(age_years) | |
if not passes_age_filter(age_num, age_range): | |
return None | |
except (ValueError, TypeError): | |
pass | |
# Extract drug information | |
drugs = [] | |
for drug in patient.get("drug", []): | |
drug_info = { | |
"name": drug.get("medicinalproduct", ""), | |
"characterization": drug.get("drugcharacterization"), # 1=suspect, 2=concomitant, 3=interacting | |
"indication": drug.get("drugindication", ""), | |
"start_date": drug.get("drugstartdate", ""), | |
"end_date": drug.get("drugenddate", ""), | |
"dosage": drug.get("drugdosagetext", ""), | |
"route": drug.get("drugadministrationroute", "") | |
} | |
drugs.append(drug_info) | |
# Extract reactions | |
reactions = [] | |
for reaction in patient.get("reaction", []): | |
reaction_info = { | |
"term": reaction.get("reactionmeddrapt", ""), | |
"outcome": reaction.get("reactionoutcome") # 1=recovered, 2=recovering, 3=not recovered, 4=recovered with sequelae, 5=fatal, 6=unknown | |
} | |
reactions.append(reaction_info) | |
# Extract seriousness criteria | |
seriousness = { | |
"serious": bool(int(rec.get("serious", "0"))), | |
"death": bool(int(rec.get("seriousnessdeath", "0"))), | |
"life_threatening": bool(int(rec.get("seriousnesslifethreatening", "0"))), | |
"hospitalization": bool(int(rec.get("seriousnesshospitalization", "0"))), | |
"disability": bool(int(rec.get("seriousnessdisabling", "0"))), | |
"congenital_anomaly": bool(int(rec.get("seriousnesscongenitalanomali", "0"))), | |
"other_serious": bool(int(rec.get("seriousnessother", "0"))) | |
} | |
return { | |
"safety_report_id": rec.get("safetyreportid"), | |
"receive_date": rec.get("receivedate"), | |
"patient": { | |
"age": age_years, | |
"age_group": age, | |
"gender": gender, # 1=male, 2=female | |
"weight": patient.get("patientweight") | |
}, | |
"drugs": drugs, | |
"reactions": reactions, | |
"seriousness": seriousness, | |
"reporter_qualification": rec.get("primarysource", {}).get("qualification"), # 1=physician, 2=pharmacist, etc. | |
"country": rec.get("occurcountry") | |
} | |
def passes_age_filter(age: float, age_range: str) -> bool: | |
"""Check if age passes the specified filter.""" | |
age_range = age_range.strip() | |
if age_range.startswith(">"): | |
threshold = float(age_range[1:]) | |
return age > threshold | |
elif age_range.startswith("<"): | |
threshold = float(age_range[1:]) | |
return age < threshold | |
elif age_range.startswith(">="): | |
threshold = float(age_range[2:]) | |
return age >= threshold | |
elif age_range.startswith("<="): | |
threshold = float(age_range[2:]) | |
return age <= threshold | |
elif "-" in age_range: | |
min_age, max_age = map(float, age_range.split("-")) | |
return min_age <= age <= max_age | |
return True | |
def calculate_case_statistics(cases: List[Dict]) -> Dict[str, Any]: | |
"""Calculate summary statistics from case data.""" | |
if not cases: | |
return {} | |
# Demographics | |
ages = [float(case["patient"]["age"]) for case in cases if case["patient"]["age"]] | |
genders = [case["patient"]["gender"] for case in cases if case["patient"]["gender"]] | |
# Outcomes | |
serious_cases = sum(1 for case in cases if case["seriousness"]["serious"]) | |
fatal_cases = sum(1 for case in cases if case["seriousness"]["death"]) | |
# Reporter types | |
reporter_types = [case["reporter_qualification"] for case in cases if case["reporter_qualification"]] | |
# Most common reactions | |
all_reactions = [] | |
for case in cases: | |
all_reactions.extend([r["term"] for r in case["reactions"]]) | |
reaction_counts = Counter(all_reactions) | |
stats = { | |
"total_cases": len(cases), | |
"serious_cases": serious_cases, | |
"serious_percentage": round(serious_cases / len(cases) * 100, 1), | |
"fatal_cases": fatal_cases, | |
"fatal_percentage": round(fatal_cases / len(cases) * 100, 1) if len(cases) > 0 else 0, | |
"demographics": { | |
"age_stats": { | |
"mean": round(sum(ages) / len(ages), 1) if ages else None, | |
"median": sorted(ages)[len(ages)//2] if ages else None, | |
"range": [min(ages), max(ages)] if ages else None | |
}, | |
"gender_distribution": dict(Counter(genders)) | |
}, | |
"top_reactions": dict(reaction_counts.most_common(10)), | |
"reporter_types": dict(Counter(reporter_types)) | |
} | |
return stats | |
def calculate_naranjo_score( | |
adverse_reaction_after_drug: str, # "yes", "no", "unknown" | |
reaction_improved_after_stopping: str, # "yes", "no", "unknown" | |
reaction_reappeared_after_readministration: str, # "yes", "no", "unknown" | |
alternative_causes_exist: str, # "yes", "no", "unknown" | |
reaction_when_placebo_given: str, # "yes", "no", "unknown" | |
drug_detected_in_blood: str, # "yes", "no", "unknown" | |
reaction_worse_with_higher_dose: str, # "yes", "no", "unknown" | |
similar_reaction_to_drug_before: str, # "yes", "no", "unknown" | |
adverse_event_confirmed_objectively: str, # "yes", "no", "unknown" | |
reaction_appeared_after_suspected_drug_given: str # "yes", "no", "unknown" | |
) -> Dict[str, Any]: | |
""" | |
Calculate Naranjo Adverse Drug Reaction Probability Scale. | |
The Naranjo scale helps determine the likelihood that an adverse event | |
is related to drug therapy rather than other factors. | |
Args: | |
All parameters should be "yes", "no", or "unknown" | |
Returns: | |
Dict with score, probability category, and detailed breakdown | |
""" | |
# Naranjo scoring system | |
questions = [ | |
{ | |
"question": "Are there previous conclusive reports on this reaction?", | |
"answer": adverse_reaction_after_drug, | |
"scores": {"yes": 1, "no": 0, "unknown": 0} | |
}, | |
{ | |
"question": "Did the adverse event appear after the suspected drug was administered?", | |
"answer": reaction_appeared_after_suspected_drug_given, | |
"scores": {"yes": 2, "no": -1, "unknown": 0} | |
}, | |
{ | |
"question": "Did the adverse reaction improve when the drug was discontinued or a specific antagonist was administered?", | |
"answer": reaction_improved_after_stopping, | |
"scores": {"yes": 1, "no": 0, "unknown": 0} | |
}, | |
{ | |
"question": "Did the adverse reaction reappear when the drug was readministered?", | |
"answer": reaction_reappeared_after_readministration, | |
"scores": {"yes": 2, "no": -1, "unknown": 0} | |
}, | |
{ | |
"question": "Are there alternative causes (other than the drug) that could on their own have caused the reaction?", | |
"answer": alternative_causes_exist, | |
"scores": {"yes": -1, "no": 2, "unknown": 0} | |
}, | |
{ | |
"question": "Did the reaction reappear when a placebo was given?", | |
"answer": reaction_when_placebo_given, | |
"scores": {"yes": -1, "no": 1, "unknown": 0} | |
}, | |
{ | |
"question": "Was the drug detected in blood (or other fluids) in concentrations known to be toxic?", | |
"answer": drug_detected_in_blood, | |
"scores": {"yes": 1, "no": 0, "unknown": 0} | |
}, | |
{ | |
"question": "Was the reaction more severe when the dose was increased or less severe when the dose was decreased?", | |
"answer": reaction_worse_with_higher_dose, | |
"scores": {"yes": 1, "no": 0, "unknown": 0} | |
}, | |
{ | |
"question": "Did the patient have a similar reaction to the same or similar drugs in any previous exposure?", | |
"answer": similar_reaction_to_drug_before, | |
"scores": {"yes": 1, "no": 0, "unknown": 0} | |
}, | |
{ | |
"question": "Was the adverse event confirmed by any objective evidence?", | |
"answer": adverse_event_confirmed_objectively, | |
"scores": {"yes": 1, "no": 0, "unknown": 0} | |
} | |
] | |
total_score = 0 | |
question_details = [] | |
for q in questions: | |
answer = q["answer"].lower().strip() | |
if answer not in q["scores"]: | |
raise ValueError(f"Invalid answer '{answer}'. Must be 'yes', 'no', or 'unknown'") | |
score = q["scores"][answer] | |
total_score += score | |
question_details.append({ | |
"question": q["question"], | |
"answer": answer, | |
"points": score | |
}) | |
# Determine probability category | |
if total_score >= 9: | |
category = "Definite" | |
probability = "≥95%" | |
interpretation = "The adverse reaction is definitely related to the drug." | |
elif total_score >= 5: | |
category = "Probable" | |
probability = "75-95%" | |
interpretation = "The adverse reaction is probably related to the drug." | |
elif total_score >= 1: | |
category = "Possible" | |
probability = "25-75%" | |
interpretation = "The adverse reaction is possibly related to the drug." | |
else: | |
category = "Doubtful" | |
probability = "<25%" | |
interpretation = "The adverse reaction is doubtfully related to the drug." | |
return { | |
"total_score": total_score, | |
"category": category, | |
"probability": probability, | |
"interpretation": interpretation, | |
"question_breakdown": question_details, | |
"scale_info": { | |
"name": "Naranjo Adverse Drug Reaction Probability Scale", | |
"reference": "Naranjo CA, et al. Clin Pharmacol Ther. 1981;30(2):239-245", | |
"scoring": { | |
"Definite": "≥9 points", | |
"Probable": "5-8 points", | |
"Possible": "1-4 points", | |
"Doubtful": "≤0 points" | |
} | |
} | |
} | |
def disproportionality_analysis( | |
drug_name: str, | |
adverse_event: str, | |
background_limit: int = 10000 | |
) -> Dict[str, Any]: | |
""" | |
Perform disproportionality analysis to detect potential drug-adverse event signals. | |
Calculates Proportional Reporting Ratio (PRR), Reporting Odds Ratio (ROR), | |
and Information Component (IC) with confidence intervals. | |
Args: | |
drug_name: Drug of interest | |
adverse_event: Adverse event of interest | |
background_limit: Number of background cases to sample for comparison | |
Returns: | |
Dict with PRR, ROR, IC values and statistical significance | |
""" | |
try: | |
base_url = "https://api.fda.gov/drug/event.json" | |
# Get cases for drug + adverse event (a) | |
drug_ae_query = { | |
"search": f'patient.drug.medicinalproduct:"{drug_name}" AND patient.reaction.reactionmeddrapt:"{adverse_event}"', | |
"limit": 1 | |
} | |
drug_ae_response = make_api_request(base_url, drug_ae_query, timeout=10) | |
if drug_ae_response and drug_ae_response.status_code == 200: | |
drug_ae_data = drug_ae_response.json() | |
a = drug_ae_data.get("meta", {}).get("results", {}).get("total", 0) | |
else: | |
a = 0 | |
if a == 0: | |
return { | |
"drug": drug_name, | |
"adverse_event": adverse_event, | |
"message": "No cases found for this drug-adverse event combination", | |
"signal_detected": False, | |
"case_count": 0 | |
} | |
# Get total cases for drug (a + b) | |
drug_total_query = { | |
"search": f'patient.drug.medicinalproduct:"{drug_name}"', | |
"limit": 1 | |
} | |
drug_total_response = make_api_request(base_url, drug_total_query, timeout=10) | |
if drug_total_response and drug_total_response.status_code == 200: | |
drug_total_data = drug_total_response.json() | |
total_drug_cases = drug_total_data.get("meta", {}).get("results", {}).get("total", 0) | |
b = max(total_drug_cases - a, 1) # Ensure b is at least 1 | |
else: | |
b = max(a * 5, 10) # Conservative estimate | |
# Get total cases for adverse event (a + c) | |
ae_total_query = { | |
"search": f'patient.reaction.reactionmeddrapt:"{adverse_event}"', | |
"limit": 1 | |
} | |
ae_total_response = make_api_request(base_url, ae_total_query, timeout=10) | |
if ae_total_response and ae_total_response.status_code == 200: | |
ae_total_data = ae_total_response.json() | |
total_ae_cases = ae_total_data.get("meta", {}).get("results", {}).get("total", 0) | |
c = max(total_ae_cases - a, 1) # Avoid zero | |
else: | |
c = max(a * 10, 100) # Conservative estimate | |
# Estimate total background cases (d) | |
# Use a reasonable estimate based on FAERS database size | |
total_cases_estimate = 15000000 # Approximate FAERS database size | |
d = max(total_cases_estimate - a - b - c, 1000) | |
# Calculate disproportionality measures | |
results = calculate_disproportionality_measures(a, b, c, d) | |
# Add metadata | |
results.update({ | |
"drug": drug_name, | |
"adverse_event": adverse_event, | |
"contingency_table": { | |
"drug_ae": a, | |
"drug_other_ae": b, | |
"other_drug_ae": c, | |
"other_drug_other_ae": d, | |
"total": a + b + c + d | |
}, | |
"data_sources": { | |
"drug_ae_cases": "FAERS API direct query", | |
"total_drug_cases": "FAERS API direct query", | |
"total_ae_cases": "FAERS API direct query", | |
"background_estimate": "Statistical approximation" | |
}, | |
"data_notes": [ | |
"This analysis uses FAERS data which has inherent limitations", | |
"Results should be interpreted by qualified pharmacovigilance professionals", | |
"Background estimates are approximations due to API limitations", | |
"Consider confounding factors and reporting biases" | |
] | |
}) | |
return results | |
except Exception as e: | |
logger.error(f"Error in disproportionality analysis: {e}") | |
return { | |
"drug": drug_name, | |
"adverse_event": adverse_event, | |
"error": str(e), | |
"message": "Analysis failed due to data access issues", | |
"signal_detected": False, | |
"case_count": 0 | |
} | |
def calculate_disproportionality_measures(a: int, b: int, c: int, d: int) -> Dict[str, Any]: | |
""" | |
Calculate PRR, ROR, and IC with confidence intervals. | |
2x2 contingency table: | |
AE of Interest Other AEs | |
Drug of Interest a b | |
Other Drugs c d | |
""" | |
# Proportional Reporting Ratio (PRR) | |
prr = (a / (a + b)) / (c / (c + d)) if (a + b) > 0 and (c + d) > 0 else 0 | |
# PRR 95% CI (using log transformation) | |
if a > 0: | |
log_prr = math.log(prr) | |
se_log_prr = math.sqrt(1/a + 1/c - 1/(a+b) - 1/(c+d)) | |
prr_ci_lower = math.exp(log_prr - 1.96 * se_log_prr) | |
prr_ci_upper = math.exp(log_prr + 1.96 * se_log_prr) | |
else: | |
prr_ci_lower = prr_ci_upper = 0 | |
# Reporting Odds Ratio (ROR) | |
ror = (a * d) / (b * c) if b > 0 and c > 0 else 0 | |
# ROR 95% CI | |
if a > 0 and b > 0 and c > 0 and d > 0: | |
log_ror = math.log(ror) | |
se_log_ror = math.sqrt(1/a + 1/b + 1/c + 1/d) | |
ror_ci_lower = math.exp(log_ror - 1.96 * se_log_ror) | |
ror_ci_upper = math.exp(log_ror + 1.96 * se_log_ror) | |
else: | |
ror_ci_lower = ror_ci_upper = 0 | |
# Information Component (IC) | |
expected = ((a + b) * (a + c)) / (a + b + c + d) | |
ic = math.log2(a / expected) if expected > 0 and a > 0 else 0 | |
# IC 95% CI (simplified approximation) | |
if a > 0: | |
ic_se = 1 / (math.log(2) * math.sqrt(a)) | |
ic_ci_lower = ic - 1.96 * ic_se | |
ic_ci_upper = ic + 1.96 * ic_se | |
else: | |
ic_ci_lower = ic_ci_upper = 0 | |
# Signal detection criteria | |
prr_signal = prr >= 2.0 and prr_ci_lower > 1.0 and a >= 3 | |
ror_signal = ror >= 2.0 and ror_ci_lower > 1.0 and a >= 3 | |
ic_signal = ic_ci_lower > 0 and a >= 3 | |
signal_detected = prr_signal or ror_signal or ic_signal | |
return { | |
"proportional_reporting_ratio": { | |
"value": round(prr, 3), | |
"confidence_interval_95": [round(prr_ci_lower, 3), round(prr_ci_upper, 3)], | |
"signal_detected": prr_signal, | |
"interpretation": "PRR ≥2 with lower CI >1 suggests potential signal" if prr_signal else "No signal detected by PRR criteria" | |
}, | |
"reporting_odds_ratio": { | |
"value": round(ror, 3), | |
"confidence_interval_95": [round(ror_ci_lower, 3), round(ror_ci_upper, 3)], | |
"signal_detected": ror_signal, | |
"interpretation": "ROR ≥2 with lower CI >1 suggests potential signal" if ror_signal else "No signal detected by ROR criteria" | |
}, | |
"information_component": { | |
"value": round(ic, 3), | |
"confidence_interval_95": [round(ic_ci_lower, 3), round(ic_ci_upper, 3)], | |
"signal_detected": ic_signal, | |
"interpretation": "IC lower CI >0 suggests potential signal" if ic_signal else "No signal detected by IC criteria" | |
}, | |
"overall_signal_detected": signal_detected, | |
"case_count": a, | |
"signal_strength": "Strong" if (prr_signal and ror_signal and ic_signal) else | |
"Moderate" if signal_detected else "Weak/None" | |
} | |
def find_similar_cases( | |
reference_case_id: str, | |
similarity_threshold: float = 0.7, | |
limit: int = 50 | |
) -> Dict[str, Any]: | |
""" | |
Find cases similar to a reference case based on patient characteristics, | |
drugs, and adverse events. | |
Args: | |
reference_case_id: FAERS safety report ID to use as reference | |
similarity_threshold: Minimum similarity score (0-1) | |
limit: Maximum number of similar cases to return | |
Returns: | |
Dict with similar cases and similarity scores | |
""" | |
# First, get the reference case details | |
from drug_data_endpoints import fetch_event_details | |
try: | |
ref_case = fetch_event_details(reference_case_id) | |
except Exception as e: | |
raise ValueError(f"Could not fetch reference case {reference_case_id}: {e}") | |
ref_drugs = [drug.lower() for drug in ref_case["drugs"]] | |
ref_reactions = [reaction.lower() for reaction in ref_case["reactions"]] | |
if not ref_drugs: | |
raise ValueError("Reference case has no drug information") | |
# Search for cases with similar drugs | |
primary_drug = ref_drugs[0] if ref_drugs else "" | |
similar_cases_response = enhanced_faers_search( | |
drug_name=primary_drug, | |
limit=min(limit * 3, 500) # Get more cases to filter | |
) | |
similar_cases = [] | |
for case in similar_cases_response["cases"]: | |
case_drugs = [drug["name"].lower() for drug in case["drugs"] if drug["name"]] | |
case_reactions = [reaction["term"].lower() for reaction in case["reactions"] if reaction["term"]] | |
# Skip the reference case itself | |
if case["safety_report_id"] == reference_case_id: | |
continue | |
# Calculate similarity score | |
similarity_score = calculate_case_similarity( | |
ref_drugs, ref_reactions, | |
case_drugs, case_reactions, | |
ref_case.get("full_record", {}).get("patient", {}), | |
case.get("patient", {}) | |
) | |
if similarity_score >= similarity_threshold: | |
similar_cases.append({ | |
"case": case, | |
"similarity_score": similarity_score, | |
"similarity_factors": get_similarity_factors( | |
ref_drugs, ref_reactions, case_drugs, case_reactions | |
) | |
}) | |
# Sort by similarity score | |
similar_cases.sort(key=lambda x: x["similarity_score"], reverse=True) | |
return { | |
"reference_case_id": reference_case_id, | |
"reference_drugs": ref_drugs, | |
"reference_reactions": ref_reactions, | |
"similar_cases": similar_cases[:limit], | |
"total_similar_found": len(similar_cases), | |
"similarity_threshold": similarity_threshold, | |
"analysis_summary": { | |
"most_common_shared_drugs": get_most_common_shared_elements( | |
[case["similarity_factors"]["shared_drugs"] for case in similar_cases] | |
), | |
"most_common_shared_reactions": get_most_common_shared_elements( | |
[case["similarity_factors"]["shared_reactions"] for case in similar_cases] | |
) | |
} | |
} | |
def calculate_case_similarity( | |
ref_drugs: List[str], ref_reactions: List[str], | |
case_drugs: List[str], case_reactions: List[str], | |
ref_patient: Dict, case_patient: Dict | |
) -> float: | |
"""Calculate similarity score between two cases.""" | |
# Drug similarity (Jaccard index) | |
ref_drugs_set = set(ref_drugs) | |
case_drugs_set = set(case_drugs) | |
drug_intersection = len(ref_drugs_set & case_drugs_set) | |
drug_union = len(ref_drugs_set | case_drugs_set) | |
drug_similarity = drug_intersection / drug_union if drug_union > 0 else 0 | |
# Reaction similarity (Jaccard index) | |
ref_reactions_set = set(ref_reactions) | |
case_reactions_set = set(case_reactions) | |
reaction_intersection = len(ref_reactions_set & case_reactions_set) | |
reaction_union = len(ref_reactions_set | case_reactions_set) | |
reaction_similarity = reaction_intersection / reaction_union if reaction_union > 0 else 0 | |
# Patient similarity (age and gender) | |
patient_similarity = 0 | |
similarity_factors = 0 | |
# Age similarity | |
ref_age = ref_patient.get("patientage") | |
case_age = case_patient.get("age") | |
if ref_age and case_age: | |
try: | |
age_diff = abs(float(ref_age) - float(case_age)) | |
age_similarity = max(0, 1 - age_diff / 50) # Normalize by 50 years | |
patient_similarity += age_similarity | |
similarity_factors += 1 | |
except (ValueError, TypeError): | |
pass | |
# Gender similarity | |
ref_gender = ref_patient.get("patientsex") | |
case_gender = case_patient.get("gender") | |
if ref_gender and case_gender and ref_gender == case_gender: | |
patient_similarity += 1 | |
similarity_factors += 1 | |
elif ref_gender and case_gender: | |
similarity_factors += 1 | |
if similarity_factors > 0: | |
patient_similarity /= similarity_factors | |
# Weighted overall similarity | |
# Drugs and reactions are most important, patient characteristics less so | |
overall_similarity = ( | |
0.5 * drug_similarity + | |
0.4 * reaction_similarity + | |
0.1 * patient_similarity | |
) | |
return round(overall_similarity, 3) | |
def get_similarity_factors( | |
ref_drugs: List[str], ref_reactions: List[str], | |
case_drugs: List[str], case_reactions: List[str] | |
) -> Dict[str, List[str]]: | |
"""Get the specific shared elements between cases.""" | |
shared_drugs = list(set(ref_drugs) & set(case_drugs)) | |
shared_reactions = list(set(ref_reactions) & set(case_reactions)) | |
return { | |
"shared_drugs": shared_drugs, | |
"shared_reactions": shared_reactions, | |
"unique_to_reference_drugs": list(set(ref_drugs) - set(case_drugs)), | |
"unique_to_case_drugs": list(set(case_drugs) - set(ref_drugs)), | |
"unique_to_reference_reactions": list(set(ref_reactions) - set(case_reactions)), | |
"unique_to_case_reactions": list(set(case_reactions) - set(ref_reactions)) | |
} | |
def get_most_common_shared_elements(element_lists: List[List[str]]) -> Dict[str, int]: | |
"""Get the most commonly shared elements across multiple cases.""" | |
all_elements = [] | |
for element_list in element_lists: | |
all_elements.extend(element_list) | |
return dict(Counter(all_elements).most_common(10)) | |
def temporal_analysis( | |
drug_name: str, | |
adverse_event: str = None, | |
limit: int = 500 | |
) -> Dict[str, Any]: | |
""" | |
Analyze temporal patterns of adverse events for a drug. | |
Args: | |
drug_name: Drug to analyze | |
adverse_event: Specific adverse event (optional) | |
limit: Maximum cases to analyze | |
Returns: | |
Dict with temporal patterns and time-to-onset analysis | |
""" | |
# Get cases with temporal information | |
cases_response = enhanced_faers_search( | |
drug_name=drug_name, | |
adverse_event=adverse_event, | |
limit=limit | |
) | |
cases = cases_response["cases"] | |
if not cases: | |
return { | |
"drug": drug_name, | |
"adverse_event": adverse_event, | |
"message": "No cases found for temporal analysis" | |
} | |
# Analyze time to onset patterns | |
onset_times = [] | |
reporting_dates = [] | |
for case in cases: | |
# Extract drug start dates and reaction onset | |
for drug in case["drugs"]: | |
if drug["name"].lower() == drug_name.lower() and drug["start_date"]: | |
try: | |
# Parse date (YYYYMMDD format) | |
start_date = datetime.strptime(drug["start_date"], "%Y%m%d") | |
# For now, we'll use receive date as proxy for reaction onset | |
# In practice, you'd want more sophisticated temporal extraction | |
if case["receive_date"]: | |
receive_date = datetime.strptime(case["receive_date"], "%Y%m%d") | |
onset_time = (receive_date - start_date).days | |
if 0 <= onset_time <= 365: # Filter reasonable onset times | |
onset_times.append(onset_time) | |
reporting_dates.append(receive_date) | |
except (ValueError, TypeError): | |
continue | |
# Calculate temporal statistics | |
temporal_stats = {} | |
if onset_times: | |
onset_times.sort() | |
temporal_stats["time_to_onset"] = { | |
"median_days": onset_times[len(onset_times)//2], | |
"mean_days": round(sum(onset_times) / len(onset_times), 1), | |
"range_days": [min(onset_times), max(onset_times)], | |
"percentiles": { | |
"25th": onset_times[len(onset_times)//4], | |
"75th": onset_times[3*len(onset_times)//4], | |
"90th": onset_times[9*len(onset_times)//10] if len(onset_times) >= 10 else max(onset_times) | |
}, | |
"distribution": categorize_onset_times(onset_times) | |
} | |
if reporting_dates: | |
# Analyze reporting trends over time | |
reporting_dates.sort() | |
temporal_stats["reporting_trends"] = analyze_reporting_trends(reporting_dates) | |
return { | |
"drug": drug_name, | |
"adverse_event": adverse_event, | |
"total_cases_analyzed": len(cases), | |
"cases_with_temporal_data": len(onset_times), | |
"temporal_analysis": temporal_stats, | |
"interpretation": interpret_temporal_patterns(temporal_stats) | |
} | |
def categorize_onset_times(onset_times: List[int]) -> Dict[str, int]: | |
"""Categorize onset times into clinically relevant periods.""" | |
categories = { | |
"immediate_0_1_day": 0, | |
"acute_1_7_days": 0, | |
"subacute_1_4_weeks": 0, | |
"delayed_1_3_months": 0, | |
"late_3_12_months": 0 | |
} | |
for onset in onset_times: | |
if onset <= 1: | |
categories["immediate_0_1_day"] += 1 | |
elif onset <= 7: | |
categories["acute_1_7_days"] += 1 | |
elif onset <= 28: | |
categories["subacute_1_4_weeks"] += 1 | |
elif onset <= 90: | |
categories["delayed_1_3_months"] += 1 | |
elif onset <= 365: | |
categories["late_3_12_months"] += 1 | |
return categories | |
def analyze_reporting_trends(reporting_dates: List[datetime]) -> Dict[str, Any]: | |
"""Analyze trends in adverse event reporting over time.""" | |
# Group by year | |
year_counts = defaultdict(int) | |
for date in reporting_dates: | |
year_counts[date.year] += 1 | |
# Calculate trend | |
years = sorted(year_counts.keys()) | |
if len(years) >= 3: | |
recent_avg = sum(year_counts[year] for year in years[-3:]) / 3 | |
early_avg = sum(year_counts[year] for year in years[:3]) / 3 | |
trend = "increasing" if recent_avg > early_avg * 1.2 else "decreasing" if recent_avg < early_avg * 0.8 else "stable" | |
else: | |
trend = "insufficient_data" | |
return { | |
"yearly_counts": dict(year_counts), | |
"date_range": [min(reporting_dates).year, max(reporting_dates).year], | |
"trend": trend, | |
"peak_year": max(year_counts.keys(), key=lambda k: year_counts[k]) if year_counts else None | |
} | |
def interpret_temporal_patterns(temporal_stats: Dict) -> List[str]: | |
"""Provide clinical interpretation of temporal patterns.""" | |
interpretations = [] | |
if "time_to_onset" in temporal_stats: | |
onset_data = temporal_stats["time_to_onset"] | |
median_onset = onset_data["median_days"] | |
if median_onset <= 1: | |
interpretations.append("Immediate onset pattern suggests Type A (dose-dependent) reaction or acute hypersensitivity") | |
elif median_onset <= 7: | |
interpretations.append("Acute onset pattern typical of many drug allergies and dose-related effects") | |
elif median_onset <= 28: | |
interpretations.append("Subacute onset may suggest immune-mediated or cumulative toxicity") | |
elif median_onset <= 90: | |
interpretations.append("Delayed onset pattern may indicate idiosyncratic reactions or chronic toxicity") | |
else: | |
interpretations.append("Late onset suggests possible chronic effects or delayed hypersensitivity") | |
# Check distribution | |
distribution = onset_data.get("distribution", {}) | |
immediate = distribution.get("immediate_0_1_day", 0) | |
total_with_onset = sum(distribution.values()) | |
if total_with_onset > 0: | |
immediate_pct = immediate / total_with_onset * 100 | |
if immediate_pct > 50: | |
interpretations.append(f"High proportion ({immediate_pct:.1f}%) of immediate reactions suggests acute mechanism") | |
if "reporting_trends" in temporal_stats: | |
trend = temporal_stats["reporting_trends"]["trend"] | |
if trend == "increasing": | |
interpretations.append("Increasing reporting trend may indicate growing awareness or emerging safety signal") | |
elif trend == "decreasing": | |
interpretations.append("Decreasing reporting trend may suggest improved safety monitoring or reduced use") | |
if not interpretations: | |
interpretations.append("Insufficient temporal data for meaningful interpretation") | |
return interpretations |