from fastapi import FastAPI, Query, HTTPException, Body from typing import Optional, List, Dict, Any, Tuple, Set import os import time import socket import logging import hashlib from functools import lru_cache from collections import Counter import requests import tldextract import math import nltk from nltk.sentiment import SentimentIntensityAnalyzer from geopy.geocoders import Nominatim from geopy.exc import GeocoderUnavailable, GeocoderTimedOut from fastapi.middleware.cors import CORSMiddleware from countryinfo import CountryInfo from sentence_transformers import SentenceTransformer, util from domain_country_map import domain_country_map from time import monotonic from langdetect import detect, DetectorFactory import re from urllib.parse import urlparse, urlunparse, parse_qsl from concurrent.futures import ThreadPoolExecutor, as_completed from html import unescape import threading import difflib from starlette.middleware.gzip import GZipMiddleware from transformers import pipeline as hf_pipeline os.environ.setdefault("OMP_NUM_THREADS", "1") from fastapi.responses import PlainTextResponse import torch torch.set_num_threads(2) # Optional runtime check for local OPUS tokenizers try: import sentencepiece as _spm # noqa: F401 _HAS_SENTENCEPIECE = True except Exception: _HAS_SENTENCEPIECE = False from enum import Enum class Speed(str, Enum): fast = "fast" balanced = "balanced" max = "max" _local_pipes = {} _news_clf = None _sbert = None # set a writable cache for tldextract and avoid network PSL fetches _TLD_CACHE = os.getenv("TLDEXTRACT_CACHE", "/data/tld_cache") try: # suffix_list_urls=None => use cached public suffix list only (no HTTP on startup) _tld = tldextract.TLDExtract(cache_dir=_TLD_CACHE, suffix_list_urls=None) except Exception: # safe fallback: still parses domains without PSL refresh _tld = tldextract.extract # --- Translation runtime flags / caches --- ALLOW_HF_REMOTE = os.getenv("ALLOW_HF_REMOTE", "0") == "1" # default OFF _hf_bad_models: Set[str] = set() def _translate_local(text: str, src: str, tgt: str) -> Optional[str]: if not _HAS_SENTENCEPIECE: # Avoid attempting to download/instantiate Marian tokenizers without sentencepiece return None model_id = opus_model_for(src, tgt) if not model_id: return None key = model_id try: if key not in _local_pipes: _local_pipes[key] = hf_pipeline("translation", model=model_id) out = _local_pipes[key](text, max_length=512) return out[0]["translation_text"] except Exception as e: log.warning("Local translate failed for %s: %s", model_id, e) return None def fetch_gdelt_multi(limit=120, query=None, language=None, timespan="48h", category=None, speed: Speed = Speed.balanced): # If user forced a language, honor it (but add a small English boost for coverage) if language: primary = fetch_gdelt_articles(limit=limit, query=query, language=language, timespan=timespan, category=category) # tiny English booster to catch global wires booster = fetch_gdelt_articles(limit=max(10, limit // 6), query=query, language="en", timespan=timespan, category=category) return primary + booster # Otherwise rotate across multiple languages if speed == Speed.fast: langs = LANG_ROTATION[:3] # quicker timespan = "24h" elif speed == Speed.balanced: langs = LANG_ROTATION[:8] # good mix timespan = "48h" else: langs = LANG_ROTATION # max coverage timespan = "3d" per_lang = max(8, math.ceil(limit / len(langs))) out = [] for lg in langs: out.extend(fetch_gdelt_articles(limit=per_lang, query=query, language=lg, timespan=timespan, category=category)) # Optional: add a few English pulls biased to different source countries (broadens outlets) if speed != Speed.fast: per_cc = max(4, limit // 30) if speed == Speed.max else max(2, limit // 40) for cc in COUNTRY_SEEDS[: (8 if speed == Speed.balanced else 16)]: out.extend( fetch_gdelt_articles( limit=per_cc, query=query, language="en", timespan=timespan, category=category, extra_tokens=[f"sourcecountry:{cc}"] ) ) return out def get_news_clf(): global _news_clf if _news_clf is None: _news_clf = hf_pipeline( "text-classification", model="cardiffnlp/tweet-topic-21-multi", top_k=1, ) return _news_clf def get_sbert(): global _sbert if _sbert is None: _sbert = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") return _sbert # globals SESSION = requests.Session() ADAPTER = requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2) SESSION.mount("http://", ADAPTER) SESSION.mount("https://", ADAPTER) def _session_get(url, **kwargs): headers = kwargs.pop("headers", {}) headers.setdefault("User-Agent", "Mozilla/5.0 (compatible; NewsGlobe/1.0)") return SESSION.get(url, headers=headers, timeout=kwargs.pop("timeout", 12), **kwargs) def _try_jina_reader(url: str, timeout: int) -> Optional[str]: try: u = url.strip() if not u.startswith(("http://", "https://")): u = "https://" + u r = _session_get(f"https://r.jina.ai/{u}", timeout=timeout) if r.status_code == 200: txt = _clean_text(r.text) sents = _split_sentences(txt) if sents: best = " ".join(sents[:2]) return best if len(best) >= 80 else (sents[0] if sents else None) except Exception: pass return None # --- description cleanup helpers --- BOILER_DESC = re.compile( r"(subscribe|sign in|sign up|enable javascript|cookies? (policy|settings)|" r"privacy (policy|notice)|continue reading|read more|click here|" r"accept (cookies|the terms)|by continuing|newsletter|advertisement|adblock)", re.I ) def _split_sentences(text: str) -> List[str]: # light-weight splitter good enough for news blurbs parts = re.split(r"(?<=[\.\?\!])\s+(?=[A-Z0-9])", (text or "").strip()) # also break on " • " and long dashes if present out = [] for p in parts: out.extend(re.split(r"\s+[•–—]\s+", p)) return [p.strip() for p in out if p and len(p.strip()) >= 2] def _tidy_description(title: str, desc: str, source_name: str, max_chars: int = 240) -> str: if not desc: return "" # remove repeated title desc = _dedupe_title_from_desc(title, desc) # strip obvious boilerplate desc = BOILER_DESC.sub("", desc) desc = re.sub(r"\s+", " ", desc).strip(" -–:•|") # choose first 1–2 sentences that look like summary sents = _split_sentences(desc) if not sents: sents = [desc] best = " ".join(sents[:2]).strip() # soft truncate at sentence boundary if len(best) > max_chars: # try only first sentence if len(sents[0]) <= max_chars * 0.9: best = sents[0] else: best = best[:max_chars].rsplit(" ", 1)[0].rstrip(",;:-–—") # avoid parroting the headline if _too_similar(title, best): # try next sentence if we have it for alt in sents[1:3]: if not _too_similar(title, alt): best = alt break # ensure it ends neatly if best and best[-1] not in ".!?": best += "." return best def _too_similar(a: str, b: str, thresh: float = 0.92) -> bool: """Return True if strings are near-duplicates (or one contains the other).""" a = (a or "").strip() b = (b or "").strip() if not a or not b: return False if a.lower() == b.lower(): return True if a.lower() in b.lower() or b.lower() in a.lower(): return True ratio = difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio() return ratio >= thresh def _dedupe_title_from_desc(title: str, desc: str) -> str: """If the description contains the title, strip it and tidy up.""" t = (title or "").strip() d = (desc or "").strip() if not t or not d: return d # Remove exact leading title if d.lower().startswith(t.lower()): d = d[len(t):].lstrip(" -–:•|") # Remove inner repeats d = d.replace(t, "").strip(" -–:•|") d = _clean_text(d) return d # Prevent duplicate upstream fetches when identical requests arrive together _inflight_locks: Dict[Tuple, threading.Lock] = {} _inflight_global_lock = threading.Lock() def _get_inflight_lock(key: Tuple) -> threading.Lock: with _inflight_global_lock: lk = _inflight_locks.get(key) if lk is None: lk = threading.Lock() _inflight_locks[key] = lk return lk DESC_CACHE_LOCK = threading.Lock() try: from bs4 import BeautifulSoup # optional but nice to have except Exception: BeautifulSoup = None # -------- Description fetching config -------- DESC_FETCH_TIMEOUT = 3 # seconds per URL DESC_MIN_LEN = 100 # consider shorter text as "weak" and try to upgrade DESC_CACHE_TTL = 24 * 3600 # 24h MAX_DESC_FETCHES = 24 # cap number of fetches per request DESC_WORKERS = 12 # parallel workers # url -> {"text": str, "t": monotonic()} DESC_CACHE: Dict[str, Dict[str, Any]] = {} def _now_mono(): try: return monotonic() except Exception: return time.time() def _clean_text(s: str) -> str: s = unescape(s or "") s = re.sub(r"\s+", " ", s).strip() return s def _extract_desc_from_ld_json(html: str) -> Optional[str]: if not html or not BeautifulSoup: return None try: soup = BeautifulSoup(html, "html.parser") for tag in soup.find_all("script", {"type": "application/ld+json"}): try: import json data = json.loads(tag.string or "") except Exception: continue def find_desc(obj): if not isinstance(obj, (dict, list)): return None if isinstance(obj, list): for it in obj: v = find_desc(it) if v: return v return None # dict for key in ("description", "abstract", "articleBody"): val = obj.get(key) if isinstance(val, str): txt = _clean_text(val) if len(txt) >= 40: return txt # nested for k, v in obj.items(): if isinstance(v, (dict, list)): got = find_desc(v) if got: return got return None d = find_desc(data) if d and len(d) >= 40: return d except Exception: pass return None CONSENT_HINTS = re.compile(r"(consent|gdpr|privacy choices|before you continue|we value your privacy)", re.I) def _looks_like_consent_wall(html: str) -> bool: if not html: return False if "consent.yahoo.com" in html.lower(): # common interstitial return True # generic phrasing return bool(CONSENT_HINTS.search(html)) def _extract_desc_from_html(html: str) -> Optional[str]: html = html or "" if BeautifulSoup: soup = BeautifulSoup(html, "html.parser") # ✅ JSON-LD early ld = _extract_desc_from_ld_json(html) if ld: txt = _clean_text(ld) if 40 <= len(txt) <= 480: return txt for sel, attr in [ ('meta[property="og:description"]', "content"), ('meta[name="twitter:description"]', "content"), ('meta[name="description"]', "content"), ]: tag = soup.select_one(sel) if tag: txt = _clean_text(tag.get(attr, "")) if len(txt) >= 40: return txt # Fallback: first meaningful

