Skip to content
Snippets Groups Projects
Commit 79506a79 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Adds a PUT method to the alerts API used for creating and/or updating

parent d7297066
No related branches found
No related tags found
No related merge requests found
......@@ -204,36 +204,7 @@ class TestAlertsConfigurationAPI(object):
assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification"
# traverse through all alerts and check that they are created along with their respective handlers within Kapacitor
for alert in alerts:
alert_id = alert["task"]
alert_type = alert["type"]
topic_id = alert["topic"]
# check the Kapacitor task
kapacitor_response = get("http://{0}:{1}/kapacitor/v1/tasks/{2}".format(kapacitor_host, kapacitor_port, alert_id))
assert kapacitor_response.status_code == 200, "Alert with ID {0} was not created - test file {1}.".format(alert_id, alerts_test_file)
kapacitor_response_json = kapacitor_response.json()
assert "link" in kapacitor_response_json, "Incorrect response from kapacitor for alert with ID {0} - test file {1}".format(alert_id, alerts_test_file)
assert kapacitor_response_json["status"] == "enabled", "Alert with ID {0} was created but is disabled - test file {1}".format(alert_id, alerts_test_file)
assert kapacitor_response_json["executing"], "Alert with ID {0} was created and is enabled, but is not executing - test file {1}".format(alert_id, alerts_test_file)
assert kapacitor_response_json["type"] == alert_type, "Alert with ID {0} was created with the wrong type - test file {1}".format(alert_id, alerts_test_file)
# check the Kapacitor topic
kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}".format(kapacitor_host, kapacitor_port, topic_id))
assert kapacitor_response.status_code == 200, "Topic with ID {0} was not created - test file {1}".format(topic_id, alerts_test_file)
kapacitor_response_json = kapacitor_response.json()
assert kapacitor_response_json["id"] == topic_id, "Topic {0} was created with incorrect ID - test file {1}".format(topic_id, alerts_test_file)
# check that all handler IDs were created and each of them is subscribed to the correct topic ID
for handler_id in alert["handlers"]:
handler_url = alert["handlers"][handler_id]
kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers/{3}".format(kapacitor_host, kapacitor_port, topic_id, handler_id))
assert kapacitor_response.status_code == 200, "Handler with ID {0} for topic with ID {1} doesn't exist - test file {2}".format(handler_id, topic_id, alerts_test_file)
kapacitor_response_json = kapacitor_response.json()
assert kapacitor_response_json["id"] == handler_id, "Incorrect ID of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file)
assert kapacitor_response_json["kind"] == "post", "Incorrect kind of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file)
assert kapacitor_response_json["options"]["url"] == handler_url, "Incorrect url of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file)
check_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port, alerts_test_file)
# send the same spec again to check that error messages are returned (because of ID duplication)
with open(alert_spec_abs_path) as alert_spec:
......@@ -248,6 +219,23 @@ class TestAlertsConfigurationAPI(object):
handlers_count = sum([len(alert["handlers"]) for alert in alerts])
assert len(clmc_service_response["triggers_action_errors"]) == handlers_count, "Expected errors were not returned for handlers specification"
# send the same request but as a PUT method instead of POST
with open(alert_spec_abs_path) as alert_spec:
with open(valid_resource_spec_abs_path) as valid_resource_spec:
request.POST['alert-spec'] = FieldStorageMock(alerts_test_file, alert_spec) # a simple mock class is used to mimic the FieldStorage class
request.POST['resource-spec'] = FieldStorageMock(valid_resources_test_file, valid_resource_spec)
clmc_service_response = AlertsConfigurationAPI(request).put_alerts_specification()
# no errors are expected now, since the PUT request must update existing alerts
assert (sfc, sfc_instance) == (clmc_service_response["service_function_chain_id"], clmc_service_response["service_function_chain_instance_id"]), \
"Incorrect extraction of metadata for file {0}". format(alerts_test_file)
assert "triggers_specification_errors" not in clmc_service_response, "Unexpected error was returned for triggers specification"
assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification"
# traverse through all alerts and check that they were not deleted by the PUT request
check_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port, alerts_test_file)
# clean-up in the end of the test
clean_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port)
def test_alerts_config_api_get(self, app_config):
......@@ -536,6 +524,48 @@ def get_alert_type(event_type, alert_period):
return events[event_type]
def check_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port, alerts_test_file):
"""
Check that all Kapactior resources are created with the expected values.
:param alerts: the list of alert objects to test
:param kapacitor_host: hostname of kapacitor
:param kapacitor_port: port number of kapacitor
:param alerts_test_file: name of the current test file
"""
for alert in alerts:
alert_id = alert["task"]
alert_type = alert["type"]
topic_id = alert["topic"]
# check the Kapacitor task
kapacitor_response = get("http://{0}:{1}/kapacitor/v1/tasks/{2}".format(kapacitor_host, kapacitor_port, alert_id))
assert kapacitor_response.status_code == 200, "Alert with ID {0} was not created - test file {1}.".format(alert_id, alerts_test_file)
kapacitor_response_json = kapacitor_response.json()
assert "link" in kapacitor_response_json, "Incorrect response from kapacitor for alert with ID {0} - test file {1}".format(alert_id, alerts_test_file)
assert kapacitor_response_json["status"] == "enabled", "Alert with ID {0} was created but is disabled - test file {1}".format(alert_id, alerts_test_file)
assert kapacitor_response_json["executing"], "Alert with ID {0} was created and is enabled, but is not executing - test file {1}".format(alert_id, alerts_test_file)
assert kapacitor_response_json["type"] == alert_type, "Alert with ID {0} was created with the wrong type - test file {1}".format(alert_id, alerts_test_file)
# check the Kapacitor topic
kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}".format(kapacitor_host, kapacitor_port, topic_id))
assert kapacitor_response.status_code == 200, "Topic with ID {0} was not created - test file {1}".format(topic_id, alerts_test_file)
kapacitor_response_json = kapacitor_response.json()
assert kapacitor_response_json["id"] == topic_id, "Topic {0} was created with incorrect ID - test file {1}".format(topic_id, alerts_test_file)
# check that all handler IDs were created and each of them is subscribed to the correct topic ID
for handler_id in alert["handlers"]:
handler_url = alert["handlers"][handler_id]
kapacitor_response = get("http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers/{3}".format(kapacitor_host, kapacitor_port, topic_id, handler_id))
assert kapacitor_response.status_code == 200, "Handler with ID {0} for topic with ID {1} doesn't exist - test file {2}".format(handler_id, topic_id, alerts_test_file)
kapacitor_response_json = kapacitor_response.json()
assert kapacitor_response_json["id"] == handler_id, "Incorrect ID of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file)
assert kapacitor_response_json["kind"] == "post", "Incorrect kind of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file)
assert kapacitor_response_json["options"]["url"] == handler_url, "Incorrect url of handler {0} in the Kapacitor response - test file {1}".format(handler_id, alerts_test_file)
def clean_kapacitor_alerts(alerts, kapacitor_host, kapacitor_port):
"""
A utility function to clean up Kapacitor from the configured alerts, topics and handlers.
......
......@@ -150,7 +150,6 @@ class AlertsConfigurationAPI(object):
# parse the alert specification file into a tosca template (including validation)
tosca_tpl = self._parse_alert_spec(alert_spec_reference)
# TODO next release - uncomment
# sfc, sfc_instance = tosca_tpl.tpl["metadata"]["sfc"], tosca_tpl.tpl["metadata"]["sfci"]
sfc = tosca_tpl.tpl["metadata"]["servicefunctionchain"]
sfc_instance = "{0}_1".format(sfc)
......@@ -200,14 +199,41 @@ class AlertsConfigurationAPI(object):
return {"deleted_alerts": alerts, "deleted_handlers": handlers}
@view_config(route_name='alerts_configuration', request_method='PUT')
def put_alerts_specification(self):
"""
The view for receiving and configuring alerts based on the TOSCA alerts specification document. This endpoint must also receive the TOSCA resources specification document for validation.
A PUT request will update any existing alerts and handlers that were registered before.
:return: a dictionary with a msg and optional keys for errors encountered while interacting with Kapacitor
:raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as alert-spec representing the TOSCA Alerts Specification
:raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as resource-spec representing the TOSCA Resources Specification
"""
return self._create_alerts_specification(patch_duplicates=True)
@view_config(route_name='alerts_configuration', request_method='POST')
def post_alerts_specification(self):
"""
The view for receiving and configuring alerts based on the TOSCA alerts specification document. This endpoint must also receive the TOSCA resources specification document for validation.
A POST request will not try to update existing alerts and handlers and will simply return the error message from Kapacitor.
:return: a dictionary with a msg and optional keys for errors encountered while interacting with Kapacitor
:raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as alert-spec representing the TOSCA Alerts Specification
:raises HTTPBadRequest: if the request doesn't contain a (YAML) file input referenced as resource-spec representing the TOSCA Resources Specification
"""
return self._create_alerts_specification(patch_duplicates=False)
def _create_alerts_specification(self, patch_duplicates=False):
"""
The base source code for creating (with or without updates) alerts from an alert specification document.
:param patch_duplicates: (defaults to False) if set to True, existing resources will be overwritten
:return: a dictionary with a msg and optional keys for errors encountered while interacting with Kapacitor
"""
kapacitor_host, kapacitor_port = self.request.registry.settings['kapacitor_host'], self.request.registry.settings['kapacitor_port']
......@@ -215,23 +241,15 @@ class AlertsConfigurationAPI(object):
alert_spec_reference = self.request.POST.get('alert-spec')
resource_spec_reference = self.request.POST.get('resource-spec')
# check that the resource specification file was sent
if not hasattr(resource_spec_reference, "file") or not hasattr(resource_spec_reference, "filename"):
raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'resource-spec' representing the TOSCA Resource Specification.")
# extract the required information from the resource specification and return an error if an exception was encountered
try:
resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers = get_resource_spec_policy_triggers(resource_spec_reference)
except Exception as e:
log.error("Couldn't extract resource specification event IDs due to error: {0}".format(e))
raise HTTPBadRequest("Couldn't extract resource specification event IDs - invalid TOSCA resource specification.")
# parse the resource specification file and extract the required information
resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers = self._parse_resource_spec(resource_spec_reference)
# parse the alert specification file into a tosca template (including validation)
tosca_tpl = self._parse_alert_spec(alert_spec_reference)
# extract the information needed from the alert spec. for validation between the two TOSCA documents
alert_spec_policy_triggers = get_alert_spec_policy_triggers(tosca_tpl)
# TODO next release - uncomment
# sfc, sfc_instance = tosca_tpl.tpl["metadata"]["sfc"], tosca_tpl.tpl["metadata"]["sfci"]
sfc = tosca_tpl.tpl["metadata"]["servicefunctionchain"]
sfc_instance = "{0}_1".format(sfc)
......@@ -242,8 +260,10 @@ class AlertsConfigurationAPI(object):
db = sfc # database per service function chain, named after the service function chain ID
# iterate through every policy and extract all triggers of the given policy - the returned lists of errors will be empty if no errors were encountered
# while interacting with the Kapacitor HTTP API
alert_tasks_errors, alert_handlers_errors = self._config_kapacitor_alerts(tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, resource_spec_policy_triggers)
# while interacting with the Kapacitor HTTP API, the patch_flag is set to False so that existing resources are not recreated
alert_tasks_errors, alert_handlers_errors = self._config_kapacitor_alerts(tosca_tpl, sfc, sfc_instance, db,
kapacitor_host, kapacitor_port, resource_spec_policy_triggers,
patch_duplicates=patch_duplicates)
return_msg = {"msg": "Alerts specification has been successfully validated and forwarded to Kapacitor", "service_function_chain_id": sfc,
"service_function_chain_instance_id": sfc_instance}
......@@ -291,7 +311,7 @@ class AlertsConfigurationAPI(object):
if len(missing_policy_triggers) > 0:
raise HTTPBadRequest("Couldn't match the following policy triggers from the alerts specification with triggers defined in the resource specification: {0}".format(missing_policy_triggers))
def _config_kapacitor_alerts(self, tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, resource_spec_policy_triggers):
def _config_kapacitor_alerts(self, tosca_tpl, sfc, sfc_instance, db, kapacitor_host, kapacitor_port, resource_spec_policy_triggers, patch_duplicates=False):
"""
Configures the alerts task and alert handlers within Kapacitor.
......@@ -302,6 +322,7 @@ class AlertsConfigurationAPI(object):
:param kapacitor_host: default host is localhost (CLMC service running on the same machine as Kapacitor)
:param kapacitor_port: default value to use is 9092
:param resource_spec_policy_triggers: the extracted policy-trigger strings from the resource specification
:param patch_duplicates: (defaults to False) if set to True, any duplication errors will be handled by first deleting the existing resource and then creating it
:return: the list for tracking errors while interacting with Kapacitor tasks and the list for tracking errors while interacting with Kapacitor alert handlers
"""
......@@ -378,24 +399,36 @@ class AlertsConfigurationAPI(object):
log.info(response_content, response.status_code)
# track all reported errors
if response_content.get("error", "") != "":
alert_tasks_errors.append({
"policy": policy_id,
"trigger": event_id,
"error": response_content.get("error")
})
if response_content.get("error", "") != "": # Kapacitor always returns an error key which is set to an empty string if successful
capture_error = True
# check if a Kapacitor task with the given identifier already exists, if so, delete it and create it again
kapacitor_api_tasks_instance_url = "{0}/{1}".format(kapacitor_api_tasks_url, task_id)
if patch_duplicates and get(kapacitor_api_tasks_instance_url).status_code == 200:
# this will only happen if the patch_duplicates flag is set to True
delete(kapacitor_api_tasks_instance_url)
response = post(kapacitor_api_tasks_url, json=kapacitor_http_request_body)
response_content = response.json()
capture_error = response_content.get("error", "") != ""
# if the error was not handled by patching an existing Kapacitor task, then add it in the list of errors
if capture_error:
alert_tasks_errors.append({
"policy": policy_id,
"trigger": event_id,
"error": response_content["error"]
})
# extract http handlers
http_handlers = trigger.trigger_tpl["action"]["implementation"]
# subscribe all http handlers to the created topic
self._config_kapacitor_alert_handlers(kapacitor_host, kapacitor_port, sfc, sfc_instance, policy_id, resource_spec_trigger_id, topic_id, event_id,
http_handlers, alert_handlers_errors)
http_handlers, alert_handlers_errors, patch_duplicates=patch_duplicates)
return alert_tasks_errors, alert_handlers_errors
def _config_kapacitor_alert_handlers(self, kapacitor_host, kapacitor_port, sfc, sfc_i, policy_id, trigger_id,
topic_id, event_id, http_handlers, alert_handlers_errors):
topic_id, event_id, http_handlers, alert_handlers_errors, patch_duplicates=False):
"""
Handles the configuration of HTTP Post alert handlers.
......@@ -409,6 +442,7 @@ class AlertsConfigurationAPI(object):
:param event_id: name of trigger
:param http_handlers: list of handlers to subscribe
:param alert_handlers_errors: the list for tracking errors while interacting with Kapacitor alert handlers
:param patch_duplicates: (defaults to False) if set to True, any duplication errors will be handled by first deleting the existing resource and then creating it
"""
kapacitor_api_handlers_url = "http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers".format(kapacitor_host, kapacitor_port, topic_id)
......@@ -429,12 +463,24 @@ class AlertsConfigurationAPI(object):
log.info(response_content, response.status_code)
if response_content.get("error", "") != "":
alert_handlers_errors.append({
"policy": policy_id,
"trigger": event_id,
"handler": http_handler_url,
"error": response_content.get("error")
})
capture_error = True
# check if a Kapacitor handler with the given identifier already exists, if so, delete it and create it again
kapacitor_api_handlers_instance_url = "{0}/{1}".format(kapacitor_api_handlers_url, handler_id)
if patch_duplicates and get(kapacitor_api_handlers_instance_url).status_code == 200:
# this will only happen if the patch_duplicates flag is set to True
delete(kapacitor_api_handlers_instance_url)
response = post(kapacitor_api_handlers_url, json=kapacitor_http_request_body)
response_content = response.json()
capture_error = response_content.get("error", "") != ""
# if the error was not handled by patching an existing Kapacitor handler, then add it in the list of errors
if capture_error:
alert_handlers_errors.append({
"policy": policy_id,
"trigger": event_id,
"handler": http_handler_url,
"error": response_content["error"]
})
def _parse_alert_spec(self, alert_spec_reference):
"""
......@@ -480,6 +526,28 @@ class AlertsConfigurationAPI(object):
return tosca_tpl
def _parse_resource_spec(self, resource_spec_reference):
"""
Parses a resource specification file to TOSCA template and extracts the information needed to compare it with the CLMC alerts specification.
:param resource_spec_reference: the resource specification file received in the request
:return: sfc ID, sfc instance ID, list of policy-trigger identifiers found in the resource spec.
"""
# check that the resource specification file was sent
if not hasattr(resource_spec_reference, "file") or not hasattr(resource_spec_reference, "filename"):
raise HTTPBadRequest("Request to this API endpoint must include a (YAML) file input referenced as 'resource-spec' representing the TOSCA Resource Specification.")
# extract the required information from the resource specification and return an error if an exception was encountered
try:
resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers = get_resource_spec_policy_triggers(resource_spec_reference)
except Exception as e:
log.error("Couldn't extract resource specification event IDs due to error: {0}".format(e))
raise HTTPBadRequest("Couldn't extract resource specification event IDs - invalid TOSCA resource specification.")
return resource_spec_sfc, resource_spec_sfc_instance, resource_spec_policy_triggers
@staticmethod
def get_hash(message):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment