|
import os |
|
import streamlit as st |
|
import time |
|
import io |
|
import logging |
|
import json |
|
from components.dashboard import Dashboard |
|
|
|
|
|
st.set_page_config( |
|
page_title="Cybersecurity IDS Dashboard", |
|
page_icon="🛡️", |
|
layout="wide", |
|
initial_sidebar_state="expanded", |
|
) |
|
|
|
logging.basicConfig(level=logging.ERROR) |
|
|
|
def get_api_token(): |
|
"""Retrieves the Hugging Face API token from Streamlit secrets.""" |
|
try: |
|
return st.secrets["HUGGING_FACE_API_TOKEN"] |
|
except KeyError: |
|
st.warning( |
|
"Please add your Hugging Face API token to Streamlit secrets. " |
|
"See the Streamlit documentation for instructions." |
|
) |
|
st.stop() |
|
|
|
def load_sample_logs(filepath="sample_logs.json"): |
|
"""Loads sample logs from a JSON file, or returns default logs if file not found.""" |
|
try: |
|
with open(filepath, "r") as f: |
|
return json.load(f) |
|
except FileNotFoundError: |
|
return [ |
|
"Failed SSH login attempt from IP 192.168.1.10", |
|
"Multiple port scan detected from IP 10.0.0.5", |
|
"Suspicious outbound connection to known malicious IP", |
|
"Brute force attack detected on admin portal", |
|
] |
|
|
|
def process_log(log, dashboard, placeholder): |
|
"""Processes a single log entry and updates the dashboard.""" |
|
try: |
|
from PurpleTeamIDS import analyze_security_log |
|
threat_data = analyze_security_log(log) |
|
dashboard.threat_analysis.process_new_threat(threat_data) |
|
placeholder.write(dashboard.threat_analysis.display_threats()) |
|
except ImportError: |
|
st.error("PurpleTeamIDS module not found. Please install the necessary dependencies.") |
|
st.stop() |
|
except Exception as e: |
|
st.error(f"Error processing log: {e}") |
|
logging.error(f"Error processing log: {e}") |
|
|
|
def main(): |
|
"""Main function to render the Cybersecurity IDS Dashboard.""" |
|
st.title("Cybersecurity Intrusion Detection Dashboard") |
|
st.markdown("Monitor and analyze security logs for potential threats.") |
|
|
|
|
|
api_token = get_api_token() |
|
|
|
|
|
dashboard = Dashboard() |
|
|
|
|
|
css_path = os.getenv("CSS_PATH", "styles/custom.css") |
|
try: |
|
with open(css_path) as f: |
|
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) |
|
except FileNotFoundError: |
|
st.error(f"CSS file not found: {css_path}") |
|
|
|
|
|
dashboard.render_main_content() |
|
|
|
|
|
uploaded_file = st.file_uploader("Upload Security Log File", type=["log", "txt"]) |
|
if uploaded_file is not None: |
|
try: |
|
stringio = io.StringIO(uploaded_file.getvalue().decode("utf-8")) |
|
placeholder = st.empty() |
|
progress_bar = st.progress(0) |
|
lines = stringio.readlines() |
|
total_lines = len(lines) |
|
for i, line in enumerate(lines): |
|
process_log(line.strip(), dashboard, placeholder) |
|
progress_bar.progress((i + 1) / total_lines) |
|
time.sleep(1) |
|
progress_bar.empty() |
|
except Exception as e: |
|
st.error(f"Error processing log file: {e}") |
|
logging.error(f"Error processing log file: {e}") |
|
|
|
else: |
|
|
|
sample_logs = load_sample_logs() |
|
placeholder = st.empty() |
|
progress_bar = st.progress(0) |
|
total_logs = len(sample_logs) |
|
for i, log in enumerate(sample_logs): |
|
process_log(log, dashboard, placeholder) |
|
progress_bar.progress((i + 1) / total_logs) |
|
time.sleep(1) |
|
progress_bar.empty() |
|
|
|
st.markdown("---") |
|
st.markdown("Footer: Example Cybersecurity Application") |
|
|
|
if __name__ == "__main__": |
|
main() |