for p in soup.find_all("p"): txt = _clean_text(p.get_text(" ")) if len(txt) >= 80: return txt else: # regex fallbacks (as you had) for pat in [ r']+property=["\']og:description["\'][^>]+content=["\']([^"\']+)["\']', r']+name=["\']twitter:description["\'][^>]+content=["\']([^"\']+)["\']', r']+name=["\']description["\'][^>]+content=["\']([^"\']+)["\']', ]: m = re.search(pat, html, flags=re.I | re.S) if m: txt = _clean_text(m.group(1)) if len(txt) >= 40: return txt m = re.search(r"]*>(.*?)

", html, flags=re.I | re.S) if m: txt = _clean_text(re.sub("<[^>]+>", " ", m.group(1))) if len(txt) >= 80: return txt # JSON-LD as last regex-free fallback not available w/o bs4 return None def _desc_cache_get(url: str) -> Optional[str]: if not url: return None entry = DESC_CACHE.get(url) if not entry: return None if _now_mono() - entry["t"] > DESC_CACHE_TTL: DESC_CACHE.pop(url, None) return None return entry["text"] def _desc_cache_put(url: str, text: str): if url and text: with DESC_CACHE_LOCK: DESC_CACHE[url] = {"text": text, "t": _now_mono()} def _attempt_fetch(url: str, timeout: int) -> Optional[str]: headers = { "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", } try: r = _session_get(url, headers=headers, timeout=timeout, allow_redirects=True) if r.status_code != 200: return None ct = (r.headers.get("Content-Type") or "").lower() txt = r.text or "" if "html" not in ct and " Optional[str]: """Fetch and cache a best-effort article description from the page (incl. AMP retries).""" if not url: return None cached = _desc_cache_get(url) if cached: return cached # Try the original URL first desc = _attempt_fetch(url, DESC_FETCH_TIMEOUT) if not desc: # Try common AMP variants amp_candidates = [] try: p = urlparse(url) # /amp path if not p.path.endswith("/amp"): amp_candidates.append(urlunparse(p._replace(path=(p.path.rstrip("/") + "/amp")))) # ?amp q = p.query amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "amp=1")))) # ?outputType=amp (CNN, some US sites) amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "outputType=amp")))) except Exception: pass for amp_url in amp_candidates: desc = _attempt_fetch(amp_url, DESC_FETCH_TIMEOUT) if desc: break if desc: _desc_cache_put(url, desc) return desc return None def _needs_desc_upgrade(a: Dict[str, Any]) -> bool: url = a.get("url") or "" if not url: return False title = (a.get("title") or "").strip() desc = (a.get("description") or "").strip() if not desc or desc.lower().startswith("no description"): return True if len(desc) < DESC_MIN_LEN: return True # NEW: if desc ≈ title, trigger upgrade if _too_similar(title, desc): return True return False def prefetch_descriptions(raw_articles: List[Dict[str, Any]], speed: Speed = Speed.balanced): candidates, seen = [], set() max_fetches = 6 if speed == Speed.fast else 8 if speed == Speed.balanced else 16 timeout = 1 if speed == Speed.fast else 2 workers = 3 if speed == Speed.fast else 4 if speed == Speed.balanced else 8 for a in raw_articles: url = a.get("url"); if not url or url in seen: continue seen.add(url) if _needs_desc_upgrade(a) and not _desc_cache_get(url): candidates.append(url) if len(candidates) >= max_fetches: break if not candidates: return with ThreadPoolExecutor(max_workers=workers) as ex: futs = [ex.submit(fetch_page_description, u) for u in candidates] for _ in as_completed(futs): pass def prefetch_descriptions_async(raw_articles, speed: Speed = Speed.balanced): threading.Thread(target=prefetch_descriptions, args=(raw_articles, speed), daemon=True).start() # news_clf = pipeline("text-classification", model="cardiffnlp/tweet-topic-21-multi", top_k=1) DetectorFactory.seed = 0 # deterministic SECTION_HINTS = { "sports": "sports", "sport": "sports", "business": "business", "money": "business", "market": "business", "tech": "technology", "technology": "technology", "sci": "science", "science": "science", "health": "health", "wellness": "health", "entertainment": "entertainment", "culture": "entertainment", "showbiz": "entertainment", "crime": "crime", "world": "general", "weather": "weather", "environment": "environment", "climate": "environment", "travel": "travel", "politics": "politics", "election": "politics", } KEYWORDS = { "sports": r"\b(NBA|NFL|MLB|NHL|Olympic|goal|match|tournament|coach|transfer)\b", "business": r"\b(stocks?|earnings|IPO|merger|acquisition|revenue|inflation|market)\b", "technology": r"\b(AI|software|chip|semiconductor|app|startup|cyber|hack|quantum|robot)\b", "science": r"\b(researchers?|study|physics|astronomy|genome|spacecraft|telescope)\b", "health": r"\b(virus|vaccine|disease|hospital|doctor|public health|covid)\b", "entertainment": r"\b(movie|film|box office|celebrity|series|show|album|music)\b", "crime": r"\b(arrested|charged|police|homicide|fraud|theft|court|lawsuit)\b", "weather": r"\b(hurricane|storm|flood|heatwave|blizzard|tornado|forecast)\b", "environment": r"\b(climate|emissions|wildfire|deforestation|biodiversity)\b", "travel": r"\b(flight|airline|airport|tourism|visa|cruise|hotel)\b", "politics": r"\b(president|parliament|congress|minister|policy|campaign|election)\b", } def _infer_category_from_url_path(url_path: str) -> Optional[str]: parts = [p for p in url_path.lower().split("/") if p] for p in parts: if p in SECTION_HINTS: return SECTION_HINTS[p] # also try hyphenated tokens like 'us-business' or 'tech-news' for p in parts: for tok in re.split(r"[-_]", p): if tok in SECTION_HINTS: return SECTION_HINTS[tok] return None def _infer_category_from_text(text: str) -> Optional[str]: if not text: return None for cat, pat in KEYWORDS.items(): if re.search(pat, text, flags=re.I): return cat return None def infer_category(article_url, title, description, provided): if provided: p = provided.strip().lower() if p: return p # url rules try: p = urlparse(article_url).path or "" cat = _infer_category_from_url_path(p) if cat: return cat except Exception: pass # keyword rules text = f"{title or ''} {description or ''}".strip() cat = _infer_category_from_text(text) if cat: return cat try: preds = get_news_clf()(text[:512]) # lazy-loaded if isinstance(preds[0], list): label = preds[0][0]["label"] else: label = preds[0]["label"] return label.lower() except Exception as e: log.warning(f"ML category failed: {e}") return "general" BOILER = re.compile(r"\b(live updates|breaking|what we know|in pictures|opinion)\b", re.I) def _norm_text(s: str) -> str: s = (s or "").strip() s = re.sub(r"\s+", " ", s) return s def _cluster_text(a): base = f"{a.get('orig_title') or a.get('title') or ''} {a.get('orig_description') or a.get('description') or ''}" base = BOILER.sub("", base) base = re.sub(r"\b(\d{1,2}:\d{2}\s?(AM|PM))|\b(\d{1,2}\s\w+\s\d{4})", "", base, flags=re.I) return _norm_text(base) def _canonical_url(u: str) -> str: if not u: return u p = urlparse(u) # drop tracking params qs = [(k, v) for (k, v) in parse_qsl(p.query, keep_blank_values=False) if not k.lower().startswith(("utm_", "fbclid", "gclid"))] clean = p._replace(query="&".join([f"{k}={v}" for k, v in qs]), fragment="") # some sites add trailing slashes inconsistently path = clean.path.rstrip("/") or "/" clean = clean._replace(path=path) return urlunparse(clean) def detect_lang(text: str) -> Optional[str]: try: return detect(text) # returns 'en','fr','de',... except Exception: return None def _embed_texts(texts: List[str]): embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) return embs # ---- cache helpers ---- CACHE_TTL_SECS = 900 SIM_THRESHOLD = 0.6 _events_cache: Dict[Tuple, Dict[str, Any]] = {} def cache_key_for(q, category, language, limit_each, translate=False, target_lang=None, speed=Speed.balanced): return (q or "", category or "", language or "", int(limit_each or 50), bool(translate), (target_lang or "").lower(), speed.value) _first_real_build = True # module-global def get_or_build_events_cache(q, category, language, translate, target_lang, limit_each, speed=Speed.balanced): global _first_real_build key = cache_key_for(q, category, language, limit_each, translate, target_lang, speed) now = monotonic() if speed == Speed.fast: use_timespan, use_limit = "24h", min(limit_each, 20) elif speed == Speed.balanced: use_timespan, use_limit = "48h", min(limit_each, 150) else: # max use_timespan, use_limit = "3d", limit_each entry = _events_cache.get(key) if entry and now - entry["t"] < CACHE_TTL_SECS: log.info(f"CACHE HIT for {key}") return key, entry["enriched"], entry["clusters"] lock = _get_inflight_lock(key) with lock: entry = _events_cache.get(key) if entry and now - entry["t"] < CACHE_TTL_SECS: log.info(f"CACHE HIT (post-lock) for {key}") return key, entry["enriched"], entry["clusters"] if _first_real_build: use_timespan = "24h" if use_timespan != "24h" else use_timespan use_limit = min(use_limit, 100) log.info(f"CACHE MISS for {key} — fetching (timespan={use_timespan}, limit_each={use_limit})") raw = combine_raw_articles( category=category, # providers may use it; inference ignores it query=q, language=language, limit_each=use_limit, timespan=use_timespan, speed=speed, ) prefetch_descriptions_async(raw, speed) enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] if category: cat_norm = (category or "").strip().lower() enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] else: enriched = enriched_all clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD, speed=speed) _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} _first_real_build = False return key, enriched, clusters # Which languages to rotate when user didn't restrict language LANG_ROTATION = ["en", "es", "fr", "de", "ar", "ru", "pt", "zh", "hi", "ja", "ko"] # A few sourcecountry seeds for English to diversify outlets (optional) COUNTRY_SEEDS = ["US", "GB", "IN", "CA", "AU", "ZA", "SG", "NG", "DE", "FR", "BR", "MX", "ES", "RU", "JP", "KR", "CN"] # ----------------- Config / Keys ----------------- USE_GNEWS_API = False USE_NEWSDATA_API = False USE_GDELT_API = True USE_NEWSAPI = False NEWSAPI_KEY = os.getenv("NEWSAPI_KEY", "ea734c66dc4044fa8e4501ad7b90e753") GNEWS_API_KEY = os.getenv("GNEWS_API_KEY", "5419897c95e8a4b21074e0d3fe95a3dd") NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "pub_1feb49a71a844719af68d0844fb43a61") HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") logging.basicConfig( level=logging.WARNING, format="%(levelname)s:%(name)s:%(message)s", ) log = logging.getLogger("newsglobe") log.setLevel(logging.WARNING) fetch_log = logging.getLogger("newsglobe.fetch_summary") fetch_log.setLevel(logging.INFO) _fetch_handler = logging.StreamHandler() _fetch_handler.setLevel(logging.INFO) _fetch_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s")) fetch_log.addHandler(_fetch_handler) fetch_log.propagate = False # don't pass to root (which is WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) logging.getLogger("requests.packages.urllib3").setLevel(logging.WARNING) logging.getLogger("sentence_transformers").setLevel(logging.WARNING) logging.getLogger("transformers").setLevel(logging.WARNING) for name in ("urllib3", "urllib3.connectionpool", "requests.packages.urllib3"): lg = logging.getLogger(name) lg.setLevel(logging.ERROR) lg.propagate = False def _newsapi_enabled() -> bool: if not NEWSAPI_KEY: log.warning("NewsAPI disabled: missing NEWSAPI_KEY env var") return False return True def cluster_id(cluster, enriched_articles): urls = sorted([(enriched_articles[i].get("url") or "") for i in cluster["indices"] if enriched_articles[i].get("url")]) base = "|".join(urls) if urls else "empty" return hashlib.md5(base.encode("utf-8")).hexdigest()[:10] # ----------------- NLTK / VADER ----------------- NLTK_DATA_DIR = os.environ.get("NLTK_DATA", "/app/nltk_data") # Make sure NLTK looks in the baked, writable dir first if NLTK_DATA_DIR not in nltk.data.path: nltk.data.path.insert(0, NLTK_DATA_DIR) try: nltk.data.find("sentiment/vader_lexicon") except LookupError: # As a fallback, try downloading into the writable dir (won't run if already baked) try: os.makedirs(NLTK_DATA_DIR, exist_ok=True) nltk.download("vader_lexicon", download_dir=NLTK_DATA_DIR, quiet=True) except Exception: pass # don't crash if download is blocked try: _vader = SentimentIntensityAnalyzer() except Exception: _vader = None def classify_sentiment(text: str) -> str: if not text: return "neutral" if _vader is None: return "neutral" scores = _vader.polarity_scores(text) c = scores["compound"] return "positive" if c >= 0.2 else "negative" if c <= -0.2 else "neutral" # ----------------- Geocode helpers ----------------- def get_country_centroid(country_name): if not country_name or country_name == "Unknown": return {"lat": 0, "lon": 0, "country": "Unknown"} try: country = CountryInfo(country_name) latlng = country.capital_latlng() return {"lat": latlng[0], "lon": latlng[1], "country": country_name} except Exception as e: log.info(f"Could not get centroid for {country_name}: {e}") return {"lat": 0, "lon": 0, "country": country_name or "Unknown"} def resolve_domain_to_ip(domain): if not domain: return None try: return socket.gethostbyname(domain) except socket.gaierror: return None def geolocate_ip(ip): try: r = _session_get(f"https://ipwho.is/{ip}?fields=success,country,latitude,longitude", timeout=8) j = r.json() if j.get("success"): return {"lat": j["latitude"], "lon": j["longitude"], "country": j["country"]} except Exception: pass return None geolocator = Nominatim(user_agent="newsglobe-app (contact: you@example.com)") domain_geo_cache: Dict[str, Dict[str, Any]] = {} MAJOR_OUTLETS = { "bbc.co.uk": "United Kingdom", "theguardian.com": "United Kingdom", "reuters.com": "United States", "aljazeera.com": "Qatar", "lemonde.fr": "France", "dw.com": "Germany", "abc.net.au": "Australia", "ndtv.com": "India", "globo.com": "Brazil", "elpais.com": "Spain", "lefigaro.fr": "France", "kyodonews.net": "Japan", "straitstimes.com": "Singapore", "thesun.my": "Malaysia", # <-- add this } def geocode_source(source_text: str, domain: str = "", do_network: bool = False): cache_key = f"{source_text}|{domain}" if cache_key in domain_geo_cache: return domain_geo_cache[cache_key] ext = _tld(domain or "") fqdn = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" # 0) Major outlets / domain map if fqdn in MAJOR_OUTLETS: coords = get_country_centroid(MAJOR_OUTLETS[fqdn]); domain_geo_cache[cache_key] = coords; return coords if ext.domain in domain_country_map: coords = get_country_centroid(domain_country_map[ext.domain]); domain_geo_cache[cache_key] = coords; return coords # 1) Suffix fallback (instant) coords = get_country_centroid(_suffix_country(ext.suffix)) domain_geo_cache[cache_key] = coords # 2) Optional async refinement (never block hot path) if do_network: threading.Thread(target=_refine_geo_async, args=(cache_key, source_text, fqdn), daemon=True).start() return coords def _suffix_country(suffix: Optional[str]) -> str: s = (suffix or "").split(".")[-1] m = { "au":"Australia","uk":"United Kingdom","gb":"United Kingdom","ca":"Canada","in":"India","us":"United States", "ng":"Nigeria","de":"Germany","fr":"France","jp":"Japan","sg":"Singapore","za":"South Africa","nz":"New Zealand", "ie":"Ireland","it":"Italy","es":"Spain","se":"Sweden","ch":"Switzerland","nl":"Netherlands","br":"Brazil", "my":"Malaysia","id":"Indonesia","ph":"Philippines","th":"Thailand","vn":"Vietnam","sa":"Saudi Arabia", "ae":"United Arab Emirates","tr":"Turkey","mx":"Mexico","ar":"Argentina","cl":"Chile","co":"Colombia", "il":"Israel","kr":"South Korea","cn":"China","tw":"Taiwan","hk":"Hong Kong" } return m.get(s, "United States" if s in ("com","org","net") else "Unknown") def _refine_geo_async(cache_key, source_text, fqdn): try: # Try IP geo (cheap) ip = resolve_domain_to_ip(fqdn) if fqdn else None if ip: coords = geolocate_ip(ip) if coords: domain_geo_cache[cache_key] = coords return # Try Nominatim FAST (lower timeout) location = geolocator.geocode(f"{source_text} News Headquarters", timeout=2) if location and hasattr(location, "raw"): coords = { "lat": location.latitude, "lon": location.longitude, "country": location.raw.get("address", {}).get("country", "Unknown"), } domain_geo_cache[cache_key] = coords except Exception: pass # ----------------- HuggingFace translate (optional) ----------------- HF_MODEL_PRIMARY = None # disable NLLB remote (avoids 404 spam); use OPUS + pivot/LibreTranslate # 2-letter ISO -> NLLB codes NLLB_CODES = { "en": "eng_Latn", "es": "spa_Latn", "fr": "fra_Latn", "de": "deu_Latn", "it": "ita_Latn", "pt": "por_Latn", "zh": "zho_Hans", "ru": "rus_Cyrl", "ar": "arb_Arab", "hi": "hin_Deva", "ja": "jpn_Jpan", "ko": "kor_Hang", } # OPUS-MT model map for common pairs (expand as needed) def opus_model_for(src2: str, tgt2: str) -> Optional[str]: pairs = { ("es", "en"): "Helsinki-NLP/opus-mt-es-en", ("en", "es"): "Helsinki-NLP/opus-mt-en-es", ("fr", "en"): "Helsinki-NLP/opus-mt-fr-en", ("en", "fr"): "Helsinki-NLP/opus-mt-en-fr", ("de", "en"): "Helsinki-NLP/opus-mt-de-en", ("en", "de"): "Helsinki-NLP/opus-mt-en-de", ("pt", "en"): "Helsinki-NLP/opus-mt-pt-en", ("en", "pt"): "Helsinki-NLP/opus-mt-en-pt", ("it", "en"): "Helsinki-NLP/opus-mt-it-en", ("en", "it"): "Helsinki-NLP/opus-mt-en-it", ("ru", "en"): "Helsinki-NLP/opus-mt-ru-en", ("en", "ru"): "Helsinki-NLP/opus-mt-en-ru", ("zh", "en"): "Helsinki-NLP/opus-mt-zh-en", ("en", "zh"): "Helsinki-NLP/opus-mt-en-zh", ("ja", "en"): "Helsinki-NLP/opus-mt-ja-en", ("en", "ja"): "Helsinki-NLP/opus-mt-en-ja", ("ko", "en"): "Helsinki-NLP/opus-mt-ko-en", ("en", "ko"): "Helsinki-NLP/opus-mt-en-ko", ("hi", "en"): "Helsinki-NLP/opus-mt-hi-en", ("en", "hi"): "Helsinki-NLP/opus-mt-en-hi", ("ar", "en"): "Helsinki-NLP/opus-mt-ar-en", ("en", "ar"): "Helsinki-NLP/opus-mt-en-ar", } return pairs.get((src2, tgt2)) SUPPORTED = {"en", "fr", "de", "es", "it", "hi", "ar", "ru", "ja", "ko", "pt", "zh"} LIBRETRANSLATE_URL = os.getenv("LIBRETRANSLATE_URL") # e.g., http://127.0.0.1:5000 def _translate_via_libre(text: str, src: str, tgt: str) -> Optional[str]: url = LIBRETRANSLATE_URL if not url or not text or src == tgt: return None try: r = SESSION.post( f"{url.rstrip('/')}/translate", json={"q": text, "source": src, "target": tgt, "format": "text"}, timeout=6 ) if r.status_code == 200: j = r.json() out = j.get("translatedText") return out if isinstance(out, str) and out else None else: log.warning("LibreTranslate HTTP %s: %s", r.status_code, r.text[:200]) except Exception as e: log.warning("LibreTranslate failed: %s", e) return None def _hf_call(model_id: str, payload: dict) -> Optional[str]: # require both a token and explicit opt-in if not (HUGGINGFACE_API_TOKEN and ALLOW_HF_REMOTE): return None if model_id in _hf_bad_models: return None url = f"https://api-inference.huggingface.co/models/{model_id}" headers = { "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", "HF-API-KEY": HUGGINGFACE_API_TOKEN, "Accept": "application/json", "Content-Type": "application/json", } try: r = requests.post(url, headers=headers, json=payload, timeout=25) if r.status_code != 200: if r.status_code == 404: _hf_bad_models.add(model_id) log.warning("HF %s -> 404: Not Found (disabled for this process)", model_id) else: log.warning("HF %s -> %s: %s", model_id, r.status_code, r.text[:300]) return None j = r.json() except Exception as e: log.warning("HF request failed: %s", e) return None if isinstance(j, list) and j and isinstance(j[0], dict): if "generated_text" in j[0]: return j[0]["generated_text"] if "translation_text" in j[0]: return j[0]["translation_text"] if isinstance(j, dict) and "generated_text" in j: return j["generated_text"] if isinstance(j, str): return j return None @lru_cache(maxsize=4096) def _translate_cached(text: str, src: str, tgt: str) -> str: if not text or src == tgt: return text # 0) Local LibreTranslate (fast & free, if running) out = _translate_via_libre(text, src, tgt) if out: return out # 1) OPUS serverless (direct pair) – try this first opus_model = opus_model_for(src, tgt) if opus_model: out = _hf_call(opus_model, {"inputs": text}) if out: return out # 2) NLLB serverless (optional; disabled if HF_MODEL_PRIMARY is None) try: if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES): out = _hf_call( HF_MODEL_PRIMARY, { "inputs": text, "parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]}, "options": {"wait_for_model": True}, }, ) if out: return out except Exception: pass # 3) Two-hop pivot via English for non-English↔non-English if src != "en" and tgt != "en": step_en = _translate_cached(text, src, "en") if step_en and step_en != text: out = _translate_cached(step_en, "en", tgt) if out: return out # 4) Local OPUS fallback (direct pair with local pipeline) out = _translate_local(text, src, tgt) if out: return out log.warning("All translate paths failed (%s->%s); returning original.", src, tgt) return text def translate_text(text: str, target_lang: Optional[str], fallback_src: Optional[str] = None) -> str: if not text or not target_lang: return text tgt = target_lang.lower() if tgt not in SUPPORTED: return text src = (fallback_src or detect_lang(text) or "en").lower() if src == tgt: return text if src not in SUPPORTED: if src.startswith("zh"): src = "zh" elif src.startswith("pt"): src = "pt" elif src[:2] in SUPPORTED: src = src[:2] else: src = "en" return _translate_cached(text, src, tgt) # ----------------- FastAPI ----------------- app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware(GZipMiddleware, minimum_size=500) # === Warm config === WARM_LIMIT_EACH = 20 # smaller bite to prime caches WARM_TIMESPAN = "24h" # narrower GDELT window for faster first fetch WARM_PREFETCH_DESCRIPTIONS = False def _fmt_mmss(ms: float) -> str: total_sec = int(round(ms / 1000.0)) m, s = divmod(total_sec, 60) return f"{m}:{s:02d}" def _warm_once(): try: log.info("WARM: starting background warm-up (limit_each=%d, timespan=%s)", WARM_LIMIT_EACH, WARM_TIMESPAN) t0 = time.perf_counter() # models (you already call these in startup, but keep them here too) get_sbert() get_news_clf() # fetch a small set with shorter timespan t1 = time.perf_counter() raw = combine_raw_articles( category=None, query=None, language="en", limit_each=WARM_LIMIT_EACH, timespan=WARM_TIMESPAN, log_summary=False # ← silence warm-up summary ) t_fetch = (time.perf_counter() - t1) * 1000 # optional: skip description prefetch during warm to save time if WARM_PREFETCH_DESCRIPTIONS: prefetch_descriptions_async(raw) # enrich + cluster once (no translation on warm) t2 = time.perf_counter() enriched = [enrich_article(a, language="en", translate=False, target_lang=None) for a in raw] t_enrich = (time.perf_counter() - t2) * 1000 t3 = time.perf_counter() clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD) t_cluster = (time.perf_counter() - t3) * 1000 # stash in cache under the common default key so /news and /events hit warm data key = cache_key_for(q=None, category=None, language="en", limit_each=WARM_LIMIT_EACH, translate=False, target_lang=None, speed=Speed.balanced) # 👈 add speed _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} t_total = (time.perf_counter() - t0) * 1000 log.info( "WARM: fetch=%s, enrich=%s, cluster=%s, total=%s (raw=%d, enriched=%d, clusters=%d)", _fmt_mmss(t_fetch), _fmt_mmss(t_enrich), _fmt_mmss(t_cluster), _fmt_mmss(t_total), len(raw), len(enriched), len(clusters), ) except Exception as e: log.warning(f"WARM: failed: {e}") @app.on_event("startup") def warm(): # keep your existing model warms get_sbert() get_news_clf() # fire-and-forget warm in a background thread so startup stays snappy threading.Thread(target=_warm_once, daemon=True).start() # ----------------- Providers ----------------- # ISO -> GDELT 'sourcelang:' names (keep yours) _GDELT_LANG = { "en": "english", "es": "spanish", "fr": "french", "de": "german", "it": "italian", "pt": "portuguese", "ru": "russian", "ar": "arabic", "hi": "hindi", "ja": "japanese", "ko": "korean", "zh": "chinese", } def _gdelt_safe_query(user_q, language): parts = [] if user_q: q = user_q.strip() if len(q) < 3: q = f'"{q}" news' parts.append(q) if language and (lg := _GDELT_LANG.get(language.lower())): parts.append(f"sourcelang:{lg}") if not parts: # rotate or randomly choose one to diversify parts.append("sourcelang:english") return " ".join(parts) def fetch_gdelt_articles( limit=50, query=None, language=None, timespan="3d", category=None, extra_tokens: Optional[List[str]] = None ): q = _gdelt_safe_query(query, language) if extra_tokens: q = f"{q} " + " ".join(extra_tokens) url = "https://api.gdeltproject.org/api/v2/doc/doc" params = { "query": q, "mode": "ArtList", "format": "json", "sort": "DateDesc", "maxrecords": int(min(250, max(1, limit))), "timespan": timespan, } headers = { "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", "Accept": "application/json", } def _do_request(p): r = _session_get(url, params=p, timeout=10) log.info(f"GDELT URL: {r.url} (status={r.status_code})") if r.status_code != 200: log.warning(f"GDELT HTTP {r.status_code}: {r.text[:400]}") return None try: return r.json() except Exception: ct = r.headers.get("Content-Type", "") log.warning(f"GDELT non-JSON response. CT={ct}. Body[:400]: {r.text[:400]}") return None data = _do_request(params) if data is None: # Retry narrower and smaller if needed p2 = {**params, "timespan": "24h", "maxrecords": min(100, params["maxrecords"])} data = _do_request(p2) if not data: return [] arts = data.get("articles") or [] results = [] for a in arts: desc = a.get("description") or a.get("content") or "" title = a.get("title") or "" if desc and ( desc.strip().lower() == title.strip().lower() or (len(desc) <= 60 and _too_similar(title, desc)) ): desc = "" desc = desc or "No description available" results.append( { "title": title, "url": a.get("url"), "source": {"name": a.get("domain") or "GDELT"}, "description": desc, "publishedAt": a.get("seendate"), "api_source": "gdelt", "gdelt_sourcecountry": a.get("sourcecountry"), # Keep the user's chosen category only for debugging/reference; do NOT use for inference. "requested_category": category, } ) log.info(f"GDELT returned {len(results)}") return results def fetch_newsdata_articles(category=None, limit=20, query=None, language=None): base_url = "https://newsdata.io/api/1/news" allowed = [ "business", "entertainment", "environment", "food", "health", "politics", "science", "sports", "technology", "top", "world", ] params = {"apikey": NEWSDATA_API_KEY, "language": (language or "en")} if category and category in allowed: params["category"] = category if query: params["q"] = query all_articles, next_page = [], None while len(all_articles) < limit: if next_page: params["page"] = next_page resp = _session_get(base_url, params=params, timeout=12) if resp.status_code != 200: break data = resp.json() articles = data.get("results", []) for a in articles: a["api_source"] = "newsdata" all_articles.extend(articles) next_page = data.get("nextPage") if not next_page: break # normalize timestamps if available for a in all_articles: a["publishedAt"] = a.get("pubDate") return all_articles[:limit] def fetch_gnews_articles(limit=20, query=None, language=None): url = f"https://gnews.io/api/v4/top-headlines?lang={(language or 'en')}&max={limit}&token={GNEWS_API_KEY}" if query: url += f"&q={requests.utils.quote(query)}" try: r = _session_get(url, timeout=12) if r.status_code != 200: return [] arts = r.json().get("articles", []) for a in arts: a["api_source"] = "gnews" return arts except Exception: return [] NEWSAPI_COUNTRIES = ["us", "gb", "ca", "au", "in", "za", "sg", "ie", "nz"] def fetch_newsapi_headlines_multi(limit=50, language=None): if not _newsapi_enabled(): return [] all_ = [] per = max(1, math.ceil(limit / max(1, len(NEWSAPI_COUNTRIES)))) per = min(per, 100) # NewsAPI pageSize cap for c in NEWSAPI_COUNTRIES: url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per}&apiKey={NEWSAPI_KEY}" r = _session_get(url, timeout=12) if r.status_code != 200: log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") continue arts = r.json().get("articles", []) for a in arts: a["api_source"] = "newsapi" all_.extend(arts) time.sleep(0.2) return all_[:limit] # ✅ enforce exact limit def fetch_newsapi_articles(category=None, limit=20, query=None, language=None): if not _newsapi_enabled(): return [] # If a query is provided, use /everything (language allowed here) if query: url = f"https://newsapi.org/v2/everything?pageSize={limit}&apiKey={NEWSAPI_KEY}&q={requests.utils.quote(query)}" if language: url += f"&language={language}" try: r = _session_get(url, timeout=12) if r.status_code != 200: log.warning(f"NewsAPI /everything HTTP {r.status_code}: {r.text[:200]}") return [] arts = r.json().get("articles", []) for a in arts: a["api_source"] = "newsapi" # DO NOT stamp category here; we infer later return arts[:limit] except Exception as e: log.warning(f"NewsAPI /everything request failed: {e}") return [] # Otherwise, rotate /top-headlines by country (no language param) results = [] per_country = max(5, limit // len(NEWSAPI_COUNTRIES)) for c in NEWSAPI_COUNTRIES: url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per_country}&apiKey={NEWSAPI_KEY}" if category: url += f"&category={category}" try: r = _session_get(url, timeout=12) if r.status_code != 200: log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") continue arts = r.json().get("articles", []) for a in arts: a["api_source"] = "newsapi" # DO NOT stamp category here; we infer later results.extend(arts) except Exception as e: log.warning(f"NewsAPI top-headlines {c} failed: {e}") time.sleep(0.2) return results[:limit] def normalize_newsdata_article(article): return { "title": article.get("title"), "url": article.get("link"), "source": {"name": article.get("source_id", "NewsData.io")}, "description": article.get("description") or "No description available", "publishedAt": article.get("publishedAt"), "api_source": article.get("api_source", "newsdata"), "category": ((article.get("category") or [None])[0] if isinstance(article.get("category"), list) else article.get("category")), } # ----------------- Enrichment ----------------- def enrich_article(a, language=None, translate=False, target_lang=None): # Normalize source name source_name = (a.get("source", {}) or {}).get("name", "").strip() or "Unknown" s_lower = source_name.lower() if "newsapi" in s_lower: source_name = "NewsAPI" elif "gnews" in s_lower: source_name = "GNews" elif "newsdata" in s_lower: source_name = "NewsData.io" # Canonicalize URL & derive domain article_url = _canonical_url(a.get("url") or "") try: ext = _tld(article_url) domain = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" except Exception: domain = "" # Country guess (GDELT provides ISO2) country_guess = None if a.get("api_source") == "gdelt": sc = a.get("gdelt_sourcecountry") if sc: iso2map = { "US": "United States", "GB": "United Kingdom", "AU": "Australia", "CA": "Canada", "IN": "India", "DE": "Germany", "FR": "France", "IT": "Italy", "ES": "Spain", "BR": "Brazil", "JP": "Japan", "CN": "China", "RU": "Russia", "KR": "South Korea", "ZA": "South Africa", "NG": "Nigeria", "MX": "Mexico", "AR": "Argentina", "CL": "Chile", "CO": "Colombia", "NL": "Netherlands", "SE": "Sweden", "NO": "Norway", "DK": "Denmark", "FI": "Finland", "IE": "Ireland", "PL": "Poland", "PT": "Portugal", "GR": "Greece", "TR": "Turkey", "IL": "Israel", "SA": "Saudi Arabia", "AE": "United Arab Emirates", "SG": "Singapore", "MY": "Malaysia", "TH": "Thailand", "PH": "Philippines", "ID": "Indonesia", "NZ": "New Zealand", } country_guess = iso2map.get(str(sc).upper(), sc if len(str(sc)) > 2 else None) coords = get_country_centroid(country_guess) if country_guess else geocode_source(source_name, domain, do_network=False) # Title / description (raw) title = (a.get("title") or "").strip() or "(untitled)" description = (a.get("description") or "").strip() if description.lower().startswith("no description"): description = "" # Prefer cached page summary when weak/title-like cached_desc = _desc_cache_get(article_url) need_upgrade = ( (not description) or description.lower().startswith("no description") or len(description) < DESC_MIN_LEN or _too_similar(title, description) ) if need_upgrade and cached_desc: description = cached_desc if description: description = _tidy_description(title, description, source_name) if (not description) or _too_similar(title, description): description = f"Quick take: {title.rstrip('.')}." # Save originals for categorization and debug orig_title = title orig_description = description # Language detection / sentiment detected_lang = (detect_lang(f"{title} {description}") or "").lower() ml_text = f"{orig_title}. {orig_description}".strip() sentiment = classify_sentiment(f"{orig_title} {orig_description}") # Stable id & category (ALWAYS infer; ignore provider/requested categories) seed = f"{source_name}|{article_url}|{title}" uid = hashlib.md5(seed.encode("utf-8")).hexdigest()[:12] cat = infer_category(article_url, orig_title, orig_description, None) return { "id": uid, "title": title, "url": article_url, "source": source_name, "lat": coords["lat"], "lon": coords["lon"], "country": coords.get("country", "Unknown"), "description": description, "sentiment": sentiment, "api_source": a.get("api_source", "unknown"), "publishedAt": a.get("publishedAt"), "_ml_text": ml_text, "orig_title": orig_title, "orig_description": orig_description, "detected_lang": detected_lang, "translated": False, "category": cat, } # ----------------- Clustering into Events ----------------- # sbert_model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") # cluster_articles() def cluster_articles(articles: List[Dict[str, Any]], sim_threshold=SIM_THRESHOLD, speed=Speed.balanced): if speed == Speed.fast: articles = articles[:150] # early cap sim_threshold = max(sim_threshold, 0.64) elif speed == Speed.balanced: articles = articles[:] sim_threshold = max(sim_threshold, 0.62) texts = [_cluster_text(a) for a in articles] embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) clusters = [] # [{indices:[...], centroid:tensor}] centroids = [] for i, emb in enumerate(embs): best_idx, best_sim = -1, -1.0 for ci, c_emb in enumerate(centroids): sim = util.cos_sim(emb, c_emb).item() if sim > sim_threshold and sim > best_sim: best_sim, best_idx = sim, ci if best_idx >= 0: clusters[best_idx]["indices"].append(i) idxs = clusters[best_idx]["indices"] new_c = embs[idxs].mean(dim=0) new_c = new_c / new_c.norm() centroids[best_idx] = new_c clusters[best_idx]["centroid"] = new_c else: # use texts[i] now (titles[] no longer exists) event_id = hashlib.md5(texts[i].encode("utf-8")).hexdigest()[:10] clusters.append({"id": event_id, "indices": [i], "centroid": emb}) centroids.append(emb) # second-pass merge to reduce fragmenting merged = _merge_close_clusters(clusters, embs, threshold=0.70) # keep ids stable: recompute with URLs of member articles for c in merged: c["id"] = cluster_id(c, articles) return merged def event_payload_from_cluster(cluster, enriched_articles): idxs = cluster["indices"] arts = [enriched_articles[i] for i in idxs] title_counts = Counter([a["title"] for a in arts]) canonical_title = title_counts.most_common(1)[0][0] keywords = list({w.lower() for t in title_counts for w in t.split() if len(w) > 3})[:8] sources = {a["source"] for a in arts} countries = {a["country"] for a in arts if a["country"] and a["country"] != "Unknown"} ts = [a.get("publishedAt") for a in arts if a.get("publishedAt")] return { "event_id": cluster_id(cluster, enriched_articles), # <-- stable id "title": canonical_title, "keywords": keywords, "article_count": len(arts), "source_count": len(sources), "country_count": len(countries), "time_range": {"min": min(ts) if ts else None, "max": max(ts) if ts else None}, "sample_urls": [a["url"] for a in arts[:3] if a.get("url")], } def aggregate_event_by_country(cluster, enriched_articles): idxs = cluster["indices"] arts = [enriched_articles[i] for i in idxs] by_country: Dict[str, Dict[str, Any]] = {} for a in arts: c = a.get("country") or "Unknown" if c not in by_country: coords = get_country_centroid(c) by_country[c] = {"country": c, "lat": coords["lat"], "lon": coords["lon"], "articles": []} by_country[c]["articles"].append(a) # summarize per country results = [] for c, block in by_country.items(): arr = block["articles"] # avg sentiment mapped to -1/0/+1 to_num = {"negative": -1, "neutral": 0, "positive": 1} vals = [to_num.get(a["sentiment"], 0) for a in arr] avg = sum(vals) / max(len(vals), 1) avg_sent = "positive" if avg > 0.15 else "negative" if avg < -0.15 else "neutral" top_sources = [s for s, _ in Counter([a["source"] for a in arr]).most_common(3)] # tiny extractive summary: top 2 headlines summary = " • ".join([a["title"] for a in arr[:2]]) results.append( { "country": c, "lat": block["lat"], "lon": block["lon"], "count": len(arr), "avg_sentiment": avg_sent, "top_sources": top_sources, "summary": summary, "samples": [ { "title": a["title"], "orig_title": a.get("orig_title"), "orig_description": a.get("orig_description"), # 👈 add this "url": a["url"], "source": a["source"], "sentiment": a["sentiment"], "detected_lang": a.get("detected_lang"), } for a in arr[:5] ], } ) return results def _merge_close_clusters(clusters, embs, threshold=0.68): # clusters: [{"indices":[...], "centroid":tensor}, ...] – add centroid in your first pass merged = [] used = set() for i in range(len(clusters)): if i in used: continue base = clusters[i] group = [i] for j in range(i + 1, len(clusters)): if j in used: continue sim = util.cos_sim(base["centroid"], clusters[j]["centroid"]).item() if sim >= threshold: group.append(j) # merge those groups all_idx = [] cents = [] for g in group: used.add(g) all_idx.extend(clusters[g]["indices"]) cents.append(clusters[g]["centroid"]) # new centroid newc = torch.stack(cents, dim=0).mean(dim=0) newc = newc / newc.norm() merged.append({"indices": sorted(set(all_idx)), "centroid": newc}) return merged # ----------------- Endpoints ----------------- prefetch = False @app.get("/events") def get_events( q: Optional[str] = Query(None), category: Optional[str] = Query(None), language: Optional[str] = Query(None), translate: Optional[bool] = Query(False), target_lang: Optional[str] = Query(None), limit_each: int = Query(150, ge=5, le=250), max_events: int = Query(15, ge=5, le=50), min_countries: int = Query(2, ge=1, le=50), min_articles: int = Query(2, ge=1, le=200), speed: Speed = Query(Speed.balanced), ): # always build cache on untranslated data cache_key, enriched, clusters = get_or_build_events_cache( q, category, language, False, None, limit_each, speed=speed ) # optional post-translate view (does not mutate cache) view = enriched if translate and target_lang: view = [dict(i) for i in enriched] for i in view: src_hint = i.get("detected_lang") i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) i["translated"] = True events = [event_payload_from_cluster(c, view) for c in clusters] events = [e for e in events if (e["country_count"] >= min_countries and e["article_count"] >= min_articles)] events.sort(key=lambda e: e["article_count"], reverse=True) return {"events": events[:max_events], "cache_key": "|".join(map(str, cache_key))} @app.get("/event/{event_id}") def get_event_details( event_id: str, cache_key: Optional[str] = Query(None), q: Optional[str] = Query(None), category: Optional[str] = Query(None), language: Optional[str] = Query(None), translate: Optional[bool] = Query(False), target_lang: Optional[str] = Query(None), limit_each: int = Query(150, ge=5, le=250), ): # /event/{event_id} if cache_key: parts = cache_key.split("|") if len(parts) != 7: raise HTTPException(status_code=400, detail="Bad cache_key") speed_str = parts[6] try: speed_obj = Speed(speed_str) # "fast" | "balanced" | "max" except ValueError: speed_obj = Speed.balanced key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), parts[4] == "True", parts[5].lower(), speed_str) else: speed_obj = Speed.balanced key_tuple = cache_key_for(q, category, language, limit_each, translate, target_lang, speed=speed_obj) entry = _events_cache.get(key_tuple) if not entry: # always build untranslated _, enriched, clusters = get_or_build_events_cache( q, category, language, False, None, limit_each, speed=speed_obj ) else: enriched, clusters = entry["enriched"], entry["clusters"] # optional post-translate view (do not mutate cache) eview = enriched if translate and target_lang: eview = [dict(i) for i in enriched] for i in eview: src_hint = i.get("detected_lang") i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) i["translated"] = True cluster = next((c for c in clusters if cluster_id(c, enriched) == event_id), None) if not cluster: raise HTTPException(status_code=404, detail="Event not found with current filters") payload = event_payload_from_cluster(cluster, eview) countries = aggregate_event_by_country(cluster, eview) payload["articles_in_event"] = sum(c["count"] for c in countries) return {"event": payload, "countries": countries} @app.get("/news") def get_news( cache_key: Optional[str] = Query(None), category: Optional[str] = Query(None), sentiment: Optional[str] = Query(None), q: Optional[str] = Query(None), language: Optional[str] = Query(None), translate: Optional[bool] = Query(False), target_lang: Optional[str] = Query(None), limit_each: int = Query(100, ge=5, le=100), lite: bool = Query(True), speed: Speed = Query(Speed.balanced), page: int = Query(1, ge=1), page_size: int = Query(120, ge=5, le=300), ): enriched: List[Dict[str, Any]] = [] # Pull from cache if provided if cache_key: parts = cache_key.split("|") if len(parts) == 7: key_tuple = ( parts[0], # q parts[1], # category parts[2], # language int(parts[3]), # limit_each parts[4] == "True", # translate parts[5].lower(), # target_lang parts[6], # speed ) entry = _events_cache.get(key_tuple) if entry: enriched = entry["enriched"] # Fetch fresh if no cached items if not enriched: raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each, speed=speed) prefetch_descriptions_async(raw, speed) enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] if category: cat_norm = (category or "").strip().lower() enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] else: enriched = enriched_all else: # If we got cached items but want to ensure the selected category is enforced: if category: cat_norm = (category or "").strip().lower() enriched = [e for e in enriched if (e.get("category") or "").lower() == cat_norm] # Translation (optional) if translate and target_lang: enriched = [dict(i) for i in enriched] for i in enriched: i["original_title"] = i.get("orig_title") or i.get("title") i["original_description"] = i.get("orig_description") or i.get("description") src_hint = i.get("detected_lang") i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) i["translated"] = True i["translated_from"] = (src_hint or "").lower() i["translated_to"] = target_lang.lower() # Optional sentiment filter if sentiment: s = sentiment.strip().lower() enriched = [i for i in enriched if i.get("sentiment", "").lower() == s] # Pagination total = len(enriched) start = (page - 1) * page_size end = start + page_size items = [dict(i) for i in enriched[start:end]] # Trim debug fields if lite: drop = {"_ml_text"} for i in items: for k in drop: i.pop(k, None) return { "items": items, "total": total, "page": page, "page_size": page_size } def combine_raw_articles(category=None, query=None, language=None, limit_each=30, timespan="3d", speed=Speed.balanced, log_summary: bool = True): if speed == Speed.fast: timespan = "24h" limit_each = min(limit_each, 20) elif speed == Speed.balanced: timespan = "48h" limit_each = min(limit_each, 150) a1 = [] if USE_NEWSAPI: if not query: a1 = fetch_newsapi_headlines_multi(limit=limit_each, language=language) else: a1 = fetch_newsapi_articles(category=category, limit=limit_each, query=query, language=language) a2 = [] if USE_NEWSDATA_API: a2 = [ normalize_newsdata_article(a) for a in fetch_newsdata_articles(category=category, limit=limit_each, query=query, language=language) if a.get("link") ] a3 = fetch_gnews_articles(limit=limit_each, query=query, language=language) if USE_GNEWS_API else [] # a4 = fetch_gdelt_articles( # limit=min(100, limit_each * 2), # query=query, # language=language, # timespan=timespan, # category=category # ) gdelt_limit = limit_each a4 = fetch_gdelt_multi( limit=gdelt_limit, query=query, language=language, # if provided, we honor it (with small EN boost) timespan=timespan, category=category, speed=speed, ) # Dedup by canonical URL (maintain source precedence) seen, merged = set(), [] for a in a1 + a3 + a2 + a4: if a.get("url"): a["url"] = _canonical_url(a["url"]) url = a["url"] if url not in seen: seen.add(url) merged.append(a) if log_summary: fetch_log.info("----- Article Fetch Summary -----") fetch_log.info(f"📊 NewsAPI returned: {len(a1)} articles") fetch_log.info(f"📊 NewsData.io returned: {len(a2)} articles") fetch_log.info(f"📊 GNews returned: {len(a3)} articles") fetch_log.info(f"📊 GDELT returned: {len(a4)} articles") fetch_log.info(f"✅ Total merged articles after deduplication: {len(merged)}") fetch_log.info("---------------------------------") return merged @app.get("/related") def related_articles( id: Optional[str] = Query(None, description="article id from /news"), title: Optional[str] = Query(None), description: Optional[str] = Query(None), q: Optional[str] = Query(None), category: Optional[str] = Query(None), language: Optional[str] = Query(None), limit_each: int = Query(50, ge=5, le=100), k: int = Query(10, ge=1, le=50), ): # ensure we have a working article list (enriched) to search over raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each) enriched = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] if not enriched: return {"items": []} # pick the query vector if id: base = next((a for a in enriched if a.get("id") == id), None) if not base: raise HTTPException(404, "article id not found in current fetch") query_text = base["_ml_text"] else: text = f"{title or ''} {description or ''}".strip() if not text: raise HTTPException(400, "provide either id or title/description") query_text = text corpus_texts = [a["_ml_text"] for a in enriched] corpus_embs = _embed_texts(corpus_texts) query_emb = _embed_texts([query_text])[0] # cosine similarities sims = util.cos_sim(query_emb, corpus_embs).cpu().numpy().flatten() # take top-k excluding the query itself (if id provided) idxs = sims.argsort()[::-1] items = [] for idx in idxs: a = enriched[idx] if id and a["id"] == id: continue items.append({**a, "similarity": float(sims[idx])}) if len(items) >= k: break return {"items": items} @app.middleware("http") async def timing_middleware(request, call_next): start = time.perf_counter() response = None try: response = await call_next(request) return response finally: dur_ms = (time.perf_counter() - start) * 1000 # log.info(f"{request.method} {request.url.path} -> {dur_ms:.1f} ms ({_fmt_mmss(dur_ms)})") if response is not None: try: response.headers["X-Process-Time-ms"] = f"{dur_ms:.1f}" except Exception: pass @app.post("/client-metric") def client_metric(payload: Dict[str, Any] = Body(...)): name = (payload.get("name") or "").strip() # Drop redraw spam if it ever slips through again if name in {"Load all article markers on globe", "Load event country markers on globe"}: return {"ok": True} return {"ok": True} @app.get("/diag/translate") def diag_translate(): remote = _hf_call("Helsinki-NLP/opus-mt-es-en", {"inputs":"Hola mundo"}) local = _translate_local("Hola mundo", "es", "en") libre = _translate_via_libre("Hola mundo", "es", "en") return { "token": bool(HUGGINGFACE_API_TOKEN), "remote_ok": bool(remote), "local_ok": bool(local), "libre_ok": bool(libre), "sample": libre or remote or local } @app.get("/", include_in_schema=False) def root(): return {"ok": True, "service": "newsglobe-backend"} @app.get("/favicon.ico", include_in_schema=False) def favicon(): return PlainTextResponse("", status_code=204)