Append
import json
import copy
import logging
import time
from datetime import datetime
from dateutil import parser as dateparser
from dateutil import zoneinfo
import warnings
import re
import hashlib
logger = logging.getLogger(__name__)
def parse_timestamp(timestring):
tzmap = {
"EDT": "US/Eastern",
"EST": "US/Eastern",
"PST": "US/Pacific",
"PDT": "US/Pacific",
"GMT": "GMT",
"UTC": "UTC",
"IST": "Asia/Kolkata",
"CDT": "US/Central",
"CEST": "Europe/Belgrade"
}
timezones = zoneinfo.get_zonefile_instance().zones
tzinfos = dict(timezones)
for key, value in tzmap.items():
tzinfos[key] = zoneinfo.gettz(value)
with warnings.catch_warnings(record=True) as w:
# dateparser defaults timezone to UTC if the provided timestamp is not available in tzinfos, raising only warning.
# catching this warning and raising exception instead
warnings.simplefilter("always")
dt_obj = dateparser.parse(timestring, tzinfos=tzinfos)
if len(w) == 1 and issubclass(w[-1].category, dateparser.UnknownTimezoneWarning):
message = str(w[-1].message).split(".", 1)[0] + \
". Please add custom dictionary 'tzmap' with definition for timezone"
raise Exception(message)
# Return timestamp in milliseconds
return dt_obj.timestamp() * 1000
def main(alert_payload, resolver_config, s_alert):
alerts = list()
alert_json = json.loads(alert_payload)
all_keys = list()
p_alert = copy.deepcopy(s_alert)
p_alert.sourcePayload = json.dumps(alert_json)
p_alert.assetName = alert_json.get("managedObjects")
p_alert.addAttribute("asset_shortname", p_alert.assetName.split(".")[0])
p_alert.assetIpAddress = alert_json.get("managedObjects")
p_alert.addAttribute("impact", alert_json.get("impact"))
p_alert.addAttribute("signatures", alert_json.get("signatures"))
_tmp_message = alert_json.get("subject") + ". Impact: " + \
str(alert_json.get("impact")) + \
". Signatures: " + str(alert_json.get("signatures"))
p_alert.message = _tmp_message if alert_json.get("status") == "open" else None
p_alert.clearMessage = _tmp_message if alert_json.get("status") == "close" else None
p_alert.addAttribute("description", alert_json.get("description"))
p_alert.alertType = alert_json.get("signatures")
p_alert.addAttribute("url", alert_json.get("url"))
tmp_time = alert_json.get("createdDate")
tmp_time = parse_timestamp(tmp_time)
p_alert.raisedAt = tmp_time if alert_json.get("status") == "open" else 0
p_alert.clearedAt = tmp_time if alert_json.get("status") == "close" else 0
p_alert.severity = "MAJOR"
p_alert.sourceMechanism = "Pearl-DDoS"
p_alert.key = str(alert_json.get("alertId")) + "#" + alert_json.get("signatures")
tmp_alertKey = p_alert.key
all_keys = hashlib.md5(tmp_alertKey.encode("utf-8")).hexdigest()
p_alert.payloadDictionary = alert_json
alerts.append(p_alert)
# Creating an aggregate alert with list of all keys for all alerts in the payload
if alert_json.get("status") == "open":
aggr_alert = copy.deepcopy(s_alert)
aggr_alert.sourcePayload = json.dumps(alert_json.get("email"))
aggr_alert.alertType = "AGGREGATE"
aggr_alert.key = str(alert_json.get("alertId"))
aggr_alert.raisedAt = tmp_time
aggr_alert.severity = "CRITICAL"
aggr_alert.message = "AGGREGATE: " + p_alert.message
aggr_alert.impactedServices = all_keys
aggr_alert.addAttribute("aggregateMode", "APPEND")
aggr_alert.aggregateMode = "APPEND"
alerts.append(aggr_alert)
else:
aggr_alert = copy.deepcopy(s_alert)
aggr_alert.sourcePayload = json.dumps(alert_json.get("email"))
aggr_alert.alertType = "AGGREGATE"
aggr_alert.key = str(alert_json.get("alertId"))
aggr_alert.clearedAt = tmp_time
aggr_alert.impactedServices = None
alerts.append(aggr_alert)
return alerts
'''
{
"alertId": 86004,
"createdDate": "2022-11-24 17:04:45 IST",
"status": "close",
"subject": "[mDDoS] Host Detection alert #86004 incoming to 10.10.2.10",
"description": "DoS host detection alert started at 2022-11-24 11:19:42. alert ended at 2022-11-24 11:34:45 UTC.",
"host": "10.10.2.10",
"url": "https://10.10.2.10/catalyst/#/ddos/attack-view/86004",
"signatures": "SIP (bps)",
"impact": "103.68Mbps/9kpps",
"importance": "High",
"managedObjects": "ABC_XYZ_Pune",
"alert_duration": 15
}
'''