Maaroufabousaleh
f
c49b21b
"""
crawl_news.py – Crawls a list of RSS feeds, grabs full-text when needed,
merges with any existing Parquet in Filebase S3 and uploads the fresh file.
✱ 2025-07-11 – switched backend to Filebase S3
β€’ Uses boto3 pointed at Filebase's S3-compatible endpoint
β€’ No local caching of seen URLs: state lives in S3 under seen_urls.txt
Requirements:
β€’ FILEBASE_ENDPOINT env var, e.g. https://s3.filebase.com
β€’ FILEBASE_ACCESS_KEY and FILEBASE_SECRET_KEY env vars
β€’ FILEBASE_BUCKET env var with your bucket name
"""
import os
import sys
import asyncio
import tempfile
from datetime import datetime
from io import BytesIO
from pathlib import Path
from dotenv import load_dotenv
import feedparser
import trafilatura
import pandas as pd
import rich.console
from crawl4ai import AsyncWebCrawler
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from data_cloud.cloud_utils import StorageHandler
# ─── Configuration ────────────────────────────────────────────────────────────
load_dotenv()
FEED_URLS = [
"https://www.marketwatch.com/rss/topstories",
"https://thedefiant.io/feed/",
"https://www.coindesk.com/arc/outboundfeeds/rss/?outputType=xml",
"https://cointelegraph.com/rss",
"https://cryptopotato.com/feed/",
"https://cryptoslate.com/feed/",
"https://cryptonews.com/news/feed/",
"https://smartliquidity.info/feed/",
"https://www.cnbc.com/id/10000664/device/rss/rss.html",
"https://time.com/nextadvisor/feed/",
]
MAX_AGE_DAYS = 1
MIN_SUMMARY_LEN = 200
MIN_CRAWL_LEN = 100
CRAWL_CONCURRENCY = 4
S3_NEWS_PATH = "news/crawled_news/news-latest.parquet"
S3_SEEN_PATH = "news/crawled_news/seen_urls.txt"
# Filebase S3 creds & endpoint ------------------------------------------------
FILEBASE_ENDPOINT = os.getenv("FILEBASE_ENDPOINT")
FILEBASE_ACCESS_KEY = os.getenv("FILEBASE_ACCESS_KEY")
FILEBASE_SECRET_KEY = os.getenv("FILEBASE_SECRET_KEY")
FILEBASE_BUCKET = os.getenv("FILEBASE_BUCKET")
if not (FILEBASE_ENDPOINT and FILEBASE_ACCESS_KEY and FILEBASE_SECRET_KEY and FILEBASE_BUCKET):
print("[ERROR] FILEBASE_ENDPOINT, FILEBASE_ACCESS_KEY, FILEBASE_SECRET_KEY, and FILEBASE_BUCKET must be set")
sys.exit(1)
# Silence logs ----------------------------------------------------------------
rich.console.Console.print = lambda *a, **k: None
os.environ.update({
"RICH_NO_COLOR": "1",
"RICH_DISABLE": "1",
"CRAWL4AI_LOG_LEVEL": "CRITICAL",
})
# ─── Main routine ─────────────────────────────────────────────────────────────
async def main() -> None:
# Setup storage handler
storage = StorageHandler(
endpoint_url=FILEBASE_ENDPOINT,
access_key=FILEBASE_ACCESS_KEY,
secret_key=FILEBASE_SECRET_KEY,
bucket_name=FILEBASE_BUCKET,
local_base="data"
)
# Load seen-URL cache from S3 only, do not fallback to local or create locally
seen_urls: set[str] = set()
try:
seen_data = storage.s3.get_object(Bucket=storage.bucket, Key=S3_SEEN_PATH)['Body'].read()
text = seen_data.decode()
seen_urls = {line.strip() for line in text.splitlines() if line.strip()}
print(f"[INFO] Loaded {len(seen_urls)} seen URLs from S3")
except Exception:
print(f"[INFO] No seen URLs found in S3. Treating as empty.")
seen_urls = set()
# Fetch & parse RSS feeds -------------------------------------------------
to_crawl, immediate = [], []
now_utc = datetime.utcnow()
for url in FEED_URLS:
feed = feedparser.parse(url)
new_count = 0
for e in feed.entries:
ts = e.get("published_parsed") or e.get("updated_parsed")
if not ts:
continue
link = e.link
if link in seen_urls:
continue
new_count += 1
content = e.get("content")
if content:
txt = "".join(p.value for p in content).strip()
if len(txt) >= MIN_CRAWL_LEN:
immediate.append({"url": link, "text": txt, "timestamp": now_utc.isoformat()})
seen_urls.add(link)
continue
summ = e.get("summary", "").strip()
if len(summ) >= MIN_SUMMARY_LEN:
immediate.append({"url": link, "text": summ, "timestamp": now_utc.isoformat()})
seen_urls.add(link)
else:
to_crawl.append(link)
print(f"β€’ Feed {url} -> {new_count} new items")
# Selective crawl for short summaries ------------------------------------
crawled = []
if to_crawl:
print(f"[INFO] Crawling {len(to_crawl)} pages…")
async with AsyncWebCrawler(
seeds=to_crawl,
max_pages=len(to_crawl),
concurrency=CRAWL_CONCURRENCY,
obey_robots_txt=True,
) as crawler:
pages = await asyncio.gather(*(crawler.arun(u) for u in to_crawl))
for sub in pages:
for page in sub:
if page.url not in seen_urls:
txt = trafilatura.extract(page.html, favor_recall=True)
if txt and len(txt.strip()) >= MIN_CRAWL_LEN:
crawled.append({"url": page.url, "text": txt.strip(), "timestamp": now_utc.isoformat()})
seen_urls.add(page.url)
# Merge, filter & dedupe --------------------------------------------------
new_results = immediate + crawled
if not new_results:
print("[WARNING] No new articles to process")
return
df_new = pd.DataFrame(new_results)
df_new["timestamp"] = pd.to_datetime(df_new["timestamp"], utc=True)
# Load existing Parquet (cloud or local)
df_old = pd.DataFrame()
try:
parquet_bytes = storage.download(S3_NEWS_PATH)
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
tmp.write(parquet_bytes)
tmp_path = tmp.name
df_old = pd.read_parquet(tmp_path)
os.remove(tmp_path)
print(f"[INFO] Loaded {len(df_old)} existing articles from {storage.get_last_mode()}")
except Exception:
print(f"[INFO] No existing Parquet found in cloud or local storage.")
df = pd.concat([df_old, df_new], ignore_index=True)
cutoff = pd.Timestamp.utcnow() - pd.Timedelta(days=MAX_AGE_DAYS)
df = df[df.timestamp >= cutoff]
df = df.sort_values("timestamp").drop_duplicates("url", keep="last")
print(f"[DEBUG] old rows: {len(df_old)}, new rows: {len(df_new)}, merged: {len(df)}")
# Upload updated Parquet to S3 only
parquet_buf = BytesIO()
df.to_parquet(parquet_buf, index=False)
data = parquet_buf.getvalue()
if not data:
raise RuntimeError("Refusing to upload empty Parquet")
storage.s3.put_object(Bucket=storage.bucket, Key=S3_NEWS_PATH, Body=data, ContentType="application/octet-stream")
print(f"[OK] Parquet updated: S3:{S3_NEWS_PATH}")
# Persist seen URLs to S3 only
seen_body = "\n".join(sorted(seen_urls)) + "\n"
storage.s3.put_object(Bucket=storage.bucket, Key=S3_SEEN_PATH, Body=seen_body.encode(), ContentType="text/plain")
print(f"[OK] Seen URLs updated: S3:{S3_SEEN_PATH}")
# Upload all files in data/crawled-news to S3 under news/ (no local fallback)
local_news_dir = os.path.join("data", "crawled-news")
s3_news_prefix = "news/crawled_news/"
for root, _, files in os.walk(local_news_dir):
for fname in files:
local_path = os.path.join(root, fname)
rel_path = os.path.relpath(local_path, local_news_dir)
s3_key = s3_news_prefix + rel_path.replace("\\", "/")
with open(local_path, "rb") as f:
file_bytes = f.read()
storage.s3.put_object(Bucket=storage.bucket, Key=s3_key, Body=file_bytes, ContentType="application/octet-stream")
print(f"[OK] Uploaded {local_path} -> S3:{s3_key}")
if __name__ == "__main__":
asyncio.run(main())