pharmacy-mcp / brand_to_generic.py
Chris McMaster
Improved drug parsing and generic matching
819adf9
from __future__ import annotations
import os
import re
import time
import functools
import logging
from typing import Dict, List, Optional
import requests
import csv
from io import StringIO
try:
import pandas as pd
except ImportError:
pd = None
logger = logging.getLogger(__name__)
_session = requests.Session()
# Reduce timeouts for better performance
DEFAULT_TIMEOUT = 5 # Reduced from 10
FAST_TIMEOUT = 3 # For quick checks
# Global to hold PBS data
pbs_data: Optional["pd.DataFrame"] = None
# Testing mode flag to disable external API calls
TESTING_MODE = False
def set_pbs_data(data: "pd.DataFrame"):
"""Sets the global PBS dataframe."""
global pbs_data
pbs_data = data
if pbs_data is not None:
logger.info(f"PBS data updated. Shape: {pbs_data.shape}")
else:
logger.info("PBS data cleared.")
def set_testing_mode(is_testing: bool):
"""Enable/disable testing mode to bypass external API calls."""
global TESTING_MODE
TESTING_MODE = is_testing
if TESTING_MODE:
logger.warning("Testing mode is enabled. External API calls will be bypassed.")
class _Throttle:
"""Simple host-level throttle (~1 rps)."""
_stamp: Dict[str, float] = {}
@classmethod
def wait(cls, host: str, gap: float = 1.0):
last = cls._stamp.get(host, 0.0)
now = time.time()
delta = now - last
if delta < gap:
time.sleep(gap - delta)
cls._stamp[host] = time.time()
def _get(url: str, **kw):
host = requests.utils.urlparse(url).netloc
_Throttle.wait(host)
try:
requests_kwargs = {"timeout": 10}
if host in ("dmd.nhs.uk", "www.nhsbsa.nhs.uk"):
requests_kwargs["verify"] = False
logger.warning("Disabling SSL certificate verification for host: %s", host)
r = _session.get(url, **requests_kwargs, **kw)
r.raise_for_status()
return r
except Exception as exc:
logger.warning("%s → %s", url, exc)
return None
_RX_RE_FMT = (
"https://rxnav.nlm.nih.gov/REST/rxcui/{rxcui}/related.json?rela=tradename_of"
)
@functools.lru_cache(maxsize=512)
def _rxnorm_lookup(brand: str):
if TESTING_MODE: return []
r = _get("https://rxnav.nlm.nih.gov/REST/rxcui.json", params={"name": brand})
if not r or not r.json().get("idGroup", {}).get("rxnormId"):
return []
rxcui = r.json()["idGroup"]["rxnormId"][0]
rel = _get(_RX_RE_FMT.format(rxcui=rxcui))
out = []
if rel:
for grp in rel.json().get("relatedGroup", {}).get("conceptGroup", []):
for c in grp.get("conceptProperties", []):
out.append(
{
"generic_name": c["name"],
"strength": c.get("strength"),
"dosage_form": c.get(
"tty"
), # SCDS/SCD etc. not ideal but indicative
"route": None,
"country": "US",
"source": "RxNorm",
"ids": {"rxcui": c["rxcui"]},
"source_url": _RX_RE_FMT.format(rxcui=rxcui),
}
)
return out
_OPENFDA_NDC = "https://api.fda.gov/drug/ndc.json"
@functools.lru_cache(maxsize=512)
def _openfda_ndc(brand: str):
if TESTING_MODE: return []
r = _get(_OPENFDA_NDC, params={"search": f'brand_name:"{brand}"', "limit": 20})
if not r:
return []
out: List[dict] = []
for prod in r.json().get("results", []):
api_gn = prod.get("generic_name")
display_gn: str
if isinstance(api_gn, str):
display_gn = api_gn # Use the string directly
elif isinstance(api_gn, list):
display_gn = ", ".join(
str(g) for g in api_gn
) # Join list elements, ensuring they are strings
else:
display_gn = "" # Default for None or other unexpected types
out.append(
{
"generic_name": display_gn,
"strength": prod.get("active_ingredients", [{}])[0].get("strength"),
"dosage_form": prod.get("dosage_form"),
"route": prod.get("route"),
"country": "US",
"source": "openFDA-NDC",
"ids": {"ndc": prod.get("product_ndc"), "spl_id": prod.get("spl_id")},
"source_url": _OPENFDA_NDC,
}
)
return out
_DPD = "https://health-products.canada.ca/api/drug/drugproduct/"
@functools.lru_cache(maxsize=512)
def _dpd_lookup(brand: str):
if TESTING_MODE: return []
r = _get(_DPD, params={"brandname": brand, "lang": "en", "type": "json"})
if not r:
return []
out = []
for prod in r.json():
for ai in prod.get("active_ingredient", []):
out.append(
{
"generic_name": ai.get("ingredient_name"),
"strength": ai.get("strength"),
"dosage_form": prod.get("dosage_form"),
"route": prod.get("route_of_administration"),
"country": "CA",
"source": "Health Canada DPD",
"ids": {"din": prod.get("drug_identification_number")},
"source_url": _DPD,
}
)
return out
def _parse_li_form(li_form_str: Optional[str]) -> Dict[str, Optional[str]]:
"""Parses strength and dosage form from an li_form string."""
if not li_form_str:
return {"strength": None, "dosage_form": None}
strength_regex = r"(\d[\d.,\s]*(?:mg|mcg|g|mL|L|microlitres|nanograms|IU|%|mmol)(?:[\s\/][\d.,\s]*(?:mg|mcg|g|mL|L|microlitres|dose(?:s)?))?(?:\s*\(.*?\))?(?:\s+in\s+[\d.,\s]*(?:mL|L|g|mg))?)"
strength_match = re.search(strength_regex, li_form_str, re.IGNORECASE)
extracted_strength = None
extracted_form = None
if strength_match:
extracted_strength = strength_match.group(0).strip()
form_before = li_form_str[: strength_match.start()].strip().rstrip(",").strip()
form_after = li_form_str[strength_match.end() :].strip().lstrip(",").strip()
if form_before and form_after:
extracted_form = f"{form_before} {form_after}".strip()
elif form_before:
extracted_form = form_before
elif form_after:
extracted_form = form_after
if not extracted_form and not extracted_strength:
if not re.search(r"\d", li_form_str):
extracted_form = li_form_str.strip()
else:
extracted_form = li_form_str.strip()
return {
"strength": extracted_strength or None,
"dosage_form": extracted_form or None,
}
@functools.lru_cache(maxsize=512)
def _pbs_lookup(brand: str):
if pbs_data is None or pbs_data.empty:
logger.warning("PBS data not loaded or empty. Skipping PBS lookup for '%s'.", brand)
return []
brand_lower = brand.lower()
if 'brand_name' not in pbs_data.columns:
logger.error("PBS data does not contain 'brand_name' column. Skipping lookup.")
return []
results_df = pbs_data[pbs_data['brand_name'].str.lower() == brand_lower]
if results_df.empty:
return []
out = []
source_url = "https://huggingface.co/datasets/cmcmaster/pbs_items"
for _, row in results_df.iterrows():
li_form = row.get("li_form")
parsed_form_strength = _parse_li_form(li_form)
generic_name = row.get("drug_name", "").strip() or None
out.append(
{
"generic_name": generic_name,
"strength": parsed_form_strength["strength"],
"dosage_form": parsed_form_strength["dosage_form"],
"route": row.get("manner_of_administration", "").strip() or None,
"country": "AU",
"source": "PBS (via Hugging Face Dataset)",
"ids": {"pbs_item_code": row.get("pbs_code", "").strip()},
"source_url": source_url,
}
)
return out
@functools.lru_cache(maxsize=512)
def _pubchem_synonym_lookup(brand: str):
if TESTING_MODE: return []
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/{requests.utils.quote(brand)}/synonyms/JSON"
r = _get(url)
if not r:
return []
syns = (
r.json()
.get("InformationList", {})
.get("Information", [{}])[0]
.get("Synonym", [])
)
generic = syns[0]
if not generic:
return []
return [
{
"generic_name": generic,
"strength": None,
"dosage_form": None,
"route": None,
"country": None,
"source": "PubChem",
"ids": {},
"source_url": url,
}
]
def brand_lookup(
brand_name: str, *, prefer_countries: Optional[List[str]] = None
) -> dict:
"""Resolve *brand_name* to generic + strength/form using multiple datasets.
If a data source returns results, those results are processed and returned immediately.
Subsequent data sources are not queried.
``prefer_countries`` (ISO alpha-2 list) controls result ordering for the successful source.
"""
brand = brand_name.strip()
for fn in (
_pbs_lookup,
# _rxnorm_lookup, These three fail, so skip them for now
# _openfda_ndc,
# _dpd_lookup,
_pubchem_synonym_lookup,
):
try:
current_results: List[dict] = fn(brand)
if current_results:
uniq = {}
for rec in current_results:
key = (rec["generic_name"], rec.get("strength"), rec.get("country"))
uniq[key] = rec
processed_results = list(uniq.values())
if prefer_countries:
processed_results.sort(
key=lambda r: (
0 if r["country"] in prefer_countries else 1,
r["country"] or "",
)
)
return {"brand_searched": brand, "results": processed_results}
except Exception as exc:
logger.exception("%s failed", fn.__name__)
return {"brand_searched": brand, "results": []}
def make_api_request_with_timeout(url: str, params: dict, timeout: int = DEFAULT_TIMEOUT):
"""Make API request with configurable timeout and better error handling."""
try:
response = requests.get(url, params=params, timeout=timeout)
return response
except requests.exceptions.Timeout:
logger.warning(f"Timeout after {timeout}s for {url}")
return None
except requests.exceptions.RequestException as e:
logger.warning(f"Request failed for {url}: {e}")
return None
if __name__ == "__main__":
import sys, pprint
pprint.pp(brand_lookup(sys.argv[1] if len(sys.argv) > 1 else "Panadol Rapid"))