Skip to content
Snippets Groups Projects
Commit 2f8634bf authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Implementation for the alerts API - TODO unit and integration testing

parent 51ec331e
No related branches found
No related tags found
No related merge requests found
...@@ -78,6 +78,7 @@ topology_template: ...@@ -78,6 +78,7 @@ topology_template:
implementation: implementation:
- http://sfemc.flame.eu/notify - http://sfemc.flame.eu/notify
- http://companyA.alert-handler.flame.eu/high-latency - http://companyA.alert-handler.flame.eu/high-latency
- low_requests_policy: - low_requests_policy:
type: eu.ict-flame.policies.StateChange type: eu.ict-flame.policies.StateChange
triggers: triggers:
...@@ -100,6 +101,62 @@ topology_template: ...@@ -100,6 +101,62 @@ topology_template:
implementation: implementation:
- http://sfemc.flame.eu/notify - http://sfemc.flame.eu/notify
- http://companyA.alert-handler.flame.eu/low-requests - http://companyA.alert-handler.flame.eu/low-requests
- requests_diff_policy:
type: eu.ict-flame.policies.StateChange
triggers:
increase_in_requests:
description: |
This event triggers when the number of requests has increased relative to the number of requests received
120 seconds ago.
event_type: relative
metric: storage.requests
condition:
threshold: 100 # requests have increased by at least 100
granularity: 120
resource_type:
sf_package: storage
sf: storage-users
location: watershed
comparison_operator: gte
action:
implementation:
- http://sfemc.flame.eu/notify
decrease_in_requests:
description: |
This event triggers when the number of requests has decreased relative to the number of requests received
120 seconds ago.
event_type: relative
metric: storage.requests
condition:
threshold: -100 # requests have decreased by at least 100
granularity: 120
resource_type:
sf_package: storage
sf: storage-users
location: watershed
comparison_operator: lte
action:
implementation:
- http://sfemc.flame.eu/notify
- missing_measurement_policy:
type: eu.ict-flame.policies.StateChange
triggers:
missing_storage_measurements:
description: This event triggers when the number of storage measurements reported falls below the threshold value.
event_type: deadman
# deadman trigger instances monitor the whole measurement (storage in this case), so simply put a star for field value
# to be compliant with the <measurement>.<field> format
metric: storage.*
condition:
threshold: 0 # if requests are less than or equal to 0 (in other words, no measurements are reported)
granularity: 60 # check for for missing data for the last 60 seconds
resource_type:
sf_package: storage
action:
implementation:
- http://sfemc.flame.eu/notify
``` ```
......
...@@ -22,13 +22,11 @@ ...@@ -22,13 +22,11 @@
// Created for Project : FLAME // Created for Project : FLAME
""" """
# Python standard libs # Python standard libs
from re import compile, IGNORECASE from re import compile, IGNORECASE
# PIP installed libs # PIP installed libs
from schema import Schema, And, Or, Optional from schema import Schema, And, Or, Optional, SchemaError
""" """
This module defines the schema objects for the TOSCA Alert Specification: This module defines the schema objects for the TOSCA Alert Specification:
...@@ -42,7 +40,6 @@ This module defines the schema objects for the TOSCA Alert Specification: ...@@ -42,7 +40,6 @@ This module defines the schema objects for the TOSCA Alert Specification:
* the condition section must specify threshold, granularity, aggregation_method, comparison_operator * the condition section must specify threshold, granularity, aggregation_method, comparison_operator
""" """
# Influx QL functions defined in the documentation https://docs.influxdata.com/influxdb/v1.6/query_language/functions/ # Influx QL functions defined in the documentation https://docs.influxdata.com/influxdb/v1.6/query_language/functions/
INFLUX_QL_FUNCTIONS = ( INFLUX_QL_FUNCTIONS = (
"count", "mean", "median", "mode", "sum", "first", "last", "max", "min" "count", "mean", "median", "mode", "sum", "first", "last", "max", "min"
...@@ -64,11 +61,9 @@ URL_REGEX = compile( ...@@ -64,11 +61,9 @@ URL_REGEX = compile(
r'(?:[/?#][^\s]*)?$', # URL path or query parameters r'(?:[/?#][^\s]*)?$', # URL path or query parameters
IGNORECASE) IGNORECASE)
# Global tags allowed to be used for filtering in the trigger condition # Global tags allowed to be used for filtering in the trigger condition
CLMC_INFORMATION_MODEL_GLOBAL_TAGS = ("sfc", "sfci", "sf_package", "sf", "sf_endpoint", "host", "location") CLMC_INFORMATION_MODEL_GLOBAL_TAGS = ("sfc", "sfci", "sf_package", "sf", "sf_endpoint", "host", "location")
ALERTS_SPECIFICATION_SCHEMA = Schema({ ALERTS_SPECIFICATION_SCHEMA = Schema({
"tosca_definitions_version": And(str, lambda v: v == "tosca_simple_profile_for_nfv_1_0_0"), "tosca_definitions_version": And(str, lambda v: v == "tosca_simple_profile_for_nfv_1_0_0"),
Optional("description"): str, Optional("description"): str,
...@@ -90,11 +85,11 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({ ...@@ -90,11 +85,11 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({
"condition": { "condition": {
"threshold": Or(int, float), "threshold": Or(int, float),
"granularity": int, "granularity": int,
"aggregation_method": And(str, lambda s: s in INFLUX_QL_FUNCTIONS), Optional("aggregation_method"): And(str, lambda s: s in INFLUX_QL_FUNCTIONS),
Optional("resource_type"): { Optional("resource_type"): {
And(str, lambda s: s in CLMC_INFORMATION_MODEL_GLOBAL_TAGS): str And(str, lambda s: s in CLMC_INFORMATION_MODEL_GLOBAL_TAGS): str
}, },
"comparison_operator": And(str, lambda s: s in COMPARISON_OPERATORS) Optional("comparison_operator"): And(str, lambda s: s in COMPARISON_OPERATORS)
}, },
"action": { "action": {
"implementation": "implementation":
...@@ -111,79 +106,24 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({ ...@@ -111,79 +106,24 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({
}) })
def fill_http_post_handler_vars(handler_id, handler_url): def validate_clmc_alerts_specification(tosca_yaml_tpl, include_error=False):
"""
Creates a dictionary object ready to be posted to kapacitor to create an alert handler.
:param handler_id: handler identifier
:param handler_url: url to post alerts to
:return: a dictionary object ready to be posted to kapacitor to create an alert handler.
""" """
CLMC validation for the TOSCA alerts specification, uses the schema defined in alerts_specification_schema.py
return { :param tosca_yaml_tpl: the tosca template to validate (as python dictionary object)
"id": handler_id, :param include_error: a flag indicating whether the output of the function should include a caught SchemaError
"kind": "post", (if set to True and no error is thrown, returns None as the error object)
"options": {
"url": handler_url
}
}
def fill_threshold_template_vars(db, measurement, field, influx_function, critical_value, comparison_operator, alert_period, topic_id, where_clause=None): :return: True/False if the tosca_tpl is valid/invalid along with any error (None if no error) that was thrown during validation (if argument include_error is set to True)
"""
Creates a dictionary object ready to be posted to kapacitor to create a task from template.
:param db: db name
:param measurement: measurement name
:param field: field name
:param influx_function: influx function to use for querying
:param critical_value: critical value to compare with
:param comparison_operator: type of comparison
:param alert_period: alert period to query influx
:param topic_id: topic identifier
:param where_clause: optional argument for filtering the influx query by tag values
:return: a dictionary object ready to be posted to kapacitor to create a task from template.
""" """
comparison_lambda = "\"real_value\" {0} {1}".format(comparison_operator, critical_value) # build up lambda string, e.g. "real_value" >= 10 try:
ALERTS_SPECIFICATION_SCHEMA.validate(tosca_yaml_tpl)
tempalte_vars = { valid, err = True, None
"db": { except SchemaError as e:
"type": "string", valid, err = False, e
"value": db
},
"measurement": {
"type": "string",
"value": measurement
},
"field": {
"type": "string",
"value": field
},
"influxFunction": {
"type": "string",
"value": influx_function
},
"comparisonLambda": {
"type": "lambda",
"value": comparison_lambda
},
"alertPeriod": {
"type": "duration",
"value": alert_period
},
"topicID": {
"type": "string",
"value": topic_id
}
}
if where_clause is not None:
tempalte_vars["whereClause"] = {
"type": "string",
"value": where_clause
}
return tempalte_vars if include_error:
return valid, err
else:
return valid
...@@ -26,12 +26,8 @@ ...@@ -26,12 +26,8 @@
# Python standard libs # Python standard libs
from os.path import join from os.path import join
# PIP installed libs
from schema import SchemaError
# CLMC-service imports # CLMC-service imports
from clmcservice import ROOT_DIR from clmcservice import ROOT_DIR
from clmcservice.alertsapi.alerts_specification_schema import ALERTS_SPECIFICATION_SCHEMA
CLMC_ALERTS_TOSCA_DEFINITIONS_REL_PATH = ["resources", "tosca", "flame_clmc_alerts_definitions.yaml"] CLMC_ALERTS_TOSCA_DEFINITIONS_REL_PATH = ["resources", "tosca", "flame_clmc_alerts_definitions.yaml"]
...@@ -58,24 +54,207 @@ def adjust_tosca_definitions_import(alert_spec): ...@@ -58,24 +54,207 @@ def adjust_tosca_definitions_import(alert_spec):
pass # nothing to replace if the import is not specified (either imports are missed, or no reference to the clmc tosca definitions file) pass # nothing to replace if the import is not specified (either imports are missed, or no reference to the clmc tosca definitions file)
def validate_clmc_alerts_specification(tosca_yaml_tpl, include_error=False): def fill_http_post_handler_vars(handler_id, handler_url):
""" """
CLMC validation for the TOSCA alerts specification, uses the schema defined in alerts_specification_schema.py Creates a dictionary object ready to be posted to kapacitor to create an alert handler.
:param tosca_yaml_tpl: the tosca template to validate (as python dictionary object) :param handler_id: handler identifier
:param include_error: a flag indicating whether the output of the function should include a caught SchemaError :param handler_url: url to post alerts to
(if set to True and no error is thrown, returns None as the error object)
:return: True/False if the tosca_tpl is valid/invalid along with any error (None if no error) that was thrown during validation (if argument include_error is set to True) :return: a dictionary object ready to be posted to kapacitor to create an alert handler.
""" """
try: return {
ALERTS_SPECIFICATION_SCHEMA.validate(tosca_yaml_tpl) "id": handler_id,
valid, err = True, None "kind": "post",
except SchemaError as e: "options": {
valid, err = False, e "url": handler_url
}
if include_error: }
return valid, err
else:
return valid class TICKScriptTemplateFiller:
"""
A utility class used for TICK script templates filtering.
"""
@staticmethod
def fill_template_vars(template_type, **kwargs):
"""
A utility function acting as an entry poiny to the fill_<template_type>_template_vars() functions defined below.
:param template_type: the template type - e.g.
:param kwargs: keyword arguments to forward to the actual function that will be used
:return: the result of the actual function that will be used.
"""
fill_function_name = "_fill_{0}_template_vars".format(template_type)
fill_function = getattr(TICKScriptTemplateFiller, fill_function_name) # python functions are first-class objects !
# TODO is this the best way to avoid long if-else chains ?
return fill_function(**kwargs)
@staticmethod
def _fill_threshold_template_vars(db=None, measurement=None, field=None, influx_function=None, critical_value=None,
comparison_operator=None, alert_period=None, topic_id=None, where_clause=None):
"""
Creates a dictionary object ready to be posted to kapacitor to create a "threshold" task from template.
:param db: db name
:param measurement: measurement name
:param field: field name
:param influx_function: influx function to use for querying
:param critical_value: critical value to compare with
:param comparison_operator: type of comparison
:param alert_period: alert period to query influx
:param topic_id: topic identifier
:param where_clause: (OPTIONAL) argument for filtering the influx query by tag values
:return: a dictionary object ready to be posted to kapacitor to create a "threshold" task from template.
"""
comparison_lambda = "\"real_value\" {0} {1}".format(comparison_operator, critical_value) # build up lambda string, e.g. "real_value" >= 10
template_vars = {
"db": {
"type": "string",
"value": db
},
"measurement": {
"type": "string",
"value": measurement
},
"field": {
"type": "string",
"value": field
},
"influxFunction": {
"type": "string",
"value": influx_function
},
"comparisonLambda": {
"type": "lambda",
"value": comparison_lambda
},
"alertPeriod": {
"type": "duration",
"value": alert_period
},
"topicID": {
"type": "string",
"value": topic_id
}
}
if where_clause is not None:
template_vars["whereClause"] = {
"type": "string",
"value": where_clause
}
return template_vars
@staticmethod
def _fill_relative_template_vars(db=None, measurement=None, field=None, critical_value=None, comparison_operator=None,
alert_period=None, topic_id=None, where_clause=None):
"""
Creates a dictionary object ready to be posted to kapacitor to create a "relative" task from template.
:param db: db name
:param measurement: measurement name
:param field: field name
:param critical_value: critical value to compare with
:param comparison_operator: type of comparison
:param alert_period: alert period to use for relative comparison
:param topic_id: topic identifier
:param where_clause: (OPTIONAL) argument for filtering the influx query by tag values
:return: a dictionary object ready to be posted to kapacitor to create a "relative" task from template.
"""
select_lambda = '"{0}"'.format(field)
comparison_lambda = '"diff" {0} {1}'.format(comparison_operator, critical_value)
template_vars = {
"db": {
"type": "string",
"value": db
},
"measurement": {
"type": "string",
"value": measurement
},
"selectLambda": {
"type": "lambda",
"value": select_lambda
},
"comparisonLambda": {
"type": "lambda",
"value": comparison_lambda
},
"alertPeriod": {
"type": "duration",
"value": alert_period
},
"topicID": {
"type": "string",
"value": topic_id
}
}
if where_clause is not None:
template_vars["whereClause"] = {
"type": "lambda",
"value": where_clause
}
return template_vars
@staticmethod
def _fill_deadman_template_vars(db=None, measurement=None, critical_value=None, alert_period=None, topic_id=None, where_clause=None):
"""
Creates a dictionary object ready to be posted to kapacitor to create a "deadman" task from template.
:param db: db name
:param measurement: measurement name
:param critical_value: critical value to compare with
:param alert_period: alert period to use for relative comparison
:param topic_id: topic identifier
:param where_clause: (OPTIONAL) argument for filtering the influx query by tag values
:return: a dictionary object ready to be posted to kapacitor to create a "deadman" task from template.
"""
template_vars = {
"db": {
"type": "string",
"value": db
},
"measurement": {
"type": "string",
"value": measurement
},
"alertPeriod": {
"type": "duration",
"value": alert_period
},
"throughputThreshold": {
"type": "int",
"value": critical_value
},
"topicID": {
"type": "string",
"value": topic_id
}
}
if where_clause is not None:
template_vars["whereClause"] = {
"type": "lambda",
"value": where_clause
}
return template_vars
...@@ -33,8 +33,8 @@ from toscaparser.tosca_template import ToscaTemplate ...@@ -33,8 +33,8 @@ from toscaparser.tosca_template import ToscaTemplate
from requests import post from requests import post
# CLMC-service imports # CLMC-service imports
from clmcservice.alertsapi.utilities import validate_clmc_alerts_specification, adjust_tosca_definitions_import from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, TICKScriptTemplateFiller, fill_http_post_handler_vars
from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, fill_threshold_template_vars, fill_http_post_handler_vars from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, validate_clmc_alerts_specification
# initialise logger # initialise logger
log = logging.getLogger('service_logger') log = logging.getLogger('service_logger')
...@@ -99,37 +99,53 @@ class AlertsConfigurationAPI(object): ...@@ -99,37 +99,53 @@ class AlertsConfigurationAPI(object):
event_type = trigger.trigger_tpl["event_type"] event_type = trigger.trigger_tpl["event_type"]
template_id = "{0}_template".format(event_type) template_id = "{0}_template".format(event_type)
measurement, field = trigger.trigger_tpl["metric"].split(".") measurement, field = trigger.trigger_tpl["metric"].split(".")
critical_value = float(trigger.trigger_tpl["condition"]["threshold"])
alert_period = "{0}s".format(trigger.trigger_tpl["condition"]["granularity"]) condition = trigger.trigger_tpl["condition"]
influx_function = trigger.trigger_tpl["condition"]["aggregation_method"] critical_value = float(condition["threshold"])
alert_period = "{0}s".format(condition["granularity"])
influx_function = condition.get("aggregation_method", "mean") # if not specified, use "mean"
# check for tag filtering
where_clause = None where_clause = None
if "resource_type" in trigger.trigger_tpl["condition"]: if "resource_type" in trigger.trigger_tpl["condition"]:
tags = trigger.trigger_tpl["conditon"]["resource_type"] tags = condition["resource_type"]
where_clause = " AND ".join(map(lambda tag_name: "{0}={1}".format(tag_name, tags[tag_name]), tags)) where_clause = " AND ".join(map(lambda tag_name: '"{0}"=\'{1}\''.format(tag_name, tags[tag_name]), tags))
comparison_operator = COMPARISON_OPERATORS[trigger.trigger_tpl["condition"]["comparison_operator"]]
http_handlers = trigger.trigger_tpl["action"]["implementation"] comparison_operator = COMPARISON_OPERATORS[condition.get("comparison_operator", "gte")] # if not specified, use "gte" (>=)
topic_id = "{0}___{1}___{2}".format(sfc, sfc_instance, event_id) # scoped per service function chain instance (no two sfc instances report to the same topic)
alert_id = "{0}___{1}".format(policy.name, event_id)
# create and activate alert task # generate topic and alert identifiers
topic_id = "{0}.{1}.{2}".format(sfc, sfc_instance, event_id) # scoped per service function chain instance (no two sfc instances report to the same topic)
alert_id = "{0}.{1}.{2}.{3}".format(sfc, sfc_instance, policy.name, event_id)
# built up the template vars dictionary depending on the event type (threshold, relative, etc.)
# all extracted properties from the trigger are passed, the TICKScriptTemplateFiller entry point then forwards those to the appropriate function
template_vars = TICKScriptTemplateFiller.fill_template_vars(event_type, db=db, measurement=measurement, field=field, influx_function=influx_function,
critical_value=critical_value, comparison_operator=comparison_operator, alert_period=alert_period,
topic_id=topic_id, where_clause=where_clause)
# create and activate alert task through the kapacitor HTTP API
kapacitor_api_tasks_url = "http://localhost:9092/kapacitor/v1/tasks" kapacitor_api_tasks_url = "http://localhost:9092/kapacitor/v1/tasks"
if event_type == "threshold":
kapacitor_http_request_body = { kapacitor_http_request_body = {
"id": alert_id, "id": alert_id,
"template-id": template_id, "template-id": template_id,
"dbrps": [{"db": db, "rp": "autogen"}], "dbrps": [{"db": db, "rp": "autogen"}],
"status": "enabled", "status": "enabled",
"vars": fill_threshold_template_vars(db, measurement, field, influx_function, critical_value, "vars": template_vars
comparison_operator, alert_period, topic_id, where_clause=where_clause)
} }
# send the request and receive a response
response = post(kapacitor_api_tasks_url, data=kapacitor_http_request_body) response = post(kapacitor_api_tasks_url, data=kapacitor_http_request_body)
response_content = response.json() response_content = response.json()
# log the response
log.info(response_content) log.info(response_content)
# exttranc http handlers
http_handlers = trigger.trigger_tpl["action"]["implementation"]
# subscribe all http handlers to the created topic
kapacitor_api_handlers_url = "http://localhost:9092/kapacitor/v1/alerts/topics/{0}/handlers".format(topic_id) kapacitor_api_handlers_url = "http://localhost:9092/kapacitor/v1/alerts/topics/{0}/handlers".format(topic_id)
for http_handler_url in http_handlers: for http_handler_url in http_handlers:
handler_id = "{0}___{1}___{2}".format(policy.name, event_id, http_handler_url) handler_id = "{0}.{1}.{2}".format(policy.name, event_id, http_handler_url)
kapacitor_http_request_body = fill_http_post_handler_vars(handler_id, http_handler_url) kapacitor_http_request_body = fill_http_post_handler_vars(handler_id, http_handler_url)
response = post(kapacitor_api_handlers_url, data=kapacitor_http_request_body) response = post(kapacitor_api_handlers_url, data=kapacitor_http_request_body)
response_content = response.json() response_content = response.json()
......
var db string // database per service function chain, so db is named after sfc
var rp = 'autogen' // default value for the retention policy
var measurement string
var whereClause = lambda: True // default value is a function which returns TRUE, hence no filtering of the query result
var messageValue = 'TRUE' // default value is TRUE, as this is what SFEMC expects as a notification for an event rule
var alertPeriod duration
var throughputThreshold float // alerts will trigger if data points reported durign the alert period fall bellow this value
var topicID string
stream
| from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.where(whereClause)
| deadman(throughputThreshold, alertPeriod)
.message(messageValue)
.topic(topicID)
var db string // database per service function chain, so db is named after sfc
var rp = 'autogen' // default value for the retention policy
var measurement string
var selectLambda lambda // must be a lambda specifying the field to select e.g. "requests"
var whereClause = lambda: True // default value is a function which returns TRUE, hence no filtering of the query result
var messageValue = 'TRUE' // default value is TRUE, as this is what SFEMC expects as a notification for an event rule
var comparisonLambda lambda // comparison function e.g. "diff" > 40
var alertPeriod duration
var topicID string
var data = stream
| from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.where(whereClause)
| eval(selectLambda)
.as('value')
var past = data
| shift(alertPeriod)
var current = data
past
| join(current) // NOTE: join buffers a given data point until a point with the correct timestamp to join on arrives
.as('past', 'current')
| eval(lambda: float("current.value" - "past.value"))
.keep()
.as('diff')
| alert()
.crit(comparisonLambda)
.message(messageValue)
.topic(topicID)
\ No newline at end of file
...@@ -12,7 +12,7 @@ var whereClause = 'TRUE' // default value is TRUE, hence no filtering of the qu ...@@ -12,7 +12,7 @@ var whereClause = 'TRUE' // default value is TRUE, hence no filtering of the qu
var messageValue = 'TRUE' // default value is TRUE, as this is what SFEMC expects as a notification for an event rule var messageValue = 'TRUE' // default value is TRUE, as this is what SFEMC expects as a notification for an event rule
var comparisonLambda lambda var comparisonLambda lambda // comparison function e.g. "real_value" > 40
var alertPeriod duration var alertPeriod duration
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment