# ----------------- Imports (Stdlib + Typing) ----------------- from fastapi import FastAPI, Query, HTTPException, Body from typing import Optional, List, Dict, Any, Tuple, Set import os import time import socket import logging import hashlib from functools import lru_cache from collections import Counter import requests import tldextract import math import nltk from nltk.sentiment import SentimentIntensityAnalyzer from geopy.geocoders import Nominatim from fastapi.middleware.cors import CORSMiddleware from countryinfo import CountryInfo from sentence_transformers import SentenceTransformer, util from domain_country_map import domain_country_map from time import monotonic from langdetect import detect, DetectorFactory import re from urllib.parse import urlparse, urlunparse, parse_qsl from concurrent.futures import ThreadPoolExecutor, as_completed from html import unescape import threading import difflib from starlette.middleware.gzip import GZipMiddleware from transformers import pipeline as hf_pipeline os.environ.setdefault("OMP_NUM_THREADS", "1") from fastapi.responses import PlainTextResponse, JSONResponse from datetime import datetime, timezone # ----------------- Torch Runtime Settings ----------------- import torch torch.set_num_threads(2) # ----------------- Optional Local Tokenizers ----------------- try: import sentencepiece as _spm _HAS_SENTENCEPIECE = True except Exception: _HAS_SENTENCEPIECE = False # ----------------- Runtime Modes / Speed Enum ----------------- from enum import Enum class Speed(str, Enum): fast = "fast" balanced = "balanced" max = "max" # ----------------- Global Model Handles / Pipelines ----------------- _local_pipes = {} _news_clf = None _sbert = None # ----------------- tldextract (PSL-cached) ----------------- _TLD_CACHE = os.getenv("TLDEXTRACT_CACHE", "/data/tld_cache") try: _tld = tldextract.TLDExtract(cache_dir=_TLD_CACHE, suffix_list_urls=None) except Exception: _tld = tldextract.extract # ----------------- Translation Runtime Flags ----------------- ALLOW_HF_REMOTE = os.getenv("ALLOW_HF_REMOTE", "0") == "1" _hf_bad_models: Set[str] = set() # ----------------- FastAPI App + Middleware ----------------- app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware(GZipMiddleware, minimum_size=500) @app.api_route("/", methods=["GET", "HEAD"], include_in_schema=False) def root(): return JSONResponse({"ok": True, "service": "newsglobe-backend"}) @app.api_route("/healthz", methods=["GET", "HEAD"], include_in_schema=False) def healthz(): return PlainTextResponse("OK", status_code=200) @app.get("/favicon.ico", include_in_schema=False) def favicon(): return PlainTextResponse("", status_code=204) # ----------------- HTTP Session (connection pooling) ----------------- SESSION = requests.Session() ADAPTER = requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2) SESSION.mount("http://", ADAPTER) SESSION.mount("https://", ADAPTER) def _session_get(url, **kwargs): headers = kwargs.pop("headers", {}) headers.setdefault("User-Agent", "Mozilla/5.0 (compatible; NewsGlobe/1.0)") return SESSION.get(url, headers=headers, timeout=kwargs.pop("timeout", 12), **kwargs) # ----------------- Lightweight Reader Fallback (Jina) ----------------- def _try_jina_reader(url: str, timeout: int) -> Optional[str]: try: u = url.strip() if not u.startswith(("http://", "https://")): u = "https://" + u r = _session_get(f"https://r.jina.ai/{u}", timeout=timeout) if r.status_code == 200: txt = _clean_text(r.text) sents = _split_sentences(txt) if sents: best = " ".join(sents[:2]) return best if len(best) >= 80 else (sents[0] if sents else None) except Exception: pass return None # ----------------- Description Cleanup Helpers ----------------- BOILER_DESC = re.compile( r"(subscribe|sign in|sign up|enable javascript|cookies? (policy|settings)|" r"privacy (policy|notice)|continue reading|read more|click here|" r"accept (cookies|the terms)|by continuing|newsletter|advertisement|adblock)", re.I ) def _split_sentences(text: str) -> List[str]: parts = re.split(r"(?<=[\.\?\!])\s+(?=[A-Z0-9])", (text or "").strip()) out = [] for p in parts: out.extend(re.split(r"\s+[•–—]\s+", p)) return [p.strip() for p in out if p and len(p.strip()) >= 2] def _too_similar(a: str, b: str, thresh: float = 0.92) -> bool: a = (a or "").strip() b = (b or "").strip() if not a or not b: return False if a.lower() == b.lower(): return True if a.lower() in b.lower() or b.lower() in a.lower(): return True ratio = difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio() return ratio >= thresh def _dedupe_title_from_desc(title: str, desc: str) -> str: t = (title or "").strip() d = (desc or "").strip() if not t or not d: return d if d.lower().startswith(t.lower()): d = d[len(t):].lstrip(" -–:•|") d = d.replace(t, "").strip(" -–:•|") d = _clean_text(d) return d def _clean_text(s: str) -> str: s = unescape(s or "") s = re.sub(r"\s+", " ", s).strip() return s def _tidy_description(title: str, desc: str, source_name: str, max_chars: int = 240) -> str: if not desc: return "" desc = _dedupe_title_from_desc(title, desc) desc = BOILER_DESC.sub("", desc) desc = re.sub(r"\s+", " ", desc).strip(" -–:•|") sents = _split_sentences(desc) if not sents: sents = [desc] best = " ".join(sents[:2]).strip() if len(best) > max_chars: if len(sents[0]) <= max_chars * 0.9: best = sents[0] else: best = best[:max_chars].rsplit(" ", 1)[0].rstrip(",;:-–—") if _too_similar(title, best): for alt in sents[1:3]: if not _too_similar(title, alt): best = alt break if best and best[-1] not in ".!?": best += "." return best # ----------------- Inflight Request Coalescing ----------------- _inflight_locks: Dict[Tuple, threading.Lock] = {} _inflight_global_lock = threading.Lock() def _get_inflight_lock(key: Tuple) -> threading.Lock: with _inflight_global_lock: lk = _inflight_locks.get(key) if lk is None: lk = threading.Lock() _inflight_locks[key] = lk return lk # ----------------- Description Fetching (Cache + Extract) ----------------- DESC_CACHE_LOCK = threading.Lock() try: from bs4 import BeautifulSoup except Exception: BeautifulSoup = None DESC_FETCH_TIMEOUT = 3 DESC_MIN_LEN = 100 DESC_CACHE_TTL = 24 * 3600 MAX_DESC_FETCHES = 24 DESC_WORKERS = 12 DESC_CACHE: Dict[str, Dict[str, Any]] = {} def _now_mono(): # Monotonic for TTL calculations try: return monotonic() except Exception: return time.time() def _extract_desc_from_ld_json(html: str) -> Optional[str]: # Prefer LD-JSON when present (often cleaner summaries) if not html or not BeautifulSoup: return None try: soup = BeautifulSoup(html, "html.parser") for tag in soup.find_all("script", {"type": "application/ld+json"}): try: import json data = json.loads(tag.string or "") except Exception: continue def find_desc(obj): if not isinstance(obj, (dict, list)): return None if isinstance(obj, list): for it in obj: v = find_desc(it) if v: return v return None for key in ("description", "abstract", "articleBody"): val = obj.get(key) if isinstance(val, str): txt = _clean_text(val) if len(txt) >= 40: return txt for k, v in obj.items(): if isinstance(v, (dict, list)): got = find_desc(v) if got: return got return None d = find_desc(data) if d and len(d) >= 40: return d except Exception: pass return None # Heuristic to detect consent walls and jump to reader fallback CONSENT_HINTS = re.compile(r"(consent|gdpr|privacy choices|before you continue|we value your privacy)", re.I) def _looks_like_consent_wall(html: str) -> bool: if not html: return False if "consent.yahoo.com" in html.lower(): return True return bool(CONSENT_HINTS.search(html)) def _extract_desc_from_html(html: str) -> Optional[str]: html = html or "" if BeautifulSoup: soup = BeautifulSoup(html, "html.parser") ld = _extract_desc_from_ld_json(html) if ld: txt = _clean_text(ld) if 40 <= len(txt) <= 480: return txt for sel, attr in [ ('meta[property="og:description"]', "content"), ('meta[name="twitter:description"]', "content"), ('meta[name="description"]', "content"), ]: tag = soup.select_one(sel) if tag: txt = _clean_text(tag.get(attr, "")) if len(txt) >= 40: return txt for p in soup.find_all("p"): txt = _clean_text(p.get_text(" ")) if len(txt) >= 80: return txt else: for pat in [ r']+property=["\']og:description["\'][^>]+content=["\']([^"\']+)["\']', r']+name=["\']twitter:description["\'][^>]+content=["\']([^"\']+)["\']', r']+name=["\']description["\'][^>]+content=["\']([^"\']+)["\']', ]: m = re.search(pat, html, flags=re.I | re.S) if m: txt = _clean_text(m.group(1)) if len(txt) >= 40: return txt m = re.search(r"]*>(.*?)

