|
from langchain_mongodb import MongoDBAtlasVectorSearch |
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
|
import os |
|
import pymongo |
|
import logging |
|
import nest_asyncio |
|
from langchain.docstore.document import Document |
|
import redis |
|
import threading |
|
import asyncio |
|
import gradio as gr |
|
import pandas as pd |
|
import plotly.express as px |
|
from datetime import datetime, timedelta |
|
|
|
|
|
|
|
logging.basicConfig(level = logging.INFO) |
|
database = "AlertSimAndRemediation" |
|
collection = "alert_embed" |
|
stream_name = "alerts" |
|
|
|
|
|
latest_alert = "No alerts yet." |
|
alert_count = 0 |
|
|
|
|
|
embedding_args = { |
|
"model_name" : "BAAI/bge-large-en-v1.5", |
|
"model_kwargs" : {"device": "cpu"}, |
|
"encode_kwargs" : {"normalize_embeddings": True} |
|
} |
|
embedding_model = HuggingFaceEmbeddings(**embedding_args) |
|
|
|
|
|
connection = pymongo.MongoClient(os.environ["MONGO_URI"]) |
|
alert_collection = connection[database][collection] |
|
|
|
|
|
r = redis.Redis(host=os.environ['REDIS_HOST'], password=os.environ['REDIS_PWD'], port=16652) |
|
|
|
|
|
async def create_textual_description(entry_data): |
|
entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()} |
|
|
|
category = entry_dict["Category"] |
|
created_at = entry_dict["CreatedAt"] |
|
acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged" |
|
remedy = entry_dict["Remedy"] |
|
severity = entry_dict["Severity"] |
|
source = entry_dict["Source"] |
|
node = entry_dict["node"] |
|
|
|
description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}." |
|
|
|
return description, entry_dict |
|
|
|
|
|
async def save(entry): |
|
vector_search = MongoDBAtlasVectorSearch.from_documents( |
|
documents=[Document( |
|
page_content=entry["content"], |
|
metadata=entry["metadata"] |
|
)], |
|
embedding=embedding_model, |
|
collection=alert_collection, |
|
index_name="alert_index", |
|
) |
|
logging.info("Alerts stored successfully!") |
|
|
|
|
|
async def listen_to_alerts(r): |
|
global latest_alert, alert_count |
|
try: |
|
last_id = '$' |
|
|
|
while True: |
|
entries = r.xread({stream_name: last_id}, block=0, count=None) |
|
|
|
if entries: |
|
stream, new_entries = entries[0] |
|
|
|
for entry_id, entry_data in new_entries: |
|
description, entry_dict = await create_textual_description(entry_data) |
|
await save({ |
|
"content" : description, |
|
"metadata" : entry_dict |
|
}) |
|
print(description) |
|
latest_alert = description |
|
alert_count += 1 |
|
|
|
last_id = entry_id |
|
await asyncio.sleep(1) |
|
|
|
except KeyboardInterrupt: |
|
print("Exiting...") |
|
|
|
def run_alert_listener(): |
|
asyncio.run(listen_to_alerts(r)) |
|
|
|
|
|
alert_thread = threading.Thread(target=run_alert_listener) |
|
alert_thread.start() |
|
|
|
|
|
|
|
|
|
def get_latest_alert(): |
|
global latest_alert, alert_count |
|
return latest_alert, alert_count |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as app: |
|
gr.Markdown("# π¨ Alert Dashboard π¨") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
latest_alert_box = gr.Textbox(label="Latest Alert", lines=3, interactive=False) |
|
alert_count_box = gr.Number(label="Alert Count", interactive=False) |
|
refresh_button = gr.Button("Refresh", variant="primary") |
|
|
|
refresh_button.click(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box]) |
|
|
|
app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box]) |
|
|
|
|
|
app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box], every=30) |
|
|
|
|
|
|
|
|
|
|
|
app.launch() |