Skip to content
Snippets Groups Projects
views.py 12.4 KiB
Newer Older
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
//      Created By :            Nikolay Stanchev
//      Created Date :          14-08-2018
//      Created for Project :   FLAME
"""

# Python standard libs
import logging
from urllib.parse import urlparse

# PIP installed libs
from pyramid.httpexceptions import HTTPBadRequest
from pyramid.view import view_defaults, view_config
from yaml import load, YAMLError
from toscaparser.tosca_template import ToscaTemplate
from requests import post

# CLMC-service imports
from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, TICKScriptTemplateFiller, fill_http_post_handler_vars
from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS,  validate_clmc_alerts_specification

# initialise logger
log = logging.getLogger('service_logger')


@view_defaults(renderer='json')
class AlertsConfigurationAPI(object):
    """
    A class-based view for configuring alerts within CLMC.
    """

    def __init__(self, request):
        """
        Initialises the instance of the view with the request argument.

        :param request: client's call request
        """

        self.request = request

    @view_config(route_name='alerts_configuration', request_method='POST')
    def post_alerts_specification(self):
        """
        The view for receiving and configuring alerts based on the TOSCA alerts specification document. This endpoint must also receive the TOSCA resources specification document for validation.

        :raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as alert-spec representing the TOSCA Alerts Specification
        """

        kapacitor_host, kapacitor_port = self.request.registry.settings['kapacitor_host'], self.request.registry.settings['kapacitor_port']

        alert_spec_reference = self.request.POST.get('alert-spec')

        # check that the specification file was sent
        if not hasattr(alert_spec_reference, "file") or not hasattr(alert_spec_reference, "filename"):
            raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'alert-spec' representing the TOSCA Alerts Specification.")

        # extract alert specification file and filename
        alerts_input_filename = alert_spec_reference.filename
        alerts_input_file = alert_spec_reference.file
        if not alerts_input_filename.lower().endswith('.yaml'):
            raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'alert-spec' representing the TOSCA Alerts Specification.")

        # parse the alerts specification file
            alerts_yaml_content = load(alerts_input_file)
            adjust_tosca_definitions_import(alerts_yaml_content)
            log.error("Couldn't parse user request file {0} to yaml format due to error: {1}".format(alerts_input_filename, err))
            log.error("Invalid content is: {0}".format(alerts_input_file.read()))
            raise HTTPBadRequest("Request alert specification file could not be parsed as valid YAML document.")

        try:
            tosca_tpl = ToscaTemplate(yaml_dict_tpl=alerts_yaml_content)
        except Exception as e:
            log.error(e)
            raise HTTPBadRequest("Request alert specification file could not be parsed as a valid TOSCA document.")

        valid_alert_spec = validate_clmc_alerts_specification(tosca_tpl.tpl)
        if not valid_alert_spec:
            raise HTTPBadRequest("Request alert specification file could not be validated as a CLMC TOSCA alerts specification document.")

        sfc, sfc_instance = tosca_tpl.tpl["metadata"]["sfc"], tosca_tpl.tpl["metadata"]["sfci"]
        db = sfc  # ASSUMPTION: database per service function chain, named after the service function chain ID

        # two lists to keep track of any errors while interacting with the Kapacitor HTTP API
        alert_tasks_errors = []
        alert_handlers_errors = []

        # iterate through every policy and extract all triggers of the given policy
        self._config_kapacitor_alerts(tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, alert_tasks_errors, alert_handlers_errors)

        return_msg = {"msg": "Alerts specification has been successfully validated and forwarded to Kapacitor", "service_function_chain_id": sfc,
                      "service_function_chain_instance_id": sfc_instance}

        if len(alert_tasks_errors) > 0:
            return_msg["triggers_specification_errors"] = alert_tasks_errors

        if len(alert_handlers_errors) > 0:
            return_msg["triggers_action_errors"] = alert_handlers_errors

        return return_msg

    def _config_kapacitor_alerts(self, tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, alert_tasks_errors, alert_handlers_errors):
        """
        Configures the alerts task and alert handlers within Kapacitor.

        :param tosca_tpl: the parsed Tosca template object
        :param sfc: sfc ID
        :param sfc_instance: sfc instance ID
        :param db: Influx database ID
        :param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor)
        :param kapacitor_port: default value to use is 9092
        :param alert_tasks_errors: the list for tracking errors while interacting with Kapacitor tasks
        :param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers
        """

        for policy in tosca_tpl.policies:
            for trigger in policy.triggers:
                event_id = trigger.name
                event_type = trigger.trigger_tpl["event_type"]
                template_id = "{0}-template".format(event_type)
                measurement, field = trigger.trigger_tpl["metric"].split(".")

                condition = trigger.trigger_tpl["condition"]
                critical_value = float(condition["threshold"])
                alert_period = "{0}s".format(condition["granularity"])
                influx_function = condition.get("aggregation_method", "mean")  # if not specified, use "mean"

                # check for tag filtering
                where_clause = None
                if "resource_type" in trigger.trigger_tpl["condition"]:
                    # make sure alert tasks are executing with queries for the given sfc and sfc instance
                    # tags["sfc"] = sfc TODO uncomment this line when we updated telegraf to name db after sfc
                    # tags["sfci"] = sfc_instance TODO uncomment this line when telegraf global tags are updated, currently we have sfc_i instead of sfci

                    # NOTE: if the template has its where clause defined as lambda (stream templates), then use "==" as comparison operator,
                    #       else if the template's where clause is defined as a string (batch templates), then use "=" as comparison operator
                    comparison_operator = TICKScriptTemplateFiller.get_comparison_operator(event_type)  # retrieves the correct comparison operator to use for building the where clause

                    # build up the where clause from the tags dictionary
                    where_clause = " AND ".join(map(lambda tag_name: '"{0}"{1}\'{2}\''.format(tag_name, comparison_operator, tags[tag_name]), tags))

                comparison_operator = COMPARISON_OPERATORS[condition.get("comparison_operator", "gte")]  # if not specified, use "gte" (>=)

                # generate topic and alert identifiers
                topic_id = "{0}.{1}.{2}".format(sfc, sfc_instance, event_id)  # scoped per service function chain instance (no two sfc instances report to the same topic)
                alert_id = "{0}.{1}.{2}.{3}".format(sfc, sfc_instance, policy.name, event_id)
                # built up the template vars dictionary depending on the event type (threshold, relative, etc.)
                # all extracted properties from the trigger are passed, the TICKScriptTemplateFiller entry point then forwards those to the appropriate function for template filling
                template_vars = TICKScriptTemplateFiller.fill_template_vars(event_type, db=db, measurement=measurement, field=field, influx_function=influx_function,
                                                                            critical_value=critical_value, comparison_operator=comparison_operator, alert_period=alert_period,
                                                                            topic_id=topic_id, where_clause=where_clause)

                # create and activate alert task through the kapacitor HTTP API
                kapacitor_api_tasks_url = "http://{0}:{1}/kapacitor/v1/tasks".format(kapacitor_host, kapacitor_port)
                kapacitor_http_request_body = {
                    "id": alert_id,
                    "template-id": template_id,
                    "dbrps": [{"db": db, "rp": "autogen"}],
                    "status": "enabled",
                    "vars": template_vars
                }

                # send the request and receive a response
                response = post(kapacitor_api_tasks_url, json=kapacitor_http_request_body)
                response_content = response.json()
                # log the response
                log.info(response_content, response.status_code)
                # track all reported errors
                if response_content.get("error", "") != "":
                    alert_tasks_errors.append({
                        "policy": policy.name,
                        "trigger": event_id,
                        "error": response_content.get("error")
                    })

                # extract http handlers
                http_handlers = trigger.trigger_tpl["action"]["implementation"]
                # subscribe all http handlers to the created topic
                self._config_kapacitor_alert_handlers(kapacitor_host, kapacitor_port, policy.name, topic_id, event_id, http_handlers, alert_handlers_errors)
    def _config_kapacitor_alert_handlers(self, kapacitor_host, kapacitor_port, policy_id, topic_id, event_id, http_handlers, alert_handlers_errors):
        """
        Handles the configuration of HTTP Post alert handlers.

        :param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor)
        :param kapacitor_port: default value to use is 9092
        :param policy_id: policy ID those triggers relate to
        :param topic_id: topic ID built of sfc, sfc instance and event_id
        :param event_id: name of trigger
        :param http_handlers: list of handlers to subscribe
        :param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers
        """
        kapacitor_api_handlers_url = "http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers".format(kapacitor_host, kapacitor_port, topic_id)
        for http_handler_url in http_handlers:
            http_handler_host = urlparse(http_handler_url).hostname
            handler_id = "{0}.{1}.{2}".format(policy_id, event_id, http_handler_host)
            kapacitor_http_request_body = fill_http_post_handler_vars(handler_id, http_handler_url)
            response = post(kapacitor_api_handlers_url, json=kapacitor_http_request_body)
            response_content = response.json()
            log.info(response_content, response.status_code)

            if response_content.get("error", "") != "":
                alert_handlers_errors.append({
                    "policy": policy_id,
                    "trigger": event_id,
                    "handler": http_handler_url,
                    "error": response_content.get("error")
                })