From fdc0505dc2ac7700345158da92c8d9576d89678e Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Mon, 9 Jul 2018 16:28:49 +0100 Subject: [PATCH] Graph API tests --- scripts/clmc-service/install.sh | 2 +- src/generate_network_measurements.py | 62 --- .../generate_network_measurements.py | 99 +++++ src/service/clmcservice/graphapi/conftest.py | 209 ++++++++++ src/service/clmcservice/graphapi/tests.py | 379 ++++++++++++++++++ src/service/clmcservice/graphapi/utilities.py | 22 +- 6 files changed, 690 insertions(+), 83 deletions(-) delete mode 100644 src/generate_network_measurements.py create mode 100644 src/service/clmcservice/generate_network_measurements.py create mode 100644 src/service/clmcservice/graphapi/conftest.py diff --git a/scripts/clmc-service/install.sh b/scripts/clmc-service/install.sh index 12467cd..16e86e3 100755 --- a/scripts/clmc-service/install.sh +++ b/scripts/clmc-service/install.sh @@ -33,5 +33,5 @@ cd `dirname $0` echo "Provisioning CLMC service" ./install-tick-stack.sh $@ -./install-clmc-service.sh $@ ./install-neo4j.sh $@ +./install-clmc-service.sh $@ \ No newline at end of file diff --git a/src/generate_network_measurements.py b/src/generate_network_measurements.py deleted file mode 100644 index 409be75..0000000 --- a/src/generate_network_measurements.py +++ /dev/null @@ -1,62 +0,0 @@ -import getopt -import sys - -from influxdb import InfluxDBClient -from json import load - - -def generate_network_measurements(influx_host, db_name, network_config_path): - """ - Generates network measurements which follow the telegraf ping plugin format. - - :param influx_host: influx DB host - :param db_name: name of database - :param network_config_path the path to the network configuration file - """ - - with open(network_config_path) as fh: - json_data = load(fh) - - # declares the data to push to influx - host, url, avg_response_ms, min_response_ms, max_response_ms - data = ((link["source"], link["target"], link["avg_response_time"], link["min_response_time"], link["max_response_time"]) for link in json_data["links"]) - - json_body = [ - {"measurement": "ping", - "tags": {"host": host, "url": url}, - "fields": {"packets_transmitted": 10, "reply_received": 10, "packets_received": 10, - "percent_reply_loss": 0, "percent_packets_loss": 0, "errors": 0, "average_response_ms": avg_ms, - "minimum_response_ms": min_ms, "maximum_response_ms": max_ms, "result_code": 0}, - "time": 1528385860 * 10**9 - } for host, url, avg_ms, min_ms, max_ms in data - ] - - print("Establishing connection with influx DB on {0} with database {1}".format(influx_host, db_name)) - db_client = InfluxDBClient(host=influx_host, timeout=10, database=db_name) - db_client.drop_measurement("ping") # clear data in the ping measurement from previous executions of this script - print("Writing network latency data to influx..\n") - assert db_client.write_points(json_body) # assert the write method returns True - successful write - - -if __name__ == "__main__": - try: - opts, args = getopt.getopt(sys.argv[1:], "h:d:p:", ['host=', 'database=', 'path=']) - except getopt.GetoptError: - print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>') - sys.exit(1) - - if len(opts) != 3: - print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>') - sys.exit(1) - - db_host, database, path = None, None, None - # Apply options, if any - for opt, arg in opts: - if opt in ('-h', '--host'): - db_host = arg - elif opt in ('-d', '--database'): - database = arg - elif opt in ('-p', '--path'): - path = arg - - if all([db_host is not None, database is not None, path is not None]): - generate_network_measurements(db_host, database, path) diff --git a/src/service/clmcservice/generate_network_measurements.py b/src/service/clmcservice/generate_network_measurements.py new file mode 100644 index 0000000..3b88cce --- /dev/null +++ b/src/service/clmcservice/generate_network_measurements.py @@ -0,0 +1,99 @@ +import getopt +import sys +from itertools import permutations +from influxdb import InfluxDBClient +from json import load +from py2neo import Graph, Node, Relationship + + +def report_network_measurements(influx_host, db_name, json_data, neo4j_host, neo4j_password): + """ + Generates network measurements which follow the telegraf ping plugin format. + + :param influx_host: influx DB host + :param db_name: name of database + :param json_data: the network configuration data + :param neo4j_host: the neo4j db host + :param neo4j_password: the neo4j db password + """ + + # declares the data to push to influx - host, url, avg_response_ms, min_response_ms, max_response_ms + data = tuple((link["source"], link["target"], link["avg_response_time"], link["min_response_time"], link["max_response_time"]) for link in json_data["links"]) + + json_body = [ + {"measurement": "ping", + "tags": {"host": host, "url": url}, + "fields": {"packets_transmitted": 10, "reply_received": 10, "packets_received": 10, + "percent_reply_loss": 0, "percent_packets_loss": 0, "errors": 0, "average_response_ms": avg_ms, + "minimum_response_ms": min_ms, "maximum_response_ms": max_ms, "result_code": 0}, + "time": 1528385860 * 10**9 + } for host, url, avg_ms, min_ms, max_ms in data + ] + + print("Establishing connection with influx DB on {0} with database {1}".format(influx_host, db_name)) + db_client = InfluxDBClient(host=influx_host, timeout=10, database=db_name) + db_client.drop_measurement("ping") # clear data in the ping measurement from previous executions of this script + print("Writing network latency data to influx..\n") + assert db_client.write_points(json_body) # assert the write method returns True - successful write + + graph = Graph(host=neo4j_host, password=neo4j_password, port=11005) + + print("Building network links from the ping telegraf plugin in influx") + compute_nodes = set([host for host, url, avg_ms, min_ms, max_ms in data]) + # retrieve all network latencies available from the influx ping table + for network_link in permutations(compute_nodes, 2): + from_node_name, to_node_name = network_link + from_node = graph.nodes.match("ComputeNode", name=from_node_name).first() + if from_node is None: + from_node = Node("ComputeNode", name=from_node_name) + graph.create(from_node) + + to_node = graph.nodes.match("ComputeNode", name=to_node_name).first() + if to_node is None: + to_node = Node("ComputeNode", name=to_node_name) + graph.create(to_node) + + # query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\' and time>={2} and time<{3}'.format(from_node['name'], to_node['name'], from_timestamp, to_timestamp) + # In future when latencies are reported continuously, we should put timestamp filtering in the query for network links + query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\''.format(from_node['name'], to_node['name']) + print("Executing query: {0}".format(query)) + + result = db_client.query(query) # execute the query + # get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator) + try: + actual_result = next(result.get_points()) + latency = actual_result.get("mean_average_response_ms") + if graph.relationships.match(nodes=(from_node, to_node), r_type="linkedTo").first() is None: + edge = Relationship(from_node, "linkedTo", to_node, latency=latency) + graph.create(edge) + except StopIteration: + # in this case there is no such link reported to Influx + print("There is no direct link between {0} and {1}".format(from_node, to_node)) + + +if __name__ == "__main__": + try: + opts, args = getopt.getopt(sys.argv[1:], "h:d:p:", ['host=', 'database=', 'path=']) + except getopt.GetoptError: + print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>') + sys.exit(1) + + if len(opts) != 3: + print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>') + sys.exit(1) + + db_host, database, path = None, None, None + # Apply options, if any + for opt, arg in opts: + if opt in ('-h', '--host'): + db_host = arg + elif opt in ('-d', '--database'): + database = arg + elif opt in ('-p', '--path'): + path = arg + + if all([db_host is not None, database is not None, path is not None]): + with open(path) as fh: + json_data = load(fh) + + report_network_measurements(db_host, database, json_data, db_host, "admin") diff --git a/src/service/clmcservice/graphapi/conftest.py b/src/service/clmcservice/graphapi/conftest.py new file mode 100644 index 0000000..5dfd3b7 --- /dev/null +++ b/src/service/clmcservice/graphapi/conftest.py @@ -0,0 +1,209 @@ +#!/usr/bin/python3 +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 09-07-2018 +// Created for Project : FLAME +""" + +import pytest +from influxdb import InfluxDBClient +from clmcservice.generate_network_measurements import report_network_measurements +from py2neo import Graph + +network_config = { + "links": [ + { + "source": "DC1", + "target": "DC2", + "min_response_time": 10, + "max_response_time": 20, + "avg_response_time": 15 + }, + { + "source": "DC2", + "target": "DC1", + "min_response_time": 16, + "max_response_time": 28, + "avg_response_time": 22 + }, + { + "source": "DC1", + "target": "DC3", + "min_response_time": 17, + "max_response_time": 19, + "avg_response_time": 18 + }, + { + "source": "DC3", + "target": "DC1", + "min_response_time": 15, + "max_response_time": 25, + "avg_response_time": 20 + }, + { + "source": "DC1", + "target": "DC5", + "min_response_time": 27, + "max_response_time": 33, + "avg_response_time": 30 + }, + { + "source": "DC5", + "target": "DC1", + "min_response_time": 10, + "max_response_time": 42, + "avg_response_time": 26 + }, + { + "source": "DC2", + "target": "DC4", + "min_response_time": 11, + "max_response_time": 29, + "avg_response_time": 20 + }, + { + "source": "DC4", + "target": "DC2", + "min_response_time": 12, + "max_response_time": 40, + "avg_response_time": 26 + }, + { + "source": "DC3", + "target": "DC4", + "min_response_time": 23, + "max_response_time": 27, + "avg_response_time": 25 + }, + { + "source": "DC4", + "target": "DC3", + "min_response_time": 12, + "max_response_time": 18, + "avg_response_time": 15 + }, + { + "source": "DC5", + "target": "DC6", + "min_response_time": 3, + "max_response_time": 15, + "avg_response_time": 9 + }, + { + "source": "DC6", + "target": "DC5", + "min_response_time": 11, + "max_response_time": 11, + "avg_response_time": 11 + }, + ] +} + + +@pytest.fixture(scope='module', autouse=True) +def db_testing_data(): + """ + This fixture generates some testing data in influx to be used for testing, after which it clears up the DB. + + :return: a pair of time stamps defining the from-to range for which the test data is reported + """ + + global network_config + + test_db_name = "TestInfluxDB" + + influx = InfluxDBClient(host="localhost", port=8086, timeout=10) + graph = Graph(host="localhost", password="admin", port="11005") + graph.delete_all() + + dbs = influx.get_list_database() + if "CLMCMetrics" not in dbs: + influx.create_database("CLMCMetrics") + report_network_measurements("localhost", "CLMCMetrics", network_config, "localhost", "admin") + + if test_db_name in dbs: + influx.drop_database(test_db_name) + influx.create_database(test_db_name) + influx.switch_database(test_db_name) + + from_timestamp = 1528385860 + to_timestamp = 1528685860 + + data = [ + ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 5, 20, 1528385860), + ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 8, 35, 1528385860), + ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 15, 1528389860), + ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 10, 23, 1528389860), + ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 17, 1528395860), + ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 15, 11, 1528395860), + ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 17, 23, 1528485860), + ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 19, 24, 1528485860), + ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 16, 1528545860), + ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 20, 18, 1528545860) + ] + influx.write_points([ + {"measurement": "nginx", + "tags": {"host": host, "ipendpoint": endpoint, "location": location, "sf": sf, "sf_i": sf_i, "sfc": sfc, "sfc_i": sfc_i, "sr": sr}, + "fields": {"requests": num_requests, "avg_processing_time": processing_time}, + "time": timestamp * 10 ** 9 + } for host, endpoint, location, sf, sf_i, sfc, sfc_i, sr, num_requests, processing_time, timestamp in data + ]) + + data = [ + ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 86, 1528386860), + ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 15, 75, 1528386860), + ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 105, 1528388860), + ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 60, 1528388860), + ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 121, 1528410860), + ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 154, 1528410860), + ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 14, 84, 1528412860), + ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 5, 45, 1528412860), + ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 63, 1528414860), + ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 16, 86, 1528414860) + ] + influx.write_points([ + {"measurement": "minio_http", + "tags": {"host": host, "ipendpoint": endpoint, "location": location, "sf": sf, "sf_i": sf_i, "sfc": sfc, "sfc_i": sfc_i, "sr": sr}, + "fields": {"total_requests_count": num_requests, "total_processing_time": processing_time}, + "time": timestamp * 10 ** 9 + } for host, endpoint, location, sf, sf_i, sfc, sfc_i, sr, num_requests, processing_time, timestamp in data + ]) + + data = [ + ("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 15, 1528386860), + ("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 17, 1528388860), + ("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 19, 1528410860), + ("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 24, 1528412860), + ("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 13, 1528414860), + ] + influx.write_points([ + {"measurement": "apache", + "tags": {"host": host, "ipendpoint": endpoint, "location": location, "sf": sf, "sf_i": sf_i, "sfc": sfc, "sfc_i": sfc_i, "sr": sr}, + "fields": {"avg_processing_time": processing_time}, + "time": timestamp * 10 ** 9 + } for host, endpoint, location, sf, sf_i, sfc, sfc_i, sr, processing_time, timestamp in data + ]) + + yield from_timestamp, to_timestamp, test_db_name, graph + + influx.switch_database("CLMCMetrics") + influx.drop_measurement("ping") + influx.drop_database("TestInfluxDB") + graph.delete_all() diff --git a/src/service/clmcservice/graphapi/tests.py b/src/service/clmcservice/graphapi/tests.py index e69de29..04c687e 100644 --- a/src/service/clmcservice/graphapi/tests.py +++ b/src/service/clmcservice/graphapi/tests.py @@ -0,0 +1,379 @@ +#!/usr/bin/python3 +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 09-07-2018 +// Created for Project : FLAME +""" + +from json import dumps +import pytest +from pyramid import testing +from clmcservice.graphapi.views import GraphAPI +from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound + + +graph_1_id = None +graph_2_id = None + + +class TestGraphAPI(object): + """ + A pytest-implementation test for the Graph API endpoints. + """ + + @pytest.fixture(autouse=True) + def app_config(self): + """ + A fixture to implement setUp/tearDown functionality for all tests by initializing configuration structure for the web service and db connection + """ + + self.registry = testing.setUp() + self.registry.add_settings({"neo4j_host": "localhost", "neo4j_password": "admin", "influx_host": "localhost", "influx_port": 8086}) + + yield + + testing.tearDown() + + def test_build(self, db_testing_data): + """ + Tests the graph build API endpoint + + :param db_testing_data: pair of time stamps - the from-to range of the generated influx test data, test database name and the graph db client + """ + + global graph_1_id, graph_2_id + + from_timestamp, to_timestamp, test_db_name, graph_db = db_testing_data + + dc_nodes = set([node["name"] for node in graph_db.nodes.match("ComputeNode")]) + assert dc_nodes == set("DC" + str(i) for i in range(1, 7)), "Compute nodes must have been created by the db_testing_data fixture" + + request = testing.DummyRequest() + request.body = request.body.encode(request.charset) + error_raised = False + try: + GraphAPI(request).build_temporal_graph() + except HTTPBadRequest: + error_raised = True + assert error_raised, "A bad request error must have been raised in case of missing request body." + + body = dumps(dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="sfc_i")) + request = testing.DummyRequest() + request.params["from"] = 12341412 + request.params["to"] = 12341412 + request.body = body.encode(request.charset) + error_raised = False + try: + GraphAPI(request).build_temporal_graph() + except HTTPBadRequest: + error_raised = True + assert error_raised, "A bad request error must have been raised in case of invalid request body." + + body = dumps(dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="sfc_1", service_functions="{invalid_json}")) + request = testing.DummyRequest() + request.params["from"] = 12341412 + request.params["to"] = 12341412 + request.body = body.encode(request.charset) + error_raised = False + try: + GraphAPI(request).build_temporal_graph() + except HTTPBadRequest: + error_raised = True + assert error_raised, "A bad request error must have been raised in case of invalid request body." + + service_functions = ["nginx", "minio"] + body = dumps(dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="sfc_1", service_functions=service_functions)) + request = testing.DummyRequest() + request.params["from"] = 12341412 + request.params["to"] = 12341412 + request.body = body.encode(request.charset) + error_raised = False + try: + GraphAPI(request).build_temporal_graph() + except HTTPBadRequest: + error_raised = True + assert error_raised, "A bad request error must have been raised in case of invalid request body." + + service_functions = dict(nginx={"measurement_name": "nginx", "response_time_field": "mean(avg_processing_time)"}, + minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)"}, + apache={"measurement_name": "apache", "response_time_field": "mean(avg_processing_time)"}) + body = dumps(dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="sfc_1", service_functions=service_functions)) + request = testing.DummyRequest() + request.params["from_timestamp"] = 12341412 + request.params["to_timestamp"] = 12341412 + request.body = body.encode(request.charset) + error_raised = False + try: + GraphAPI(request).build_temporal_graph() + except HTTPBadRequest: + error_raised = True + assert error_raised, "A bad request error must have been raised in case of invalid URL parameters." + + service_functions = dict(nginx={"measurement_name": "nginx", "response_time_field": "mean(avg_processing_time)"}, + minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)"}) + build_json_body = dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="test_sfc1_1", service_functions=service_functions) + body = dumps(build_json_body) + request = testing.DummyRequest() + request.params["from"] = from_timestamp + request.params["to"] = to_timestamp + request.body = body.encode(request.charset) + response = GraphAPI(request).build_temporal_graph() + graph_subresponse = response.pop("graph") + assert response == build_json_body, "Response must contain the request body" + assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + request_id = graph_subresponse["uuid"] + graph_1_id = request_id + + # check that the appropriate nodes have been created + sf_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunction")]) + assert sf_names == {"nginx", "minio"}, "The graph must contain 2 service functions - nginx and minio" + sf_i_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunctionInstance")]) + assert sf_i_names == {"nginx_1", "minio_1"}, "The graph must contain 2 service function instances - nginx_1 and minio_1" + endpoints = set([node["name"] for node in graph_db.nodes.match("Endpoint", uuid=request_id)]) + assert endpoints == {"minio_1_ep1", "nginx_1_ep1", "nginx_1_ep2"} + + # check the appropriate edges have been created + self.check_exist_relationship( + ( + ("minio_1_ep1", "Endpoint", "DC4", "ComputeNode", "hostedBy"), + ("nginx_1_ep1", "Endpoint", "DC4", "ComputeNode", "hostedBy"), + ("nginx_1_ep2", "Endpoint", "DC6", "ComputeNode", "hostedBy"), + ("minio_1", "ServiceFunctionInstance", "minio_1_ep1", "Endpoint", "realisedBy"), + ("nginx_1", "ServiceFunctionInstance", "nginx_1_ep1", "Endpoint", "realisedBy"), + ("nginx_1", "ServiceFunctionInstance", "nginx_1_ep2", "Endpoint", "realisedBy"), + ("minio_1", "ServiceFunctionInstance", "minio", "ServiceFunction", "instanceOf"), + ("nginx_1", "ServiceFunctionInstance", "nginx", "ServiceFunction", "instanceOf") + ), graph_db, request_id + ) + + # check endpoint nodes response time property + for endpoint, response_time in (("minio_1_ep1", 9), ("nginx_1_ep1", 18.2), ("nginx_1_ep2", 22.2)): + endpoint_node = graph_db.nodes.match("Endpoint", name=endpoint, uuid=request_id).first() + assert endpoint_node["response_time"] == response_time, "Wrong response time property of endpoint node" + + # send a new request for a new service function chain + service_functions = dict(minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)"}, + apache={"measurement_name": "apache", "response_time_field": "mean(avg_processing_time)"}) + build_json_body = dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="test_sfc2_1", service_functions=service_functions) + body = dumps(build_json_body) + request = testing.DummyRequest() + request.params["from"] = from_timestamp + request.params["to"] = to_timestamp + request.body = body.encode(request.charset) + response = GraphAPI(request).build_temporal_graph() + graph_subresponse = response.pop("graph") + assert response == build_json_body, "Response must contain the request body" + assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + request_id = graph_subresponse["uuid"] + graph_2_id = request_id + + assert graph_db.nodes.match("ServiceFunction", name="apache").first() is not None, "Service function apache must have been added to the graph" + + for sf_i in ("apache_1", "minio_2"): + assert graph_db.nodes.match("ServiceFunctionInstance", name=sf_i).first() is not None, "Service function instance {0} must have been added to the graph".format(sf_i) + + for ep in ("minio_2_ep1", "apache_1_ep1"): + assert graph_db.nodes.match("Endpoint", name=ep, uuid=request_id).first() is not None, "Endpoint {0} must have been added to the graph".format(ep) + + # check the appropriate edges have been created + self.check_exist_relationship( + ( + ("minio_2_ep1", "Endpoint", "DC5", "ComputeNode", "hostedBy"), + ("apache_1_ep1", "Endpoint", "DC5", "ComputeNode", "hostedBy"), + ("minio_2", "ServiceFunctionInstance", "minio_2_ep1", "Endpoint", "realisedBy"), + ("apache_1", "ServiceFunctionInstance", "apache_1_ep1", "Endpoint", "realisedBy"), + ("minio_2", "ServiceFunctionInstance", "minio", "ServiceFunction", "instanceOf"), + ("apache_1", "ServiceFunctionInstance", "apache", "ServiceFunction", "instanceOf") + ), graph_db, request_id + ) + + # check endpoint nodes response time property + for endpoint, response_time in (("minio_2_ep1", 7), ("apache_1_ep1", 17.6)): + endpoint_node = graph_db.nodes.match("Endpoint", name=endpoint, uuid=request_id).first() + assert endpoint_node["response_time"] == response_time, "Wrong response time property of endpoint node" + + def test_delete(self, db_testing_data): + """ + Tests the delete API endpoint of the Graph API + + :param db_testing_data: pair of time stamps - the from-to range of the generated influx test data, test database name and the graph db client + """ + + global graph_1_id, graph_2_id + + from_timestamp, to_timestamp, test_db_name, graph_db = db_testing_data + + request = testing.DummyRequest() + request.matchdict["graph_id"] = "invalid_graph_id" + error_raised = False + try: + GraphAPI(request).delete_temporal_graph() + except HTTPNotFound: + error_raised = True + assert error_raised, "HTTP Not Found error must be raised in case of unrecognized subgraph ID" + + request = testing.DummyRequest() + request.matchdict["graph_id"] = graph_1_id + response = GraphAPI(request).delete_temporal_graph() + assert response == {"uuid": graph_1_id, "deleted": 3}, "Incorrect response when deleting temporal graph" + + request = testing.DummyRequest() + request.matchdict["graph_id"] = graph_2_id + response = GraphAPI(request).delete_temporal_graph() + assert response == {"uuid": graph_2_id, "deleted": 2}, "Incorrect response when deleting temporal graph" + + assert len(graph_db.nodes.match("Endpoint")) == 0, "All endpoint nodes should have been deleted" + assert set([node["name"] for node in graph_db.nodes.match("ComputeNode")]) == set(["DC" + str(i) for i in range(1, 7)]), "Compute nodes must not be deleted" + assert set([node["name"] for node in graph_db.nodes.match("ServiceFunctionInstance")]) == {"nginx_1", "apache_1", "minio_1", "minio_2"}, "Service function instances must not be deleted." + assert set([node["name"] for node in graph_db.nodes.match("ServiceFunction")]) == {"nginx", "minio", "apache"}, "Service functions must not be deleted" + + def test_rtt(self, db_testing_data): + """ + Tests the rtt API endpoint of the Graph API + + :param db_testing_data: pair of time stamps - the from-to range of the generated influx test data, test database name and the graph db client + """ + + from_timestamp, to_timestamp, test_db_name, graph_db = db_testing_data + + service_functions = dict(nginx={"measurement_name": "nginx", "response_time_field": "mean(avg_processing_time)"}, + minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)"}) + build_json_body = dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="test_sfc1_1", service_functions=service_functions) + body = dumps(build_json_body) + request = testing.DummyRequest() + request.params["from"] = from_timestamp + request.params["to"] = to_timestamp + request.body = body.encode(request.charset) + response = GraphAPI(request).build_temporal_graph() + graph_subresponse = response.pop("graph") + assert response == build_json_body, "Response must contain the request body" + assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + request_id = graph_subresponse["uuid"] + + request = testing.DummyRequest() + request.matchdict["graph_id"] = request_id + error_raised = False + try: + GraphAPI(request).run_rtt_query() + except HTTPBadRequest: + error_raised = True + assert error_raised, "HTTP Bad Request must be thrown in case of missing or invalid url parameters" + + request = testing.DummyRequest() + request.matchdict["graph_id"] = request_id + request.params["endpoint"] = "nginx_1_ep1" + request.params["compute"] = "DC1" + error_raised = False + try: + GraphAPI(request).run_rtt_query() + except HTTPBadRequest: + error_raised = True + assert error_raised, "HTTP Bad Request must be thrown in case of missing or invalid url parameters" + + request = testing.DummyRequest() + request.matchdict["graph_id"] = request_id + request.params["endpoint"] = "nginx_1_ep1" + request.params["compute_node"] = "DC0" + error_raised = False + try: + GraphAPI(request).run_rtt_query() + except HTTPNotFound: + error_raised = True + assert error_raised, "HTTP Not Found error must be thrown for non existing compute node" + + request = testing.DummyRequest() + request.matchdict["graph_id"] = "random-uuid" + request.params["endpoint"] = "nginx_1_ep1" + request.params["compute_node"] = "DC0" + error_raised = False + try: + GraphAPI(request).run_rtt_query() + except HTTPNotFound: + error_raised = True + assert error_raised, "HTTP Not Found error must be thrown for an endpoint node with incorrect request ID" + + request = testing.DummyRequest() + request.matchdict["graph_id"] = request_id + request.params["endpoint"] = "apache_1_ep1" + request.params["compute_node"] = "DC1" + error_raised = False + try: + GraphAPI(request).run_rtt_query() + except HTTPNotFound: + error_raised = True + assert error_raised, "HTTP Not Found error must be thrown for a non existing endpoint" + + for dc, endpoint, forward_latencies, reverse_latencies, response_time in ( + ("DC6", "nginx_1_ep2", [], [], 22.2), + ("DC2", "nginx_1_ep2", [22, 30, 9], [11, 26, 15], 22.2), + ("DC3", "nginx_1_ep1", [25], [15], 18.2) + ): + request = testing.DummyRequest() + request.matchdict["graph_id"] = request_id + request.params["endpoint"] = endpoint + request.params["compute_node"] = dc + response = GraphAPI(request).run_rtt_query() + assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time}, "Incorrect RTT response" + + # send a new request for a new service function chain + service_functions = dict(minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)"}, + apache={"measurement_name": "apache", "response_time_field": "mean(avg_processing_time)"}) + build_json_body = dict(database=test_db_name, retention_policy="autogen", service_function_chain_instance="test_sfc2_1", service_functions=service_functions) + body = dumps(build_json_body) + request = testing.DummyRequest() + request.params["from"] = from_timestamp + request.params["to"] = to_timestamp + request.body = body.encode(request.charset) + response = GraphAPI(request).build_temporal_graph() + graph_subresponse = response.pop("graph") + assert response == build_json_body, "Response must contain the request body" + assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + request_id = graph_subresponse["uuid"] + + for dc, endpoint, forward_latencies, reverse_latencies, response_time in ( + ("DC5", "apache_1_ep1", [], [], 17.6), + ("DC5", "minio_2_ep1", [], [], 7), + ("DC3", "apache_1_ep1", [20, 30], [26, 18], 17.6), + ("DC2", "minio_2_ep1", [22, 30], [26, 15], 7) + ): + request = testing.DummyRequest() + request.matchdict["graph_id"] = request_id + request.params["endpoint"] = endpoint + request.params["compute_node"] = dc + response = GraphAPI(request).run_rtt_query() + assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time}, "Incorrect RTT response" + + @staticmethod + def check_exist_relationship(relationships_tuple, graph, uuid): + for relationship in relationships_tuple: + from_node_name, from_node_type, to_node_name, to_node_type, relationship_type = relationship + if from_node_type == "Endpoint": + from_node = graph.nodes.match(from_node_type, name=from_node_name, uuid=uuid).first() + else: + from_node = graph.nodes.match(from_node_type, name=from_node_name).first() + assert from_node is not None + + if to_node_type == "Endpoint": + to_node = graph.nodes.match(to_node_type, name=to_node_name, uuid=uuid).first() + else: + to_node = graph.nodes.match(to_node_type, name=to_node_name).first() + assert to_node is not None + + assert graph.relationships.match(nodes=(from_node, to_node), r_type=relationship_type).first() is not None, "Graph is missing a required relationship" diff --git a/src/service/clmcservice/graphapi/utilities.py b/src/service/clmcservice/graphapi/utilities.py index 766a831..c4ef5ed 100644 --- a/src/service/clmcservice/graphapi/utilities.py +++ b/src/service/clmcservice/graphapi/utilities.py @@ -22,7 +22,6 @@ // Created for Project : FLAME """ -from itertools import permutations from json import loads from py2neo import Node, Relationship import logging @@ -71,6 +70,8 @@ def validate_json_queries_body(body): assert GRAPH_BUILD_QUERY_PARAMS == set(body.keys()), "Invalid JSON query document." + assert type(body["service_functions"]) == dict, "The service function description should be represented with a dictionary." + for sf in body["service_functions"]: query_data = body["service_functions"][sf] assert type(query_data) == dict, "Each service function must be associated with a respective JSON object." @@ -231,25 +232,6 @@ def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, compute_nodes.add(compute_node) # add the compute node to the set of compute nodes - log.info("Building network links from the ping telegraf plugin in influx") - # retrieve all network latencies available from the influx ping table - for network_link in permutations(compute_nodes, 2): - from_node, to_node = network_link - # query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\' and time>={2} and time<{3}'.format(from_node['name'], to_node['name'], from_timestamp, to_timestamp) - # In future when latencies are reported continuously, we should put timestamp filtering in the query for network links - query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\''.format(from_node['name'], to_node['name']) - log.info("Executing query: {0}".format(query)) - - result = influx_client.query(query) # execute the query - # get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator) - try: - actual_result = next(result.get_points()) - latency = actual_result.get("mean_average_response_ms") - find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency) - except StopIteration: - # in this case there is no such link reported to Influx - log.info("There is no direct link between {0} and {1}".format(from_node, to_node)) - log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfc_i, db, rp)) -- GitLab