Newer
Older
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 14-08-2018
// Created for Project : FLAME
"""
# Python standard libs
import logging
# PIP installed libs
from pyramid.httpexceptions import HTTPBadRequest
from pyramid.view import view_defaults, view_config
from yaml import load, YAMLError
from toscaparser.tosca_template import ToscaTemplate
from requests import post
# CLMC-service imports
from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, TICKScriptTemplateFiller, fill_http_post_handler_vars
from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, validate_clmc_alerts_specification
# initialise logger
log = logging.getLogger('service_logger')
@view_defaults(renderer='json')
class AlertsConfigurationAPI(object):
"""
A class-based view for configuring alerts within CLMC.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
@view_config(route_name='alerts_configuration', request_method='POST')
def post_alerts_specification(self):
"""
The view for receiving and configuring alerts based on the TOSCA alerts specification document. This endpoint must also receive the TOSCA resources specification document for validation.
:raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as alert-spec representing the TOSCA Alerts Specification
"""
Nikolay Stanchev
committed
kapacitor_host, kapacitor_port = self.request.registry.settings['kapacitor_host'], self.request.registry.settings['kapacitor_port']
alert_spec_reference = self.request.POST.get('alert-spec')
# check that the specification file was sent
if not hasattr(alert_spec_reference, "file") or not hasattr(alert_spec_reference, "filename"):
raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'alert-spec' representing the TOSCA Alerts Specification.")
# extract alert specification file and filename
alerts_input_filename = alert_spec_reference.filename
alerts_input_file = alert_spec_reference.file
if not alerts_input_filename.lower().endswith('.yaml'):
raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'alert-spec' representing the TOSCA Alerts Specification.")
alerts_yaml_content = load(alerts_input_file)
adjust_tosca_definitions_import(alerts_yaml_content)
except YAMLError as err:
log.error("Couldn't parse user request file {0} to yaml format due to error: {1}".format(alerts_input_filename, err))
log.error("Invalid content is: {0}".format(alerts_input_file.read()))
raise HTTPBadRequest("Request alert specification file could not be parsed as valid YAML document.")
try:
tosca_tpl = ToscaTemplate(yaml_dict_tpl=alerts_yaml_content)
except Exception as e:
log.error(e)
raise HTTPBadRequest("Request alert specification file could not be parsed as a valid TOSCA document.")
valid_alert_spec = validate_clmc_alerts_specification(tosca_tpl.tpl)
if not valid_alert_spec:
raise HTTPBadRequest("Request alert specification file could not be validated as a CLMC TOSCA alerts specification document.")
sfc, sfc_instance = tosca_tpl.tpl["metadata"]["sfc"], tosca_tpl.tpl["metadata"]["sfci"]
db = sfc # ASSUMPTION: database per service function chain, named after the service function chain ID
# two lists to keep track of any errors while interacting with the Kapacitor HTTP API
alert_tasks_errors = []
alert_handlers_errors = []
# iterate through every policy and extract all triggers of the given policy
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
self._config_kapacitor_alerts(tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, alert_tasks_errors, alert_handlers_errors)
return_msg = {"msg": "Alerts specification has been successfully validated and forwarded to Kapacitor", "service_function_chain_id": sfc,
"service_function_chain_instance_id": sfc_instance}
if len(alert_tasks_errors) > 0:
return_msg["triggers_specification_errors"] = alert_tasks_errors
if len(alert_handlers_errors) > 0:
return_msg["triggers_action_errors"] = alert_handlers_errors
return return_msg
def _config_kapacitor_alerts(self, tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, alert_tasks_errors, alert_handlers_errors):
"""
Configures the alerts task and alert handlers within Kapacitor.
:param tosca_tpl: the parsed Tosca template object
:param sfc: sfc ID
:param sfc_instance: sfc instance ID
:param db: Influx database ID
:param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor)
:param kapacitor_port: default value to use is 9092
:param alert_tasks_errors: the list for tracking errors while interacting with Kapacitor tasks
:param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers
"""
for policy in tosca_tpl.policies:
for trigger in policy.triggers:
event_id = trigger.name
event_type = trigger.trigger_tpl["event_type"]
template_id = "{0}-template".format(event_type)
measurement, field = trigger.trigger_tpl["metric"].split(".")
condition = trigger.trigger_tpl["condition"]
critical_value = float(condition["threshold"])
alert_period = "{0}s".format(condition["granularity"])
influx_function = condition.get("aggregation_method", "mean") # if not specified, use "mean"
# check for tag filtering
where_clause = None
if "resource_type" in trigger.trigger_tpl["condition"]:
tags = condition["resource_type"]
# make sure alert tasks are executing with queries for the given sfc and sfc instance
# tags["sfc"] = sfc TODO uncomment this line when we updated telegraf to name db after sfc
# tags["sfci"] = sfc_instance TODO uncomment this line when telegraf global tags are updated, currently we have sfc_i instead of sfci
# NOTE: if the template has its where clause defined as lambda (stream templates), then use "==" as comparison operator,
# else if the template's where clause is defined as a string (batch templates), then use "=" as comparison operator
comparison_operator = TICKScriptTemplateFiller.get_comparison_operator(event_type) # retrieves the correct comparison operator to use for building the where clause
# build up the where clause from the tags dictionary
where_clause = " AND ".join(map(lambda tag_name: '"{0}"{1}\'{2}\''.format(tag_name, comparison_operator, tags[tag_name]), tags))
comparison_operator = COMPARISON_OPERATORS[condition.get("comparison_operator", "gte")] # if not specified, use "gte" (>=)
# generate topic and alert identifiers
topic_id = "{0}.{1}.{2}".format(sfc, sfc_instance, event_id) # scoped per service function chain instance (no two sfc instances report to the same topic)
alert_id = "{0}.{1}.{2}.{3}".format(sfc, sfc_instance, policy.name, event_id)
# built up the template vars dictionary depending on the event type (threshold, relative, etc.)
# all extracted properties from the trigger are passed, the TICKScriptTemplateFiller entry point then forwards those to the appropriate function for template filling
template_vars = TICKScriptTemplateFiller.fill_template_vars(event_type, db=db, measurement=measurement, field=field, influx_function=influx_function,
critical_value=critical_value, comparison_operator=comparison_operator, alert_period=alert_period,
topic_id=topic_id, where_clause=where_clause)
# create and activate alert task through the kapacitor HTTP API
Nikolay Stanchev
committed
kapacitor_api_tasks_url = "http://{0}:{1}/kapacitor/v1/tasks".format(kapacitor_host, kapacitor_port)
kapacitor_http_request_body = {
"id": alert_id,
"template-id": template_id,
"dbrps": [{"db": db, "rp": "autogen"}],
"status": "enabled",
"vars": template_vars
}
# send the request and receive a response
response = post(kapacitor_api_tasks_url, json=kapacitor_http_request_body)
response_content = response.json()
# log the response
log.info(response_content, response.status_code)
# track all reported errors
if response_content.get("error", "") != "":
alert_tasks_errors.append({
"policy": policy.name,
"trigger": event_id,
"error": response_content.get("error")
})
http_handlers = trigger.trigger_tpl["action"]["implementation"]
# subscribe all http handlers to the created topic
self._config_kapacitor_alert_handlers(kapacitor_host, kapacitor_port, policy.name, topic_id, event_id, http_handlers, alert_handlers_errors)
def _config_kapacitor_alert_handlers(self, kapacitor_host, kapacitor_port, policy_id, topic_id, event_id, http_handlers, alert_handlers_errors):
"""
Handles the configuration of HTTP Post alert handlers.
:param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor)
:param kapacitor_port: default value to use is 9092
:param policy_id: policy ID those triggers relate to
:param topic_id: topic ID built of sfc, sfc instance and event_id
:param event_id: name of trigger
:param http_handlers: list of handlers to subscribe
:param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers
"""
kapacitor_api_handlers_url = "http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers".format(kapacitor_host, kapacitor_port, topic_id)
for http_handler_url in http_handlers:
http_handler_host = urlparse(http_handler_url).hostname
handler_id = "{0}.{1}.{2}".format(policy_id, event_id, http_handler_host)
kapacitor_http_request_body = fill_http_post_handler_vars(handler_id, http_handler_url)
response = post(kapacitor_api_handlers_url, json=kapacitor_http_request_body)
response_content = response.json()
log.info(response_content, response.status_code)
if response_content.get("error", "") != "":
alert_handlers_errors.append({
"policy": policy_id,
"trigger": event_id,
"handler": http_handler_url,
"error": response_content.get("error")
})