", html, flags=re.I | re.S) if m: txt = _clean_text(re.sub("<[^>]+>", " ", m.group(1))) if len(txt) >= 80: return txt return None def _desc_cache_get(url: str) -> Optional[str]: if not url: return None with DESC_CACHE_LOCK: entry = DESC_CACHE.get(url) if not entry: return None if _now_mono() - entry["t"] > DESC_CACHE_TTL: DESC_CACHE.pop(url, None) return None return entry["text"] def _desc_cache_put(url: str, text: str): if url and text: with DESC_CACHE_LOCK: DESC_CACHE[url] = {"text": text, "t": _now_mono()} def _attempt_fetch(url: str, timeout: int) -> Optional[str]: # Fetch page and extract description; fallback to reader if needed headers = { "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", } try: r = _session_get(url, headers=headers, timeout=timeout, allow_redirects=True) if r.status_code != 200: return None ct = (r.headers.get("Content-Type") or "").lower() txt = r.text or "" if "html" not in ct and " Optional[str]: # Public entry: consult cache -> fetch -> AMP variants -> cache if not url: return None cached = _desc_cache_get(url) if cached: return cached desc = _attempt_fetch(url, DESC_FETCH_TIMEOUT) if not desc: amp_candidates = [] try: p = urlparse(url) if not p.path.endswith("/amp"): amp_candidates.append(urlunparse(p._replace(path=(p.path.rstrip("/") + "/amp")))) q = p.query amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "amp=1")))) amp_candidates.append(urlunparse(p._replace(query=(q + ("&" if q else "") + "outputType=amp")))) except Exception: pass for amp_url in amp_candidates: desc = _attempt_fetch(amp_url, DESC_FETCH_TIMEOUT) if desc: break if desc: _desc_cache_put(url, desc) return desc return None def _needs_desc_upgrade(a: Dict[str, Any]) -> bool: # Decide if we should try to refetch a better description url = a.get("url") or "" if not url: return False title = (a.get("title") or "").strip() desc = (a.get("description") or "").strip() if not desc or desc.lower().startswith("no description"): return True if len(desc) < DESC_MIN_LEN: return True if _too_similar(title, desc): return True return False def prefetch_descriptions(raw_articles: List[Dict[str, Any]], speed: Speed = Speed.balanced): # Parallel prefetch for weak descriptions (bounded to avoid stampedes) candidates, seen = [], set() max_fetches = 6 if speed == Speed.fast else 8 if speed == Speed.balanced else 16 timeout = 1 if speed == Speed.fast else 2 workers = 3 if speed == Speed.fast else 4 if speed == Speed.balanced else 8 for a in raw_articles: url = a.get("url") if not url or url in seen: continue seen.add(url) if _needs_desc_upgrade(a) and not _desc_cache_get(url): candidates.append(url) if len(candidates) >= max_fetches: break if not candidates: return with ThreadPoolExecutor(max_workers=workers) as ex: futs = [ex.submit(fetch_page_description, u) for u in candidates] for _ in as_completed(futs): pass def prefetch_descriptions_async(raw_articles, speed: Speed = Speed.balanced): threading.Thread(target=prefetch_descriptions, args=(raw_articles, speed), daemon=True).start() # ----------------- Category / Keyword Heuristics ----------------- DetectorFactory.seed = 0 SECTION_HINTS = { "sports": "sports", "sport": "sports", "business": "business", "money": "business", "market": "business", "tech": "technology", "technology": "technology", "sci": "science", "science": "science", "health": "health", "wellness": "health", "entertainment": "entertainment", "culture": "entertainment", "showbiz": "entertainment", "crime": "crime", "world": "general", "weather": "weather", "environment": "environment", "climate": "environment", "travel": "travel", "politics": "politics", "election": "politics", } KEYWORDS = { "sports": r"\b(NBA|NFL|MLB|NHL|Olympic|goal|match|tournament|coach|transfer)\b", "business": r"\b(stocks?|earnings|IPO|merger|acquisition|revenue|inflation|market|tax|budget|inflation|revenue|deficit)\b", "technology": r"\b(AI|software|chip|semiconductor|app|startup|cyber|hack|quantum|robot)\b", "science": r"\b(researchers?|study|physics|astronomy|genome|spacecraft|telescope)\b", "health": r"\b(virus|vaccine|disease|hospital|doctor|public health|covid|recall|FDA|contamination|disease outbreak)\b", "entertainment": r"\b(movie|film|box office|celebrity|series|show|album|music|)\b", "crime": r"\b(arrested|charged|police|homicide|fraud|theft|court|lawsuit)\b", "weather": r"\b(hurricane|storm|flood|heatwave|blizzard|tornado|forecast)\b", "environment": r"\b(climate|emissions|wildfire|deforestation|biodiversity)\b", "travel": r"\b(flight|airline|airport|tourism|visa|cruise|hotel)\b", "politics": r"\b(president|parliament|congress|minister|policy|campaign|election|rally|protest|demonstration)\b", } # ----------------- Category normalization to frontend set ----------------- FRONTEND_CATS = { "politics","technology","sports","business","entertainment", "science","health","crime","weather","environment","travel", "viral","general" } ML_TO_FRONTEND = { "arts_&_culture": "entertainment", "business": "business", "business_&_entrepreneurs": "business", "celebrity_&_pop_culture": "entertainment", "crime": "crime", "diaries_&_daily_life": "viral", "entertainment": "entertainment", "environment": "environment", "fashion_&_style": "entertainment", "film_tv_&_video": "entertainment", "fitness_&_health": "health", "food_&_dining": "entertainment", "general": "general", "learning_&_educational": "science", "news_&_social_concern": "politics", "politics": "politics", "science_&_technology": "science", "sports": "sports", "technology": "technology", "travel_&_adventure": "travel", "other_hobbies": "viral" } def normalize_category(c: Optional[str]) -> str: s = (c or "").strip().lower() if not s: return "general" if s in FRONTEND_CATS: return s return ML_TO_FRONTEND.get(s, "general") def get_news_clf(): # Lazy-init topic classifier global _news_clf if _news_clf is None: _news_clf = hf_pipeline( "text-classification", model="cardiffnlp/tweet-topic-21-multi", top_k=1, ) return _news_clf def _infer_category_from_url_path(url_path: str) -> Optional[str]: # Order: provided -> URL path -> keyword -> ML fallback parts = [p for p in url_path.lower().split("/") if p] for p in parts: if p in SECTION_HINTS: return SECTION_HINTS[p] for p in parts: for tok in re.split(r"[-_]", p): if tok in SECTION_HINTS: return SECTION_HINTS[tok] return None def _infer_category_from_text(text: str) -> Optional[str]: if not text: return None for cat, pat in KEYWORDS.items(): if re.search(pat, text, flags=re.I): return cat return None def infer_category(article_url, title, description, provided): if provided: got = normalize_category(provided) if got: return got try: p = urlparse(article_url).path or "" cat = _infer_category_from_url_path(p) if cat: return normalize_category(cat) except Exception: pass text = f"{title or ''} {description or ''}".strip() cat = _infer_category_from_text(text) if cat: return normalize_category(cat) try: preds = get_news_clf()(text[:512]) label = preds[0][0]["label"] if isinstance(preds[0], list) else preds[0]["label"] return normalize_category(label) except Exception: return "general" # ----------------- Language Detection / Embeddings ----------------- def detect_lang(text: str) -> Optional[str]: try: return detect(text) except Exception: return None def get_sbert(): global _sbert if _sbert is None: _sbert = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") return _sbert def _embed_texts(texts: List[str]): embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) return embs # ----------------- NLTK / VADER Sentiment ----------------- NLTK_DATA_DIR = os.environ.get("NLTK_DATA", "/app/nltk_data") if NLTK_DATA_DIR not in nltk.data.path: nltk.data.path.insert(0, NLTK_DATA_DIR) try: nltk.data.find("sentiment/vader_lexicon") except LookupError: try: os.makedirs(NLTK_DATA_DIR, exist_ok=True) nltk.download("vader_lexicon", download_dir=NLTK_DATA_DIR, quiet=True) except Exception: pass try: _vader = SentimentIntensityAnalyzer() except Exception: _vader = None def classify_sentiment(text: str) -> str: if not text: return "neutral" if _vader is None: return "neutral" scores = _vader.polarity_scores(text) c = scores["compound"] return "positive" if c >= 0.2 else "negative" if c <= -0.2 else "neutral" # ----------------- Geocoding / Domain → Country ----------------- def get_country_centroid(country_name): if not country_name or country_name == "Unknown": return {"lat": 0, "lon": 0, "country": "Unknown"} try: country = CountryInfo(country_name) latlng = country.capital_latlng() return {"lat": latlng[0], "lon": latlng[1], "country": country_name} except Exception as e: log.info(f"Could not get centroid for {country_name}: {e}") return {"lat": 0, "lon": 0, "country": country_name or "Unknown"} def resolve_domain_to_ip(domain): if not domain: return None try: return socket.gethostbyname(domain) except socket.gaierror: return None def geolocate_ip(ip): try: r = _session_get(f"https://ipwho.is/{ip}?fields=success,country,latitude,longitude", timeout=8) j = r.json() if j.get("success"): return {"lat": j["latitude"], "lon": j["longitude"], "country": j["country"]} except Exception: pass return None # Nominatim for a light refinement pass (async) geolocator = Nominatim(user_agent="newsglobe-app (contact: you@example.com)") domain_geo_cache: Dict[str, Dict[str, Any]] = {} MAJOR_OUTLETS = { "bbc.co.uk": "United Kingdom", "theguardian.com": "United Kingdom", "reuters.com": "United States", "aljazeera.com": "Qatar", "lemonde.fr": "France", "dw.com": "Germany", "abc.net.au": "Australia", "ndtv.com": "India", "globo.com": "Brazil", "elpais.com": "Spain", "lefigaro.fr": "France", "kyodonews.net": "Japan", "straitstimes.com": "Singapore", "thesun.my": "Malaysia", } def geocode_source(source_text: str, domain: str = "", do_network: bool = False): cache_key = f"{source_text}|{domain}" if cache_key in domain_geo_cache: return domain_geo_cache[cache_key] ext = _tld(domain or "") fqdn = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" if fqdn in MAJOR_OUTLETS: coords = get_country_centroid(MAJOR_OUTLETS[fqdn]); domain_geo_cache[cache_key] = coords; return coords if ext.domain in domain_country_map: coords = get_country_centroid(domain_country_map[ext.domain]); domain_geo_cache[cache_key] = coords; return coords coords = get_country_centroid(_suffix_country(ext.suffix)) domain_geo_cache[cache_key] = coords if do_network: threading.Thread(target=_refine_geo_async, args=(cache_key, source_text, fqdn), daemon=True).start() return coords def _suffix_country(suffix: Optional[str]) -> str: s = (suffix or "").split(".")[-1] m = { "au":"Australia","uk":"United Kingdom","gb":"United Kingdom","ca":"Canada","in":"India","us":"United States", "ng":"Nigeria","de":"Germany","fr":"France","jp":"Japan","sg":"Singapore","za":"South Africa","nz":"New Zealand", "ie":"Ireland","it":"Italy","es":"Spain","se":"Sweden","ch":"Switzerland","nl":"Netherlands","br":"Brazil", "my":"Malaysia","id":"Indonesia","ph":"Philippines","th":"Thailand","vn":"Vietnam","sa":"Saudi Arabia", "ae":"United Arab Emirates","tr":"Turkey","mx":"Mexico","ar":"Argentina","cl":"Chile","co":"Colombia", "il":"Israel","kr":"South Korea","cn":"China","tw":"Taiwan","hk":"Hong Kong" } return m.get(s, "United States" if s in ("com","org","net") else "Unknown") def _refine_geo_async(cache_key, source_text, fqdn): try: ip = resolve_domain_to_ip(fqdn) if fqdn else None if ip: coords = geolocate_ip(ip) if coords: domain_geo_cache[cache_key] = coords return location = geolocator.geocode(f"{source_text} News Headquarters", timeout=2) if location and hasattr(location, "raw"): coords = { "lat": location.latitude, "lon": location.longitude, "country": location.raw.get("address", {}).get("country", "Unknown"), } domain_geo_cache[cache_key] = coords except Exception: pass # ----------------- Translation (HF / Libre / Local) ----------------- HF_MODEL_PRIMARY = None NLLB_CODES = { "en": "eng_Latn", "es": "spa_Latn", "fr": "fra_Latn", "de": "deu_Latn", "it": "ita_Latn", "pt": "por_Latn", "zh": "zho_Hans", "ru": "rus_Cyrl", "ar": "arb_Arab", "hi": "hin_Deva", "ja": "jpn_Jpan", "ko": "kor_Hang", } def opus_model_for(src2: str, tgt2: str) -> Optional[str]: pairs = { ("es", "en"): "Helsinki-NLP/opus-mt-es-en", ("en", "es"): "Helsinki-NLP/opus-mt-en-es", ("fr", "en"): "Helsinki-NLP/opus-mt-fr-en", ("en", "fr"): "Helsinki-NLP/opus-mt-en-fr", ("de", "en"): "Helsinki-NLP/opus-mt-de-en", ("en", "de"): "Helsinki-NLP/opus-mt-en-de", ("pt", "en"): "Helsinki-NLP/opus-mt-pt-en", ("en", "pt"): "Helsinki-NLP/opus-mt-en-pt", ("it", "en"): "Helsinki-NLP/opus-mt-it-en", ("en", "it"): "Helsinki-NLP/opus-mt-en-it", ("ru", "en"): "Helsinki-NLP/opus-mt-ru-en", ("en", "ru"): "Helsinki-NLP/opus-mt-en-ru", ("zh", "en"): "Helsinki-NLP/opus-mt-zh-en", ("en", "zh"): "Helsinki-NLP/opus-mt-en-zh", ("ja", "en"): "Helsinki-NLP/opus-mt-ja-en", ("en", "ja"): "Helsinki-NLP/opus-mt-en-ja", ("ko", "en"): "Helsinki-NLP/opus-mt-ko-en", ("en", "ko"): "Helsinki-NLP/opus-mt-en-ko", ("hi", "en"): "Helsinki-NLP/opus-mt-hi-en", ("en", "hi"): "Helsinki-NLP/opus-mt-en-hi", ("ar", "en"): "Helsinki-NLP/opus-mt-ar-en", ("en", "ar"): "Helsinki-NLP/opus-mt-en-ar", } return pairs.get((src2, tgt2)) SUPPORTED = {"en", "fr", "de", "es", "it", "hi", "ar", "ru", "ja", "ko", "pt", "zh"} LIBRETRANSLATE_URL = os.getenv("LIBRETRANSLATE_URL") def _lt_lang(code: str) -> str: if not code: return code c = code.lower() # LibreTranslate uses zh-Hans; normalize zh* to zh-Hans if c.startswith("zh"): return "zh-Hans" return c def _translate_via_libre(text: str, src: str, tgt: str) -> Optional[str]: url = LIBRETRANSLATE_URL if not url or not text or src == tgt: return None payload = { "q": text, "source": _lt_lang(src), "target": _lt_lang(tgt), "format": "text", } # First call can be slow while LT warms models; retry once. for attempt in (1, 2): try: r = SESSION.post( f"{url.rstrip('/')}/translate", json=payload, timeout=15 # was 6 ) if r.status_code == 200: j = r.json() out = j.get("translatedText") return out if isinstance(out, str) and out else None else: log.warning("LibreTranslate HTTP %s: %s", r.status_code, r.text[:200]) return None except Exception as e: if attempt == 2: log.warning("LibreTranslate failed: %s", e) return None time.sleep(0.5) def _hf_call(model_id: str, payload: dict) -> Optional[str]: if not (HUGGINGFACE_API_TOKEN and ALLOW_HF_REMOTE): return None if model_id in _hf_bad_models: return None url = f"https://api-inference.huggingface.co/models/{model_id}" headers = { "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", "HF-API-KEY": HUGGINGFACE_API_TOKEN, "Accept": "application/json", "Content-Type": "application/json", } try: r = requests.post(url, headers=headers, json=payload, timeout=25) if r.status_code != 200: if r.status_code == 404: _hf_bad_models.add(model_id) log.warning("HF %s -> 404: Not Found (disabled for this process)", model_id) else: log.warning("HF %s -> %s: %s", model_id, r.status_code, r.text[:300]) return None j = r.json() except Exception as e: log.warning("HF request failed: %s", e) return None if isinstance(j, list) and j and isinstance(j[0], dict): if "generated_text" in j[0]: return j[0]["generated_text"] if "translation_text" in j[0]: return j[0]["translation_text"] if isinstance(j, dict) and "generated_text" in j: return j["generated_text"] if isinstance(j, str): return j return None @lru_cache(maxsize=4096) def _translate_cached(text: str, src: str, tgt: str) -> str: if not text or src == tgt: return text out = _translate_via_libre(text, src, tgt) if out: return out opus_model = opus_model_for(src, tgt) if opus_model: out = _hf_call(opus_model, {"inputs": text}) if out: return out try: if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES): out = _hf_call( HF_MODEL_PRIMARY, { "inputs": text, "parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]}, "options": {"wait_for_model": True}, }, ) if out: return out except Exception: pass if src != "en" and tgt != "en": step_en = _translate_cached(text, src, "en") if step_en and step_en != text: out = _translate_cached(step_en, "en", tgt) if out: return out out = _translate_local(text, src, tgt) if out: return out log.warning("All translate paths failed (%s->%s); returning original.", src, tgt) return text def translate_text(text: str, target_lang: Optional[str], fallback_src: Optional[str] = None) -> str: if not text or not target_lang: return text tgt = target_lang.lower() if tgt not in SUPPORTED: return text src = (fallback_src or detect_lang(text) or "en").lower() if src == tgt: return text if src not in SUPPORTED: if src.startswith("zh"): src = "zh" elif src.startswith("pt"): src = "pt" elif src[:2] in SUPPORTED: src = src[:2] else: src = "en" return _translate_cached(text, src, tgt) def _translate_local(text: str, src: str, tgt: str) -> Optional[str]: if not _HAS_SENTENCEPIECE: return None model_id = opus_model_for(src, tgt) if not model_id: return None key = model_id try: if key not in _local_pipes: _local_pipes[key] = hf_pipeline("translation", model=model_id) out = _local_pipes[key](text, max_length=512) return out[0]["translation_text"] except Exception as e: log.warning("Local translate failed for %s: %s", model_id, e) return None # ----------------- Warmup Settings & Routine ----------------- WARM_LIMIT_EACH = 20 WARM_TIMESPAN = "24h" WARM_PREFETCH_DESCRIPTIONS = False def _fmt_mmss(ms: float) -> str: total_sec = int(round(ms / 1000.0)) m, s = divmod(total_sec, 60) return f"{m}:{s:02d}" def _warm_once(): try: log.info("WARM: starting background warm-up (limit_each=%d, timespan=%s)", WARM_LIMIT_EACH, WARM_TIMESPAN) t0 = time.perf_counter() get_sbert() get_news_clf() t1 = time.perf_counter() raw = combine_raw_articles( category=None, query=None, language="en", limit_each=WARM_LIMIT_EACH, timespan=WARM_TIMESPAN, log_summary=False ) t_fetch = (time.perf_counter() - t1) * 1000 if WARM_PREFETCH_DESCRIPTIONS: prefetch_descriptions_async(raw) t2 = time.perf_counter() enriched = [enrich_article(a, language="en", translate=False, target_lang=None) for a in raw] t_enrich = (time.perf_counter() - t2) * 1000 t3 = time.perf_counter() clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD) t_cluster = (time.perf_counter() - t3) * 1000 key = cache_key_for(q=None, category=None, language="en", limit_each=WARM_LIMIT_EACH, translate=False, target_lang=None, speed=Speed.balanced) _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} t_total = (time.perf_counter() - t0) * 1000 log.info( "WARM: fetch=%s, enrich=%s, cluster=%s, total=%s (raw=%d, enriched=%d, clusters=%d)", _fmt_mmss(t_fetch), _fmt_mmss(t_enrich), _fmt_mmss(t_cluster), _fmt_mmss(t_total), len(raw), len(enriched), len(clusters), ) except Exception as e: log.warning(f"WARM: failed: {e}") @app.on_event("startup") def warm(): try: _translate_cached.cache_clear() except Exception: pass get_sbert() get_news_clf() threading.Thread(target=_warm_once, daemon=True).start() # ----------------- GDELT Query Helpers ----------------- _GDELT_LANG = { "en": "english", "es": "spanish", "fr": "french", "de": "german", "it": "italian", "pt": "portuguese", "ru": "russian", "ar": "arabic", "hi": "hindi", "ja": "japanese", "ko": "korean", "zh": "chinese", } def _gdelt_safe_query(user_q, language): parts = [] if user_q: q = user_q.strip() if len(q) < 3: q = f'"{q}" news' parts.append(q) if language and (lg := _GDELT_LANG.get(language.lower())): parts.append(f"sourcelang:{lg}") if not parts: parts.append("sourcelang:english") return " ".join(parts) # ----------------- GDELT Fetchers ----------------- def fetch_gdelt_articles( limit=50, query=None, language=None, timespan="3d", category=None, extra_tokens: Optional[List[str]] = None, start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None, ): q = _gdelt_safe_query(query, language) if extra_tokens: q = f"{q} " + " ".join(extra_tokens) url = "https://api.gdeltproject.org/api/v2/doc/doc" params = { "query": q, "mode": "ArtList", "format": "json", "sort": "DateDesc", "maxrecords": int(min(250, max(1, limit))), } if start_utc and end_utc: params["startdatetime"] = _gdelt_fmt(start_utc) params["enddatetime"] = _gdelt_fmt(end_utc) else: params["timespan"] = timespan headers = { "User-Agent": "Mozilla/5.0 (compatible; NewsGlobe/1.0; +mailto:you@yourdomain.com)", "Accept": "application/json", } def _do_request(p): r = _session_get(url, params=p, timeout=10) log.info(f"GDELT URL: {r.url} (status={r.status_code})") if r.status_code != 200: log.warning(f"GDELT HTTP {r.status_code}: {r.text[:400]}") return None try: return r.json() except Exception: ct = r.headers.get("Content-Type", "") log.warning(f"GDELT non-JSON response. CT={ct}. Body[:400]: {r.text[:400]}") return None data = _do_request(params) if data is None: p2 = {**params, "timespan": "24h", "maxrecords": min(100, params["maxrecords"])} data = _do_request(p2) if not data: return [] arts = data.get("articles") or [] results = [] for a in arts: desc = a.get("description") or a.get("content") or "" title = a.get("title") or "" if desc and ( desc.strip().lower() == title.strip().lower() or (len(desc) <= 60 and _too_similar(title, desc)) ): desc = "" desc = desc or "No description available" results.append( { "title": title, "url": a.get("url"), "source": {"name": a.get("domain") or "GDELT"}, "description": desc, "publishedAt": a.get("seendate"), "api_source": "gdelt", "gdelt_sourcecountry": a.get("sourcecountry"), "requested_category": category, } ) log.info(f"GDELT returned {len(results)}") return results def fetch_gdelt_multi( limit=120, query=None, language=None, timespan="48h", category=None, speed: Speed = Speed.balanced, start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None ): if language: primary = fetch_gdelt_articles(limit=limit, query=query, language=language, timespan=timespan, category=category, start_utc=start_utc, end_utc=end_utc) booster = fetch_gdelt_articles(limit=max(10, limit // 6), query=query, language="en", timespan=timespan, category=category, start_utc=start_utc, end_utc=end_utc) return primary + booster if speed == Speed.fast: langs = LANG_ROTATION[:3] timespan = "24h" elif speed == Speed.balanced: langs = LANG_ROTATION[:8] timespan = "48h" else: langs = LANG_ROTATION timespan = "3d" per_lang = max(8, math.ceil(limit / len(langs))) out = [] for lg in langs: out.extend(fetch_gdelt_articles(limit=per_lang, query=query, language=lg, timespan=timespan, category=category, start_utc=start_utc, end_utc=end_utc)) if speed != Speed.fast: per_cc = max(4, limit // 30) if speed == Speed.max else max(2, limit // 40) for cc in COUNTRY_SEEDS[: (8 if speed == Speed.balanced else 16)]: out.extend(fetch_gdelt_articles( limit=per_cc, query=query, language="en", timespan=timespan, category=category, extra_tokens=[f"sourcecountry:{cc}"], start_utc=start_utc, end_utc=end_utc )) return out # ----------------- Provider Flags / Keys / Logging ----------------- USE_GNEWS_API = True USE_NEWSDATA_API = True USE_GDELT_API = True USE_NEWSAPI = True NEWSAPI_KEY = os.getenv("NEWSAPI_KEY", "3b2d3fde45b84cdbb3f706dfb0110df4") GNEWS_API_KEY = os.getenv("GNEWS_API_KEY", "5419897c95e8a4b21074e0d3fe95a3dd") NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "pub_1feb49a71a844719af68d0844fb43a61") HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") logging.basicConfig( level=logging.WARNING, format="%(levelname)s:%(name)s:%(message)s", ) log = logging.getLogger("newsglobe") log.setLevel(logging.WARNING) fetch_log = logging.getLogger("newsglobe.fetch_summary") fetch_log.setLevel(logging.INFO) _fetch_handler = logging.StreamHandler() _fetch_handler.setLevel(logging.INFO) _fetch_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s")) fetch_log.addHandler(_fetch_handler) fetch_log.propagate = False for name in ("urllib3", "urllib3.connectionpool", "requests.packages.urllib3"): lg = logging.getLogger(name) lg.setLevel(logging.ERROR) lg.propagate = False def _newsapi_enabled() -> bool: if not NEWSAPI_KEY: log.warning("NewsAPI disabled: missing NEWSAPI_KEY env var") return False return True # ----------------- Clustering Helpers ----------------- def cluster_id(cluster, enriched_articles): urls = sorted([(enriched_articles[i].get("url") or "") for i in cluster["indices"] if enriched_articles[i].get("url")]) base = "|".join(urls) if urls else "empty" return hashlib.md5(base.encode("utf-8")).hexdigest()[:10] BOILER = re.compile(r"\b(live updates|breaking|what we know|in pictures|opinion)\b", re.I) def _norm_text(s: str) -> str: s = (s or "").strip() s = re.sub(r"\s+", " ", s) return s def _cluster_text(a): base = f"{a.get('orig_title') or a.get('title') or ''} {a.get('orig_description') or a.get('description') or ''}" base = BOILER.sub("", base) base = re.sub(r"\b(\d{1,2}:\d{2}\s?(AM|PM))|\b(\d{1,2}\s\w+\s\d{4})", "", base, flags=re.I) return _norm_text(base) def _canonical_url(u: str) -> str: if not u: return u p = urlparse(u) qs = [(k, v) for (k, v) in parse_qsl(p.query, keep_blank_values=False) if not k.lower().startswith(("utm_", "fbclid", "gclid"))] clean = p._replace(query="&".join([f"{k}={v}" for k, v in qs]), fragment="") path = clean.path.rstrip("/") or "/" clean = clean._replace(path=path) return urlunparse(clean) # ----------------- Normalizers / Enrichment ----------------- def normalize_newsdata_article(article): return { "title": article.get("title"), "url": article.get("link"), "source": {"name": article.get("source_id", "NewsData.io")}, "description": article.get("description") or "No description available", "publishedAt": article.get("publishedAt"), "api_source": article.get("api_source", "newsdata"), "category": ((article.get("category") or [None])[0] if isinstance(article.get("category"), list) else article.get("category")), } def enrich_article(a, language=None, translate=False, target_lang=None): source_name = (a.get("source", {}) or {}).get("name", "").strip() or "Unknown" s_lower = source_name.lower() if "newsapi" in s_lower: source_name = "NewsAPI" elif "gnews" in s_lower: source_name = "GNews" elif "newsdata" in s_lower: source_name = "NewsData.io" article_url = _canonical_url(a.get("url") or "") try: ext = _tld(article_url) domain = ".".join([p for p in (ext.domain, ext.suffix) if p]) if (ext.domain or ext.suffix) else "" except Exception: domain = "" country_guess = None if a.get("api_source") == "gdelt": sc = a.get("gdelt_sourcecountry") if sc: iso2map = { "US": "United States", "GB": "United Kingdom", "AU": "Australia", "CA": "Canada", "IN": "India", "DE": "Germany", "FR": "France", "IT": "Italy", "ES": "Spain", "BR": "Brazil", "JP": "Japan", "CN": "China", "RU": "Russia", "KR": "South Korea", "ZA": "South Africa", "NG": "Nigeria", "MX": "Mexico", "AR": "Argentina", "CL": "Chile", "CO": "Colombia", "NL": "Netherlands", "SE": "Sweden", "NO": "Norway", "DK": "Denmark", "FI": "Finland", "IE": "Ireland", "PL": "Poland", "PT": "Portugal", "GR": "Greece", "TR": "Turkey", "IL": "Israel", "SA": "Saudi Arabia", "AE": "United Arab Emirates", "SG": "Singapore", "MY": "Malaysia", "TH": "Thailand", "PH": "Philippines", "ID": "Indonesia", "NZ": "New Zealand", } country_guess = iso2map.get(str(sc).upper(), sc if len(str(sc)) > 2 else None) coords = get_country_centroid(country_guess) if country_guess else geocode_source(source_name, domain, do_network=False) title = (a.get("title") or "").strip() or "(untitled)" description = (a.get("description") or "").strip() if description.lower().startswith("no description"): description = "" cached_desc = _desc_cache_get(article_url) need_upgrade = ( (not description) or description.lower().startswith("no description") or len(description) < DESC_MIN_LEN or _too_similar(title, description) ) if need_upgrade and cached_desc: description = cached_desc if description: description = _tidy_description(title, description, source_name) if (not description) or _too_similar(title, description): description = f"Quick take: {title.rstrip('.')}." orig_title = title orig_description = description detected_lang = (detect_lang(f"{title} {description}") or "").lower() ml_text = f"{orig_title}. {orig_description}".strip() sentiment = classify_sentiment(f"{orig_title} {orig_description}") seed = f"{source_name}|{article_url}|{title}" uid = hashlib.md5(seed.encode("utf-8")).hexdigest()[:12] provided = a.get("category") if provided and normalize_category(provided) != "general": cat = normalize_category(provided) else: cat = infer_category(article_url, orig_title, orig_description, provided) return { "id": uid, "title": title, "url": article_url, "source": source_name, "lat": coords["lat"], "lon": coords["lon"], "country": coords.get("country", "Unknown"), "description": description, "sentiment": sentiment, "api_source": a.get("api_source", "unknown"), "publishedAt": a.get("publishedAt"), "_ml_text": ml_text, "orig_title": orig_title, "orig_description": orig_description, "detected_lang": detected_lang, "translated": False, "category": cat, } # ----------------- Clustering (Semantic, single-pass + merge) ----------------- def cluster_articles(articles: List[Dict[str, Any]], sim_threshold=0.6, speed: Speed = Speed.balanced): if speed == Speed.fast: articles = articles[:150] sim_threshold = max(sim_threshold, 0.64) elif speed == Speed.balanced: articles = articles[:] sim_threshold = max(sim_threshold, 0.62) texts = [_cluster_text(a) for a in articles] embs = get_sbert().encode(texts, convert_to_tensor=True, normalize_embeddings=True, show_progress_bar=False) clusters = [] centroids = [] for i, emb in enumerate(embs): best_idx, best_sim = -1, -1.0 for ci, c_emb in enumerate(centroids): sim = util.cos_sim(emb, c_emb).item() if sim > sim_threshold and sim > best_sim: best_sim, best_idx = sim, ci if best_idx >= 0: clusters[best_idx]["indices"].append(i) idxs = clusters[best_idx]["indices"] new_c = embs[idxs].mean(dim=0) new_c = new_c / new_c.norm() centroids[best_idx] = new_c clusters[best_idx]["centroid"] = new_c else: event_id = hashlib.md5(texts[i].encode("utf-8")).hexdigest()[:10] clusters.append({"id": event_id, "indices": [i], "centroid": emb}) centroids.append(emb) merged = _merge_close_clusters(clusters, embs, threshold=0.70) for c in merged: c["id"] = cluster_id(c, articles) return merged def event_payload_from_cluster(cluster, enriched_articles): idxs = cluster["indices"] arts = [enriched_articles[i] for i in idxs] title_counts = Counter([a["title"] for a in arts]) canonical_title = title_counts.most_common(1)[0][0] keywords = list({w.lower() for t in title_counts for w in t.split() if len(w) > 3})[:8] sources = {a["source"] for a in arts} countries = {a["country"] for a in arts if a["country"] and a["country"] != "Unknown"} ts = [a.get("publishedAt") for a in arts if a.get("publishedAt")] return { "event_id": cluster_id(cluster, enriched_articles), "title": canonical_title, "keywords": keywords, "article_count": len(arts), "source_count": len(sources), "country_count": len(countries), "time_range": {"min": min(ts) if ts else None, "max": max(ts) if ts else None}, "sample_urls": [a["url"] for a in arts[:3] if a.get("url")], } def aggregate_event_by_country(cluster, enriched_articles, max_samples: int | None = 5): idxs = cluster["indices"] arts = [enriched_articles[i] for i in idxs] by_country: Dict[str, Dict[str, Any]] = {} for a in arts: c = a.get("country") or "Unknown" if c not in by_country: coords = get_country_centroid(c) by_country[c] = {"country": c, "lat": coords["lat"], "lon": coords["lon"], "articles": []} by_country[c]["articles"].append(a) results = [] for c, block in by_country.items(): arr = block["articles"] to_num = {"negative": -1, "neutral": 0, "positive": 1} vals = [to_num.get(a["sentiment"], 0) for a in arr] avg = sum(vals) / max(len(vals), 1) avg_sent = "positive" if avg > 0.15 else "negative" if avg < -0.15 else "neutral" top_sources = [s for s, _ in Counter([a["source"] for a in arr]).most_common(3)] summary = " • ".join([a["title"] for a in arr[:2]]) use = arr if (max_samples in (None, 0) or max_samples < 0) else arr[:max_samples] results.append( { "country": c, "lat": block["lat"], "lon": block["lon"], "count": len(arr), "avg_sentiment": avg_sent, "top_sources": top_sources, "summary": summary, "samples": [ { "title": a["title"], "orig_title": a.get("orig_title"), "orig_description": a.get("orig_description"), "url": a["url"], "source": a["source"], "sentiment": a["sentiment"], "detected_lang": a.get("detected_lang"), } # for a in arr[:5] for a in use ], } ) return results def _merge_close_clusters(clusters, embs, threshold=0.68): merged = [] used = set() for i in range(len(clusters)): if i in used: continue base = clusters[i] group = [i] for j in range(i + 1, len(clusters)): if j in used: continue sim = util.cos_sim(base["centroid"], clusters[j]["centroid"]).item() if sim >= threshold: group.append(j) all_idx = [] cents = [] for g in group: used.add(g) all_idx.extend(clusters[g]["indices"]) cents.append(clusters[g]["centroid"]) newc = torch.stack(cents, dim=0).mean(dim=0) newc = newc / newc.norm() merged.append({"indices": sorted(set(all_idx)), "centroid": newc}) return merged # ----------------- Event Cache / Keys ----------------- CACHE_TTL_SECS = 900 SIM_THRESHOLD = 0.6 _events_cache: Dict[Tuple, Dict[str, Any]] = {} # -------- Date parsing helpers (Option B) -------- ISO_BASIC_RE = re.compile(r'^(\d{4})(\d{2})(\d{2})(?:[T ]?(\d{2})(\d{2})(\d{2}))?(Z|[+-]\d{2}:?\d{2})?$') def _parse_user_dt(s: Optional[str], which: str) -> Optional[datetime]: """Parse query 'start'/'end' into UTC-aware datetimes.""" if not s: return None s = s.strip() try: # Normalize Z if s.endswith("Z"): s = s[:-1] + "+00:00" # Date-only if re.match(r'^\d{4}-\d{2}-\d{2}$', s): s = s + ("T00:00:00+00:00" if which == "start" else "T23:59:59+00:00") dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: m = ISO_BASIC_RE.match(s) if m: yyyy, MM, dd, hh, mm, ss, tz = m.groups() hh = hh or ("00" if which == "start" else "23") mm = mm or ("00" if which == "start" else "59") ss = ss or ("00" if which == "start" else "59") return datetime(int(yyyy), int(MM), int(dd), int(hh), int(mm), int(ss), tzinfo=timezone.utc) return None def _gdelt_fmt(dt: datetime) -> str: return dt.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") def _parse_any_pubdate(s: Optional[str]) -> Optional[datetime]: """Best-effort parse of provider publishedAt strings to UTC.""" if not s: return None try: t = s.strip() if t.endswith("Z"): t = t[:-1] + "+00:00" return datetime.fromisoformat(t).astimezone(timezone.utc) except Exception: m = ISO_BASIC_RE.match(s) if m: yyyy, MM, dd, hh, mm, ss, tz = m.groups() hh = hh or "00"; mm = mm or "00"; ss = ss or "00" return datetime(int(yyyy), int(MM), int(dd), int(hh), int(mm), int(ss), tzinfo=timezone.utc) return None def cache_key_for( q, category, language, limit_each, translate=False, target_lang=None, start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None, speed: Speed = Speed.balanced ): return ( q or "", category or "", language or "", int(limit_each or 50), bool(translate), (target_lang or "").lower(), (start_utc and _gdelt_fmt(start_utc)) or "", (end_utc and _gdelt_fmt(end_utc)) or "", speed.value, ) _first_real_build = True def get_or_build_events_cache( q, category, language, translate, target_lang, limit_each, start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None, speed: Speed = Speed.balanced ): global _first_real_build key = cache_key_for(q, category, language, limit_each, translate, target_lang, start_utc, end_utc, speed) now = monotonic() if speed == Speed.fast: use_timespan, use_limit = "24h", min(limit_each, 20) elif speed == Speed.balanced: use_timespan, use_limit = "48h", min(limit_each, 100) else: use_timespan, use_limit = "3d", limit_each entry = _events_cache.get(key) if entry and now - entry["t"] < CACHE_TTL_SECS: log.info(f"CACHE HIT for {key}") return key, entry["enriched"], entry["clusters"] lock = _get_inflight_lock(key) with lock: entry = _events_cache.get(key) if entry and now - entry["t"] < CACHE_TTL_SECS: log.info(f"CACHE HIT (post-lock) for {key}") return key, entry["enriched"], entry["clusters"] if _first_real_build and not (start_utc and end_utc): use_timespan = "24h" if use_timespan != "24h" else use_timespan use_limit = min(use_limit, 100) log.info(f"CACHE MISS for {key} — fetching (timespan={use_timespan}, limit_each={use_limit}, start={start_utc}, end={end_utc})") raw = combine_raw_articles( category=category, query=q, language=language, limit_each=use_limit, timespan=use_timespan, speed=speed, start_utc=start_utc, end_utc=end_utc, ) prefetch_descriptions_async(raw, speed) enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] if category: cat_norm = (category or "").strip().lower() enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] else: enriched = enriched_all clusters = cluster_articles(enriched, sim_threshold=SIM_THRESHOLD, speed=speed) _events_cache[key] = {"t": monotonic(), "enriched": enriched, "clusters": clusters} _first_real_build = False return key, enriched, clusters # ----------------- Language Rotation / Seeds ----------------- LANG_ROTATION = ["en", "es", "fr", "de", "ar", "ru", "pt", "zh", "hi", "ja", "ko"] COUNTRY_SEEDS = ["US", "GB", "IN", "CA", "AU", "ZA", "SG", "NG", "DE", "FR", "BR", "MX", "ES", "RU", "JP", "KR", "CN"] # ----------------- Other Providers (NewsData/GNews/NewsAPI) ----------------- def fetch_newsdata_articles(category=None, limit=20, query=None, language=None): base_url = "https://newsdata.io/api/1/news" allowed = [ "business", "entertainment", "environment", "food", "health", "politics", "science", "sports", "technology", "top", "world", ] params = {"apikey": NEWSDATA_API_KEY, "language": (language or "en")} if category and category in allowed: params["category"] = category if query: params["q"] = query all_articles, next_page = [], None while len(all_articles) < limit: if next_page: params["page"] = next_page resp = _session_get(base_url, params=params, timeout=12) if resp.status_code != 200: break data = resp.json() articles = data.get("results", []) for a in articles: a["api_source"] = "newsdata" all_articles.extend(articles) next_page = data.get("nextPage") if not next_page: break for a in all_articles: a["publishedAt"] = a.get("pubDate") return all_articles[:limit] def fetch_gnews_articles(limit=20, query=None, language=None): url = f"https://gnews.io/api/v4/top-headlines?lang={(language or 'en')}&max={limit}&token={GNEWS_API_KEY}" if query: url += f"&q={requests.utils.quote(query)}" try: r = _session_get(url, timeout=12) if r.status_code != 200: return [] arts = r.json().get("articles", []) for a in arts: a["api_source"] = "gnews" return arts except Exception: return [] NEWSAPI_COUNTRIES = ["us", "gb", "ca", "au", "in", "za", "sg", "ie", "nz"] def fetch_newsapi_headlines_multi(limit=50, language=None): if not _newsapi_enabled(): return [] all_ = [] per = max(1, math.ceil(limit / max(1, len(NEWSAPI_COUNTRIES)))) per = min(per, 100) for c in NEWSAPI_COUNTRIES: url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per}&apiKey={NEWSAPI_KEY}" r = _session_get(url, timeout=12) if r.status_code != 200: log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") continue arts = r.json().get("articles", []) for a in arts: a["api_source"] = "newsapi" all_.extend(arts) time.sleep(0.2) return all_[:limit] def fetch_newsapi_articles( category=None, limit=20, query=None, language=None, start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None, ): if not _newsapi_enabled(): return [] if query: url = f"https://newsapi.org/v2/everything?pageSize={limit}&apiKey={NEWSAPI_KEY}&q={requests.utils.quote(query)}" if language: url += f"&language={language}" # NEW: date range for /everything if start_utc: url += f"&from={start_utc.date().isoformat()}" if end_utc: url += f"&to={end_utc.date().isoformat()}" try: r = _session_get(url, timeout=12) if r.status_code != 200: log.warning(f"NewsAPI /everything HTTP {r.status_code}: {r.text[:200]}") return [] arts = r.json().get("articles", []) for a in arts: a["api_source"] = "newsapi" return arts[:limit] except Exception as e: log.warning(f"NewsAPI /everything request failed: {e}") return [] results = [] per_country = max(5, limit // len(NEWSAPI_COUNTRIES)) for c in NEWSAPI_COUNTRIES: url = f"https://newsapi.org/v2/top-headlines?country={c}&pageSize={per_country}&apiKey={NEWSAPI_KEY}" if category: url += f"&category={category}" try: r = _session_get(url, timeout=12) if r.status_code != 200: log.warning(f"NewsAPI top-headlines {c} -> HTTP {r.status_code}: {r.text[:200]}") continue arts = r.json().get("articles", []) for a in arts: a["api_source"] = "newsapi" results.extend(arts) except Exception as e: log.warning(f"NewsAPI top-headlines {c} failed: {e}") time.sleep(0.2) return results[:limit] # ----------------- Provider Combiner / Dedup ----------------- def combine_raw_articles(category=None, query=None, language=None, limit_each=30, timespan="3d", speed=Speed.balanced, log_summary: bool = True, start_utc: Optional[datetime] = None, end_utc: Optional[datetime] = None): if speed == Speed.fast: timespan = "24h" limit_each = min(limit_each, 20) elif speed == Speed.balanced: timespan = "48h" limit_each = min(limit_each, 100) a1 = [] if USE_NEWSAPI: if not query: a1 = fetch_newsapi_headlines_multi(limit=limit_each, language=language) else: a1 = fetch_newsapi_articles(category=category, limit=limit_each, query=query, language=language, start_utc=start_utc, end_utc=end_utc) a2 = [] if USE_NEWSDATA_API: a2 = [ normalize_newsdata_article(a) for a in fetch_newsdata_articles(category=category, limit=limit_each, query=query, language=language) if a.get("link") ] a3 = fetch_gnews_articles(limit=limit_each, query=query, language=language) if USE_GNEWS_API else [] a4 = fetch_gdelt_multi( limit=limit_each, query=query, language=language, timespan=timespan, category=category, speed=speed, start_utc=start_utc, end_utc=end_utc ) if USE_GDELT_API else [] seen, merged = set(), [] for a in a1 + a3 + a2 + a4: if a.get("url"): a["url"] = _canonical_url(a["url"]) url = a["url"] if url not in seen: seen.add(url) merged.append(a) #Apply date filter locally (for providers that can’t filter server-side) if start_utc or end_utc: s_ts = start_utc.timestamp() if start_utc else None e_ts = end_utc.timestamp() if end_utc else None def _in_range(row): dt = _parse_any_pubdate(row.get("publishedAt") or "") if not dt: return False t = dt.timestamp() if s_ts and t < s_ts: return False if e_ts and t > e_ts: return False return True merged = [a for a in merged if _in_range(a)] if log_summary: fetch_log.info("----- Article Fetch Summary -----") fetch_log.info(f"📊 NewsAPI returned: {len(a1)} articles") fetch_log.info(f"📊 NewsData.io returned: {len(a2)} articles") fetch_log.info(f"📊 GNews returned: {len(a3)} articles") fetch_log.info(f"📊 GDELT returned: {len(a4)} articles") fetch_log.info(f"✅ Total merged articles after deduplication: {len(merged)}") fetch_log.info("---------------------------------") return merged # ----------------- API: /events ----------------- @app.get("/events") def get_events( q: Optional[str] = Query(None), category: Optional[str] = Query(None), language: Optional[str] = Query(None), translate: Optional[bool] = Query(False), target_lang: Optional[str] = Query(None), limit_each: int = Query(150, ge=5, le=250), max_events: int = Query(15, ge=5, le=50), min_countries: int = Query(2, ge=1, le=50), min_articles: int = Query(2, ge=1, le=200), speed: Speed = Query(Speed.balanced), start: Optional[str] = Query(None), end: Optional[str] = Query(None), ): start_dt = _parse_user_dt(start, "start") end_dt = _parse_user_dt(end, "end") if start_dt and end_dt and start_dt > end_dt: start_dt, end_dt = end_dt, start_dt # swap cache_key, enriched, clusters = get_or_build_events_cache( q, category, language, False, None, limit_each, start_utc=start_dt, end_utc=end_dt, speed=speed ) view = enriched if translate and target_lang: view = [dict(i) for i in enriched] for i in view: src_hint = i.get("detected_lang") i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) i["translated"] = True events = [event_payload_from_cluster(c, view) for c in clusters] events = [e for e in events if (e["country_count"] >= min_countries and e["article_count"] >= min_articles)] events.sort(key=lambda e: e["article_count"], reverse=True) return {"events": events[:max_events], "cache_key": "|".join(map(str, cache_key))} # ----------------- API: /event/{event_id} ----------------- @app.get("/event/{event_id}") def get_event_details( event_id: str, cache_key: Optional[str] = Query(None), q: Optional[str] = Query(None), category: Optional[str] = Query(None), language: Optional[str] = Query(None), translate: Optional[bool] = Query(False), target_lang: Optional[str] = Query(None), limit_each: int = Query(150, ge=5, le=250), max_samples: int = Query(5, ge=0, le=1000), start: Optional[str] = Query(None), end: Optional[str] = Query(None), ): start_dt = _parse_user_dt(start, "start") end_dt = _parse_user_dt(end, "end") if cache_key: parts = cache_key.split("|") if len(parts) == 9: speed_str = parts[8] try: speed_obj = Speed(speed_str) except ValueError: speed_obj = Speed.balanced key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), parts[4] == "True", parts[5].lower(), parts[6], parts[7], speed_str) elif len(parts) == 7: # backwards compat speed_str = parts[6] try: speed_obj = Speed(speed_str) except ValueError: speed_obj = Speed.balanced key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), parts[4] == "True", parts[5].lower(), "", "", speed_str) else: raise HTTPException(status_code=400, detail="Bad cache_key") else: speed_obj = Speed.balanced key_tuple = cache_key_for(q, category, language, limit_each, translate, target_lang, start_utc=start_dt, end_utc=end_dt, speed=speed_obj) entry = _events_cache.get(key_tuple) if not entry: _, enriched, clusters = get_or_build_events_cache( q, category, language, False, None, limit_each, start_utc=start_dt, end_utc=end_dt, speed=speed_obj ) else: enriched, clusters = entry["enriched"], entry["clusters"] eview = enriched if translate and target_lang: eview = [dict(i) for i in enriched] for i in eview: src_hint = i.get("detected_lang") i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) i["translated"] = True cluster = next((c for c in clusters if cluster_id(c, enriched) == event_id), None) if not cluster: raise HTTPException(status_code=404, detail="Event not found with current filters") payload = event_payload_from_cluster(cluster, eview) countries = aggregate_event_by_country(cluster, eview, max_samples=max_samples) payload["articles_in_event"] = sum(c["count"] for c in countries) return {"event": payload, "countries": countries} # ----------------- API: /news ----------------- @app.get("/news") def get_news( cache_key: Optional[str] = Query(None), category: Optional[str] = Query(None), sentiment: Optional[str] = Query(None), q: Optional[str] = Query(None), language: Optional[str] = Query(None), translate: Optional[bool] = Query(False), target_lang: Optional[str] = Query(None), limit_each: int = Query(100, ge=5, le=100), lite: bool = Query(True), speed: Speed = Query(Speed.balanced), page: int = Query(1, ge=1), page_size: int = Query(120, ge=5, le=300), start: Optional[str] = Query(None), end: Optional[str] = Query(None), ): start_dt = _parse_user_dt(start, "start") end_dt = _parse_user_dt(end, "end") enriched: List[Dict[str, Any]] = [] if cache_key: parts = cache_key.split("|") if len(parts) == 9: key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), parts[4] == "True", parts[5].lower(), parts[6], parts[7], parts[8]) entry = _events_cache.get(key_tuple) if entry: enriched = entry["enriched"] elif len(parts) == 7: # backwards compat key_tuple = (parts[0], parts[1], parts[2], int(parts[3]), parts[4] == "True", parts[5].lower(), "", "", parts[6]) entry = _events_cache.get(key_tuple) if entry: enriched = entry["enriched"] if not enriched: raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each, speed=speed, start_utc=start_dt, end_utc=end_dt) prefetch_descriptions_async(raw, speed) enriched_all = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] if category: cat_norm = (category or "").strip().lower() enriched = [e for e in enriched_all if (e.get("category") or "").lower() == cat_norm] else: enriched = enriched_all else: if category: cat_norm = (category or "").strip().lower() enriched = [e for e in enriched if (e.get("category") or "").lower() == cat_norm] if translate and target_lang: enriched = [dict(i) for i in enriched] for i in enriched: i["original_title"] = i.get("orig_title") or i.get("title") i["original_description"] = i.get("orig_description") or i.get("description") src_hint = i.get("detected_lang") i["title"] = translate_text(i.get("title") or "", target_lang, fallback_src=src_hint) i["description"] = translate_text(i.get("description") or "", target_lang, fallback_src=src_hint) i["translated"] = True i["translated_from"] = (src_hint or "").lower() i["translated_to"] = target_lang.lower() if sentiment: s = sentiment.strip().lower() enriched = [i for i in enriched if i.get("sentiment", "").lower() == s] total = len(enriched) offset = (page - 1) * page_size end_idx = offset + page_size items = [dict(i) for i in enriched[offset:end_idx]] if lite: drop = {"_ml_text"} for i in items: for k in drop: i.pop(k, None) return { "items": items, "total": total, "page": page, "page_size": page_size } # ----------------- API: /related ----------------- @app.get("/related") def related_articles( id: Optional[str] = Query(None, description="article id from /news"), title: Optional[str] = Query(None), description: Optional[str] = Query(None), q: Optional[str] = Query(None), category: Optional[str] = Query(None), language: Optional[str] = Query(None), limit_each: int = Query(50, ge=5, le=100), k: int = Query(10, ge=1, le=50), ): raw = combine_raw_articles(category=category, query=q, language=language, limit_each=limit_each) enriched = [enrich_article(a, language=language, translate=False, target_lang=None) for a in raw] if not enriched: return {"items": []} if id: base = next((a for a in enriched if a.get("id") == id), None) if not base: raise HTTPException(404, "article id not found in current fetch") query_text = base["_ml_text"] else: text = f"{title or ''} {description or ''}".strip() if not text: raise HTTPException(400, "provide either id or title/description") query_text = text corpus_texts = [a["_ml_text"] for a in enriched] corpus_embs = _embed_texts(corpus_texts) query_emb = _embed_texts([query_text])[0] sims = util.cos_sim(query_emb, corpus_embs).cpu().numpy().flatten() idxs = sims.argsort()[::-1] items = [] for idx in idxs: a = enriched[idx] if id and a["id"] == id: continue items.append({**a, "similarity": float(sims[idx])}) if len(items) >= k: break return {"items": items} # ----------------- Middleware: Request Timing ----------------- @app.middleware("http") async def timing_middleware(request, call_next): start = time.perf_counter() response = None try: response = await call_next(request) return response finally: dur_ms = (time.perf_counter() - start) * 1000 if response is not None: try: response.headers["X-Process-Time-ms"] = f"{dur_ms:.1f}" except Exception: pass # ----------------- Misc: Client Metrics ----------------- @app.post("/client-metric") def client_metric(payload: Dict[str, Any] = Body(...)): name = (payload.get("name") or "").strip() if name in {"Load all article markers on globe", "Load event country markers on globe"}: return {"ok": True} return {"ok": True} # ----------------- Diagnostics: Translation Health ----------------- @app.get("/diag/translate") def diag_translate( src: str = Query("pt"), tgt: str = Query("en"), text: str = Query("Olá mundo") ): # Try each path explicitly (same order your runtime uses) libre = _translate_via_libre(text, src, tgt) remote = None local = None opus_id = opus_model_for(src, tgt) if opus_id: remote = _hf_call(opus_id, {"inputs": text}) local = _translate_local(text, src, tgt) # Optional: try primary NLLB if configured nllb = None if HF_MODEL_PRIMARY and (src in NLLB_CODES) and (tgt in NLLB_CODES): nllb = _hf_call( HF_MODEL_PRIMARY, { "inputs": text, "parameters": {"src_lang": NLLB_CODES[src], "tgt_lang": NLLB_CODES[tgt]}, "options": {"wait_for_model": True}, }, ) sample_out = libre or remote or local or nllb out_lang = detect_lang(sample_out or "") or None return { "src": src, "tgt": tgt, "text": text, "libre_url": LIBRETRANSLATE_URL, "token_present": bool(HUGGINGFACE_API_TOKEN), "libre_ok": bool(libre), "remote_ok": bool(remote), "local_ok": bool(local), "nllb_ok": bool(nllb), "sample_out": sample_out, "sample_out_lang_detected": out_lang, "lang_match": (out_lang == tgt) }