MANOJSEQ's picture
Update main.py
3baa49a verified
# ----------------- Imports (Stdlib + Typing) -----------------
from fastapi import FastAPI, Query, HTTPException, Body
from typing import Optional, List, Dict, Any, Tuple, Set
import os
import time
import socket
import logging
import hashlib
from functools import lru_cache
from collections import Counter
import requests
import tldextract
import math
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from geopy.geocoders import Nominatim
from fastapi.middleware.cors import CORSMiddleware
from countryinfo import CountryInfo
from sentence_transformers import SentenceTransformer, util
from domain_country_map import domain_country_map
from time import monotonic
from langdetect import detect, DetectorFactory
import re
from urllib.parse import urlparse, urlunparse, parse_qsl
from concurrent.futures import ThreadPoolExecutor, as_completed
from html import unescape
import threading
import difflib
from starlette.middleware.gzip import GZipMiddleware
from transformers import pipeline as hf_pipeline
os.environ.setdefault("OMP_NUM_THREADS", "1")
from fastapi.responses import PlainTextResponse, JSONResponse
from datetime import datetime, timezone
# ----------------- Torch Runtime Settings -----------------
import torch
torch.set_num_threads(2)
# ----------------- Optional Local Tokenizers -----------------
try:
import sentencepiece as _spm
_HAS_SENTENCEPIECE = True
except Exception:
_HAS_SENTENCEPIECE = False
# ----------------- Runtime Modes / Speed Enum -----------------
from enum import Enum
class Speed(str, Enum):
fast = "fast"
balanced = "balanced"
max = "max"
# ----------------- Global Model Handles / Pipelines -----------------
_local_pipes = {}
_news_clf = None
_sbert = None
# ----------------- tldextract (PSL-cached) -----------------
_TLD_CACHE = os.getenv("TLDEXTRACT_CACHE", "/data/tld_cache")
try:
_tld = tldextract.TLDExtract(cache_dir=_TLD_CACHE, suffix_list_urls=None)
except Exception:
_tld = tldextract.extract
# ----------------- Translation Runtime Flags -----------------
ALLOW_HF_REMOTE = os.getenv("ALLOW_HF_REMOTE", "0") == "1"
_hf_bad_models: Set[str] = set()
# ----------------- FastAPI App + Middleware -----------------
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=500)
@app.api_route("/", methods=["GET", "HEAD"], include_in_schema=False)
def root():
return JSONResponse({"ok": True, "service": "newsglobe-backend"})
@app.api_route("/healthz", methods=["GET", "HEAD"], include_in_schema=False)
def healthz():
return PlainTextResponse("OK", status_code=200)
@app.get("/favicon.ico", include_in_schema=False)
def favicon():
return PlainTextResponse("", status_code=204)
# ----------------- HTTP Session (connection pooling) -----------------
SESSION = requests.Session()
ADAPTER = requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2)
SESSION.mount("http://", ADAPTER)
SESSION.mount("https://", ADAPTER)
def _session_get(url, **kwargs):
headers = kwargs.pop("headers", {})
headers.setdefault("User-Agent", "Mozilla/5.0 (compatible; NewsGlobe/1.0)")
return SESSION.get(url, headers=headers, timeout=kwargs.pop("timeout", 12), **kwargs)
# ----------------- Lightweight Reader Fallback (Jina) -----------------
def _try_jina_reader(url: str, timeout: int) -> Optional[str]:
try:
u = url.strip()
if not u.startswith(("http://", "https://")):
u = "https://" + u
r = _session_get(f"https://r.jina.ai/{u}", timeout=timeout)
if r.status_code == 200:
txt = _clean_text(r.text)
sents = _split_sentences(txt)
if sents:
best = " ".join(sents[:2])
return best if len(best) >= 80 else (sents[0] if sents else None)
except Exception:
pass
return None
# ----------------- Description Cleanup Helpers -----------------
BOILER_DESC = re.compile(
r"(subscribe|sign in|sign up|enable javascript|cookies? (policy|settings)|"
r"privacy (policy|notice)|continue reading|read more|click here|"
r"accept (cookies|the terms)|by continuing|newsletter|advertisement|adblock)",
re.I
)
def _split_sentences(text: str) -> List[str]:
parts = re.split(r"(?<=[\.\?\!])\s+(?=[A-Z0-9])", (text or "").strip())
out = []
for p in parts:
out.extend(re.split(r"\s+[•–—]\s+", p))
return [p.strip() for p in out if p and len(p.strip()) >= 2]
def _too_similar(a: str, b: str, thresh: float = 0.92) -> bool:
a = (a or "").strip()
b = (b or "").strip()
if not a or not b:
return False
if a.lower() == b.lower():
return True
if a.lower() in b.lower() or b.lower() in a.lower():
return True
ratio = difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio()
return ratio >= thresh
def _dedupe_title_from_desc(title: str, desc: str) -> str:
t = (title or "").strip()
d = (desc or "").strip()
if not t or not d:
return d
if d.lower().startswith(t.lower()):
d = d[len(t):].lstrip(" -–:•|")
d = d.replace(t, "").strip(" -–:•|")
d = _clean_text(d)
return d
def _clean_text(s: str) -> str:
s = unescape(s or "")
s = re.sub(r"\s+", " ", s).strip()
return s
def _tidy_description(title: str, desc: str, source_name: str, max_chars: int = 240) -> str:
if not desc:
return ""
desc = _dedupe_title_from_desc(title, desc)
desc = BOILER_DESC.sub("", desc)
desc = re.sub(r"\s+", " ", desc).strip(" -–:•|")
sents = _split_sentences(desc)
if not sents:
sents = [desc]
best = " ".join(sents[:2]).strip()
if len(best) > max_chars:
if len(sents[0]) <= max_chars * 0.9:
best = sents[0]
else:
best = best[:max_chars].rsplit(" ", 1)[0].rstrip(",;:-–—")
if _too_similar(title, best):
for alt in sents[1:3]:
if not _too_similar(title, alt):
best = alt
break
if best and best[-1] not in ".!?":
best += "."
return best
# ----------------- Inflight Request Coalescing -----------------
_inflight_locks: Dict[Tuple, threading.Lock] = {}
_inflight_global_lock = threading.Lock()
def _get_inflight_lock(key: Tuple) -> threading.Lock:
with _inflight_global_lock:
lk = _inflight_locks.get(key)
if lk is None:
lk = threading.Lock()
_inflight_locks[key] = lk
return lk
# ----------------- Description Fetching (Cache + Extract) -----------------
DESC_CACHE_LOCK = threading.Lock()
try:
from bs4 import BeautifulSoup
except Exception:
BeautifulSoup = None
DESC_FETCH_TIMEOUT = 3
DESC_MIN_LEN = 100
DESC_CACHE_TTL = 24 * 3600
MAX_DESC_FETCHES = 24
DESC_WORKERS = 12
DESC_CACHE: Dict[str, Dict[str, Any]] = {}
def _now_mono():
# Monotonic for TTL calculations
try:
return monotonic()
except Exception:
return time.time()
def _extract_desc_from_ld_json(html: str) -> Optional[str]:
# Prefer LD-JSON when present (often cleaner summaries)
if not html or not BeautifulSoup:
return None
try:
soup = BeautifulSoup(html, "html.parser")
for tag in soup.find_all("script", {"type": "application/ld+json"}):
try:
import json
data = json.loads(tag.string or "")
except Exception:
continue
def find_desc(obj):
if not isinstance(obj, (dict, list)):
return None
if isinstance(obj, list):
for it in obj:
v = find_desc(it)
if v:
return v
return None
for key in ("description", "abstract", "articleBody"):
val = obj.get(key)
if isinstance(val, str):
txt = _clean_text(val)
if len(txt) >= 40:
return txt
for k, v in obj.items():
if isinstance(v, (dict, list)):
got = find_desc(v)
if got:
return got
return None
d = find_desc(data)
if d and len(d) >= 40:
return d
except Exception:
pass
return None
# Heuristic to detect consent walls and jump to reader fallback
CONSENT_HINTS = re.compile(r"(consent|gdpr|privacy choices|before you continue|we value your privacy)", re.I)
def _looks_like_consent_wall(html: str) -> bool:
if not html:
return False
if "consent.yahoo.com" in html.lower():
return True
return bool(CONSENT_HINTS.search(html))
def _extract_desc_from_html(html: str) -> Optional[str]:
html = html or ""
if BeautifulSoup:
soup = BeautifulSoup(html, "html.parser")
ld = _extract_desc_from_ld_json(html)
if ld:
txt = _clean_text(ld)
if 40 <= len(txt) <= 480:
return txt
for sel, attr in [
('meta[property="og:description"]', "content"),
('meta[name="twitter:description"]', "content"),
('meta[name="description"]', "content"),
]:
tag = soup.select_one(sel)
if tag:
txt = _clean_text(tag.get(attr, ""))
if len(txt) >= 40:
return txt
for p in soup.find_all("p"):
txt = _clean_text(p.get_text(" "))
if len(txt) >= 80:
return txt
else:
for pat in [
r'<meta[^>]+property=["\']og:description["\'][^>]+content=["\']([^"\']+)["\']',
r'<meta[^>]+name=["\']twitter:description["\'][^>]+content=["\']([^"\']+)["\']',
r'<meta[^>]+name=["\']description["\'][^>]+content=["\']([^"\']+)["\']',
]:
m = re.search(pat, html, flags=re.I | re.S)
if m:
txt = _clean_text(m.group(1))
if len(txt) >= 40:
return txt
m = re.search(r"<p[^>]*>(.*?)</p>", html, flags=re.I | re.S)
if m:
txt = _clean_text(re.sub("<[^>]+>", " ", m.group(1)))
if len(txt) >= 80:
return txt
return None
def _desc_cache_get(url: str) -> Optional[str]:
if not url:
return None
with DESC_CACHE_LOCK:
entry = DESC_CACHE.get(url)
if not entry:
return None
if _now_mono() - entry["t"] > DESC_CACHE_TTL:
DESC_CACHE.pop(url, None)
return None
return entry["text"]
def _desc_cache_put(url: str, text: str):
if url and text:
with DESC_CACHE_LOCK:
DESC_CACHE[url] = {"text": text, "t": _now_mono()}
def _attempt_fetch(url: str, timeout: int) -> Optional[str]:
# Fetch page and extract description; fallback to reader if needed
headers = {
"User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:[email protected])",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
try:
r = _session_get(url, headers=headers, timeout=timeout, allow_redirects=True)
if r.status_code != 200:
return None
ct = (r.headers.get("Content-Type") or "").lower()
txt = r.text or ""
if "html" not in ct and "<html" not in txt.lower():
return None
if _looks_like_consent_wall(txt):
jd = _try_jina_reader(url, timeout)
if jd:
return jd
return None
desc = _extract_desc_from_html(txt)
if desc and 40 <= len(desc) <= 480:
return desc
except Exception:
pass
jd = _try_jina_reader(url, timeout)
if jd and 40 <= len(jd) <= 480:
return jd
return None
def fetch_page_description(url: str) -> Optional[str]:
# Public entry: consult cache -> fetch -> AMP variants -> cache
if not url:
return None
cached = _desc_cache_get(url)
if cached:
return cached
desc = _attempt_fetch(url, DESC_FETCH_TIMEOUT)
if not desc:
amp_candidates = []
try:
p = urlparse(url)
if not p.path.endswith("/amp"):
amp_candidates.append(urlunparse(p._replace(path=(p.path.rstrip("/") + "/amp"))))
q = p.query
amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "amp=1"))))
amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "outputType=amp"))))
except Exception:
pass
for amp_url in amp_candidates:
desc = _attempt_fetch(amp_url, DESC_FETCH_TIMEOUT)
if desc:
break
if desc:
_desc_cache_put(url, desc)
return desc
return None
def _needs_desc_upgrade(a: Dict[str, Any]) -> bool:
# Decide if we should try to refetch a better description
url = a.get("url") or ""
if not url:
return False
title = (a.get("title") or "").strip()
desc = (a.get("description") or "").strip()
if not desc or desc.lower().startswith("no description"):
return True
if len(desc) < DESC_MIN_LEN:
return True
if _too_similar(title, desc):
return True
return False
def prefetch_descriptions(raw_articles: List[Dict[str, Any]], speed: Speed = Speed.balanced):
# Parallel prefetch for weak descriptions (bounded to avoid stampedes)
candidates, seen = [], set()
max_fetches = 6 if speed == Speed.fast else 8 if speed == Speed.balanced else 16
timeout = 1 if speed == Speed.fast else 2
workers = 3 if speed == Speed.fast else 4 if speed == Speed.balanced else 8
for a in raw_articles:
url = a.get("url")
if not url or url in seen:
continue
seen.add(url)
if _needs_desc_upgrade(a) and not _desc_cache_get(url):
candidates.append(url)
if len(candidates) >= max_fetches:
break
if not candidates:
return
with ThreadPoolExecutor(max_workers=workers) as ex:
futs = [ex.submit(fetch_page_description, u) for u in candidates]
for _ in as_completed(futs):
pass
def prefetch_descriptions_async(raw_articles, speed: Speed = Speed.balanced):
threading.Thread(target=prefetch_descriptions, args=(raw_articles, speed), daemon=True).start()
# ----------------- Category / Keyword Heuristics -----------------
DetectorFactory.seed = 0
SECTION_HINTS = {
"sports": "sports",
"sport": "sports",
"business": "business",
"money": "business",
"market": "business",
"tech": "technology",
"technology": "technology",
"sci": "science",
"science": "science",
"health": "health",
"wellness": "health",
"entertainment": "entertainment",
"culture": "entertainment",
"showbiz": "entertainment",
"crime": "crime",
"world": "general",
"weather": "weather",
"environment": "environment",
"climate": "environment",
"travel": "travel",
"politics": "politics",
"election": "politics",
}
KEYWORDS = {
"sports": r"\b(NBA|NFL|MLB|NHL|Olympic|goal|match|tournament|coach|transfer)\b",
"business": r"\b(stocks?|earnings|IPO|merger|acquisition|revenue|inflation|market|tax|budget|inflation|revenue|deficit)\b",
"technology": r"\b(AI|software|chip|semiconductor|app|startup|cyber|hack|quantum|robot)\b",
"science": r"\b(researchers?|study|physics|astronomy|genome|spacecraft|telescope)\b",
"health": r"\b(virus|vaccine|disease|hospital|doctor|public health|covid|recall|FDA|contamination|disease outbreak)\b",
"entertainment": r"\b(movie|film|box office|celebrity|series|show|album|music|)\b",
"crime": r"\b(arrested|charged|police|homicide|fraud|theft|court|lawsuit)\b",
"weather": r"\b(hurricane|storm|flood|heatwave|blizzard|tornado|forecast)\b",
"environment": r"\b(climate|emissions|wildfire|deforestation|biodiversity)\b",
"travel": r"\b(flight|airline|airport|tourism|visa|cruise|hotel)\b",
"politics": r"\b(president|parliament|congress|minister|policy|campaign|election|rally|protest|demonstration)\b",
}
# ----------------- Category normalization to frontend set -----------------
FRONTEND_CATS = {
"politics","technology","sports","business","entertainment",
"science","health","crime","weather","environment","travel",
"viral","general"
}
ML_TO_FRONTEND = {
"arts_&_culture": "entertainment",
"business": "business",
"business_&_entrepreneurs": "business",
"celebrity_&_pop_culture": "entertainment",
"crime": "crime",
"diaries_&_daily_life": "viral",
"entertainment": "entertainment",
"environment": "environment",
"fashion_&_style": "entertainment",
"film_tv_&_video": "entertainment",
"fitness_&_health": "health",
"food_&_dining": "entertainment",
"general": "general",
"learning_&_educational": "science",
"news_&_social_concern": "politics",
"politics": "politics",
"science_&_technology": "science",
"sports": "sports",
"technology": "technology",
"travel_&_adventure": "travel",
"other_hobbies": "viral"
}
def normalize_category(c: Optional[str]) -> str:
s = (c or "").strip().lower()
if not s:
return "general"
if s in FRONTEND_CATS:
return s
return ML_TO_FRONTEND.get(s, "general")
def get_news_clf():
# Lazy-init topic classifier
global _news_clf
if _news_clf is None:
_news_clf = hf_pipeline(
"text-classification",
model="cardiffnlp/tweet-topic-21-multi",
top_k=1,
)
return _news_clf
def _infer_category_from_url_path(url_path: str) -> Optional[str]:
# Order: provided -> URL path -> keyword -> ML fallback
parts = [p for p in url_path.lower().split("/") if p]
for p in parts:
if p in SECTION_HINTS:
return SECTION_HINTS[p]
for p in parts:
for tok in re.split(r"[-_]", p):
if tok in SECTION_HINTS:
return SECTION_HINTS[tok]
return None
def _infer_category_from_text(text: str) -> Optional[str]:
if not text:
return None
for cat, pat in KEYWORDS.items():
if re.search(pat, text, flags=re.I):
return cat
return None
def infer_category(article_url, title, description, provided):
if provided:
got = normalize_category(provided)
if got:
return got
try:
p = urlparse(article_url).path or ""
cat = _infer_category_from_url_path(p)
if cat:
return normalize_category(cat)
except Exception:
pass
text = f"{title or ''} {description or ''}".strip()
cat = _infer_category_from_text(text)
if cat:
return normalize_category(cat)
try:
preds = get_news_clf()(text[:512])
label = preds[0][0]["label"] if isinstance(preds[0], list) else preds[0]["label"]
return normalize_category(label)
except Exception:
return "general"
# ----------------- Language Detection / Embeddings -----------------
def detect_lang(text: str) -> Optional[str]:
try:
return detect(text)
except Exception:
return None
def get_sbert():
global _sbert
if _sbert is None:
_sbert = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
return _sbert
def _embed_texts(texts: List[str]):
embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False)
return embs
# ----------------- NLTK / VADER Sentiment -----------------
NLTK_DATA_DIR = os.environ.get("NLTK_DATA", "/app/nltk_data")
if NLTK_DATA_DIR not in nltk.data.path:
nltk.data.path.insert(0, NLTK_DATA_DIR)
try:
nltk.data.find("sentiment/vader_lexicon")
except LookupError:
try:
os.makedirs(NLTK_DATA_DIR, exist_ok=True)
nltk.download("vader_lexicon", download_dir=NLTK_DATA_DIR, quiet=True)
except Exception:
pass
try:
_vader = SentimentIntensityAnalyzer()
except Exception:
_vader = None
def classify_sentiment(text: str) -> str:
if not text:
return "neutral"
if _vader is None:
return "neutral"
scores = _vader.polarity_scores(text)
c = scores["compound"]
return "positive" if c >= 0.2 else "negative" if c <= -0.2 else "neutral"
# ----------------- Geocoding / Domain → Country -----------------
def get_country_centroid(country_name):
if not country_name or country_name == "Unknown":
return {"lat": 0, "lon": 0, "country": "Unknown"}
try:
country = CountryInfo(country_name)
latlng = country.capital_latlng()
return {"lat": latlng[0], "lon": latlng[1], "country": country_name}
except Exception as e:
log.info(f"Could not get centroid for {country_name}: {e}")
return {"lat": 0, "lon": 0, "country": country_name or "Unknown"}
def resolve_domain_to_ip(domain):
if not domain:
return None
try:
return socket.gethostbyname(domain)
except socket.gaierror:
return None
def geolocate_ip(ip):
try:
r = _session_get(f"https://ipwho.is/{ip}?fields=success,country,latitude,longitude", timeout=8)
j = r.json()
if j.get("success"):
return {"lat": j["latitude"], "lon": j["longitude"], "country": j["country"]}
except Exception:
pass
return None
# Nominatim for a light refinement pass (async)
geolocator = Nominatim(user_agent="newsglobe-app (contact: [email protected])")
domain_geo_cache: Dict[str, Dict[str, Any]] = {}
MAJOR_OUTLETS = {
"bbc.co.uk": "United Kingdom",
"theguardian.com": "United Kingdom",
"reuters.com": "United States",
"aljazeera.com": "Qatar",
"lemonde.fr": "France",
"dw.com": "Germany",
"abc.net.au": "Australia",
"ndtv.com": "India",
"globo.com": "Brazil",
"elpais.com": "Spain",
"lefigaro.fr": "France",
"kyodonews.net": "Japan",
"straitstimes.com": "Singapore",
"thesun.my": "Malaysia",
}
def geocode_source(source_text: str, domain: str = "", do_network: bool = False):
cache_key = f"{source_text}|{domain}"
if cache_key in domain_geo_cache:
return domain_geo_cache[cache_key]
ext = _tld(domain or "")
fqdn = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else ""
if fqdn in MAJOR_OUTLETS:
coords = get_country_centroid(MAJOR_OUTLETS[fqdn]); domain_geo_cache[cache_key] = coords; return coords
if ext.domain in domain_country_map:
coords = get_country_centroid(domain_country_map[ext.domain]); domain_geo_cache[cache_key] = coords; return coords
coords = get_country_centroid(_suffix_country(ext.suffix))
domain_geo_cache[cache_key] = coords
if do_network:
threading.Thread(target=_refine_geo_async, args=(cache_key, source_text, fqdn), daemon=True).start()
return coords
def _suffix_country(suffix: Optional[str]) -> str:
s = (suffix or "").split(".")[-1]
m = {
"au":"Australia","uk":"United Kingdom","gb":"United Kingdom","ca":"Canada","in":"India","us":"United States",
"ng":"Nigeria","de":"Germany","fr":"France","jp":"Japan","sg":"Singapore","za":"South Africa","nz":"New Zealand",
"ie":"Ireland","it":"Italy","es":"Spain","se":"Sweden","ch":"Switzerland","nl":"Netherlands","br":"Brazil",
"my":"Malaysia","id":"Indonesia","ph":"Philippines","th":"Thailand","vn":"Vietnam","sa":"Saudi Arabia",
"ae":"United Arab Emirates","tr":"Turkey","mx":"Mexico","ar":"Argentina","cl":"Chile","co":"Colombia",
"il":"Israel","kr":"South Korea","cn":"China","tw":"Taiwan","hk":"Hong Kong"
}
return m.get(s, "United States" if s in ("com","org","net") else "Unknown")
def _refine_geo_async(cache_key, source_text, fqdn):
try:
ip = resolve_domain_to_ip(fqdn) if fqdn else None
if ip:
coords = geolocate_ip(ip)
if coords:
domain_geo_cache[cache_key] = coords
return
location = geolocator.geocode(f"{source_text} News Headquarters", timeout=2)
if location and hasattr(location, "raw"):
coords = {
"lat": location.latitude,
"lon": location.longitude,
"country": location.raw.get("address", {}).get("country", "Unknown"),
}
domain_geo_cache[cache_key] = coords
except Exception:
pass
# ----------------- Translation (HF / Libre / Local) -----------------
HF_MODEL_PRIMARY = None
NLLB_CODES = {
"en": "eng_Latn",
"es": "spa_Latn",
"fr": "fra_Latn",
"de": "deu_Latn",
"it": "ita_Latn",
"pt": "por_Latn",
"zh": "zho_Hans",
"ru": "rus_Cyrl",
"ar": "arb_Arab",
"hi": "hin_Deva",
"ja": "jpn_Jpan",
"ko": "kor_Hang",
}
def opus_model_for(src2: str, tgt2: str) -> Optional[str]:
pairs = {
("es", "en"): "Helsinki-NLP/opus-mt-es-en",
("en", "es"): "Helsinki-NLP/opus-mt-en-es",
("fr", "en"): "Helsinki-NLP/opus-mt-fr-en",
("en", "fr"): "Helsinki-NLP/opus-mt-en-fr",
("de", "en"): "Helsinki-NLP/opus-mt-de-en",
("en", "de"): "Helsinki-NLP/opus-mt-en-de",
("pt", "en"): "Helsinki-NLP/opus-mt-pt-en",
("en", "pt"): "Helsinki-NLP/opus-mt-en-pt",
("it", "en"): "Helsinki-NLP/opus-mt-it-en",
("en", "it"): "Helsinki-NLP/opus-mt-en-it",
("ru", "en"): "Helsinki-NLP/opus-mt-ru-en",
("en", "ru"): "Helsinki-NLP/opus-mt-en-ru",
("zh", "en"): "Helsinki-NLP/opus-mt-zh-en",
("en", "zh"): "Helsinki-NLP/opus-mt-en-zh",
("ja", "en"): "Helsinki-NLP/opus-mt-ja-en",
("en", "ja"): "Helsinki-NLP/opus-mt-en-ja",
("ko", "en"): "Helsinki-NLP/opus-mt-ko-en",
("en", "ko"): "Helsinki-NLP/opus-mt-en-ko",
("hi", "en"): "Helsinki-NLP/opus-mt-hi-en",
("en", "hi"): "Helsinki-NLP/opus-mt-en-hi",
("ar", "en"): "Helsinki-NLP/opus-mt-ar-en",
("en", "ar"): "Helsinki-NLP/opus-mt-en-ar",
}
return pairs.get((src2, tgt2))
SUPPORTED = {"en", "fr", "de", "es", "it", "hi", "ar", "ru", "ja", "ko", "pt", "zh"}
LIBRETRANSLATE_URL = os.getenv("LIBRETRANSLATE_URL")
def _lt_lang(code: str) -> str:
if not code:
return code
c = code.lower()
# LibreTranslate uses zh-Hans; normalize zh* to zh-Hans
if c.startswith("zh"):
return "zh-Hans"
return c
def _translate_via_libre(text: str, src: str, tgt: str) -> Optional[str]:
url = LIBRETRANSLATE_URL
if not url or not text or src == tgt:
return None
payload = {
"q": text,
"source": _lt_lang(src),
"target": _lt_lang(tgt),
"format": "text",
}
# First call can be slow while LT warms models; retry once.
for attempt in (1, 2):
try:
r = SESSION.post(
f"{url.rstrip('/')}/translate",
json=payload,
timeout=15 # was 6
)
if r.status_code == 200:
j = r.json()
out = j.get("translatedText")
return out if isinstance(out, str) and out else None
else:
log.warning("LibreTranslate HTTP %s: %s", r.status_code, r.text[:200])
return None
except Exception as e:
if attempt == 2:
log.warning("LibreTranslate failed: %s", e)
return None
time.sleep(0.5)
def _hf_call(model_id: str, payload: dict) -> Optional[str]:
if not (HUGGINGFACE_API_TOKEN and ALLOW_HF_REMOTE):
return None
if model_id in _hf_bad_models:
return None
url = f"https://api-inference.huggingface.co/models/{model_id}"
headers = {
"Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}",
"HF-API-KEY": HUGGINGFACE_API_TOKEN,
"Accept": "application/json",
"Content-Type": "application/json",
}
try:
r = requests.post(url, headers=headers, json=payload, timeout=25)
if r.status_code != 200:
if r.status_code == 404:
_hf_bad_models.add(model_id)
log.warning("HF %s -> 404: Not Found (disabled for this process)", model_id)
else:
log.warning("HF %s -> %s: %s", model_id, r.status_code, r.text[:300])
return None
j = r.json()
except Exception as e:
log.warning("HF request failed: %s", e)
return None
if isinstance(j, list) and j and isinstance(j[0], dict):
if "generated_text" in j[0]:
return j[0]["generated_text"]
if "translation_text" in j[0]:
return j[0]["translation_text"]
if isinstance(j, dict) and "generated_text" in j:
return j["generated_text"]
if isinstance(j, str):
return j
return None
@lru_cache(maxsize=4096)
def _translate_cached(text: str, src: str, tgt: str) -> str:
if not text or src == tgt:
return text
out = _translate_via_libre(text, src, tgt)
if out:
return out
opus_model = opus_model_for(src, tgt)
if opus_model:
out = _hf_call(opus_model, {"inputs": text})
if out:
return out
try:
if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES):
out = _hf_call(
HF_MODEL_PRIMARY,
{
"inputs": text,
"parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]},
"options": {"wait_for_model": True},
},
)
if out:
return out
except Exception:
pass
if src != "en" and tgt != "en":
step_en = _translate_cached(text, src, "en")
if step_en and step_en != text:
out = _translate_cached(step_en, "en", tgt)
if out:
return out
out = _translate_local(text, src, tgt)
if out:
return out
log.warning("All translate paths failed (%s->%s); returning original.", src, tgt)
return text
def translate_text(text: str, target_lang: Optional[str], fallback_src: Optional[str] = None) -> str:
if not text or not target_lang:
return text
tgt = target_lang.lower()
if tgt not in SUPPORTED:
return text
src = (fallback_src or detect_lang(text) or "en").lower()
if src == tgt:
return text
if src not in SUPPORTED:
if src.startswith("zh"):
src = "zh"
elif src.startswith("pt"):
src = "pt"
elif src[:2] in SUPPORTED:
src = src[:2]
else:
src = "en"
return _translate_cached(text, src, tgt)
def _translate_local(text: str, src: str, tgt: str) -> Optional[str]:
if not _HAS_SENTENCEPIECE:
return None
model_id = opus_model_for(src, tgt)
if not model_id:
return None
key = model_id
try:
if key not in _local_pipes:
_local_pipes[key] = hf_pipeline("translation", model=model_id)
out = _local_pipes[key](text, max_length=512)
return out[0]["translation_text"]
except Exception as e:
log.warning("Local translate failed for %s: %s", model_id, e)
return None
# ----------------- Warmup Settings & Routine -----------------
WARM_LIMIT_EACH = 20
WARM_TIMESPAN = "24h"
WARM_PREFETCH_DESCRIPTIONS = False
def _fmt_mmss(ms: float) -> str:
total_sec = int(round(ms / 1000.0))
m, s = divmod(total_sec, 60)
return f"{m}:{s:02d}"
def _warm_once():
try:
log.info("WARM: starting background warm-up (limit_each=%d, timespan=%s)", WARM_LIMIT_EACH, WARM_TIMESPAN)
t0 = time.perf_counter()
get_sbert()
get_news_clf()
t1 = time.perf_counter()
raw = combine_raw_articles(
category=None, query=None, language="en",
limit_each=WARM_LIMIT_EACH, timespan=WARM_TIMESPAN,
log_summary=False
)
t_fetch = (time.perf_counter() - t1) * 1000
if WARM_PREFETCH_DESCRIPTIONS:
prefetch_descriptions_async(raw)
t2 = time.perf_counter()
enriched = [enrich_article(a, language="en", translate=False, target_lang=None) for a in raw]
t_enrich = (time.perf_counter() - t2) * 1000
t3 = time.perf_counter()
clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD)
t_cluster = (time.perf_counter() - t3) * 1000
key = cache_key_for(q=None, category=None, language="en",
limit_each=WARM_LIMIT_EACH, translate=False, target_lang=None,
speed=Speed.balanced)
_events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters}
t_total = (time.perf_counter() - t0) * 1000
log.info(
"WARM: fetch=%s, enrich=%s, cluster=%s, total=%s (raw=%d, enriched=%d, clusters=%d)",
_fmt_mmss(t_fetch), _fmt_mmss(t_enrich), _fmt_mmss(t_cluster), _fmt_mmss(t_total),
len(raw), len(enriched), len(clusters),
)
except Exception as e:
log.warning(f"WARM: failed: {e}")
@app.on_event("startup")
def warm():
try:
_translate_cached.cache_clear()
except Exception:
pass
get_sbert()
get_news_clf()
threading.Thread(target=_warm_once, daemon=True).start()
# ----------------- GDELT Query Helpers -----------------
_GDELT_LANG = {
"en": "english",
"es": "spanish",
"fr": "french",
"de": "german",
"it": "italian",
"pt": "portuguese",
"ru": "russian",
"ar": "arabic",
"hi": "hindi",
"ja": "japanese",
"ko": "korean",
"zh": "chinese",
}
def _gdelt_safe_query(user_q, language):
parts = []
if user_q:
q = user_q.strip()
if len(q) < 3:
q = f'"{q}" news'
parts.append(q)
if language and (lg := _GDELT_LANG.get(language.lower())):
parts.append(f"sourcelang:{lg}")
if not parts:
parts.append("sourcelang:english")
return " ".join(parts)
# ----------------- GDELT Fetchers -----------------
def fetch_gdelt_articles(
limit=50,
query=None,
language=None,
timespan="3d",
category=None,
extra_tokens: Optional[List[str]] = None,
start_utc: Optional[datetime] = None,
end_utc: Optional[datetime] = None,
):
q = _gdelt_safe_query(query, language)
if extra_tokens:
q = f"{q} " + " ".join(extra_tokens)
url = "https://api.gdeltproject.org/api/v2/doc/doc"
params = {
"query": q,
"mode": "ArtList",
"format": "json",
"sort": "DateDesc",
"maxrecords": int(min(250, max(1, limit))),
}
if start_utc and end_utc:
params["startdatetime"] = _gdelt_fmt(start_utc)
params["enddatetime"] = _gdelt_fmt(end_utc)
else:
params["timespan"] = timespan
headers = {
"User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:[email protected])",
"Accept": "application/json",
}
def _do_request(p):
r = _session_get(url, params=p, timeout=10)
log.info(f"GDELT URL: {r.url} (status={r.status_code})")
if r.status_code != 200:
log.warning(f"GDELT HTTP {r.status_code}: {r.text[:400]}")
return None
try:
return r.json()
except Exception:
ct = r.headers.get("Content-Type", "")
log.warning(f"GDELT non-JSON response. CT={ct}. Body[:400]: {r.text[:400]}")
return None
data = _do_request(params)
if data is None:
p2 = {**params, "timespan": "24h", "maxrecords": min(100, params["maxrecords"])}
data = _do_request(p2)
if not data:
return []
arts = data.get("articles") or []
results = []
for a in arts:
desc = a.get("description") or a.get("content") or ""
title = a.get("title") or ""
if desc and (
desc.strip().lower() == title.strip().lower() or
(len(desc) <= 60 and _too_similar(title, desc))
):
desc = ""
desc = desc or "No description available"
results.append(
{
"title": title,
"url": a.get("url"),
"source": {"name": a.get("domain") or "GDELT"},
"description": desc,
"publishedAt": a.get("seendate"),
"api_source": "gdelt",
"gdelt_sourcecountry": a.get("sourcecountry"),
"requested_category": category,
}
)
log.info(f"GDELT returned {len(results)}")
return results
def fetch_gdelt_multi(
limit=120, query=None, language=None, timespan="48h",
category=None, speed: Speed = Speed.balanced,
start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None
):
if language:
primary = fetch_gdelt_articles(limit=limit, query=query, language=language,
timespan=timespan, category=category,
start_utc=start_utc, end_utc=end_utc)
booster = fetch_gdelt_articles(limit=max(10, limit // 6), query=query, language="en",
timespan=timespan, category=category,
start_utc=start_utc, end_utc=end_utc)
return primary + booster
if speed == Speed.fast:
langs = LANG_ROTATION[:3]
timespan = "24h"
elif speed == Speed.balanced:
langs = LANG_ROTATION[:8]
timespan = "48h"
else:
langs = LANG_ROTATION
timespan = "3d"
per_lang = max(8, math.ceil(limit / len(langs)))
out = []
for lg in langs:
out.extend(fetch_gdelt_articles(limit=per_lang, query=query, language=lg,
timespan=timespan, category=category,
start_utc=start_utc, end_utc=end_utc))
if speed != Speed.fast:
per_cc = max(4, limit // 30) if speed == Speed.max else max(2, limit // 40)
for cc in COUNTRY_SEEDS[: (8 if speed == Speed.balanced else 16)]:
out.extend(fetch_gdelt_articles(
limit=per_cc, query=query, language="en",
timespan=timespan, category=category,
extra_tokens=[f"sourcecountry:{cc}"],
start_utc=start_utc, end_utc=end_utc
))
return out
# ----------------- Provider Flags / Keys / Logging -----------------
USE_GNEWS_API = True
USE_NEWSDATA_API = True
USE_GDELT_API = True
USE_NEWSAPI = True
NEWSAPI_KEY = os.getenv("NEWSAPI_KEY", "3b2d3fde45b84cdbb3f706dfb0110df4")
GNEWS_API_KEY = os.getenv("GNEWS_API_KEY", "5419897c95e8a4b21074e0d3fe95a3dd")
NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "pub_1feb49a71a844719af68d0844fb43a61")
HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN")
logging.basicConfig(
level=logging.WARNING,
format="%(levelname)s:%(name)s:%(message)s",
)
log = logging.getLogger("newsglobe")
log.setLevel(logging.WARNING)
fetch_log = logging.getLogger("newsglobe.fetch_summary")
fetch_log.setLevel(logging.INFO)
_fetch_handler = logging.StreamHandler()
_fetch_handler.setLevel(logging.INFO)
_fetch_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
fetch_log.addHandler(_fetch_handler)
fetch_log.propagate = False
for name in ("urllib3", "urllib3.connectionpool", "requests.packages.urllib3"):
lg = logging.getLogger(name)
lg.setLevel(logging.ERROR)
lg.propagate = False
def _newsapi_enabled() -> bool:
if not NEWSAPI_KEY:
log.warning("NewsAPI disabled: missing NEWSAPI_KEY env var")
return False
return True
# ----------------- Clustering Helpers -----------------
def cluster_id(cluster, enriched_articles):
urls = sorted([(enriched_articles[i].get("url") or "") for i in cluster["indices"] if enriched_articles[i].get("url")])
base = "|".join(urls) if urls else "empty"
return hashlib.md5(base.encode("utf-8")).hexdigest()[:10]
BOILER = re.compile(r"\b(live updates|breaking|what we know|in pictures|opinion)\b", re.I)
def _norm_text(s: str) -> str:
s = (s or "").strip()
s = re.sub(r"\s+", " ", s)
return s
def _cluster_text(a):
base = f"{a.get('orig_title') or a.get('title') or ''} {a.get('orig_description') or a.get('description') or ''}"
base = BOILER.sub("", base)
base = re.sub(r"\b(\d{1,2}:\d{2}\s?(AM|PM))|\b(\d{1,2}\s\w+\s\d{4})", "", base, flags=re.I)
return _norm_text(base)
def _canonical_url(u: str) -> str:
if not u:
return u
p = urlparse(u)
qs = [(k, v) for (k, v) in parse_qsl(p.query, keep_blank_values=False) if not k.lower().startswith(("utm_", "fbclid", "gclid"))]
clean = p._replace(query="&".join([f"{k}={v}" for k, v in qs]), fragment="")
path = clean.path.rstrip("/") or "/"
clean = clean._replace(path=path)
return urlunparse(clean)
# ----------------- Normalizers / Enrichment -----------------
def normalize_newsdata_article(article):
return {
"title": article.get("title"),
"url": article.get("link"),
"source": {"name": article.get("source_id", "NewsData.io")},
"description": article.get("description") or "No description available",
"publishedAt": article.get("publishedAt"),
"api_source": article.get("api_source", "newsdata"),
"category": ((article.get("category") or [None])[0] if isinstance(article.get("category"), list) else article.get("category")),
}
def enrich_article(a, language=None, translate=False, target_lang=None):
source_name = (a.get("source", {}) or {}).get("name", "").strip() or "Unknown"
s_lower = source_name.lower()
if "newsapi" in s_lower:
source_name = "NewsAPI"
elif "gnews" in s_lower:
source_name = "GNews"
elif "newsdata" in s_lower:
source_name = "NewsData.io"
article_url = _canonical_url(a.get("url") or "")
try:
ext = _tld(article_url)
domain = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else ""
except Exception:
domain = ""
country_guess = None
if a.get("api_source") == "gdelt":
sc = a.get("gdelt_sourcecountry")
if sc:
iso2map = {
"US": "United States", "GB": "United Kingdom", "AU": "Australia", "CA": "Canada", "IN": "India",
"DE": "Germany", "FR": "France", "IT": "Italy", "ES": "Spain", "BR": "Brazil", "JP": "Japan",
"CN": "China", "RU": "Russia", "KR": "South Korea", "ZA": "South Africa", "NG": "Nigeria",
"MX": "Mexico", "AR": "Argentina", "CL": "Chile", "CO": "Colombia", "NL": "Netherlands",
"SE": "Sweden", "NO": "Norway", "DK": "Denmark", "FI": "Finland", "IE": "Ireland", "PL": "Poland",
"PT": "Portugal", "GR": "Greece", "TR": "Turkey", "IL": "Israel", "SA": "Saudi Arabia",
"AE": "United Arab Emirates", "SG": "Singapore", "MY": "Malaysia", "TH": "Thailand",
"PH": "Philippines", "ID": "Indonesia", "NZ": "New Zealand",
}
country_guess = iso2map.get(str(sc).upper(), sc if len(str(sc)) > 2 else None)
coords = get_country_centroid(country_guess) if country_guess else geocode_source(source_name, domain, do_network=False)
title = (a.get("title") or "").strip() or "(untitled)"
description = (a.get("description") or "").strip()
if description.lower().startswith("no description"):
description = ""
cached_desc = _desc_cache_get(article_url)
need_upgrade = (
(not description)
or description.lower().startswith("no description")
or len(description) < DESC_MIN_LEN
or _too_similar(title, description)
)
if need_upgrade and cached_desc:
description = cached_desc
if description:
description = _tidy_description(title, description, source_name)
if (not description) or _too_similar(title, description):
description = f"Quick take: {title.rstrip('.')}."
orig_title = title
orig_description = description
detected_lang = (detect_lang(f"{title} {description}") or "").lower()
ml_text = f"{orig_title}. {orig_description}".strip()
sentiment = classify_sentiment(f"{orig_title} {orig_description}")
seed = f"{source_name}|{article_url}|{title}"
uid = hashlib.md5(seed.encode("utf-8")).hexdigest()[:12]
provided = a.get("category")
if provided and normalize_category(provided) != "general":
cat = normalize_category(provided)
else:
cat = infer_category(article_url, orig_title, orig_description, provided)
return {
"id": uid,
"title": title,
"url": article_url,
"source": source_name,
"lat": coords["lat"],
"lon": coords["lon"],
"country": coords.get("country", "Unknown"),
"description": description,
"sentiment": sentiment,
"api_source": a.get("api_source", "unknown"),
"publishedAt": a.get("publishedAt"),
"_ml_text": ml_text,
"orig_title": orig_title,
"orig_description": orig_description,
"detected_lang": detected_lang,
"translated": False,
"category": cat,
}
# ----------------- Clustering (Semantic, single-pass + merge) -----------------
def cluster_articles(articles: List[Dict[str, Any]], sim_threshold=0.6, speed: Speed = Speed.balanced):
if speed == Speed.fast:
articles = articles[:150]
sim_threshold = max(sim_threshold, 0.64)
elif speed == Speed.balanced:
articles = articles[:]
sim_threshold = max(sim_threshold, 0.62)
texts = [_cluster_text(a) for a in articles]
embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False)
clusters = []
centroids = []
for i, emb in enumerate(embs):
best_idx, best_sim = -1, -1.0
for ci, c_emb in enumerate(centroids):
sim = util.cos_sim(emb, c_emb).item()
if sim > sim_threshold and sim > best_sim:
best_sim, best_idx = sim, ci
if best_idx >= 0:
clusters[best_idx]["indices"].append(i)
idxs = clusters[best_idx]["indices"]
new_c = embs[idxs].mean(dim=0)
new_c = new_c / new_c.norm()
centroids[best_idx] = new_c
clusters[best_idx]["centroid"] = new_c
else:
event_id = hashlib.md5(texts[i].encode("utf-8")).hexdigest()[:10]
clusters.append({"id": event_id, "indices": [i], "centroid": emb})
centroids.append(emb)
merged = _merge_close_clusters(clusters, embs, threshold=0.70)
for c in merged:
c["id"] = cluster_id(c, articles)
return merged
def event_payload_from_cluster(cluster, enriched_articles):
idxs = cluster["indices"]
arts = [enriched_articles[i] for i in idxs]
title_counts = Counter([a["title"] for a in arts])
canonical_title = title_counts.most_common(1)[0][0]
keywords = list({w.lower() for t in title_counts for w in t.split() if len(w) > 3})[:8]
sources = {a["source"] for a in arts}
countries = {a["country"] for a in arts if a["country"] and a["country"] != "Unknown"}
ts = [a.get("publishedAt") for a in arts if a.get("publishedAt")]
return {
"event_id": cluster_id(cluster, enriched_articles),
"title": canonical_title,
"keywords": keywords,
"article_count": len(arts),
"source_count": len(sources),
"country_count": len(countries),
"time_range": {"min": min(ts) if ts else None, "max": max(ts) if ts else None},
"sample_urls": [a["url"] for a in arts[:3] if a.get("url")],
}
def aggregate_event_by_country(cluster, enriched_articles, max_samples: int | None = 5):
idxs = cluster["indices"]
arts = [enriched_articles[i] for i in idxs]
by_country: Dict[str, Dict[str, Any]] = {}
for a in arts:
c = a.get("country") or "Unknown"
if c not in by_country:
coords = get_country_centroid(c)
by_country[c] = {"country": c, "lat": coords["lat"], "lon": coords["lon"], "articles": []}
by_country[c]["articles"].append(a)
results = []
for c, block in by_country.items():
arr = block["articles"]
to_num = {"negative": -1, "neutral": 0, "positive": 1}
vals = [to_num.get(a["sentiment"], 0) for a in arr]
avg = sum(vals) / max(len(vals), 1)
avg_sent = "positive" if avg > 0.15 else "negative" if avg < -0.15 else "neutral"
top_sources = [s for s, _ in Counter([a["source"] for a in arr]).most_common(3)]
summary = " • ".join([a["title"] for a in arr[:2]])
use = arr if (max_samples in (None, 0) or max_samples < 0) else arr[:max_samples]
results.append(
{
"country": c,
"lat": block["lat"],
"lon": block["lon"],
"count": len(arr),
"avg_sentiment": avg_sent,
"top_sources": top_sources,
"summary": summary,
"samples": [
{
"title": a["title"],
"orig_title": a.get("orig_title"),
"orig_description": a.get("orig_description"),
"url": a["url"],
"source": a["source"],
"sentiment": a["sentiment"],
"detected_lang": a.get("detected_lang"),
}
# for a in arr[:5]
for a in use
],
}
)
return results
def _merge_close_clusters(clusters, embs, threshold=0.68):
merged = []
used = set()
for i in range(len(clusters)):
if i in used:
continue
base = clusters[i]
group = [i]
for j in range(i + 1, len(clusters)):
if j in used:
continue
sim = util.cos_sim(base["centroid"], clusters[j]["centroid"]).item()
if sim >= threshold:
group.append(j)
all_idx = []
cents = []
for g in group:
used.add(g)
all_idx.extend(clusters[g]["indices"])
cents.append(clusters[g]["centroid"])
newc = torch.stack(cents, dim=0).mean(dim=0)
newc = newc / newc.norm()
merged.append({"indices": sorted(set(all_idx)), "centroid": newc})
return merged
# ----------------- Event Cache / Keys -----------------
CACHE_TTL_SECS = 900
SIM_THRESHOLD = 0.6
_events_cache: Dict[Tuple, Dict[str, Any]] = {}
# -------- Date parsing helpers (Option B) --------
ISO_BASIC_RE = re.compile(r'^(\d{4})(\d{2})(\d{2})(?:[T ]?(\d{2})(\d{2})(\d{2}))?(Z|[+-]\d{2}:?\d{2})?$')
def _parse_user_dt(s: Optional[str], which: str) -> Optional[datetime]:
"""Parse query 'start'/'end' into UTC-aware datetimes."""
if not s:
return None
s = s.strip()
try:
# Normalize Z
if s.endswith("Z"):
s = s[:-1] + "+00:00"
# Date-only
if re.match(r'^\d{4}-\d{2}-\d{2}$', s):
s = s + ("T00:00:00+00:00" if which == "start" else "T23:59:59+00:00")
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except Exception:
m = ISO_BASIC_RE.match(s)
if m:
yyyy, MM, dd, hh, mm, ss, tz = m.groups()
hh = hh or ("00" if which == "start" else "23")
mm = mm or ("00" if which == "start" else "59")
ss = ss or ("00" if which == "start" else "59")
return datetime(int(yyyy), int(MM), int(dd), int(hh), int(mm), int(ss), tzinfo=timezone.utc)
return None
def _gdelt_fmt(dt: datetime) -> str:
return dt.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S")
def _parse_any_pubdate(s: Optional[str]) -> Optional[datetime]:
"""Best-effort parse of provider publishedAt strings to UTC."""
if not s:
return None
try:
t = s.strip()
if t.endswith("Z"):
t = t[:-1] + "+00:00"
return datetime.fromisoformat(t).astimezone(timezone.utc)
except Exception:
m = ISO_BASIC_RE.match(s)
if m:
yyyy, MM, dd, hh, mm, ss, tz = m.groups()
hh = hh or "00"; mm = mm or "00"; ss = ss or "00"
return datetime(int(yyyy), int(MM), int(dd), int(hh), int(mm), int(ss), tzinfo=timezone.utc)
return None
def cache_key_for(
q, category, language, limit_each,
translate=False, target_lang=None,
start_utc: Optional[datetime] = None,
end_utc: Optional[datetime] = None,
speed: Speed = Speed.balanced
):
return (
q or "", category or "", language or "", int(limit_each or 50),
bool(translate), (target_lang or "").lower(),
(start_utc and _gdelt_fmt(start_utc)) or "",
(end_utc and _gdelt_fmt(end_utc)) or "",
speed.value,
)
_first_real_build = True
def get_or_build_events_cache(
q, category, language, translate, target_lang, limit_each,
start_utc: Optional[datetime] = None,
end_utc: Optional[datetime] = None,
speed: Speed = Speed.balanced
):
global _first_real_build
key = cache_key_for(q, category, language, limit_each, translate, target_lang, start_utc, end_utc, speed)
now = monotonic()
if speed == Speed.fast:
use_timespan, use_limit = "24h", min(limit_each, 20)
elif speed == Speed.balanced:
use_timespan, use_limit = "48h", min(limit_each, 100)
else:
use_timespan, use_limit = "3d", limit_each
entry = _events_cache.get(key)
if entry and now - entry["t"] < CACHE_TTL_SECS:
log.info(f"CACHE HIT for {key}")
return key, entry["enriched"], entry["clusters"]
lock = _get_inflight_lock(key)
with lock:
entry = _events_cache.get(key)
if entry and now - entry["t"] < CACHE_TTL_SECS:
log.info(f"CACHE HIT (post-lock) for {key}")
return key, entry["enriched"], entry["clusters"]
if _first_real_build and not (start_utc and end_utc):
use_timespan = "24h" if use_timespan != "24h" else use_timespan
use_limit = min(use_limit, 100)
log.info(f"CACHE MISS for {key} — fetching (timespan={use_timespan}, limit_each={use_limit}, start={start_utc}, end={end_utc})")
raw = combine_raw_articles(
category=category,
query=q,
language=language,
limit_each=use_limit,
timespan=use_timespan,
speed=speed,
start_utc=start_utc,
end_utc=end_utc,
)
prefetch_descriptions_async(raw, speed)
enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw]
if category:
cat_norm = (category or "").strip().lower()
enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm]
else:
enriched = enriched_all
clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD, speed=speed)
_events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters}
_first_real_build = False
return key, enriched, clusters
# ----------------- Language Rotation / Seeds -----------------
LANG_ROTATION = ["en", "es", "fr", "de", "ar", "ru", "pt", "zh", "hi", "ja", "ko"]
COUNTRY_SEEDS = ["US", "GB", "IN", "CA", "AU", "ZA", "SG", "NG", "DE", "FR", "BR", "MX", "ES", "RU", "JP", "KR", "CN"]
# ----------------- Other Providers (NewsData/GNews/NewsAPI) -----------------
def fetch_newsdata_articles(category=None, limit=20, query=None, language=None):
base_url = "https://newsdata.io/api/1/news"
allowed = [
"business",
"entertainment",
"environment",
"food",
"health",
"politics",
"science",
"sports",
"technology",
"top",
"world",
]
params = {"apikey": NEWSDATA_API_KEY, "language": (language or "en")}
if category and category in allowed:
params["category"] = category
if query:
params["q"] = query
all_articles, next_page = [], None
while len(all_articles) < limit:
if next_page:
params["page"] = next_page
resp = _session_get(base_url, params=params, timeout=12)
if resp.status_code != 200:
break
data = resp.json()
articles = data.get("results", [])
for a in articles:
a["api_source"] = "newsdata"
all_articles.extend(articles)
next_page = data.get("nextPage")
if not next_page:
break
for a in all_articles:
a["publishedAt"] = a.get("pubDate")
return all_articles[:limit]
def fetch_gnews_articles(limit=20, query=None, language=None):
url = f"https://gnews.io/api/v4/top-headlines?lang={(language or 'en')}&max={limit}&token={GNEWS_API_KEY}"
if query:
url += f"&q={requests.utils.quote(query)}"
try:
r = _session_get(url, timeout=12)
if r.status_code != 200:
return []
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "gnews"
return arts
except Exception:
return []
NEWSAPI_COUNTRIES = ["us", "gb", "ca", "au", "in", "za", "sg", "ie", "nz"]
def fetch_newsapi_headlines_multi(limit=50, language=None):
if not _newsapi_enabled():
return []
all_ = []
per = max(1, math.ceil(limit / max(1, len(NEWSAPI_COUNTRIES))))
per = min(per, 100)
for c in NEWSAPI_COUNTRIES:
url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per}&apiKey={NEWSAPI_KEY}"
r = _session_get(url, timeout=12)
if r.status_code != 200:
log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}")
continue
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "newsapi"
all_.extend(arts)
time.sleep(0.2)
return all_[:limit]
def fetch_newsapi_articles(
category=None,
limit=20,
query=None,
language=None,
start_utc: Optional[datetime] = None,
end_utc: Optional[datetime] = None,
):
if not _newsapi_enabled():
return []
if query:
url = f"https://newsapi.org/v2/everything?pageSize={limit}&apiKey={NEWSAPI_KEY}&q={requests.utils.quote(query)}"
if language:
url += f"&language={language}"
# NEW: date range for /everything
if start_utc:
url += f"&from={start_utc.date().isoformat()}"
if end_utc:
url += f"&to={end_utc.date().isoformat()}"
try:
r = _session_get(url, timeout=12)
if r.status_code != 200:
log.warning(f"NewsAPI /everything HTTP {r.status_code}: {r.text[:200]}")
return []
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "newsapi"
return arts[:limit]
except Exception as e:
log.warning(f"NewsAPI /everything request failed: {e}")
return []
results = []
per_country = max(5, limit // len(NEWSAPI_COUNTRIES))
for c in NEWSAPI_COUNTRIES:
url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per_country}&apiKey={NEWSAPI_KEY}"
if category:
url += f"&category={category}"
try:
r = _session_get(url, timeout=12)
if r.status_code != 200:
log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}")
continue
arts = r.json().get("articles", [])
for a in arts:
a["api_source"] = "newsapi"
results.extend(arts)
except Exception as e:
log.warning(f"NewsAPI top-headlines {c} failed: {e}")
time.sleep(0.2)
return results[:limit]
# ----------------- Provider Combiner / Dedup -----------------
def combine_raw_articles(category=None, query=None, language=None, limit_each=30,
timespan="3d", speed=Speed.balanced, log_summary: bool = True,
start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None):
if speed == Speed.fast:
timespan = "24h"
limit_each = min(limit_each, 20)
elif speed == Speed.balanced:
timespan = "48h"
limit_each = min(limit_each, 100)
a1 = []
if USE_NEWSAPI:
if not query:
a1 = fetch_newsapi_headlines_multi(limit=limit_each, language=language)
else:
a1 = fetch_newsapi_articles(category=category, limit=limit_each, query=query,
language=language, start_utc=start_utc, end_utc=end_utc)
a2 = []
if USE_NEWSDATA_API:
a2 = [
normalize_newsdata_article(a)
for a in fetch_newsdata_articles(category=category, limit=limit_each, query=query, language=language)
if a.get("link")
]
a3 = fetch_gnews_articles(limit=limit_each, query=query, language=language) if USE_GNEWS_API else []
a4 = fetch_gdelt_multi(
limit=limit_each, query=query, language=language,
timespan=timespan, category=category, speed=speed,
start_utc=start_utc, end_utc=end_utc
) if USE_GDELT_API else []
seen, merged = set(), []
for a in a1 + a3 + a2 + a4:
if a.get("url"):
a["url"] = _canonical_url(a["url"])
url = a["url"]
if url not in seen:
seen.add(url)
merged.append(a)
#Apply date filter locally (for providers that can’t filter server-side)
if start_utc or end_utc:
s_ts = start_utc.timestamp() if start_utc else None
e_ts = end_utc.timestamp() if end_utc else None
def _in_range(row):
dt = _parse_any_pubdate(row.get("publishedAt") or "")
if not dt:
return False
t = dt.timestamp()
if s_ts and t < s_ts: return False
if e_ts and t > e_ts: return False
return True
merged = [a for a in merged if _in_range(a)]
if log_summary:
fetch_log.info("----- Article Fetch Summary -----")
fetch_log.info(f"📊 NewsAPI returned: {len(a1)} articles")
fetch_log.info(f"📊 NewsData.io returned: {len(a2)} articles")
fetch_log.info(f"📊 GNews returned: {len(a3)} articles")
fetch_log.info(f"📊 GDELT returned: {len(a4)} articles")
fetch_log.info(f"✅ Total merged articles after deduplication: {len(merged)}")
fetch_log.info("---------------------------------")
return merged
# ----------------- API: /events -----------------
@app.get("/events")
def get_events(
q: Optional[str] = Query(None),
category: Optional[str] = Query(None),
language: Optional[str] = Query(None),
translate: Optional[bool] = Query(False),
target_lang: Optional[str] = Query(None),
limit_each: int = Query(150, ge=5, le=250),
max_events: int = Query(15, ge=5, le=50),
min_countries: int = Query(2, ge=1, le=50),
min_articles: int = Query(2, ge=1, le=200),
speed: Speed = Query(Speed.balanced),
start: Optional[str] = Query(None),
end: Optional[str] = Query(None),
):
start_dt = _parse_user_dt(start, "start")
end_dt = _parse_user_dt(end, "end")
if start_dt and end_dt and start_dt > end_dt:
start_dt, end_dt = end_dt, start_dt # swap
cache_key, enriched, clusters = get_or_build_events_cache(
q, category, language, False, None, limit_each,
start_utc=start_dt, end_utc=end_dt, speed=speed
)
view = enriched
if translate and target_lang:
view = [dict(i) for i in enriched]
for i in view:
src_hint = i.get("detected_lang")
i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint)
i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint)
i["translated"] = True
events = [event_payload_from_cluster(c, view) for c in clusters]
events = [e for e in events if (e["country_count"] >= min_countries and e["article_count"] >= min_articles)]
events.sort(key=lambda e: e["article_count"], reverse=True)
return {"events": events[:max_events], "cache_key": "|".join(map(str, cache_key))}
# ----------------- API: /event/{event_id} -----------------
@app.get("/event/{event_id}")
def get_event_details(
event_id: str,
cache_key: Optional[str] = Query(None),
q: Optional[str] = Query(None),
category: Optional[str] = Query(None),
language: Optional[str] = Query(None),
translate: Optional[bool] = Query(False),
target_lang: Optional[str] = Query(None),
limit_each: int = Query(150, ge=5, le=250),
max_samples: int = Query(5, ge=0, le=1000),
start: Optional[str] = Query(None),
end: Optional[str] = Query(None),
):
start_dt = _parse_user_dt(start, "start")
end_dt = _parse_user_dt(end, "end")
if cache_key:
parts = cache_key.split("|")
if len(parts) == 9:
speed_str = parts[8]
try:
speed_obj = Speed(speed_str)
except ValueError:
speed_obj = Speed.balanced
key_tuple = (parts[0], parts[1], parts[2], int(parts[3]),
parts[4] == "True", parts[5].lower(),
parts[6], parts[7], speed_str)
elif len(parts) == 7: # backwards compat
speed_str = parts[6]
try:
speed_obj = Speed(speed_str)
except ValueError:
speed_obj = Speed.balanced
key_tuple = (parts[0], parts[1], parts[2], int(parts[3]),
parts[4] == "True", parts[5].lower(), "", "", speed_str)
else:
raise HTTPException(status_code=400, detail="Bad cache_key")
else:
speed_obj = Speed.balanced
key_tuple = cache_key_for(q, category, language, limit_each, translate, target_lang,
start_utc=start_dt, end_utc=end_dt, speed=speed_obj)
entry = _events_cache.get(key_tuple)
if not entry:
_, enriched, clusters = get_or_build_events_cache(
q, category, language, False, None, limit_each,
start_utc=start_dt, end_utc=end_dt, speed=speed_obj
)
else:
enriched, clusters = entry["enriched"], entry["clusters"]
eview = enriched
if translate and target_lang:
eview = [dict(i) for i in enriched]
for i in eview:
src_hint = i.get("detected_lang")
i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint)
i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint)
i["translated"] = True
cluster = next((c for c in clusters if cluster_id(c, enriched) == event_id), None)
if not cluster:
raise HTTPException(status_code=404, detail="Event not found with current filters")
payload = event_payload_from_cluster(cluster, eview)
countries = aggregate_event_by_country(cluster, eview, max_samples=max_samples)
payload["articles_in_event"] = sum(c["count"] for c in countries)
return {"event": payload, "countries": countries}
# ----------------- API: /news -----------------
@app.get("/news")
def get_news(
cache_key: Optional[str] = Query(None),
category: Optional[str] = Query(None),
sentiment: Optional[str] = Query(None),
q: Optional[str] = Query(None),
language: Optional[str] = Query(None),
translate: Optional[bool] = Query(False),
target_lang: Optional[str] = Query(None),
limit_each: int = Query(100, ge=5, le=100),
lite: bool = Query(True),
speed: Speed = Query(Speed.balanced),
page: int = Query(1, ge=1),
page_size: int = Query(120, ge=5, le=300),
start: Optional[str] = Query(None),
end: Optional[str] = Query(None),
):
start_dt = _parse_user_dt(start, "start")
end_dt = _parse_user_dt(end, "end")
enriched: List[Dict[str, Any]] = []
if cache_key:
parts = cache_key.split("|")
if len(parts) == 9:
key_tuple = (parts[0], parts[1], parts[2], int(parts[3]),
parts[4] == "True", parts[5].lower(), parts[6], parts[7], parts[8])
entry = _events_cache.get(key_tuple)
if entry:
enriched = entry["enriched"]
elif len(parts) == 7: # backwards compat
key_tuple = (parts[0], parts[1], parts[2], int(parts[3]),
parts[4] == "True", parts[5].lower(), "", "", parts[6])
entry = _events_cache.get(key_tuple)
if entry:
enriched = entry["enriched"]
if not enriched:
raw = combine_raw_articles(category=category, query=q, language=language,
limit_each=limit_each, speed=speed,
start_utc=start_dt, end_utc=end_dt)
prefetch_descriptions_async(raw, speed)
enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw]
if category:
cat_norm = (category or "").strip().lower()
enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm]
else:
enriched = enriched_all
else:
if category:
cat_norm = (category or "").strip().lower()
enriched = [e for e in enriched if (e.get("category") or "").lower() == cat_norm]
if translate and target_lang:
enriched = [dict(i) for i in enriched]
for i in enriched:
i["original_title"] = i.get("orig_title") or i.get("title")
i["original_description"] = i.get("orig_description") or i.get("description")
src_hint = i.get("detected_lang")
i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint)
i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint)
i["translated"] = True
i["translated_from"] = (src_hint or "").lower()
i["translated_to"] = target_lang.lower()
if sentiment:
s = sentiment.strip().lower()
enriched = [i for i in enriched if i.get("sentiment", "").lower() == s]
total = len(enriched)
offset = (page - 1) * page_size
end_idx = offset + page_size
items = [dict(i) for i in enriched[offset:end_idx]]
if lite:
drop = {"_ml_text"}
for i in items:
for k in drop:
i.pop(k, None)
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size
}
# ----------------- API: /related -----------------
@app.get("/related")
def related_articles(
id: Optional[str] = Query(None, description="article id from /news"),
title: Optional[str] = Query(None),
description: Optional[str] = Query(None),
q: Optional[str] = Query(None),
category: Optional[str] = Query(None),
language: Optional[str] = Query(None),
limit_each: int = Query(50, ge=5, le=100),
k: int = Query(10, ge=1, le=50),
):
raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each)
enriched = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw]
if not enriched:
return {"items": []}
if id:
base = next((a for a in enriched if a.get("id") == id), None)
if not base:
raise HTTPException(404, "article id not found in current fetch")
query_text = base["_ml_text"]
else:
text = f"{title or ''} {description or ''}".strip()
if not text:
raise HTTPException(400, "provide either id or title/description")
query_text = text
corpus_texts = [a["_ml_text"] for a in enriched]
corpus_embs = _embed_texts(corpus_texts)
query_emb = _embed_texts([query_text])[0]
sims = util.cos_sim(query_emb, corpus_embs).cpu().numpy().flatten()
idxs = sims.argsort()[::-1]
items = []
for idx in idxs:
a = enriched[idx]
if id and a["id"] == id:
continue
items.append({**a, "similarity": float(sims[idx])})
if len(items) >= k:
break
return {"items": items}
# ----------------- Middleware: Request Timing -----------------
@app.middleware("http")
async def timing_middleware(request, call_next):
start = time.perf_counter()
response = None
try:
response = await call_next(request)
return response
finally:
dur_ms = (time.perf_counter() - start) * 1000
if response is not None:
try:
response.headers["X-Process-Time-ms"] = f"{dur_ms:.1f}"
except Exception:
pass
# ----------------- Misc: Client Metrics -----------------
@app.post("/client-metric")
def client_metric(payload: Dict[str, Any] = Body(...)):
name = (payload.get("name") or "").strip()
if name in {"Load all article markers on globe", "Load event country markers on globe"}:
return {"ok": True}
return {"ok": True}
# ----------------- Diagnostics: Translation Health -----------------
@app.get("/diag/translate")
def diag_translate(
src: str = Query("pt"),
tgt: str = Query("en"),
text: str = Query("Olá mundo")
):
# Try each path explicitly (same order your runtime uses)
libre = _translate_via_libre(text, src, tgt)
remote = None
local = None
opus_id = opus_model_for(src, tgt)
if opus_id:
remote = _hf_call(opus_id, {"inputs": text})
local = _translate_local(text, src, tgt)
# Optional: try primary NLLB if configured
nllb = None
if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES):
nllb = _hf_call(
HF_MODEL_PRIMARY,
{
"inputs": text,
"parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]},
"options": {"wait_for_model": True},
},
)
sample_out = libre or remote or local or nllb
out_lang = detect_lang(sample_out or "") or None
return {
"src": src, "tgt": tgt, "text": text,
"libre_url": LIBRETRANSLATE_URL,
"token_present": bool(HUGGINGFACE_API_TOKEN),
"libre_ok": bool(libre),
"remote_ok": bool(remote),
"local_ok": bool(local),
"nllb_ok": bool(nllb),
"sample_out": sample_out,
"sample_out_lang_detected": out_lang,
"lang_match": (out_lang == tgt)
}