Spaces:
Sleeping
Sleeping
Update api_clients/openfda_client.py
Browse files- api_clients/openfda_client.py +133 -0
api_clients/openfda_client.py
CHANGED
|
@@ -0,0 +1,133 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
# api_clients/openfda_client.py
|
| 2 |
+
"""
|
| 3 |
+
Client for the OpenFDA API.
|
| 4 |
+
This module specializes in fetching critical, real-world drug safety data,
|
| 5 |
+
including the most frequent adverse events and active enforcement reports (recalls).
|
| 6 |
+
"""
|
| 7 |
+
import asyncio
|
| 8 |
+
import aiohttp
|
| 9 |
+
from urllib.parse import quote
|
| 10 |
+
from .config import OPENFDA_BASE_URL, REQUEST_HEADERS
|
| 11 |
+
|
| 12 |
+
async def get_adverse_events(session: aiohttp.ClientSession, drug_name: str, top_n: int = 5) -> list[dict]:
|
| 13 |
+
"""
|
| 14 |
+
Finds the most frequently reported adverse events for a given drug.
|
| 15 |
+
|
| 16 |
+
This function uses the 'count' feature of the OpenFDA API to get a summary
|
| 17 |
+
of the most common patient reactions, which is far more efficient than
|
| 18 |
+
downloading individual reports.
|
| 19 |
+
|
| 20 |
+
Args:
|
| 21 |
+
session (aiohttp.ClientSession): The active HTTP session.
|
| 22 |
+
drug_name (str): The brand or generic name of the drug.
|
| 23 |
+
top_n (int): The number of top adverse events to return.
|
| 24 |
+
|
| 25 |
+
Returns:
|
| 26 |
+
list[dict]: A list of top adverse events, e.g., [{'term': 'Nausea', 'count': 5000}].
|
| 27 |
+
Returns an empty list on failure or if no results are found.
|
| 28 |
+
"""
|
| 29 |
+
if not drug_name:
|
| 30 |
+
return []
|
| 31 |
+
|
| 32 |
+
# OpenFDA uses Lucene query syntax. We search in both brand name and generic name fields.
|
| 33 |
+
search_query = f'(patient.drug.openfda.brand_name:"{drug_name}" OR patient.drug.openfda.generic_name:"{drug_name}")'
|
| 34 |
+
|
| 35 |
+
params = {
|
| 36 |
+
'search': search_query,
|
| 37 |
+
'count': 'patient.reaction.reactionmeddrapt.exact', # The field for patient reactions
|
| 38 |
+
'limit': top_n
|
| 39 |
+
}
|
| 40 |
+
|
| 41 |
+
url = f"{OPENFDA_BASE_URL}/drug/event.json"
|
| 42 |
+
|
| 43 |
+
try:
|
| 44 |
+
async with session.get(url, params=params, headers=REQUEST_HEADERS, timeout=10) as resp:
|
| 45 |
+
if resp.status == 404: # 404 means no results found for the query
|
| 46 |
+
return []
|
| 47 |
+
resp.raise_for_status()
|
| 48 |
+
data = await resp.json()
|
| 49 |
+
return data.get('results', [])
|
| 50 |
+
|
| 51 |
+
except aiohttp.ClientError as e:
|
| 52 |
+
print(f"An error occurred fetching adverse events for '{drug_name}': {e}")
|
| 53 |
+
return []
|
| 54 |
+
|
| 55 |
+
async def check_for_recalls(session: aiohttp.ClientSession, drug_name: str, limit: int = 3) -> list[dict]:
|
| 56 |
+
"""
|
| 57 |
+
Checks for recent, ongoing drug enforcement reports (recalls) for a given drug.
|
| 58 |
+
It prioritizes finding active and serious recalls.
|
| 59 |
+
|
| 60 |
+
Args:
|
| 61 |
+
session (aiohttp.ClientSession): The active HTTP session.
|
| 62 |
+
drug_name (str): The brand or generic name of the drug.
|
| 63 |
+
limit (int): The maximum number of recall reports to return.
|
| 64 |
+
|
| 65 |
+
Returns:
|
| 66 |
+
list[dict]: A list of recall reports, containing reason and severity.
|
| 67 |
+
Returns an empty list on failure or if no recalls are found.
|
| 68 |
+
"""
|
| 69 |
+
if not drug_name:
|
| 70 |
+
return []
|
| 71 |
+
|
| 72 |
+
# We search for the drug name and filter for 'Ongoing' status to find active recalls.
|
| 73 |
+
search_query = f'"{quote(drug_name)}" AND status:Ongoing'
|
| 74 |
+
|
| 75 |
+
params = {
|
| 76 |
+
'search': search_query,
|
| 77 |
+
'sort': 'report_date:desc', # Get the most recent ones first
|
| 78 |
+
'limit': limit
|
| 79 |
+
}
|
| 80 |
+
|
| 81 |
+
url = f"{OPENFDA_BASE_URL}/drug/enforcement.json"
|
| 82 |
+
|
| 83 |
+
try:
|
| 84 |
+
async with session.get(url, params=params, headers=REQUEST_HEADERS, timeout=10) as resp:
|
| 85 |
+
if resp.status == 404:
|
| 86 |
+
return []
|
| 87 |
+
resp.raise_for_status()
|
| 88 |
+
data = await resp.json()
|
| 89 |
+
|
| 90 |
+
results = data.get('results', [])
|
| 91 |
+
# We parse the complex result into a clean, simple structure
|
| 92 |
+
parsed_recalls = [
|
| 93 |
+
{
|
| 94 |
+
"reason": r.get("reason_for_recall", "N/A"),
|
| 95 |
+
"classification": r.get("classification", "N/A"), # Class I is most serious
|
| 96 |
+
"report_date": r.get("report_date", "N/A")
|
| 97 |
+
}
|
| 98 |
+
for r in results
|
| 99 |
+
]
|
| 100 |
+
return parsed_recalls
|
| 101 |
+
|
| 102 |
+
except aiohttp.ClientError as e:
|
| 103 |
+
print(f"An error occurred fetching recalls for '{drug_name}': {e}")
|
| 104 |
+
return []
|
| 105 |
+
|
| 106 |
+
async def get_safety_profile(session: aiohttp.ClientSession, drug_name: str) -> dict:
|
| 107 |
+
"""
|
| 108 |
+
A high-level orchestrator that gathers a complete safety profile for a single drug
|
| 109 |
+
by concurrently fetching adverse events and recalls.
|
| 110 |
+
|
| 111 |
+
Args:
|
| 112 |
+
session (aiohttp.ClientSession): The active HTTP session.
|
| 113 |
+
drug_name (str): The drug to profile.
|
| 114 |
+
|
| 115 |
+
Returns:
|
| 116 |
+
dict: A dictionary containing 'adverse_events' and 'recalls' keys.
|
| 117 |
+
"""
|
| 118 |
+
# Run both API calls in parallel for maximum efficiency
|
| 119 |
+
tasks = {
|
| 120 |
+
"adverse_events": get_adverse_events(session, drug_name),
|
| 121 |
+
"recalls": check_for_recalls(session, drug_name)
|
| 122 |
+
}
|
| 123 |
+
|
| 124 |
+
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
|
| 125 |
+
|
| 126 |
+
# Map results back, handling potential errors from gather()
|
| 127 |
+
safety_data = dict(zip(tasks.keys(), results))
|
| 128 |
+
for key, value in safety_data.items():
|
| 129 |
+
if isinstance(value, Exception):
|
| 130 |
+
print(f"Sub-task for {key} failed for {drug_name}: {value}")
|
| 131 |
+
safety_data[key] = [] # Ensure return type is consistent (list)
|
| 132 |
+
|
| 133 |
+
return safety_data
|