diff --git a/src/service/clmcservice/alertsapi/alerts_specification_schema.py b/src/service/clmcservice/alertsapi/alerts_specification_schema.py index 7b48d43e0aabe10ab1d8b8b3dc52895221b309d9..50097ed6bcb765ec2e7a88c88c1459ca3c1fc0af 100644 --- a/src/service/clmcservice/alertsapi/alerts_specification_schema.py +++ b/src/service/clmcservice/alertsapi/alerts_specification_schema.py @@ -42,6 +42,8 @@ from schema import Schema, And, Or, Optional, SchemaError SFEMC = "flame_sfemc" # describes the keyword used for the SFEMC alert handler +STREAM_TYPE = "stream" +BATCH_TYPE = "batch" # Influx QL functions defined in the documentation https://docs.influxdata.com/influxdb/v1.6/query_language/functions/ INFLUX_QL_FUNCTIONS = ( @@ -80,14 +82,34 @@ HANDLERS = { ] } -THRESHOLD_TRIGGER = { +THRESHOLD_STREAM_TRIGGER = { Optional("description"): str, + "metadata": { + "monitoring_type": STREAM_TYPE + }, + "event_type": "threshold", + "metric": And(str, lambda s: len(s.split('.', 1)) == 2), + "condition": { + "threshold": Or(int, float), + Optional("resource_type"): { + And(str, lambda tag: tag not in INVALID_TAGS): str + }, + "comparison_operator": And(str, lambda s: s in COMPARISON_OPERATORS) + }, + "action": HANDLERS +} + +THRESHOLD_BATCH_TRIGGER = { + Optional("description"): str, + "metadata": { + "monitoring_type": BATCH_TYPE + }, "event_type": "threshold", "metric": And(str, lambda s: len(s.split('.', 1)) == 2), "condition": { "threshold": Or(int, float), "granularity": And(int, lambda p: p > 0), - Optional("aggregation_method"): And(str, lambda s: s in INFLUX_QL_FUNCTIONS), # defaults to "mean" + "aggregation_method": And(str, lambda s: s in INFLUX_QL_FUNCTIONS), Optional("resource_type"): { And(str, lambda tag: tag not in INVALID_TAGS): str }, @@ -143,7 +165,7 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({ str: { "type": "eu.ict-flame.policies.Alert", "triggers": And({ - str: Or(THRESHOLD_TRIGGER, RELATIVE_TRIGGER, DEADMAN_TRIGGER) + str: Or(THRESHOLD_STREAM_TRIGGER, THRESHOLD_BATCH_TRIGGER, RELATIVE_TRIGGER, DEADMAN_TRIGGER) }, lambda triggers: len(triggers) > 0) } } diff --git a/src/service/clmcservice/alertsapi/tests.py b/src/service/clmcservice/alertsapi/tests.py index c77b74bb0c8b66e819107cece0f86042bafa1868..62a33ccee73c8fdd8d61ab71012e03253b56c7dd 100644 --- a/src/service/clmcservice/alertsapi/tests.py +++ b/src/service/clmcservice/alertsapi/tests.py @@ -37,7 +37,7 @@ from toscaparser.tosca_template import ToscaTemplate # CLMC-service imports from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, SFEMC -from clmcservice.alertsapi.alerts_specification_schema import validate_clmc_alerts_specification +from clmcservice.alertsapi.alerts_specification_schema import validate_clmc_alerts_specification, STREAM_TYPE, BATCH_TYPE from clmcservice.alertsapi.views import AlertsConfigurationAPI from clmcservice import ROOT_DIR @@ -136,7 +136,7 @@ class TestAlertsConfigurationAPI(object): if not isfile(alert_config_abs_path): continue # skip directories - print(alert_config_abs_path, valid_expected) + print(alert_config_abs_path, "Is valid:", valid_expected) with open(alert_config_abs_path, 'r') as fh: yaml_content = load(fh) @@ -145,6 +145,7 @@ class TestAlertsConfigurationAPI(object): # do not catch exceptions here since we are testing the clmc validator, the tosca parsing is tested in the previous test method alert_tosca_spec = ToscaTemplate(yaml_dict_tpl=yaml_content) valid_real, err = validate_clmc_alerts_specification(alert_tosca_spec.tpl, include_error=True) + print("Validation error:\n==========\n{0}\n==========".format(err)) assert valid_expected == valid_real, "CLMC alerts specification validator test failed for file: {0}".format(alert_config_abs_path) def test_alerts_config_api_post(self, app_config): @@ -615,13 +616,12 @@ def extract_alert_configuration_data(alert_spec, sfemc_fqdn, sfemc_port): for trigger in policy.triggers: trigger_id = trigger.name event_type = trigger.trigger_tpl["event_type"] - alert_period_integer = trigger.trigger_tpl["condition"]["granularity"] topic_id = "{0}\n{1}\n{2}\n{3}".format(sfc, sfc_instance, policy_id, trigger_id) topic_id = AlertsConfigurationAPI.get_hash(topic_id) alert_id = topic_id - alert_type = get_alert_type(event_type, alert_period_integer) + alert_type = get_alert_type(event_type, trigger.trigger_tpl.get("metadata", {})) alert_handlers = {} for handler_url in trigger.trigger_tpl["action"]["implementation"]: @@ -638,23 +638,23 @@ def extract_alert_configuration_data(alert_spec, sfemc_fqdn, sfemc_port): return sfc, sfc_instance, alerts -def get_alert_type(event_type, alert_period): +def get_alert_type(event_type, trigger_metadata): """ Retrieve the alert type (stream ot batch) based on the event type and alert period. :param event_type: event type, e.g. threshold, relative, deadman, etc. - :param alert_period: the alert period + :param trigger_metadata: metadata for this trigger :return: "batch" or "stream" """ if event_type in AlertsConfigurationAPI.DUAL_VERSION_TEMPLATES: - if alert_period < AlertsConfigurationAPI.STREAM_PERIOD_LIMIT: - return "stream" - else: - return "batch" + assert "monitoring_type" in trigger_metadata, "Dual version trigger type doesn't have monitoring_type field in its metadata" + monitoring_type = trigger_metadata.get("monitoring_type") + assert monitoring_type in {STREAM_TYPE, BATCH_TYPE}, "Monitoring type must be either stream or batch" + return monitoring_type else: - events = {"relative": "batch", "deadman": "stream"} + events = {"relative": BATCH_TYPE, "deadman": STREAM_TYPE} return events[event_type] diff --git a/src/service/clmcservice/alertsapi/views.py b/src/service/clmcservice/alertsapi/views.py index 6c7a28c200b52523c52a4c4f5d08a4e1a89c35a2..c27f0db5d8c38b40e5e93f3d1032a4f9e0c353ae 100644 --- a/src/service/clmcservice/alertsapi/views.py +++ b/src/service/clmcservice/alertsapi/views.py @@ -36,7 +36,7 @@ from requests import post, get, delete # CLMC-service imports from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, TICKScriptTemplateFiller, fill_http_post_handler_vars, get_resource_spec_policy_triggers, get_alert_spec_policy_triggers -from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, SFEMC, validate_clmc_alerts_specification +from clmcservice.alertsapi.alerts_specification_schema import STREAM_TYPE, BATCH_TYPE, COMPARISON_OPERATORS, SFEMC, validate_clmc_alerts_specification # initialise logger log = logging.getLogger('service_logger') @@ -48,8 +48,6 @@ class AlertsConfigurationAPI(object): A class-based view for configuring alerts within CLMC. """ - STREAM_PERIOD_LIMIT = 60 # if alert period is less than 60 seconds, then a stream template is used, otherwise use batch - DUAL_VERSION_TEMPLATES = {"threshold"} # this set defines all template types that are written in two versions (stream and batch) KAPACITOR_TASK_API_PREFIX = "/kapacitor/v1/tasks" @@ -521,21 +519,26 @@ class AlertsConfigurationAPI(object): condition = trigger.trigger_tpl["condition"] critical_value = float(condition["threshold"]) - alert_period_integer = condition["granularity"] - alert_period = "{0}s".format(alert_period_integer) - influx_function = condition.get("aggregation_method", "mean") # if not specified, use "mean" - # check for tag filtering - where_clause = None + # get granularity and aggregation method, defaults to None - not every alert type requires these fields + alert_period_integer = condition.get("granularity") + if alert_period_integer is not None: + alert_period = "{0}s".format(alert_period_integer) + else: + alert_period = None + influx_function = condition.get("aggregation_method") + + # check for tag filtering (optional, if not specified include the sfc and sfc instance tags only) + tags = {} if "resource_type" in trigger.trigger_tpl["condition"]: tags = condition["resource_type"] - # make sure alert tasks are executing with queries for the given sfc and sfc instance - automatically add those tags using the metadata values - tags["flame_sfc"] = sfc - tags["flame_sfci"] = sfc_instance + # make sure alert tasks are executing with queries for the given sfc and sfc instance - automatically add those tags using the metadata values + tags["flame_sfc"] = sfc + tags["flame_sfci"] = sfc_instance - # build up the where clause from the tags dictionary - where_filters_list = map(lambda tag_name: '"{0}"=\'{1}\''.format(tag_name, tags[tag_name]), tags) - where_clause = " AND ".join(where_filters_list) + # build up the where clause from the tags dictionary + where_filters_list = map(lambda tag_name: '"{0}"=\'{1}\''.format(tag_name, tags[tag_name]), tags) + where_clause = " AND ".join(where_filters_list) comparison_operator = COMPARISON_OPERATORS.get(condition.get("comparison_operator")) # if not specified, the comparison operator will be set to None @@ -546,10 +549,13 @@ class AlertsConfigurationAPI(object): # check whether the template needs to be a stream or a batch if event_type in self.DUAL_VERSION_TEMPLATES: - if alert_period_integer < self.STREAM_PERIOD_LIMIT: + monitoring_type = trigger_metadata.get("monitoring_type") + assert monitoring_type in {STREAM_TYPE, BATCH_TYPE}, "Alerts schema validation failed - dual version alert types should specify monitoring type as stream or batch in the metadata." + + if monitoring_type == STREAM_TYPE: template_id = "{0}-stream-template".format(event_type) event_type = "{0}_stream".format(event_type) - else: + elif monitoring_type == BATCH_TYPE: template_id = "{0}-batch-template".format(event_type) event_type = "{0}_batch".format(event_type)