#!/usr/bin/python3 """ // © University of Southampton IT Innovation Centre, 2018 // // Copyright in this software belongs to University of Southampton // IT Innovation Centre of Gamma House, Enterprise Road, // Chilworth Science Park, Southampton, SO16 7NS, UK. // // This software may not be used, sold, licensed, transferred, copied // or reproduced in whole or in part in any manner or form or in or // on any media by any person other than in accordance with the terms // of the Licence Agreement supplied with the software, or otherwise // without the prior written consent of the copyright owners. // // This software is distributed WITHOUT ANY WARRANTY, without even the // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // PURPOSE, except where stated in the Licence Agreement supplied with // the software. // // Created By : Nikolay Stanchev // Created Date : 14-08-2018 // Created for Project : FLAME """ # Python standard libs import logging from urllib.parse import urlparse # PIP installed libs from pyramid.httpexceptions import HTTPBadRequest from pyramid.view import view_defaults, view_config from yaml import load, YAMLError from toscaparser.tosca_template import ToscaTemplate from requests import post # CLMC-service imports from clmcservice.alertsapi.utilities import adjust_tosca_definitions_import, TICKScriptTemplateFiller, fill_http_post_handler_vars from clmcservice.alertsapi.alerts_specification_schema import COMPARISON_OPERATORS, validate_clmc_alerts_specification # initialise logger log = logging.getLogger('service_logger') @view_defaults(renderer='json') class AlertsConfigurationAPI(object): """ A class-based view for configuring alerts within CLMC. """ def __init__(self, request): """ Initialises the instance of the view with the request argument. :param request: client's call request """ self.request = request @view_config(route_name='alerts_configuration', request_method='POST') def post_alerts_specification(self): """ The view for receiving and configuring alerts based on the TOSCA alerts specification document. This endpoint must also receive the TOSCA resources specification document for validation. :raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as alert-spec representing the TOSCA Alerts Specification """ kapacitor_host, kapacitor_port = self.request.registry.settings['kapacitor_host'], self.request.registry.settings['kapacitor_port'] alert_spec_reference = self.request.POST.get('alert-spec') # check that the specification file was sent if not hasattr(alert_spec_reference, "file") or not hasattr(alert_spec_reference, "filename"): raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'alert-spec' representing the TOSCA Alerts Specification.") # extract alert specification file and filename alerts_input_filename = alert_spec_reference.filename alerts_input_file = alert_spec_reference.file if not alerts_input_filename.lower().endswith('.yaml'): raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'alert-spec' representing the TOSCA Alerts Specification.") # parse the alerts specification file try: alerts_yaml_content = load(alerts_input_file) adjust_tosca_definitions_import(alerts_yaml_content) except YAMLError as err: log.error("Couldn't parse user request file {0} to yaml format due to error: {1}".format(alerts_input_filename, err)) log.error("Invalid content is: {0}".format(alerts_input_file.read())) raise HTTPBadRequest("Request alert specification file could not be parsed as valid YAML document.") try: tosca_tpl = ToscaTemplate(yaml_dict_tpl=alerts_yaml_content) except Exception as e: log.error(e) raise HTTPBadRequest("Request alert specification file could not be parsed as a valid TOSCA document.") valid_alert_spec = validate_clmc_alerts_specification(tosca_tpl.tpl) if not valid_alert_spec: raise HTTPBadRequest("Request alert specification file could not be validated as a CLMC TOSCA alerts specification document.") sfc, sfc_instance = tosca_tpl.tpl["metadata"]["sfc"], tosca_tpl.tpl["metadata"]["sfci"] db = sfc # ASSUMPTION: database per service function chain, named after the service function chain ID # two lists to keep track of any errors while interacting with the Kapacitor HTTP API alert_tasks_errors = [] alert_handlers_errors = [] # iterate through every policy and extract all triggers of the given policy self._config_kapacitor_alerts(tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, alert_tasks_errors, alert_handlers_errors) return_msg = {"msg": "Alerts specification has been successfully validated and forwarded to Kapacitor", "service_function_chain_id": sfc, "service_function_chain_instance_id": sfc_instance} if len(alert_tasks_errors) > 0: return_msg["triggers_specification_errors"] = alert_tasks_errors if len(alert_handlers_errors) > 0: return_msg["triggers_action_errors"] = alert_handlers_errors return return_msg def _config_kapacitor_alerts(self, tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, alert_tasks_errors, alert_handlers_errors): """ Configures the alerts task and alert handlers within Kapacitor. :param tosca_tpl: the parsed Tosca template object :param sfc: sfc ID :param sfc_instance: sfc instance ID :param db: Influx database ID :param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor) :param kapacitor_port: default value to use is 9092 :param alert_tasks_errors: the list for tracking errors while interacting with Kapacitor tasks :param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers """ for policy in tosca_tpl.policies: for trigger in policy.triggers: event_id = trigger.name event_type = trigger.trigger_tpl["event_type"] template_id = "{0}-template".format(event_type) measurement, field = trigger.trigger_tpl["metric"].split(".") condition = trigger.trigger_tpl["condition"] critical_value = float(condition["threshold"]) alert_period = "{0}s".format(condition["granularity"]) influx_function = condition.get("aggregation_method", "mean") # if not specified, use "mean" # check for tag filtering where_clause = None if "resource_type" in trigger.trigger_tpl["condition"]: tags = condition["resource_type"] # make sure alert tasks are executing with queries for the given sfc and sfc instance # tags["sfc"] = sfc TODO uncomment this line when we updated telegraf to name db after sfc # tags["sfci"] = sfc_instance TODO uncomment this line when telegraf global tags are updated, currently we have sfc_i instead of sfci # NOTE: if the template has its where clause defined as lambda (stream templates), then use "==" as comparison operator, # else if the template's where clause is defined as a string (batch templates), then use "=" as comparison operator comparison_operator = TICKScriptTemplateFiller.get_comparison_operator(event_type) # retrieves the correct comparison operator to use for building the where clause # build up the where clause from the tags dictionary where_clause = " AND ".join(map(lambda tag_name: '"{0}"{1}\'{2}\''.format(tag_name, comparison_operator, tags[tag_name]), tags)) comparison_operator = COMPARISON_OPERATORS[condition.get("comparison_operator", "gte")] # if not specified, use "gte" (>=) # generate topic and alert identifiers topic_id = "{0}.{1}.{2}".format(sfc, sfc_instance, event_id) # scoped per service function chain instance (no two sfc instances report to the same topic) alert_id = "{0}.{1}.{2}.{3}".format(sfc, sfc_instance, policy.name, event_id) # built up the template vars dictionary depending on the event type (threshold, relative, etc.) # all extracted properties from the trigger are passed, the TICKScriptTemplateFiller entry point then forwards those to the appropriate function for template filling template_vars = TICKScriptTemplateFiller.fill_template_vars(event_type, db=db, measurement=measurement, field=field, influx_function=influx_function, critical_value=critical_value, comparison_operator=comparison_operator, alert_period=alert_period, topic_id=topic_id, where_clause=where_clause) # create and activate alert task through the kapacitor HTTP API kapacitor_api_tasks_url = "http://{0}:{1}/kapacitor/v1/tasks".format(kapacitor_host, kapacitor_port) kapacitor_http_request_body = { "id": alert_id, "template-id": template_id, "dbrps": [{"db": db, "rp": "autogen"}], "status": "enabled", "vars": template_vars } # send the request and receive a response response = post(kapacitor_api_tasks_url, json=kapacitor_http_request_body) response_content = response.json() # log the response log.info(response_content, response.status_code) # track all reported errors if response_content.get("error", "") != "": alert_tasks_errors.append({ "policy": policy.name, "trigger": event_id, "error": response_content.get("error") }) # extract http handlers http_handlers = trigger.trigger_tpl["action"]["implementation"] # subscribe all http handlers to the created topic self._config_kapacitor_alert_handlers(kapacitor_host, kapacitor_port, policy.name, topic_id, event_id, http_handlers, alert_handlers_errors) def _config_kapacitor_alert_handlers(self, kapacitor_host, kapacitor_port, policy_id, topic_id, event_id, http_handlers, alert_handlers_errors): """ Handles the configuration of HTTP Post alert handlers. :param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor) :param kapacitor_port: default value to use is 9092 :param policy_id: policy ID those triggers relate to :param topic_id: topic ID built of sfc, sfc instance and event_id :param event_id: name of trigger :param http_handlers: list of handlers to subscribe :param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers """ kapacitor_api_handlers_url = "http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers".format(kapacitor_host, kapacitor_port, topic_id) for http_handler_url in http_handlers: http_handler_host = urlparse(http_handler_url).hostname handler_id = "{0}.{1}.{2}".format(policy_id, event_id, http_handler_host) kapacitor_http_request_body = fill_http_post_handler_vars(handler_id, http_handler_url) response = post(kapacitor_api_handlers_url, json=kapacitor_http_request_body) response_content = response.json() log.info(response_content, response.status_code) if response_content.get("error", "") != "": alert_handlers_errors.append({ "policy": policy_id, "trigger": event_id, "handler": http_handler_url, "error": response_content.get("error") })