TTOPM commited on
Commit
9a706fd
·
verified ·
1 Parent(s): ed72f3c

Create utils.py

Browse files
Files changed (1) hide show
  1. belel-shield/dashboard/utils.py +95 -0
belel-shield/dashboard/utils.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json, os, time, hashlib, psutil
2
+ from pathlib import Path
3
+ from datetime import datetime
4
+ from dateutil import parser as dateparser
5
+
6
+ HOME = Path.home()
7
+ DATA_DIR = HOME / ".belel"
8
+ ALERTS_JSON = DATA_DIR / "gideon_alerts.json"
9
+ ALERTS_JSONL = DATA_DIR / "gideon_alerts.jsonl"
10
+ BLOCKLIST_CACHE = DATA_DIR / "belel-blocklist.json"
11
+ CHECKSUM_FILE = DATA_DIR / "belel-blocklist.checksums.json"
12
+ GEOLITE_DB = DATA_DIR / "GeoLite2-City.mmdb"
13
+
14
+ def sha256_file(p: Path) -> str:
15
+ h = hashlib.sha256()
16
+ with open(p, "rb") as f:
17
+ for chunk in iter(lambda: f.read(8192), b""):
18
+ h.update(chunk)
19
+ return h.hexdigest()
20
+
21
+ def load_alerts():
22
+ # Supports array JSON or JSON Lines
23
+ if ALERTS_JSONL.exists():
24
+ rows = []
25
+ with open(ALERTS_JSONL, "r") as f:
26
+ for line in f:
27
+ line=line.strip()
28
+ if not line: continue
29
+ try:
30
+ rows.append(json.loads(line))
31
+ except: pass
32
+ return rows
33
+ if ALERTS_JSON.exists():
34
+ try:
35
+ return json.loads(ALERTS_JSON.read_text())
36
+ except: return []
37
+ return []
38
+
39
+ def normalize_alerts(rows):
40
+ out=[]
41
+ for r in rows:
42
+ ts = r.get("timestamp") or r.get("ts")
43
+ try: dt = dateparser.parse(ts) if ts else None
44
+ except: dt = None
45
+ out.append({
46
+ "timestamp": ts,
47
+ "dt": dt,
48
+ "ip": r.get("ip"),
49
+ "port": r.get("port"),
50
+ "host": r.get("host"),
51
+ "reason": r.get("reason") or r.get("alert"),
52
+ "alert": r.get("alert")
53
+ })
54
+ return out
55
+
56
+ def load_blocklist_status():
57
+ status = {"present": BLOCKLIST_CACHE.exists(), "checksum_ok": None, "expected": None, "actual": None}
58
+ if not BLOCKLIST_CACHE.exists() or not CHECKSUM_FILE.exists():
59
+ return status
60
+ try:
61
+ meta = json.loads(CHECKSUM_FILE.read_text())
62
+ status["expected"] = meta.get("hash")
63
+ actual = sha256_file(BLOCKLIST_CACHE)
64
+ status["actual"] = actual
65
+ status["checksum_ok"] = (status["expected"] or "").lower() == actual.lower()
66
+ return status
67
+ except:
68
+ return status
69
+
70
+ def firewall_present():
71
+ # quick check for iptables on Linux
72
+ return (os.name=="posix" and (Path("/sbin/iptables").exists() or Path("/usr/sbin/iptables").exists()))
73
+
74
+ def process_running(name_contains="gideon_scanner"):
75
+ for p in psutil.process_iter(attrs=["name","cmdline"]):
76
+ try:
77
+ cmd = " ".join(p.info.get("cmdline") or [])
78
+ if name_contains in cmd:
79
+ return True
80
+ except: pass
81
+ return False
82
+
83
+ def geoip_lookup(ip, reader):
84
+ # reader: geoip2.database.Reader, may be None
85
+ if not reader or not ip: return None
86
+ try:
87
+ r = reader.city(ip)
88
+ return {
89
+ "lat": r.location.latitude,
90
+ "lon": r.location.longitude,
91
+ "city": r.city.name,
92
+ "country": r.country.iso_code
93
+ }
94
+ except:
95
+ return None