|
import requests |
|
from bs4 import BeautifulSoup |
|
from elasticsearch import Elasticsearch |
|
from langchain.tools import tool |
|
|
|
es = Elasticsearch( |
|
"https://localhost:9200", |
|
basic_auth=("elastic","dVJI85*y60R3ZVbECj1w"), |
|
ca_certs="/Volumes/macOS/Projects/PFE UM6P/elasticsearch-8.12.1/config/certs/http_ca.crt" |
|
) |
|
|
|
|
|
class EventSearchTool(): |
|
@tool("Event search Tool") |
|
def search(keyword: str): |
|
"""Useful tool to search for an indicator of compromise or an security event |
|
Parameters: |
|
- keyword: The keyword to search for |
|
Returns: |
|
- A list of events that match the keyword |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
query = { |
|
"match": {"value": { |
|
"query": keyword |
|
}} |
|
} |
|
|
|
|
|
res = es.search(size=5, index="all_events_full", query=query, knn=None, _source=["event_id", "event_title", "event_date", "category", "attribute_tags", "type", "value"]) |
|
hits = res["hits"]["hits"] |
|
events = [x['_source'] for x in hits] |
|
|
|
return events |
|
|
|
|
|
@tool("Event search by event_id Tool") |
|
def get_event_by_id(id:str): |
|
"""Useful tool to search for an event by its id, and return the full event details |
|
Parameters: |
|
- id: The event id to search for |
|
Returns: |
|
- The full details of the event with the specified id |
|
""" |
|
|
|
if not es.ping(): |
|
raise "ElasticNotReachable" |
|
res = es.search(index="all_events_full", query={"match": {"event_id": id}}, _source=["event_id", "event_title", "event_date", "category", "attribute_tags", "type", "value"]) |
|
hits = res["hits"]["hits"] |
|
events = [x['_source'] for x in hits] |
|
|
|
return events |
|
|
|
|