diff --git a/docs/AlertsConfiguration.md b/docs/AlertsConfiguration.md index ab7a1e9bf04d809dcf7a2fed48727f2a76393a91..fdd91048a38eca65860527ef03cb2ed7b3009e38 100644 --- a/docs/AlertsConfiguration.md +++ b/docs/AlertsConfiguration.md @@ -78,6 +78,7 @@ topology_template: implementation: - http://sfemc.flame.eu/notify - http://companyA.alert-handler.flame.eu/high-latency + - low_requests_policy: type: eu.ict-flame.policies.StateChange triggers: @@ -100,6 +101,62 @@ topology_template: implementation: - http://sfemc.flame.eu/notify - http://companyA.alert-handler.flame.eu/low-requests + + - requests_diff_policy: + type: eu.ict-flame.policies.StateChange + triggers: + increase_in_requests: + description: | + This event triggers when the number of requests has increased relative to the number of requests received + 120 seconds ago. + event_type: relative + metric: storage.requests + condition: + threshold: 100 # requests have increased by at least 100 + granularity: 120 + resource_type: + sf_package: storage + sf: storage-users + location: watershed + comparison_operator: gte + action: + implementation: + - http://sfemc.flame.eu/notify + decrease_in_requests: + description: | + This event triggers when the number of requests has decreased relative to the number of requests received + 120 seconds ago. + event_type: relative + metric: storage.requests + condition: + threshold: -100 # requests have decreased by at least 100 + granularity: 120 + resource_type: + sf_package: storage + sf: storage-users + location: watershed + comparison_operator: lte + action: + implementation: + - http://sfemc.flame.eu/notify + + - missing_measurement_policy: + type: eu.ict-flame.policies.StateChange + triggers: + missing_storage_measurements: + description: This event triggers when the number of storage measurements reported falls below the threshold value. + event_type: deadman + # deadman trigger instances monitor the whole measurement (storage in this case), so simply put a star for field value + # to be compliant with the <measurement>.<field> format + metric: storage.* + condition: + threshold: 0 # if requests are less than or equal to 0 (in other words, no measurements are reported) + granularity: 60 # check for for missing data for the last 60 seconds + resource_type: + sf_package: storage + action: + implementation: + - http://sfemc.flame.eu/notify ``` diff --git a/src/service/clmcservice/alertsapi/alerts_specification_schema.py b/src/service/clmcservice/alertsapi/alerts_specification_schema.py index ed17c3a207d46b2fffae29254be250e27484ac13..74a5170d96bfe06832a8522489df5765d0a7ae8d 100644 --- a/src/service/clmcservice/alertsapi/alerts_specification_schema.py +++ b/src/service/clmcservice/alertsapi/alerts_specification_schema.py @@ -22,13 +22,11 @@ // Created for Project : FLAME """ - # Python standard libs from re import compile, IGNORECASE # PIP installed libs -from schema import Schema, And, Or, Optional - +from schema import Schema, And, Or, Optional, SchemaError """ This module defines the schema objects for the TOSCA Alert Specification: @@ -42,7 +40,6 @@ This module defines the schema objects for the TOSCA Alert Specification: * the condition section must specify threshold, granularity, aggregation_method, comparison_operator """ - # Influx QL functions defined in the documentation https://docs.influxdata.com/influxdb/v1.6/query_language/functions/ INFLUX_QL_FUNCTIONS = ( "count", "mean", "median", "mode", "sum", "first", "last", "max", "min" @@ -64,11 +61,9 @@ URL_REGEX = compile( r'(?:[/?#][^\s]*)?$', # URL path or query parameters IGNORECASE) - # Global tags allowed to be used for filtering in the trigger condition CLMC_INFORMATION_MODEL_GLOBAL_TAGS = ("sfc", "sfci", "sf_package", "sf", "sf_endpoint", "host", "location") - ALERTS_SPECIFICATION_SCHEMA = Schema({ "tosca_definitions_version": And(str, lambda v: v == "tosca_simple_profile_for_nfv_1_0_0"), Optional("description"): str, @@ -90,11 +85,11 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({ "condition": { "threshold": Or(int, float), "granularity": int, - "aggregation_method": And(str, lambda s: s in INFLUX_QL_FUNCTIONS), + Optional("aggregation_method"): And(str, lambda s: s in INFLUX_QL_FUNCTIONS), Optional("resource_type"): { And(str, lambda s: s in CLMC_INFORMATION_MODEL_GLOBAL_TAGS): str }, - "comparison_operator": And(str, lambda s: s in COMPARISON_OPERATORS) + Optional("comparison_operator"): And(str, lambda s: s in COMPARISON_OPERATORS) }, "action": { "implementation": @@ -111,79 +106,24 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({ }) -def fill_http_post_handler_vars(handler_id, handler_url): +def validate_clmc_alerts_specification(tosca_yaml_tpl, include_error=False): """ - Creates a dictionary object ready to be posted to kapacitor to create an alert handler. + CLMC validation for the TOSCA alerts specification, uses the schema defined in alerts_specification_schema.py - :param handler_id: handler identifier - :param handler_url: url to post alerts to + :param tosca_yaml_tpl: the tosca template to validate (as python dictionary object) + :param include_error: a flag indicating whether the output of the function should include a caught SchemaError + (if set to True and no error is thrown, returns None as the error object) - :return: a dictionary object ready to be posted to kapacitor to create an alert handler. + :return: True/False if the tosca_tpl is valid/invalid along with any error (None if no error) that was thrown during validation (if argument include_error is set to True) """ - return { - "id": handler_id, - "kind": "post", - "options": { - "url": handler_url - } - } - - -def fill_threshold_template_vars(db, measurement, field, influx_function, critical_value, comparison_operator, alert_period, topic_id, where_clause=None): - """ - Creates a dictionary object ready to be posted to kapacitor to create a task from template. - - :param db: db name - :param measurement: measurement name - :param field: field name - :param influx_function: influx function to use for querying - :param critical_value: critical value to compare with - :param comparison_operator: type of comparison - :param alert_period: alert period to query influx - :param topic_id: topic identifier - :param where_clause: optional argument for filtering the influx query by tag values - - :return: a dictionary object ready to be posted to kapacitor to create a task from template. - """ - - comparison_lambda = "\"real_value\" {0} {1}".format(comparison_operator, critical_value) # build up lambda string, e.g. "real_value" >= 10 - - tempalte_vars = { - "db": { - "type": "string", - "value": db - }, - "measurement": { - "type": "string", - "value": measurement - }, - "field": { - "type": "string", - "value": field - }, - "influxFunction": { - "type": "string", - "value": influx_function - }, - "comparisonLambda": { - "type": "lambda", - "value": comparison_lambda - }, - "alertPeriod": { - "type": "duration", - "value": alert_period - }, - "topicID": { - "type": "string", - "value": topic_id - } - } - - if where_clause is not None: - tempalte_vars["whereClause"] = { - "type": "string", - "value": where_clause - } + try: + ALERTS_SPECIFICATION_SCHEMA.validate(tosca_yaml_tpl) + valid, err = True, None + except SchemaError as e: + valid, err = False, e - return tempalte_vars + if include_error: + return valid, err + else: + return valid diff --git a/src/service/clmcservice/alertsapi/utilities.py b/src/service/clmcservice/alertsapi/utilities.py index ce10967c18c9ab13414b74fafaf2c40dd1e7c178..7cec89edaa928bf678c69a3cb96f63624403028d 100644 --- a/src/service/clmcservice/alertsapi/utilities.py +++ b/src/service/clmcservice/alertsapi/utilities.py @@ -26,12 +26,8 @@ # Python standard libs from os.path import join -# PIP installed libs -from schema import SchemaError - # CLMC-service imports from clmcservice import ROOT_DIR -from clmcservice.alertsapi.alerts_specification_schema import ALERTS_SPECIFICATION_SCHEMA CLMC_ALERTS_TOSCA_DEFINITIONS_REL_PATH = ["resources", "tosca", "flame_clmc_alerts_definitions.yaml"] @@ -58,24 +54,207 @@ def adjust_tosca_definitions_import(alert_spec): pass # nothing to replace if the import is not specified (either imports are missed, or no reference to the clmc tosca definitions file) -def validate_clmc_alerts_specification(tosca_yaml_tpl, include_error=False): +def fill_http_post_handler_vars(handler_id, handler_url): """ - CLMC validation for the TOSCA alerts specification, uses the schema defined in alerts_specification_schema.py + Creates a dictionary object ready to be posted to kapacitor to create an alert handler. - :param tosca_yaml_tpl: the tosca template to validate (as python dictionary object) - :param include_error: a flag indicating whether the output of the function should include a caught SchemaError - (if set to True and no error is thrown, returns None as the error object) + :param handler_id: handler identifier + :param handler_url: url to post alerts to - :return: True/False if the tosca_tpl is valid/invalid along with any error (None if no error) that was thrown during validation (if argument include_error is set to True) + :return: a dictionary object ready to be posted to kapacitor to create an alert handler. """ - try: - ALERTS_SPECIFICATION_SCHEMA.validate(tosca_yaml_tpl) - valid, err = True, None - except SchemaError as e: - valid, err = False, e - - if include_error: - return valid, err - else: - return valid + return { + "id": handler_id, + "kind": "post", + "options": { + "url": handler_url + } + } + + +class TICKScriptTemplateFiller: + """ + A utility class used for TICK script templates filtering. + """ + + @staticmethod + def fill_template_vars(template_type, **kwargs): + """ + A utility function acting as an entry poiny to the fill_<template_type>_template_vars() functions defined below. + + :param template_type: the template type - e.g. + :param kwargs: keyword arguments to forward to the actual function that will be used + + :return: the result of the actual function that will be used. + """ + + fill_function_name = "_fill_{0}_template_vars".format(template_type) + fill_function = getattr(TICKScriptTemplateFiller, fill_function_name) # python functions are first-class objects ! + + # TODO is this the best way to avoid long if-else chains ? + + return fill_function(**kwargs) + + @staticmethod + def _fill_threshold_template_vars(db=None, measurement=None, field=None, influx_function=None, critical_value=None, + comparison_operator=None, alert_period=None, topic_id=None, where_clause=None): + """ + Creates a dictionary object ready to be posted to kapacitor to create a "threshold" task from template. + + :param db: db name + :param measurement: measurement name + :param field: field name + :param influx_function: influx function to use for querying + :param critical_value: critical value to compare with + :param comparison_operator: type of comparison + :param alert_period: alert period to query influx + :param topic_id: topic identifier + :param where_clause: (OPTIONAL) argument for filtering the influx query by tag values + + :return: a dictionary object ready to be posted to kapacitor to create a "threshold" task from template. + """ + + comparison_lambda = "\"real_value\" {0} {1}".format(comparison_operator, critical_value) # build up lambda string, e.g. "real_value" >= 10 + + template_vars = { + "db": { + "type": "string", + "value": db + }, + "measurement": { + "type": "string", + "value": measurement + }, + "field": { + "type": "string", + "value": field + }, + "influxFunction": { + "type": "string", + "value": influx_function + }, + "comparisonLambda": { + "type": "lambda", + "value": comparison_lambda + }, + "alertPeriod": { + "type": "duration", + "value": alert_period + }, + "topicID": { + "type": "string", + "value": topic_id + } + } + + if where_clause is not None: + template_vars["whereClause"] = { + "type": "string", + "value": where_clause + } + + return template_vars + + @staticmethod + def _fill_relative_template_vars(db=None, measurement=None, field=None, critical_value=None, comparison_operator=None, + alert_period=None, topic_id=None, where_clause=None): + """ + Creates a dictionary object ready to be posted to kapacitor to create a "relative" task from template. + + :param db: db name + :param measurement: measurement name + :param field: field name + :param critical_value: critical value to compare with + :param comparison_operator: type of comparison + :param alert_period: alert period to use for relative comparison + :param topic_id: topic identifier + :param where_clause: (OPTIONAL) argument for filtering the influx query by tag values + + :return: a dictionary object ready to be posted to kapacitor to create a "relative" task from template. + """ + + select_lambda = '"{0}"'.format(field) + + comparison_lambda = '"diff" {0} {1}'.format(comparison_operator, critical_value) + + template_vars = { + "db": { + "type": "string", + "value": db + }, + "measurement": { + "type": "string", + "value": measurement + }, + "selectLambda": { + "type": "lambda", + "value": select_lambda + }, + "comparisonLambda": { + "type": "lambda", + "value": comparison_lambda + }, + "alertPeriod": { + "type": "duration", + "value": alert_period + }, + "topicID": { + "type": "string", + "value": topic_id + } + } + + if where_clause is not None: + template_vars["whereClause"] = { + "type": "lambda", + "value": where_clause + } + + return template_vars + + @staticmethod + def _fill_deadman_template_vars(db=None, measurement=None, critical_value=None, alert_period=None, topic_id=None, where_clause=None): + """ + Creates a dictionary object ready to be posted to kapacitor to create a "deadman" task from template. + + :param db: db name + :param measurement: measurement name + :param critical_value: critical value to compare with + :param alert_period: alert period to use for relative comparison + :param topic_id: topic identifier + :param where_clause: (OPTIONAL) argument for filtering the influx query by tag values + + :return: a dictionary object ready to be posted to kapacitor to create a "deadman" task from template. + """ + + template_vars = { + "db": { + "type": "string", + "value": db + }, + "measurement": { + "type": "string", + "value": measurement + }, + "alertPeriod": { + "type": "duration", + "value": alert_period + }, + "throughputThreshold": { + "type": "int", + "value": critical_value + }, + "topicID": { + "type": "string", + "value": topic_id + } + } + + if where_clause is not None: + template_vars["whereClause"] = { + "type": "lambda", + "value": where_clause + } + + return template_vars diff --git a/src/service/clmcservice/alertsapi/views.py b/src/service/clmcservice/alertsapi/views.py index 19f67dd0fa6333a94230334fdf3da31e1772d56b..0053d7fa1866cdca04a9a400d0d437a42f530b1e 100644 --- a/src/service/clmcservice/alertsapi/views.py +++ b/src/service/clmcservice/alertsapi/views.py @@ -33,8 +33,8 @@ from toscaparser.tosca_template import ToscaTemplate from requests import post # CLMC-service imports -from clmcservice.alertsapi.utilities import validate_clmc_alerts_specification, adjust_tosca_definitions_import -from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, fill_threshold_template_vars, fill_http_post_handler_vars +from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, TICKScriptTemplateFiller, fill_http_post_handler_vars +from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, validate_clmc_alerts_specification # initialise logger log = logging.getLogger('service_logger') @@ -99,37 +99,53 @@ class AlertsConfigurationAPI(object): event_type = trigger.trigger_tpl["event_type"] template_id = "{0}_template".format(event_type) measurement, field = trigger.trigger_tpl["metric"].split(".") - critical_value = float(trigger.trigger_tpl["condition"]["threshold"]) - alert_period = "{0}s".format(trigger.trigger_tpl["condition"]["granularity"]) - influx_function = trigger.trigger_tpl["condition"]["aggregation_method"] + + condition = trigger.trigger_tpl["condition"] + critical_value = float(condition["threshold"]) + alert_period = "{0}s".format(condition["granularity"]) + influx_function = condition.get("aggregation_method", "mean") # if not specified, use "mean" + + # check for tag filtering where_clause = None if "resource_type" in trigger.trigger_tpl["condition"]: - tags = trigger.trigger_tpl["conditon"]["resource_type"] - where_clause = " AND ".join(map(lambda tag_name: "{0}={1}".format(tag_name, tags[tag_name]), tags)) - comparison_operator = COMPARISON_OPERATORS[trigger.trigger_tpl["condition"]["comparison_operator"]] - http_handlers = trigger.trigger_tpl["action"]["implementation"] - topic_id = "{0}___{1}___{2}".format(sfc, sfc_instance, event_id) # scoped per service function chain instance (no two sfc instances report to the same topic) - alert_id = "{0}___{1}".format(policy.name, event_id) + tags = condition["resource_type"] + where_clause = " AND ".join(map(lambda tag_name: '"{0}"=\'{1}\''.format(tag_name, tags[tag_name]), tags)) + + comparison_operator = COMPARISON_OPERATORS[condition.get("comparison_operator", "gte")] # if not specified, use "gte" (>=) + + # generate topic and alert identifiers + topic_id = "{0}.{1}.{2}".format(sfc, sfc_instance, event_id) # scoped per service function chain instance (no two sfc instances report to the same topic) + alert_id = "{0}.{1}.{2}.{3}".format(sfc, sfc_instance, policy.name, event_id) - # create and activate alert task + # built up the template vars dictionary depending on the event type (threshold, relative, etc.) + # all extracted properties from the trigger are passed, the TICKScriptTemplateFiller entry point then forwards those to the appropriate function + template_vars = TICKScriptTemplateFiller.fill_template_vars(event_type, db=db, measurement=measurement, field=field, influx_function=influx_function, + critical_value=critical_value, comparison_operator=comparison_operator, alert_period=alert_period, + topic_id=topic_id, where_clause=where_clause) + + # create and activate alert task through the kapacitor HTTP API kapacitor_api_tasks_url = "http://localhost:9092/kapacitor/v1/tasks" - if event_type == "threshold": - kapacitor_http_request_body = { - "id": alert_id, - "template-id": template_id, - "dbrps": [{"db": db, "rp": "autogen"}], - "status": "enabled", - "vars": fill_threshold_template_vars(db, measurement, field, influx_function, critical_value, - comparison_operator, alert_period, topic_id, where_clause=where_clause) - } - - response = post(kapacitor_api_tasks_url, data=kapacitor_http_request_body) - response_content = response.json() - log.info(response_content) + kapacitor_http_request_body = { + "id": alert_id, + "template-id": template_id, + "dbrps": [{"db": db, "rp": "autogen"}], + "status": "enabled", + "vars": template_vars + } + + # send the request and receive a response + response = post(kapacitor_api_tasks_url, data=kapacitor_http_request_body) + response_content = response.json() + # log the response + log.info(response_content) + + # exttranc http handlers + http_handlers = trigger.trigger_tpl["action"]["implementation"] + # subscribe all http handlers to the created topic kapacitor_api_handlers_url = "http://localhost:9092/kapacitor/v1/alerts/topics/{0}/handlers".format(topic_id) for http_handler_url in http_handlers: - handler_id = "{0}___{1}___{2}".format(policy.name, event_id, http_handler_url) + handler_id = "{0}.{1}.{2}".format(policy.name, event_id, http_handler_url) kapacitor_http_request_body = fill_http_post_handler_vars(handler_id, http_handler_url) response = post(kapacitor_api_handlers_url, data=kapacitor_http_request_body) response_content = response.json() diff --git a/src/service/clmcservice/resources/TICKscript/deadman_template.tick b/src/service/clmcservice/resources/TICKscript/deadman_template.tick new file mode 100644 index 0000000000000000000000000000000000000000..a772b8199dc6737f2e2c4e4dcb87237380305c52 --- /dev/null +++ b/src/service/clmcservice/resources/TICKscript/deadman_template.tick @@ -0,0 +1,26 @@ +var db string // database per service function chain, so db is named after sfc + +var rp = 'autogen' // default value for the retention policy + +var measurement string + +var whereClause = lambda: True // default value is a function which returns TRUE, hence no filtering of the query result + +var messageValue = 'TRUE' // default value is TRUE, as this is what SFEMC expects as a notification for an event rule + +var alertPeriod duration + +var throughputThreshold float // alerts will trigger if data points reported durign the alert period fall bellow this value + +var topicID string + + +stream + | from() + .database(db) + .retentionPolicy(rp) + .measurement(measurement) + .where(whereClause) + | deadman(throughputThreshold, alertPeriod) + .message(messageValue) + .topic(topicID) diff --git a/src/service/clmcservice/resources/TICKscript/relative_template.tick b/src/service/clmcservice/resources/TICKscript/relative_template.tick new file mode 100644 index 0000000000000000000000000000000000000000..e158ab9d2cd6c6a15dec29f2e30c1d895c7dfef0 --- /dev/null +++ b/src/service/clmcservice/resources/TICKscript/relative_template.tick @@ -0,0 +1,43 @@ +var db string // database per service function chain, so db is named after sfc + +var rp = 'autogen' // default value for the retention policy + +var measurement string + +var selectLambda lambda // must be a lambda specifying the field to select e.g. "requests" + +var whereClause = lambda: True // default value is a function which returns TRUE, hence no filtering of the query result + +var messageValue = 'TRUE' // default value is TRUE, as this is what SFEMC expects as a notification for an event rule + +var comparisonLambda lambda // comparison function e.g. "diff" > 40 + +var alertPeriod duration + +var topicID string + + +var data = stream + | from() + .database(db) + .retentionPolicy(rp) + .measurement(measurement) + .where(whereClause) + | eval(selectLambda) + .as('value') + +var past = data + | shift(alertPeriod) + +var current = data + +past + | join(current) // NOTE: join buffers a given data point until a point with the correct timestamp to join on arrives + .as('past', 'current') + | eval(lambda: float("current.value" - "past.value")) + .keep() + .as('diff') + | alert() + .crit(comparisonLambda) + .message(messageValue) + .topic(topicID) \ No newline at end of file diff --git a/src/service/clmcservice/resources/TICKscript/threshold_template.tick b/src/service/clmcservice/resources/TICKscript/threshold_template.tick index ed1e6468a9a88f2c19a203d43180be238e765e60..3118be43061029e76844abc267685ddda2216c44 100644 --- a/src/service/clmcservice/resources/TICKscript/threshold_template.tick +++ b/src/service/clmcservice/resources/TICKscript/threshold_template.tick @@ -12,7 +12,7 @@ var whereClause = 'TRUE' // default value is TRUE, hence no filtering of the qu var messageValue = 'TRUE' // default value is TRUE, as this is what SFEMC expects as a notification for an event rule -var comparisonLambda lambda +var comparisonLambda lambda // comparison function e.g. "real_value" > 40 var alertPeriod duration