diff --git a/src/service/clmcservice/alertsapi/tests.py b/src/service/clmcservice/alertsapi/tests.py index be88402c696ca0a8c809c1722d6235c95dcae649..59da0fbc681f5c5b8c66dc811bebaff4d064d9da 100644 --- a/src/service/clmcservice/alertsapi/tests.py +++ b/src/service/clmcservice/alertsapi/tests.py @@ -204,36 +204,7 @@ class TestAlertsConfigurationAPI(object): assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification" # traverse through all alerts and check that they are created along with their respective handlers within Kapacitor - for alert in alerts: - - alert_id = alert["task"] - alert_type = alert["type"] - topic_id = alert["topic"] - - # check the Kapacitor task - kapacitor_response = get("http://{0}:{1}/kapacitor/v1/tasks/{2}".format(kapacitor_host, kapacitor_port, alert_id)) - assert kapacitor_response.status_code == 200, "Alert with ID {0} was not created - test file {1}.".format(alert_id, alerts_test_file) - kapacitor_response_json = kapacitor_response.json() - assert "link" in kapacitor_response_json, "Incorrect response from kapacitor for alert with ID {0} - test file {1}".format(alert_id, alerts_test_file) - assert kapacitor_response_json["status"] == "enabled", "Alert with ID {0} was created but is disabled - test file {1}".format(alert_id, alerts_test_file) - assert kapacitor_response_json["executing"], "Alert with ID {0} was created and is enabled, but is not executing - test file {1}".format(alert_id, alerts_test_file) - assert kapacitor_response_json["type"] == alert_type, "Alert with ID {0} was created with the wrong type - test file {1}".format(alert_id, alerts_test_file) - - # check the Kapacitor topic - kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}".format(kapacitor_host, kapacitor_port, topic_id)) - assert kapacitor_response.status_code == 200, "Topic with ID {0} was not created - test file {1}".format(topic_id, alerts_test_file) - kapacitor_response_json = kapacitor_response.json() - assert kapacitor_response_json["id"] == topic_id, "Topic {0} was created with incorrect ID - test file {1}".format(topic_id, alerts_test_file) - - # check that all handler IDs were created and each of them is subscribed to the correct topic ID - for handler_id in alert["handlers"]: - handler_url = alert["handlers"][handler_id] - kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers/{3}".format(kapacitor_host, kapacitor_port, topic_id, handler_id)) - assert kapacitor_response.status_code == 200, "Handler with ID {0} for topic with ID {1} doesn't exist - test file {2}".format(handler_id, topic_id, alerts_test_file) - kapacitor_response_json = kapacitor_response.json() - assert kapacitor_response_json["id"] == handler_id, "Incorrect ID of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file) - assert kapacitor_response_json["kind"] == "post", "Incorrect kind of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file) - assert kapacitor_response_json["options"]["url"] == handler_url, "Incorrect url of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file) + check_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port, alerts_test_file) # send the same spec again to check that error messages are returned (because of ID duplication) with open(alert_spec_abs_path) as alert_spec: @@ -248,6 +219,23 @@ class TestAlertsConfigurationAPI(object): handlers_count = sum([len(alert["handlers"]) for alert in alerts]) assert len(clmc_service_response["triggers_action_errors"]) == handlers_count, "Expected errors were not returned for handlers specification" + # send the same request but as a PUT method instead of POST + with open(alert_spec_abs_path) as alert_spec: + with open(valid_resource_spec_abs_path) as valid_resource_spec: + request.POST['alert-spec'] = FieldStorageMock(alerts_test_file, alert_spec) # a simple mock class is used to mimic the FieldStorage class + request.POST['resource-spec'] = FieldStorageMock(valid_resources_test_file, valid_resource_spec) + clmc_service_response = AlertsConfigurationAPI(request).put_alerts_specification() + + # no errors are expected now, since the PUT request must update existing alerts + assert (sfc, sfc_instance) == (clmc_service_response["service_function_chain_id"], clmc_service_response["service_function_chain_instance_id"]), \ + "Incorrect extraction of metadata for file {0}". format(alerts_test_file) + assert "triggers_specification_errors" not in clmc_service_response, "Unexpected error was returned for triggers specification" + assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification" + + # traverse through all alerts and check that they were not deleted by the PUT request + check_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port, alerts_test_file) + + # clean-up in the end of the test clean_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port) def test_alerts_config_api_get(self, app_config): @@ -536,6 +524,48 @@ def get_alert_type(event_type, alert_period): return events[event_type] +def check_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port, alerts_test_file): + """ + Check that all Kapactior resources are created with the expected values. + + :param alerts: the list of alert objects to test + :param kapacitor_host: hostname of kapacitor + :param kapacitor_port: port number of kapacitor + :param alerts_test_file: name of the current test file + """ + + for alert in alerts: + + alert_id = alert["task"] + alert_type = alert["type"] + topic_id = alert["topic"] + + # check the Kapacitor task + kapacitor_response = get("http://{0}:{1}/kapacitor/v1/tasks/{2}".format(kapacitor_host, kapacitor_port, alert_id)) + assert kapacitor_response.status_code == 200, "Alert with ID {0} was not created - test file {1}.".format(alert_id, alerts_test_file) + kapacitor_response_json = kapacitor_response.json() + assert "link" in kapacitor_response_json, "Incorrect response from kapacitor for alert with ID {0} - test file {1}".format(alert_id, alerts_test_file) + assert kapacitor_response_json["status"] == "enabled", "Alert with ID {0} was created but is disabled - test file {1}".format(alert_id, alerts_test_file) + assert kapacitor_response_json["executing"], "Alert with ID {0} was created and is enabled, but is not executing - test file {1}".format(alert_id, alerts_test_file) + assert kapacitor_response_json["type"] == alert_type, "Alert with ID {0} was created with the wrong type - test file {1}".format(alert_id, alerts_test_file) + + # check the Kapacitor topic + kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}".format(kapacitor_host, kapacitor_port, topic_id)) + assert kapacitor_response.status_code == 200, "Topic with ID {0} was not created - test file {1}".format(topic_id, alerts_test_file) + kapacitor_response_json = kapacitor_response.json() + assert kapacitor_response_json["id"] == topic_id, "Topic {0} was created with incorrect ID - test file {1}".format(topic_id, alerts_test_file) + + # check that all handler IDs were created and each of them is subscribed to the correct topic ID + for handler_id in alert["handlers"]: + handler_url = alert["handlers"][handler_id] + kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers/{3}".format(kapacitor_host, kapacitor_port, topic_id, handler_id)) + assert kapacitor_response.status_code == 200, "Handler with ID {0} for topic with ID {1} doesn't exist - test file {2}".format(handler_id, topic_id, alerts_test_file) + kapacitor_response_json = kapacitor_response.json() + assert kapacitor_response_json["id"] == handler_id, "Incorrect ID of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file) + assert kapacitor_response_json["kind"] == "post", "Incorrect kind of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file) + assert kapacitor_response_json["options"]["url"] == handler_url, "Incorrect url of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file) + + def clean_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port): """ A utility function to clean up Kapacitor from the configured alerts, topics and handlers. diff --git a/src/service/clmcservice/alertsapi/views.py b/src/service/clmcservice/alertsapi/views.py index edd5495f957a692e401d97adfb9301babed4d098..ddb1f71a47cf8177cd561639fd01fb589beac193 100644 --- a/src/service/clmcservice/alertsapi/views.py +++ b/src/service/clmcservice/alertsapi/views.py @@ -150,7 +150,6 @@ class AlertsConfigurationAPI(object): # parse the alert specification file into a tosca template (including validation) tosca_tpl = self._parse_alert_spec(alert_spec_reference) - # TODO next release - uncomment # sfc, sfc_instance = tosca_tpl.tpl["metadata"]["sfc"], tosca_tpl.tpl["metadata"]["sfci"] sfc = tosca_tpl.tpl["metadata"]["servicefunctionchain"] sfc_instance = "{0}_1".format(sfc) @@ -200,14 +199,41 @@ class AlertsConfigurationAPI(object): return {"deleted_alerts": alerts, "deleted_handlers": handlers} + @view_config(route_name='alerts_configuration', request_method='PUT') + def put_alerts_specification(self): + """ + The view for receiving and configuring alerts based on the TOSCA alerts specification document. This endpoint must also receive the TOSCA resources specification document for validation. + A PUT request will update any existing alerts and handlers that were registered before. + + :return: a dictionary with a msg and optional keys for errors encountered while interacting with Kapacitor + + :raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as alert-spec representing the TOSCA Alerts Specification + :raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as resource-spec representing the TOSCA Resources Specification + """ + + return self._create_alerts_specification(patch_duplicates=True) + @view_config(route_name='alerts_configuration', request_method='POST') def post_alerts_specification(self): """ The view for receiving and configuring alerts based on the TOSCA alerts specification document. This endpoint must also receive the TOSCA resources specification document for validation. + A POST request will not try to update existing alerts and handlers and will simply return the error message from Kapacitor. :return: a dictionary with a msg and optional keys for errors encountered while interacting with Kapacitor :raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as alert-spec representing the TOSCA Alerts Specification + :raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as resource-spec representing the TOSCA Resources Specification + """ + + return self._create_alerts_specification(patch_duplicates=False) + + def _create_alerts_specification(self, patch_duplicates=False): + """ + The base source code for creating (with or without updates) alerts from an alert specification document. + + :param patch_duplicates: (defaults to False) if set to True, existing resources will be overwritten + + :return: a dictionary with a msg and optional keys for errors encountered while interacting with Kapacitor """ kapacitor_host, kapacitor_port = self.request.registry.settings['kapacitor_host'], self.request.registry.settings['kapacitor_port'] @@ -215,23 +241,15 @@ class AlertsConfigurationAPI(object): alert_spec_reference = self.request.POST.get('alert-spec') resource_spec_reference = self.request.POST.get('resource-spec') - # check that the resource specification file was sent - if not hasattr(resource_spec_reference, "file") or not hasattr(resource_spec_reference, "filename"): - raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'resource-spec' representing the TOSCA Resource Specification.") - - # extract the required information from the resource specification and return an error if an exception was encountered - try: - resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers = get_resource_spec_policy_triggers(resource_spec_reference) - except Exception as e: - log.error("Couldn't extract resource specification event IDs due to error: {0}".format(e)) - raise HTTPBadRequest("Couldn't extract resource specification event IDs - invalid TOSCA resource specification.") + # parse the resource specification file and extract the required information + resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers = self._parse_resource_spec(resource_spec_reference) # parse the alert specification file into a tosca template (including validation) tosca_tpl = self._parse_alert_spec(alert_spec_reference) # extract the information needed from the alert spec. for validation between the two TOSCA documents alert_spec_policy_triggers = get_alert_spec_policy_triggers(tosca_tpl) - # TODO next release - uncomment + # sfc, sfc_instance = tosca_tpl.tpl["metadata"]["sfc"], tosca_tpl.tpl["metadata"]["sfci"] sfc = tosca_tpl.tpl["metadata"]["servicefunctionchain"] sfc_instance = "{0}_1".format(sfc) @@ -242,8 +260,10 @@ class AlertsConfigurationAPI(object): db = sfc # database per service function chain, named after the service function chain ID # iterate through every policy and extract all triggers of the given policy - the returned lists of errors will be empty if no errors were encountered - # while interacting with the Kapacitor HTTP API - alert_tasks_errors, alert_handlers_errors = self._config_kapacitor_alerts(tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, resource_spec_policy_triggers) + # while interacting with the Kapacitor HTTP API, the patch_flag is set to False so that existing resources are not recreated + alert_tasks_errors, alert_handlers_errors = self._config_kapacitor_alerts(tosca_tpl, sfc, sfc_instance, db, + kapacitor_host, kapacitor_port, resource_spec_policy_triggers, + patch_duplicates=patch_duplicates) return_msg = {"msg": "Alerts specification has been successfully validated and forwarded to Kapacitor", "service_function_chain_id": sfc, "service_function_chain_instance_id": sfc_instance} @@ -291,7 +311,7 @@ class AlertsConfigurationAPI(object): if len(missing_policy_triggers) > 0: raise HTTPBadRequest("Couldn't match the following policy triggers from the alerts specification with triggers defined in the resource specification: {0}".format(missing_policy_triggers)) - def _config_kapacitor_alerts(self, tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, resource_spec_policy_triggers): + def _config_kapacitor_alerts(self, tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, resource_spec_policy_triggers, patch_duplicates=False): """ Configures the alerts task and alert handlers within Kapacitor. @@ -302,6 +322,7 @@ class AlertsConfigurationAPI(object): :param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor) :param kapacitor_port: default value to use is 9092 :param resource_spec_policy_triggers: the extracted policy-trigger strings from the resource specification + :param patch_duplicates: (defaults to False) if set to True, any duplication errors will be handled by first deleting the existing resource and then creating it :return: the list for tracking errors while interacting with Kapacitor tasks and the list for tracking errors while interacting with Kapacitor alert handlers """ @@ -378,24 +399,36 @@ class AlertsConfigurationAPI(object): log.info(response_content, response.status_code) # track all reported errors - if response_content.get("error", "") != "": - alert_tasks_errors.append({ - "policy": policy_id, - "trigger": event_id, - "error": response_content.get("error") - }) + if response_content.get("error", "") != "": # Kapacitor always returns an error key which is set to an empty string if successful + capture_error = True + # check if a Kapacitor task with the given identifier already exists, if so, delete it and create it again + kapacitor_api_tasks_instance_url = "{0}/{1}".format(kapacitor_api_tasks_url, task_id) + if patch_duplicates and get(kapacitor_api_tasks_instance_url).status_code == 200: + # this will only happen if the patch_duplicates flag is set to True + delete(kapacitor_api_tasks_instance_url) + response = post(kapacitor_api_tasks_url, json=kapacitor_http_request_body) + response_content = response.json() + capture_error = response_content.get("error", "") != "" + + # if the error was not handled by patching an existing Kapacitor task, then add it in the list of errors + if capture_error: + alert_tasks_errors.append({ + "policy": policy_id, + "trigger": event_id, + "error": response_content["error"] + }) # extract http handlers http_handlers = trigger.trigger_tpl["action"]["implementation"] # subscribe all http handlers to the created topic self._config_kapacitor_alert_handlers(kapacitor_host, kapacitor_port, sfc, sfc_instance, policy_id, resource_spec_trigger_id, topic_id, event_id, - http_handlers, alert_handlers_errors) + http_handlers, alert_handlers_errors, patch_duplicates=patch_duplicates) return alert_tasks_errors, alert_handlers_errors def _config_kapacitor_alert_handlers(self, kapacitor_host, kapacitor_port, sfc, sfc_i, policy_id, trigger_id, - topic_id, event_id, http_handlers, alert_handlers_errors): + topic_id, event_id, http_handlers, alert_handlers_errors, patch_duplicates=False): """ Handles the configuration of HTTP Post alert handlers. @@ -409,6 +442,7 @@ class AlertsConfigurationAPI(object): :param event_id: name of trigger :param http_handlers: list of handlers to subscribe :param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers + :param patch_duplicates: (defaults to False) if set to True, any duplication errors will be handled by first deleting the existing resource and then creating it """ kapacitor_api_handlers_url = "http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers".format(kapacitor_host, kapacitor_port, topic_id) @@ -429,12 +463,24 @@ class AlertsConfigurationAPI(object): log.info(response_content, response.status_code) if response_content.get("error", "") != "": - alert_handlers_errors.append({ - "policy": policy_id, - "trigger": event_id, - "handler": http_handler_url, - "error": response_content.get("error") - }) + capture_error = True + # check if a Kapacitor handler with the given identifier already exists, if so, delete it and create it again + kapacitor_api_handlers_instance_url = "{0}/{1}".format(kapacitor_api_handlers_url, handler_id) + if patch_duplicates and get(kapacitor_api_handlers_instance_url).status_code == 200: + # this will only happen if the patch_duplicates flag is set to True + delete(kapacitor_api_handlers_instance_url) + response = post(kapacitor_api_handlers_url, json=kapacitor_http_request_body) + response_content = response.json() + capture_error = response_content.get("error", "") != "" + + # if the error was not handled by patching an existing Kapacitor handler, then add it in the list of errors + if capture_error: + alert_handlers_errors.append({ + "policy": policy_id, + "trigger": event_id, + "handler": http_handler_url, + "error": response_content["error"] + }) def _parse_alert_spec(self, alert_spec_reference): """ @@ -480,6 +526,28 @@ class AlertsConfigurationAPI(object): return tosca_tpl + def _parse_resource_spec(self, resource_spec_reference): + """ + Parses a resource specification file to TOSCA template and extracts the information needed to compare it with the CLMC alerts specification. + + :param resource_spec_reference: the resource specification file received in the request + + :return: sfc ID, sfc instance ID, list of policy-trigger identifiers found in the resource spec. + """ + + # check that the resource specification file was sent + if not hasattr(resource_spec_reference, "file") or not hasattr(resource_spec_reference, "filename"): + raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'resource-spec' representing the TOSCA Resource Specification.") + + # extract the required information from the resource specification and return an error if an exception was encountered + try: + resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers = get_resource_spec_policy_triggers(resource_spec_reference) + except Exception as e: + log.error("Couldn't extract resource specification event IDs due to error: {0}".format(e)) + raise HTTPBadRequest("Couldn't extract resource specification event IDs - invalid TOSCA resource specification.") + + return resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers + @staticmethod def get_hash(message): """