Skip to content
Snippets Groups Projects
Commit 593c95e7 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Updates alerts specification schema to have different definitions for a...

Updates alerts specification schema to have different definitions for a threshold stream and a threshold batch trigger
parent c35617fa
Branches
Tags
No related merge requests found
......@@ -42,6 +42,8 @@ from schema import Schema, And, Or, Optional, SchemaError
SFEMC = "flame_sfemc" # describes the keyword used for the SFEMC alert handler
STREAM_TYPE = "stream"
BATCH_TYPE = "batch"
# Influx QL functions defined in the documentation https://docs.influxdata.com/influxdb/v1.6/query_language/functions/
INFLUX_QL_FUNCTIONS = (
......@@ -80,14 +82,34 @@ HANDLERS = {
]
}
THRESHOLD_TRIGGER = {
THRESHOLD_STREAM_TRIGGER = {
Optional("description"): str,
"metadata": {
"monitoring_type": STREAM_TYPE
},
"event_type": "threshold",
"metric": And(str, lambda s: len(s.split('.', 1)) == 2),
"condition": {
"threshold": Or(int, float),
Optional("resource_type"): {
And(str, lambda tag: tag not in INVALID_TAGS): str
},
"comparison_operator": And(str, lambda s: s in COMPARISON_OPERATORS)
},
"action": HANDLERS
}
THRESHOLD_BATCH_TRIGGER = {
Optional("description"): str,
"metadata": {
"monitoring_type": BATCH_TYPE
},
"event_type": "threshold",
"metric": And(str, lambda s: len(s.split('.', 1)) == 2),
"condition": {
"threshold": Or(int, float),
"granularity": And(int, lambda p: p > 0),
Optional("aggregation_method"): And(str, lambda s: s in INFLUX_QL_FUNCTIONS), # defaults to "mean"
"aggregation_method": And(str, lambda s: s in INFLUX_QL_FUNCTIONS),
Optional("resource_type"): {
And(str, lambda tag: tag not in INVALID_TAGS): str
},
......@@ -143,7 +165,7 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({
str: {
"type": "eu.ict-flame.policies.Alert",
"triggers": And({
str: Or(THRESHOLD_TRIGGER, RELATIVE_TRIGGER, DEADMAN_TRIGGER)
str: Or(THRESHOLD_STREAM_TRIGGER, THRESHOLD_BATCH_TRIGGER, RELATIVE_TRIGGER, DEADMAN_TRIGGER)
}, lambda triggers: len(triggers) > 0)
}
}
......
......@@ -37,7 +37,7 @@ from toscaparser.tosca_template import ToscaTemplate
# CLMC-service imports
from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, SFEMC
from clmcservice.alertsapi.alerts_specification_schema import validate_clmc_alerts_specification
from clmcservice.alertsapi.alerts_specification_schema import validate_clmc_alerts_specification, STREAM_TYPE, BATCH_TYPE
from clmcservice.alertsapi.views import AlertsConfigurationAPI
from clmcservice import ROOT_DIR
......@@ -136,7 +136,7 @@ class TestAlertsConfigurationAPI(object):
if not isfile(alert_config_abs_path):
continue # skip directories
print(alert_config_abs_path, valid_expected)
print(alert_config_abs_path, "Is valid:", valid_expected)
with open(alert_config_abs_path, 'r') as fh:
yaml_content = load(fh)
......@@ -145,6 +145,7 @@ class TestAlertsConfigurationAPI(object):
# do not catch exceptions here since we are testing the clmc validator, the tosca parsing is tested in the previous test method
alert_tosca_spec = ToscaTemplate(yaml_dict_tpl=yaml_content)
valid_real, err = validate_clmc_alerts_specification(alert_tosca_spec.tpl, include_error=True)
print("Validation error:\n==========\n{0}\n==========".format(err))
assert valid_expected == valid_real, "CLMC alerts specification validator test failed for file: {0}".format(alert_config_abs_path)
def test_alerts_config_api_post(self, app_config):
......@@ -615,13 +616,12 @@ def extract_alert_configuration_data(alert_spec, sfemc_fqdn, sfemc_port):
for trigger in policy.triggers:
trigger_id = trigger.name
event_type = trigger.trigger_tpl["event_type"]
alert_period_integer = trigger.trigger_tpl["condition"]["granularity"]
topic_id = "{0}\n{1}\n{2}\n{3}".format(sfc, sfc_instance, policy_id, trigger_id)
topic_id = AlertsConfigurationAPI.get_hash(topic_id)
alert_id = topic_id
alert_type = get_alert_type(event_type, alert_period_integer)
alert_type = get_alert_type(event_type, trigger.trigger_tpl.get("metadata", {}))
alert_handlers = {}
for handler_url in trigger.trigger_tpl["action"]["implementation"]:
......@@ -638,23 +638,23 @@ def extract_alert_configuration_data(alert_spec, sfemc_fqdn, sfemc_port):
return sfc, sfc_instance, alerts
def get_alert_type(event_type, alert_period):
def get_alert_type(event_type, trigger_metadata):
"""
Retrieve the alert type (stream ot batch) based on the event type and alert period.
:param event_type: event type, e.g. threshold, relative, deadman, etc.
:param alert_period: the alert period
:param trigger_metadata: metadata for this trigger
:return: "batch" or "stream"
"""
if event_type in AlertsConfigurationAPI.DUAL_VERSION_TEMPLATES:
if alert_period < AlertsConfigurationAPI.STREAM_PERIOD_LIMIT:
return "stream"
else:
return "batch"
assert "monitoring_type" in trigger_metadata, "Dual version trigger type doesn't have monitoring_type field in its metadata"
monitoring_type = trigger_metadata.get("monitoring_type")
assert monitoring_type in {STREAM_TYPE, BATCH_TYPE}, "Monitoring type must be either stream or batch"
return monitoring_type
else:
events = {"relative": "batch", "deadman": "stream"}
events = {"relative": BATCH_TYPE, "deadman": STREAM_TYPE}
return events[event_type]
......
......@@ -36,7 +36,7 @@ from requests import post, get, delete
# CLMC-service imports
from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, TICKScriptTemplateFiller, fill_http_post_handler_vars, get_resource_spec_policy_triggers, get_alert_spec_policy_triggers
from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, SFEMC, validate_clmc_alerts_specification
from clmcservice.alertsapi.alerts_specification_schema import STREAM_TYPE, BATCH_TYPE, COMPARISON_OPERATORS, SFEMC, validate_clmc_alerts_specification
# initialise logger
log = logging.getLogger('service_logger')
......@@ -48,8 +48,6 @@ class AlertsConfigurationAPI(object):
A class-based view for configuring alerts within CLMC.
"""
STREAM_PERIOD_LIMIT = 60 # if alert period is less than 60 seconds, then a stream template is used, otherwise use batch
DUAL_VERSION_TEMPLATES = {"threshold"} # this set defines all template types that are written in two versions (stream and batch)
KAPACITOR_TASK_API_PREFIX = "/kapacitor/v1/tasks"
......@@ -521,21 +519,26 @@ class AlertsConfigurationAPI(object):
condition = trigger.trigger_tpl["condition"]
critical_value = float(condition["threshold"])
alert_period_integer = condition["granularity"]
alert_period = "{0}s".format(alert_period_integer)
influx_function = condition.get("aggregation_method", "mean") # if not specified, use "mean"
# check for tag filtering
where_clause = None
# get granularity and aggregation method, defaults to None - not every alert type requires these fields
alert_period_integer = condition.get("granularity")
if alert_period_integer is not None:
alert_period = "{0}s".format(alert_period_integer)
else:
alert_period = None
influx_function = condition.get("aggregation_method")
# check for tag filtering (optional, if not specified include the sfc and sfc instance tags only)
tags = {}
if "resource_type" in trigger.trigger_tpl["condition"]:
tags = condition["resource_type"]
# make sure alert tasks are executing with queries for the given sfc and sfc instance - automatically add those tags using the metadata values
tags["flame_sfc"] = sfc
tags["flame_sfci"] = sfc_instance
# make sure alert tasks are executing with queries for the given sfc and sfc instance - automatically add those tags using the metadata values
tags["flame_sfc"] = sfc
tags["flame_sfci"] = sfc_instance
# build up the where clause from the tags dictionary
where_filters_list = map(lambda tag_name: '"{0}"=\'{1}\''.format(tag_name, tags[tag_name]), tags)
where_clause = " AND ".join(where_filters_list)
# build up the where clause from the tags dictionary
where_filters_list = map(lambda tag_name: '"{0}"=\'{1}\''.format(tag_name, tags[tag_name]), tags)
where_clause = " AND ".join(where_filters_list)
comparison_operator = COMPARISON_OPERATORS.get(condition.get("comparison_operator")) # if not specified, the comparison operator will be set to None
......@@ -546,10 +549,13 @@ class AlertsConfigurationAPI(object):
# check whether the template needs to be a stream or a batch
if event_type in self.DUAL_VERSION_TEMPLATES:
if alert_period_integer < self.STREAM_PERIOD_LIMIT:
monitoring_type = trigger_metadata.get("monitoring_type")
assert monitoring_type in {STREAM_TYPE, BATCH_TYPE}, "Alerts schema validation failed - dual version alert types should specify monitoring type as stream or batch in the metadata."
if monitoring_type == STREAM_TYPE:
template_id = "{0}-stream-template".format(event_type)
event_type = "{0}_stream".format(event_type)
else:
elif monitoring_type == BATCH_TYPE:
template_id = "{0}-batch-template".format(event_type)
event_type = "{0}_batch".format(event_type)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment