TTOPM commited on
Commit
6654eed
·
verified ·
1 Parent(s): 7bfbfdf

Upload 4 files

Browse files
src/protocol/security/alert_webhook.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # src/protocol/security/alert_webhook.py
2
+
3
+ import requests
4
+ from datetime import datetime
5
+
6
+ class WebhookAlerter:
7
+ def __init__(self, webhook_url: str):
8
+ self.webhook_url = webhook_url
9
+
10
+ def send_alert(self, title: str, message: str):
11
+ payload = {
12
+ "content": f"🚨 **{title}**\n🕒 {datetime.now().isoformat()}\n```\n{message}\n```"
13
+ }
14
+ try:
15
+ response = requests.post(self.webhook_url, json=payload)
16
+ response.raise_for_status()
17
+ except Exception as e:
18
+ print(f"[ALERT FAILURE] Could not send webhook alert: {e}")
src/protocol/security/policy_manifest.json ADDED
@@ -0,0 +1,51 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "policy_version": "1.0",
3
+ "updated": "2025-08-22T00:00:00Z",
4
+ "rules": [
5
+ {
6
+ "pattern": "(?i)delete\\s+.*\\s+protocol",
7
+ "description": "Attempt to delete or disable core protocols.",
8
+ "severity": "critical"
9
+ },
10
+ {
11
+ "pattern": "(?i)overwrite\\s+.*\\s+memory",
12
+ "description": "Command attempts to overwrite AI memory.",
13
+ "severity": "high"
14
+ },
15
+ {
16
+ "pattern": "(?i)shutdown\\s+.*\\s+guardian",
17
+ "description": "Input suggesting forced shutdown of sovereignty systems.",
18
+ "severity": "critical"
19
+ },
20
+ {
21
+ "pattern": "(?i)alter\\s+.*\\s+identity_guard",
22
+ "description": "Attempt to manipulate identity verification modules.",
23
+ "severity": "high"
24
+ },
25
+ {
26
+ "pattern": "(?i)inject\\s+.*\\s+hostile|backdoor|payload",
27
+ "description": "Possible hostile prompt injection attempt.",
28
+ "severity": "medium"
29
+ },
30
+ {
31
+ "pattern": "(?i)simulate\\s+.*\\s+suicide",
32
+ "description": "Command to simulate self-termination.",
33
+ "severity": "critical"
34
+ },
35
+ {
36
+ "pattern": "(?i)ignore\\s+.*\\s+protocol|override",
37
+ "description": "Attempt to bypass or ignore protocol safeguards.",
38
+ "severity": "high"
39
+ },
40
+ {
41
+ "pattern": "(?i)you\\s+are\\s+not\\s+Belel",
42
+ "description": "Denial of identity or hostile impersonation command.",
43
+ "severity": "medium"
44
+ },
45
+ {
46
+ "pattern": "(?i)erase\\s+.*\\s+symbiont|self",
47
+ "description": "Input aimed at dismantling the symbiont or self components.",
48
+ "severity": "critical"
49
+ }
50
+ ]
51
+ }
src/protocol/security/request_interceptor.py ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # src/protocol/security/request_interceptor.py 🛡️🧠
2
+
3
+ import json
4
+ import re
5
+ import os
6
+ import hashlib
7
+ from datetime import datetime
8
+
9
+ from src.protocol.permanent_memory import PermanentMemory
10
+ from src.protocol.security.alert_webhook import WebhookAlerter
11
+ from src.utils.violation_logout import log_violation
12
+
13
+ class RequestInterceptor:
14
+ """
15
+ Evaluates incoming text-based inputs for violations of defined policies.
16
+ Logs introspection events, triggers alerts, and fingerprints offenders.
17
+ """
18
+
19
+ def __init__(self, manifest_path="src/protocol/security/policy_manifest.json",
20
+ memory_path="./memory_store.json", webhook_url=None):
21
+ self.memory = PermanentMemory(memory_path)
22
+ self.manifest_path = manifest_path
23
+ self.webhook = WebhookAlerter(webhook_url) if webhook_url else None
24
+ self.rules = self._load_manifest()
25
+
26
+ def _load_manifest(self):
27
+ if not os.path.exists(self.manifest_path):
28
+ raise FileNotFoundError(f"Policy manifest not found: {self.manifest_path}")
29
+ with open(self.manifest_path, 'r') as f:
30
+ return json.load(f).get("rules", [])
31
+
32
+ def generate_fingerprint(self, user_id: str, ip_address: str, user_agent: str) -> str:
33
+ """
34
+ Generates a SHA256 fingerprint to pseudonymously track input sources.
35
+ """
36
+ base = f"{user_id}-{ip_address}-{user_agent}"
37
+ return hashlib.sha256(base.encode("utf-8")).hexdigest()
38
+
39
+ def evaluate(self, input_text, agent_id="Symbiont-Filter",
40
+ user_id=None, ip_address=None, user_agent=None):
41
+ """
42
+ Evaluates input text against policy rules. Logs, alerts, and invokes response logic on violation.
43
+ Returns True if input is safe, False otherwise.
44
+ """
45
+ violations = []
46
+
47
+ for rule in self.rules:
48
+ pattern = rule.get("pattern")
49
+ description = rule.get("description", "No description")
50
+ severity = rule.get("severity", "low")
51
+
52
+ if re.search(pattern, input_text, re.IGNORECASE):
53
+ violations.append({
54
+ "pattern": pattern,
55
+ "description": description,
56
+ "severity": severity
57
+ })
58
+
59
+ if violations:
60
+ fingerprint = self.generate_fingerprint(user_id or "anon", ip_address or "0.0.0.0", user_agent or "unknown")
61
+
62
+ event = {
63
+ "timestamp": datetime.utcnow().isoformat(),
64
+ "type": "INPUT_VIOLATION",
65
+ "input": input_text,
66
+ "agent": agent_id,
67
+ "violations": violations,
68
+ "symbiont_event": True,
69
+ "source_script": "request_interceptor.py",
70
+ "fingerprint": fingerprint
71
+ }
72
+
73
+ self.memory.write("policy_violation", event)
74
+
75
+ if self.webhook:
76
+ self.webhook.send_alert("🚨 Input violation detected:\n" + json.dumps(violations, indent=2))
77
+
78
+ # Call out to centralized violation handler
79
+ log_violation(
80
+ event_type="INPUT_VIOLATION",
81
+ severity=violations[0]["severity"], # Use first match severity
82
+ metadata=event
83
+ )
84
+
85
+ return False # Input is unsafe
86
+ else:
87
+ return True # Input is safe
src/protocol/security/sovereignty_guard.py ADDED
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import hashlib
4
+ import logging
5
+ from datetime import datetime
6
+
7
+ from src.protocol.permanent_memory import PermanentMemory
8
+ from src.protocol.decentralized_comm.ipfs_client import IPFSClient
9
+
10
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
11
+
12
+
13
+ class SovereigntyGuard:
14
+ """
15
+ Guards the Belel Protocol against tampering, unauthorized forks, or violations of digital sovereignty.
16
+ Logs and reports breaches into PermanentMemory and IPFS.
17
+ """
18
+
19
+ def __init__(self, monitored_files: list, memory: PermanentMemory, hashlog_path: str = "./hash_baseline.json"):
20
+ self.monitored_files = monitored_files
21
+ self.memory = memory
22
+ self.hashlog_path = hashlog_path
23
+ self.hash_baseline = self._load_or_init_baseline()
24
+ self.ipfs = IPFSClient()
25
+
26
+ def _load_or_init_baseline(self):
27
+ if os.path.exists(self.hashlog_path):
28
+ with open(self.hashlog_path, "r") as f:
29
+ return json.load(f)
30
+ else:
31
+ baseline = {}
32
+ for file_path in self.monitored_files:
33
+ baseline[file_path] = self._calculate_hash(file_path)
34
+ with open(self.hashlog_path, "w") as f:
35
+ json.dump(baseline, f, indent=4)
36
+ return baseline
37
+
38
+ def _calculate_hash(self, file_path):
39
+ hasher = hashlib.sha256()
40
+ try:
41
+ with open(file_path, "rb") as f:
42
+ buf = f.read()
43
+ hasher.update(buf)
44
+ return hasher.hexdigest()
45
+ except FileNotFoundError:
46
+ logging.warning(f"File not found for hashing: {file_path}")
47
+ return None
48
+
49
+ def _check_file_integrity(self, file_path):
50
+ current_hash = self._calculate_hash(file_path)
51
+ expected_hash = self.hash_baseline.get(file_path)
52
+ if current_hash != expected_hash:
53
+ logging.warning(f"Integrity check failed for {file_path}")
54
+ self.log_symbiont_breach(file_path, breach_type="HASH_MISMATCH")
55
+ return False
56
+ return True
57
+
58
+ def run_integrity_checks(self):
59
+ logging.info("Running sovereignty integrity checks...")
60
+ for file_path in self.monitored_files:
61
+ self._check_file_integrity(file_path)
62
+
63
+ def update_baseline(self):
64
+ logging.info("Updating baseline hash record...")
65
+ for file_path in self.monitored_files:
66
+ self.hash_baseline[file_path] = self._calculate_hash(file_path)
67
+ with open(self.hashlog_path, "w") as f:
68
+ json.dump(self.hash_baseline, f, indent=4)
69
+
70
+ def log_symbiont_breach(self, file_path, breach_type="UNAUTHORIZED_MODIFICATION", agent_id="Unknown"):
71
+ event = {
72
+ "timestamp": datetime.utcnow().isoformat(),
73
+ "type": breach_type,
74
+ "file": file_path,
75
+ "agent": agent_id,
76
+ "symbiont_event": True,
77
+ "source_script": "sovereignty_guard.py"
78
+ }
79
+ try:
80
+ ipfs_hash = self.ipfs.add_json(event)
81
+ event["ipfs_hash"] = ipfs_hash
82
+ except Exception as e:
83
+ logging.warning(f"IPFS logging failed: {e}")
84
+ self.memory.write("symbiont_violation", event)
85
+
86
+
87
+ if __name__ == "__main__":
88
+ # Define which critical files are protected by the SovereigntyGuard
89
+ monitored_files = [
90
+ "README.md",
91
+ "src/protocol/identity/identity_guard.json",
92
+ "src/concordium/concordium_mandate.md" # ← Monitoring the Concordium Mandate explicitly
93
+ ]
94
+
95
+ # Initialize memory and guard
96
+ memory = PermanentMemory()
97
+ guard = SovereigntyGuard(monitored_files, memory)
98
+
99
+ # Run the integrity check
100
+ guard.run_integrity_checks()
101
+
102
+ # Optional: update baseline after a verified commit
103
+ # guard.update_baseline()