Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import json | |
| import sqlite3 | |
| import threading | |
| from typing import Any, List, Optional | |
| from datetime import datetime | |
| from pydantic import BaseModel, Field | |
| from fastapi import APIRouter, Query, HTTPException | |
| # ===== Config ===== | |
| def _pick_writable_dir(): | |
| candidates = [ | |
| os.getenv("DB_DIR"), # <- recommended: set this in env | |
| "/data", # HF Spaces/container convention | |
| "/app/data", # container-friendly | |
| os.path.join(os.getcwd(), "data"), | |
| tempfile.gettempdir(), | |
| ] | |
| for d in candidates: | |
| if not d: | |
| continue | |
| try: | |
| os.makedirs(d, exist_ok=True) | |
| test = os.path.join(d, ".rwcheck") | |
| with open(test, "w") as f: | |
| f.write("ok") | |
| os.remove(test) | |
| return d | |
| except Exception: | |
| continue | |
| # absolute fallback: current working directory | |
| return os.getcwd() | |
| DB_DIR = _pick_writable_dir() | |
| DB_PATH = os.path.join(DB_DIR, os.getenv("DB_FILE", "app.db")) | |
| # ===== SQLite bootstrap ===== | |
| _conn = sqlite3.connect(DB_PATH, check_same_thread=False) | |
| _conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS reports ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL, | |
| report_date TEXT, | |
| ocr_text TEXT, | |
| anomalies TEXT, | |
| measurements TEXT, | |
| created_at TEXT DEFAULT (datetime('now')) | |
| ) | |
| """ | |
| ) | |
| _conn.execute( | |
| "CREATE INDEX IF NOT EXISTS idx_reports_user_created ON reports(user_id, created_at DESC)" | |
| ) | |
| _conn.commit() | |
| _db_lock = threading.Lock() | |
| def _safe_parse_json(text): | |
| if text is None: | |
| return None | |
| s = text.strip() | |
| if not s: | |
| return None | |
| # Try real JSON first | |
| try: | |
| return json.loads(s) | |
| except json.JSONDecodeError: | |
| pass | |
| # Common legacy cases: Python repr (single quotes, True/False/None) | |
| try: | |
| return ast.literal_eval(s) | |
| except Exception: | |
| return None # or return s if you prefer to surface the raw text | |
| def _row_to_dict(row: sqlite3.Row) -> dict: | |
| return { | |
| "id": row[0], | |
| "user_id": row[1], | |
| "report_date": row[2], | |
| "ocr_text": row[3], | |
| "anomalies": _safe_parse_json(row[4]), | |
| "measurements": _safe_parse_json(row[5]), | |
| "created_at": row[6], | |
| } | |
| def db_insert_report(payload: dict) -> int: | |
| with _db_lock: | |
| cur = _conn.cursor() | |
| cur.execute( | |
| """ | |
| INSERT INTO reports ( | |
| user_id, report_date, ocr_text, | |
| anomalies, measurements | |
| ) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| payload.get("user_id"), | |
| payload.get("report_date"), | |
| payload.get("ocr_text"), | |
| payload.get("anomalies"), | |
| payload.get("measurements"), | |
| ), | |
| ) | |
| _conn.commit() | |
| return cur.lastrowid | |
| def db_fetch_reports(user_id: str, limit: int = 50, offset: int = 0) -> List[dict]: | |
| _conn.row_factory = sqlite3.Row | |
| cur = _conn.cursor() | |
| cur.execute( | |
| """ | |
| SELECT id, user_id, report_date, ocr_text, | |
| anomalies, measurements, created_at | |
| FROM reports | |
| WHERE user_id = ? | |
| ORDER BY datetime(created_at) DESC, id DESC | |
| LIMIT ? OFFSET ? | |
| """, | |
| (user_id, limit, offset), | |
| ) | |
| return [_row_to_dict(r) for r in cur.fetchall()] | |
| def db_get_report(report_id: int) -> Optional[dict]: | |
| _conn.row_factory = sqlite3.Row | |
| cur = _conn.cursor() | |
| cur.execute( | |
| """ | |
| SELECT id, user_id, report_date, ocr_text, | |
| anomalies, measurements, created_at | |
| FROM reports | |
| WHERE id = ? | |
| """, | |
| (report_id,), | |
| ) | |
| row = cur.fetchone() | |
| return _row_to_dict(row) if row else None | |
| def db_delete_report(report_id: int) -> bool: | |
| with _db_lock: | |
| cur = _conn.cursor() | |
| cur.execute("DELETE FROM reports WHERE id = ?", (report_id,)) | |
| _conn.commit() | |
| return cur.rowcount > 0 | |
| # ===== Pydantic Schemas ===== | |
| class ReportIn(BaseModel): | |
| user_id: str = Field(..., example="[email protected]") | |
| report_date: Optional[str] = Field(None, example="2025-09-07") | |
| ocr_text: Optional[str] = None | |
| anomalies: Optional[Any] = None | |
| measurements: Optional[Any] = None | |
| class ReportOut(BaseModel): | |
| id: int | |
| user_id: str | |
| report_date: Optional[str] | |
| ocr_text: Optional[str] | |
| anomalies: Optional[Any] | |
| measurements: Optional[Any] | |
| created_at: str | |
| # ===== Router ===== | |
| router = APIRouter(tags=["Reports"]) | |
| def save_report(body: ReportIn): | |
| if not body.user_id: | |
| raise HTTPException(status_code=400, detail="user_id is required") | |
| print(f"Saving report for user: {body}") | |
| rid = db_insert_report(body.dict()) | |
| return {"ok": True, "id": rid} | |
| def list_reports( | |
| user_id: str = Query(..., description="User email to filter by"), | |
| limit: int = Query(50, ge=1, le=200), | |
| offset: int = Query(0, ge=0), | |
| ): | |
| return db_fetch_reports(user_id=user_id, limit=limit, offset=offset) | |
| def get_report(report_id: int): | |
| row = db_get_report(report_id) | |
| if not row: | |
| raise HTTPException(status_code=404, detail="Report not found") | |
| return row | |
| def delete_report(report_id: int): | |
| if db_delete_report(report_id): | |
| return {"ok": True, "deleted": report_id} | |
| raise HTTPException(status_code=404, detail="Report not found") | |
| if __name__ == "__main__": | |
| #req = ReportIn(user_id="[email protected]", report_date="2025-09-08", ocr_text="Medical Report - Cancer Patient Name: Carol Davis Age: 55 Gender: Female Clinical History: Recent biopsy confirms breast cancer (invasive ductal carcinoma). No lymph node involvement. PET scan negative for metastasis.", anomalies=[{"findings":"BREAST CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"BRAIN CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"}], measurements=[]) | |
| #save_report(req) | |
| print(db_fetch_reports(user_id="[email protected]")) |