Newer
Older
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 22-08-2018
## Created for Project : FLAME
"""
import datetime
from requests import post, get, delete, put
from os import listdir
from os.path import join, dirname
from json import load
from schema import Schema, And, Or, Optional, SchemaError
from clmctest.alerts.alert_handler_server import LOG_TEST_FOLDER_PATH
NGINX_PORT = 80
def is_valid_timestamp(str_timestamp):
try:
strptime(str_timestamp, "%Y-%m-%dT%H:%M:%SZ")
return True
except ValueError:
return False
def is_valid_details_string(details):
Nikolay Stanchev
committed
try:
details_dict = {key.strip(): value.strip() for key,value in [item.split("=") for item in details.split(",")]}
return len(details_dict) == 4 and "db" in details_dict and "sfc" in details_dict and "sfci" in details_dict and "policy" in details_dict
except Exception:
return False
JSON_BODY_SCHEMA = Schema({
"message": "TRUE",
"id": str,
"level": "CRITICAL",
"duration": int,
"previousLevel": str,
"details": And(str, is_valid_details_string),
"time": And(str, is_valid_timestamp),
"data": {
"series": [
{
"name": str,
Optional("tags"): {
str: str
},
"columns": [
str
],
"values": [
[
Or(str, int)
]
]
}
]
}
})
class TestAlerts(object):
Nikolay Stanchev
committed
def test_alert_triggers(self, rspec_config, set_up_tear_down_fixture):
"""
Test is implemented using the following steps:
* Send to clmc service a POST request with TOSCA alert spec. and resource spec. files
* Check that the registered alerts can be fetched with a GET request
* Wait 10 seconds for Kapacitor to configure and start executing the defined tasks
* Send some test requests to nginx to increase the load
* Check that 4 log files have been created - one for each alert defined in the alert spec.
* Send to clmc service a DELETE request with TOSCA alert spec. file
* Check that the returned lists of deleted handlers and alerts are correct
:param rspec_config: fixture from conftest.py
"""
global NGINX_PORT, JSON_BODY_SCHEMA
clmc_service_host, nginx_host = None, None
for host in rspec_config:
if host["name"] == "clmc-service":
clmc_service_host = host["ip_address"]
elif host["name"] == "nginx":
nginx_host = host["ip_address"]
if clmc_service_host is not None and nginx_host is not None:
break
# create the alerts with a POST request
print("Sending alerts specification to clmc service...")
alerts_spec = join(dirname(__file__), "alerts_test_config.yaml")
resources_spec = join(dirname(__file__), "resources_test_config.yaml")
with open(alerts_spec, 'rb') as alerts:
with open(resources_spec, 'rb') as resources:
files = {'alert-spec': alerts, 'resource-spec': resources}
response = post("http://{0}/clmc-service/alerts".format(clmc_service_host), files=files)
assert response.status_code == 200
clmc_service_response = response.json()
assert "triggers_specification_errors" not in clmc_service_response, "Unexpected error was returned for triggers specification"
assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification"
sfc, sfc_instance = "MS_Template_1", "MS_Template_1_1"
assert (sfc, sfc_instance) == (clmc_service_response["service_function_chain_id"], clmc_service_response["service_function_chain_instance_id"])
print("Alert spec sent successfully")
# check that the alerts can be fetched with a GET request
print("Validate that the alerts were registered and can be fetched with a GET request.")
response = get("http://{0}/clmc-service/alerts/{1}/{2}".format(clmc_service_host, sfc, sfc_instance))
assert response.status_code == 200
clmc_service_response = response.json()
clmc_service_response = sorted(clmc_service_response, key=lambda x: x["trigger"]) # sort by trigger so that the response can be compared to what's expected
# sort the handlers of returned alerts to ensure comparison order is correct
for alert in clmc_service_response:
alert["handlers"] = sorted(alert["handlers"])
# compare actual response with expected response
assert clmc_service_response == [
{"policy": "scale_nginx_policy", "trigger": "high_requests", "task_identifier": "46fb8800c8a5eeeb04b090d838d475df574a2e6d854b5d678fc981c096eb6c1b",
"handlers": ["http://172.40.231.200:9999/"],
"topic_identifier": "46fb8800c8a5eeeb04b090d838d475df574a2e6d854b5d678fc981c096eb6c1b",
"task_api_endpoint": "/kapacitor/v1/tasks/46fb8800c8a5eeeb04b090d838d475df574a2e6d854b5d678fc981c096eb6c1b",
"topic_api_endpoint": "/kapacitor/v1/alerts/topics/46fb8800c8a5eeeb04b090d838d475df574a2e6d854b5d678fc981c096eb6c1b",
"topic_handlers_api_endpoint": "/kapacitor/v1/alerts/topics/46fb8800c8a5eeeb04b090d838d475df574a2e6d854b5d678fc981c096eb6c1b/handlers"},
{"policy": "scale_nginx_policy", "trigger": "increase_in_active_requests", "task_identifier": "7a9867f9270dba6635ac3760a3b70bc929f5bd0f3bf582e45d27fbd437f528ca",
"handlers": [SFEMC, "http://172.40.231.200:9999/"],
"topic_identifier": "7a9867f9270dba6635ac3760a3b70bc929f5bd0f3bf582e45d27fbd437f528ca",
"task_api_endpoint": "/kapacitor/v1/tasks/7a9867f9270dba6635ac3760a3b70bc929f5bd0f3bf582e45d27fbd437f528ca",
"topic_api_endpoint": "/kapacitor/v1/alerts/topics/7a9867f9270dba6635ac3760a3b70bc929f5bd0f3bf582e45d27fbd437f528ca",
"topic_handlers_api_endpoint": "/kapacitor/v1/alerts/topics/7a9867f9270dba6635ac3760a3b70bc929f5bd0f3bf582e45d27fbd437f528ca/handlers"},
{"policy": "scale_nginx_policy", "trigger": "increase_in_running_processes", "task_identifier": "f5edaeb27fb847116be749c3815d240cbf0d7ba79aee1959daf0b3445a70f2c8",
"handlers": [SFEMC, "http://172.40.231.200:9999/"],
"topic_identifier": "f5edaeb27fb847116be749c3815d240cbf0d7ba79aee1959daf0b3445a70f2c8",
"task_api_endpoint": "/kapacitor/v1/tasks/f5edaeb27fb847116be749c3815d240cbf0d7ba79aee1959daf0b3445a70f2c8",
"topic_api_endpoint": "/kapacitor/v1/alerts/topics/f5edaeb27fb847116be749c3815d240cbf0d7ba79aee1959daf0b3445a70f2c8",
"topic_handlers_api_endpoint": "/kapacitor/v1/alerts/topics/f5edaeb27fb847116be749c3815d240cbf0d7ba79aee1959daf0b3445a70f2c8/handlers"},
{"policy": "deadman_policy", "trigger": "no_measurements", "task_identifier": "f7dab6fd53001c812d44533d3bbb6ef45f0d1d39b9441bc3c60402ebda85d320",
"handlers": [SFEMC, "http://172.40.231.200:9999/"],
"topic_identifier": "f7dab6fd53001c812d44533d3bbb6ef45f0d1d39b9441bc3c60402ebda85d320",
"task_api_endpoint": "/kapacitor/v1/tasks/f7dab6fd53001c812d44533d3bbb6ef45f0d1d39b9441bc3c60402ebda85d320",
"topic_api_endpoint": "/kapacitor/v1/alerts/topics/f7dab6fd53001c812d44533d3bbb6ef45f0d1d39b9441bc3c60402ebda85d320",
"topic_handlers_api_endpoint": "/kapacitor/v1/alerts/topics/f7dab6fd53001c812d44533d3bbb6ef45f0d1d39b9441bc3c60402ebda85d320/handlers"}
], "Incorrect response for GET alerts request"
print("Alert spec validated successfully")
print("Wait 10 seconds for Kapacitor stream/batch tasks to start working...")
sleep(10)
print("Sending test requests to nginx...")
response = get("http://{0}:{1}/".format(nginx_host, NGINX_PORT))
assert response.status_code == 200
print("Wait 15 seconds for Kapacitor to trigger alerts...")
sleep(15)
alert_logs = listdir(LOG_TEST_FOLDER_PATH)
assert len(alert_logs) == 4, "4 log files must have been created - one for each alert defined in the specification."
for alert_log in alert_logs:
alert_log_path = join(LOG_TEST_FOLDER_PATH, alert_log)
with open(alert_log_path) as fh:
alert_json = load(fh)
try:
JSON_BODY_SCHEMA.validate(alert_json)
valid = True
except SchemaError:
valid = False
assert valid, "Alert log content is invalid - {0}".format(alert_log_path)
# delete the alerts with a DELETE request
with open(alerts_spec, 'rb') as alerts:
files = {'alert-spec': alerts}
response = delete("http://{0}/clmc-service/alerts".format(clmc_service_host), files=files)
assert response.status_code == 200, "Incorrect status code returned after deleting the alert specification"
json_response = response.json()
# sort by trigger to ensure comparison order is correct
assert sorted(json_response["deleted_alerts"], key=lambda x: x['trigger']) == [{"policy": "scale_nginx_policy", "trigger": "high_requests"}, {"policy": "scale_nginx_policy", "trigger": "increase_in_active_requests"},
{"policy": "scale_nginx_policy", "trigger": "increase_in_running_processes"}, {"policy": "deadman_policy", "trigger": "no_measurements"}], \
"Incorrect list of deleted alerts"
# sort by handler and trigger to ensure comparison order is correct
assert sorted(json_response["deleted_handlers"], key=lambda x: (x['handler'], x['trigger'])) == [{"policy": "scale_nginx_policy", "trigger": "increase_in_active_requests", "handler": SFEMC},
{"policy": "scale_nginx_policy", "trigger": "increase_in_running_processes", "handler": SFEMC},
{"policy": "deadman_policy", "trigger": "no_measurements", "handler": SFEMC},
{"policy": "scale_nginx_policy", "trigger": "high_requests", "handler": "http://172.40.231.200:9999/"},
{"policy": "scale_nginx_policy", "trigger": "increase_in_active_requests", "handler": "http://172.40.231.200:9999/"},
{"policy": "scale_nginx_policy", "trigger": "increase_in_running_processes", "handler": "http://172.40.231.200:9999/"},
{"policy": "deadman_policy", "trigger": "no_measurements", "handler": "http://172.40.231.200:9999/"}], \
"Incorrect list of deleted handlers"
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def test_alerts_update_request(self, rspec_config):
"""
Test is implemented using the following steps:
* Send to clmc service a POST request with TOSCA alert spec. and resource spec. files
* Send to clmc service a PUT request with TOSCA alert spec. and resource spec. files
* Check that the alerts have a "created" timestamp that is later than the timestamp of the alerts during the POST request,
implying that the alerts were re-created during the PUT request
:param rspec_config: fixture from conftest.py
"""
clmc_service_host = None
for host in rspec_config:
if host["name"] == "clmc-service":
clmc_service_host = host["ip_address"]
break
# create the alerts with a POST request
print("Sending alerts specification to clmc service...")
alerts_spec = join(dirname(__file__), "alerts_test_config.yaml")
resources_spec = join(dirname(__file__), "resources_test_config.yaml")
with open(alerts_spec, 'rb') as alerts:
with open(resources_spec, 'rb') as resources:
files = {'alert-spec': alerts, 'resource-spec': resources}
response = post("http://{0}/clmc-service/alerts".format(clmc_service_host), files=files)
assert response.status_code == 200
clmc_service_response = response.json()
assert "triggers_specification_errors" not in clmc_service_response, "Unexpected error was returned for triggers specification"
assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification"
sfc, sfc_instance = "MS_Template_1", "MS_Template_1_1"
assert (sfc, sfc_instance) == (clmc_service_response["service_function_chain_id"], clmc_service_response["service_function_chain_instance_id"])
print("Alert spec sent successfully")
# find the latest timestamp of the registered alerts
max_post_timestamp = 0
tasks = get("http://{0}/kapacitor/v1/tasks".format(clmc_service_host)).json()["tasks"]
for timestamp in tasks_timestamps(tasks, sfc, sfc_instance):
max_post_timestamp = max(max_post_timestamp, timestamp)
delay = 2 # seconds
print("Sleeping {0} seconds to ensure a difference between the timestamps when creating the alerts and when updating them...".format(delay))
sleep(delay)
# update the alerts with a PUT request and check that the "created" metadata is updated implying that the alerts were recreated
print("Sending alerts specification to clmc service for updating...")
with open(alerts_spec, 'rb') as alerts:
with open(resources_spec, 'rb') as resources:
files = {'alert-spec': alerts, 'resource-spec': resources}
response = put("http://{0}/clmc-service/alerts".format(clmc_service_host), files=files)
assert response.status_code == 200
clmc_service_response = response.json()
assert "triggers_specification_errors" not in clmc_service_response, "Unexpected error was returned for triggers specification"
assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification"
sfc, sfc_instance = "MS_Template_1", "MS_Template_1_1"
assert (sfc, sfc_instance) == (clmc_service_response["service_function_chain_id"], clmc_service_response["service_function_chain_instance_id"])
print("Alert spec updated successfully")
# find the earliest timestamp of the updated alerts
min_put_timestamp = float("inf")
tasks = get("http://{0}/kapacitor/v1/tasks".format(clmc_service_host)).json()["tasks"]
for timestamp in tasks_timestamps(tasks, sfc, sfc_instance):
min_put_timestamp = min(min_put_timestamp, timestamp)
print("Latest timestamp during the POST request", max_post_timestamp, "Earliest timestamp during the PUT request", min_put_timestamp)
assert min_put_timestamp - max_post_timestamp >= delay, "There is an alert that wasn't updated properly with a PUT request"
# delete the alerts with a DELETE request
with open(alerts_spec, 'rb') as alerts:
files = {'alert-spec': alerts}
delete("http://{0}/clmc-service/alerts".format(clmc_service_host), files=files)
def tasks_timestamps(all_tasks, sfc_id, sfc_instance_id):
"""
Generates the timestamps for the tasks related to the given SFC and SFC instance.
:param all_tasks: the full list of tasks from kapacitor
:param sfc_id: SFC identifier
:param sfc_instance_id: SFC instance identifier
"""
for task in all_tasks:
# get the configured variables of this alert
task_config = task["vars"]
# if configured for this SFC instance
if task_config["sfc"]["value"] == sfc_id and task_config["sfci"]["value"] == sfc_instance_id:
created_datestr = task["created"][:26] # ignore the timezone and only take the first 6 digits of the microseconds
task_created_timestamp = datetime.datetime.strptime(created_datestr, "%Y-%m-%dT%H:%M:%S.%f")
yield task_created_timestamp.timestamp()