From 6793cc307a166bb07e32b988ad24bf3e16ef2573 Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Thu, 12 Jul 2018 14:12:48 +0100 Subject: [PATCH] Updates graph api to return rtt results with global tag values --- src/service/clmcservice/graphapi/conftest.py | 40 ++++++++--------- src/service/clmcservice/graphapi/tests.py | 38 +++++++++------- src/service/clmcservice/graphapi/utilities.py | 9 +++- src/service/clmcservice/graphapi/views.py | 45 +++++++++++++------ 4 files changed, 81 insertions(+), 51 deletions(-) diff --git a/src/service/clmcservice/graphapi/conftest.py b/src/service/clmcservice/graphapi/conftest.py index 8111634..df21ef5 100644 --- a/src/service/clmcservice/graphapi/conftest.py +++ b/src/service/clmcservice/graphapi/conftest.py @@ -147,16 +147,16 @@ def db_testing_data(): to_timestamp = 1528685860 data = [ - ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 5, 20, 1528385860), - ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 8, 35, 1528385860), - ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 15, 1528389860), - ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 10, 23, 1528389860), - ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 17, 1528395860), - ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 15, 11, 1528395860), - ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 17, 23, 1528485860), - ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 19, 24, 1528485860), - ("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 16, 1528545860), - ("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 20, 18, 1528545860) + ("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 5, 20, 1528385860), + ("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 8, 35, 1528385860), + ("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 15, 1528389860), + ("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 10, 23, 1528389860), + ("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 17, 1528395860), + ("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 15, 11, 1528395860), + ("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 17, 23, 1528485860), + ("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 19, 24, 1528485860), + ("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 16, 1528545860), + ("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 20, 18, 1528545860) ] influx.write_points([ {"measurement": "nginx", @@ -167,16 +167,16 @@ def db_testing_data(): ]) data = [ - ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 86, 1528386860), - ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 15, 75, 1528386860), - ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 105, 1528388860), - ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 60, 1528388860), - ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 121, 1528410860), - ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 154, 1528410860), - ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 14, 84, 1528412860), - ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 5, 45, 1528412860), - ("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 63, 1528414860), - ("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 16, 86, 1528414860) + ("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 86, 1528386860), + ("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 15, 75, 1528386860), + ("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 105, 1528388860), + ("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 60, 1528388860), + ("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 121, 1528410860), + ("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 154, 1528410860), + ("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 14, 84, 1528412860), + ("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 5, 45, 1528412860), + ("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 63, 1528414860), + ("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 16, 86, 1528414860) ] influx.write_points([ {"measurement": "minio_http", diff --git a/src/service/clmcservice/graphapi/tests.py b/src/service/clmcservice/graphapi/tests.py index ce56154..d5c4554 100644 --- a/src/service/clmcservice/graphapi/tests.py +++ b/src/service/clmcservice/graphapi/tests.py @@ -149,11 +149,14 @@ class TestGraphAPI(object): sf_i_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunctionInstance")]) assert sf_i_names == {"nginx_1", "minio_1"}, "The graph must contain 2 service function instances - nginx_1 and minio_1" endpoints = set([node["name"] for node in graph_db.nodes.match("Endpoint", uuid=request_id)]) - assert endpoints == {"minio_1_ep1", "nginx_1_ep1", "nginx_1_ep2"} + assert endpoints == {"minio_1_ep1", "nginx_1_ep1", "nginx_1_ep2"}, "The graph must contain 3 endpoints - minio_1_ep1, nginx_1_ep1, nginx_1_ep2" sfc_i_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunctionChainInstance")]) - assert sfc_i_names == {"test_sfc1_1"} + assert sfc_i_names == {"test_sfc1_1"}, "The graph must contain 1 service function chain instance - test_sfc1_1" sfc_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunctionChain")]) - assert sfc_names == {"test_sfc1"} + assert sfc_names == {"test_sfc1"}, "The graph must contain 1 service function chain - test_sfc1" + + reference_node = graph_db.nodes.match("Reference", uuid=request_id, sfc_i="test_sfc1_1", sfc="test_sfc1").first() + assert reference_node is not None and reference_node["from"] == from_timestamp * 10**9 and reference_node["to"] == to_timestamp * 10**9, "Reference node must have been created" # check the appropriate edges have been created self.check_exist_relationship( @@ -205,6 +208,9 @@ class TestGraphAPI(object): assert graph_db.nodes.match("ServiceFunctionChainInstance", name="test_sfc2_1").first() is not None, "Service function chain instance test_sfc2_1 must have been added to the graph" assert graph_db.nodes.match("ServiceFunctionChain", name="test_sfc2").first() is not None, "Service function chain test_sfc2 must have been added to the graph" + reference_node = graph_db.nodes.match("Reference", uuid=request_id, sfc_i="test_sfc2_1", sfc="test_sfc2").first() + assert reference_node is not None and reference_node["from"] == from_timestamp * 10**9 and reference_node["to"] == to_timestamp * 10**9, "Reference node must have been created" + # check the appropriate edges have been created self.check_exist_relationship( ( @@ -250,12 +256,12 @@ class TestGraphAPI(object): request = testing.DummyRequest() request.matchdict["graph_id"] = graph_1_id response = GraphAPI(request).delete_temporal_graph() - assert response == {"uuid": graph_1_id, "deleted": 3}, "Incorrect response when deleting temporal graph" + assert response == {"uuid": graph_1_id, "deleted": 4}, "Incorrect response when deleting temporal graph" request = testing.DummyRequest() request.matchdict["graph_id"] = graph_2_id response = GraphAPI(request).delete_temporal_graph() - assert response == {"uuid": graph_2_id, "deleted": 2}, "Incorrect response when deleting temporal graph" + assert response == {"uuid": graph_2_id, "deleted": 3}, "Incorrect response when deleting temporal graph" assert len(graph_db.nodes.match("Endpoint")) == 0, "All endpoint nodes should have been deleted" assert set([node["name"] for node in graph_db.nodes.match("ComputeNode")]) == set(["DC" + str(i) for i in range(1, 7)]), "Compute nodes must not be deleted" @@ -349,17 +355,17 @@ class TestGraphAPI(object): error_raised = True assert error_raised, "HTTP Not Found error must be thrown for a non existing endpoint" - for dc, endpoint, forward_latencies, reverse_latencies, response_time in ( - ("DC6", "nginx_1_ep2", [], [], 22.2), - ("DC2", "nginx_1_ep2", [22, 30, 9], [11, 26, 15], 22.2), - ("DC3", "nginx_1_ep1", [25], [15], 18.2) + for dc, endpoint, forward_latencies, reverse_latencies, response_time, global_tags in ( + ("DC6", "nginx_1_ep2", [], [], 22.2, {"location": "DC6", "sr": "sr6", "ipendpoint": "nginx_1_ep2", "host": "host2", "sfc": "test_sfc1", "sfc_i": "test_sfc1_1", "sf": "nginx", "sf_i": "nginx_1"}), + ("DC2", "nginx_1_ep2", [22, 30, 9], [11, 26, 15], 22.2, {"location": "DC6", "sr": "sr6", "ipendpoint": "nginx_1_ep2", "host": "host2", "sfc": "test_sfc1", "sfc_i": "test_sfc1_1", "sf": "nginx", "sf_i": "nginx_1"}), + ("DC3", "nginx_1_ep1", [25], [15], 18.2, {"location": "DC4", "sr": "sr4", "ipendpoint": "nginx_1_ep1", "host": "host1", "sfc": "test_sfc1", "sfc_i": "test_sfc1_1", "sf": "nginx", "sf_i": "nginx_1"}) ): request = testing.DummyRequest() request.matchdict["graph_id"] = request_id request.params["endpoint"] = endpoint request.params["compute_node"] = dc response = GraphAPI(request).run_rtt_query() - assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time}, "Incorrect RTT response" + assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time, "global_tags": global_tags}, "Incorrect RTT response" # send a new request for a new service function chain service_functions = dict(minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)"}, @@ -376,18 +382,18 @@ class TestGraphAPI(object): assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." request_id = graph_subresponse["uuid"] - for dc, endpoint, forward_latencies, reverse_latencies, response_time in ( - ("DC5", "apache_1_ep1", [], [], 17.6), - ("DC5", "minio_2_ep1", [], [], 7), - ("DC3", "apache_1_ep1", [20, 30], [26, 18], 17.6), - ("DC2", "minio_2_ep1", [22, 30], [26, 15], 7) + for dc, endpoint, forward_latencies, reverse_latencies, response_time, global_tags in ( + ("DC5", "apache_1_ep1", [], [], 17.6, {"location": "DC5", "sr": "sr5", "ipendpoint": "apache_1_ep1", "host": "host5", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "apache", "sf_i": "apache_1"}), + ("DC5", "minio_2_ep1", [], [], 7, {"location": "DC5", "sr": "sr5", "ipendpoint": "minio_2_ep1", "host": "host4", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "minio", "sf_i": "minio_2"}), + ("DC3", "apache_1_ep1", [20, 30], [26, 18], 17.6, {"location": "DC5", "sr": "sr5", "ipendpoint": "apache_1_ep1", "host": "host5", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "apache", "sf_i": "apache_1"}), + ("DC2", "minio_2_ep1", [22, 30], [26, 15], 7, {"location": "DC5", "sr": "sr5", "ipendpoint": "minio_2_ep1", "host": "host4", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "minio", "sf_i": "minio_2"}) ): request = testing.DummyRequest() request.matchdict["graph_id"] = request_id request.params["endpoint"] = endpoint request.params["compute_node"] = dc response = GraphAPI(request).run_rtt_query() - assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time}, "Incorrect RTT response" + assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time, "global_tags": global_tags}, "Incorrect RTT response" @staticmethod def check_exist_relationship(relationships_tuple, graph, uuid): diff --git a/src/service/clmcservice/graphapi/utilities.py b/src/service/clmcservice/graphapi/utilities.py index 9680c17..2841152 100644 --- a/src/service/clmcservice/graphapi/utilities.py +++ b/src/service/clmcservice/graphapi/utilities.py @@ -33,7 +33,7 @@ GRAPH_BUILD_URL_PARAMS = ("from", "to") GRAPH_BUILD_QUERY_PARAMS = {"database", "retention_policy", "service_function_chain_instance", "service_functions"} GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "measurement_name"} -INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time FROM "{1}"."{2}".{3} WHERE sfc_i=\'{4}\' and time>={5} and time<{6} GROUP BY ipendpoint, location, sf_i' +INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time FROM "{1}"."{2}".{3} WHERE sfc_i=\'{4}\' and time>={5} and time<{6} GROUP BY ipendpoint, location, sf_i, host, sr' RTT_CYPHER_QUERY_TEMPLATE = """ @@ -200,6 +200,11 @@ def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, log.info("Building graph for service function chain {0} from database {1} with retention policy {2}".format(sfc_i, db, rp)) sfc = "_".join(sfc_i.split('_')[: -1]) # assumes sfc_i is always in the form <sfc>_<num> + + # create a UUID reference node + reference_node = Node("Reference", **{"uuid": request_id, "sfc": sfc, "sfc_i": sfc_i, "from": from_timestamp, "to": to_timestamp}) + graph.create(reference_node) + # create a node for the service function chain if it doesn't exist service_function_chain_node = find_or_create_node(graph, "ServiceFunctionChain", name=sfc) # create a node for the service function chain instance if it doesn't exist @@ -246,7 +251,7 @@ def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, find_or_create_edge(graph, "utilizedBy", service_function_instance_node, service_function_chain_instance_node) # create an Endpoint node from the tag value (if it is not already created) - ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["ipendpoint"], response_time=response_time, uuid=request_id) + ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["ipendpoint"], response_time=response_time, uuid=request_id, host=tags["host"], sr=tags["sr"]) # create an edge between the instance and the endpoint (if it is not already created) find_or_create_edge(graph, "realisedBy", service_function_instance_node, ipendpoint_node) diff --git a/src/service/clmcservice/graphapi/views.py b/src/service/clmcservice/graphapi/views.py index 62c312e..4d1a94f 100644 --- a/src/service/clmcservice/graphapi/views.py +++ b/src/service/clmcservice/graphapi/views.py @@ -102,13 +102,13 @@ class GraphAPI(object): graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db - number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id) - if number_of_deleted_nodes > 0: - return {"uuid": graph_id, "deleted": number_of_deleted_nodes} - else: + if graph.nodes.match("Reference", uuid=graph_id).first() is None: raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id)) + number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id) + return {"uuid": graph_id, "deleted": number_of_deleted_nodes} + @view_config(route_name='graph_algorithms_rtt', request_method='GET') def run_rtt_query(self): """ @@ -133,6 +133,10 @@ class GraphAPI(object): all_nodes = graph.nodes + reference_node = all_nodes.match("Reference", uuid=graph_id).first() + if reference_node is None: + raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id)) + compute_node = all_nodes.match("ComputeNode", name=compute_node_label).first() if compute_node is None: raise HTTPNotFound("Compute node {0} doesn't exist.".format(compute_node_label)) @@ -142,12 +146,27 @@ class GraphAPI(object): raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label)) # check if the endpoint is hosted by the compute node before running the RTT cypher query - hosted_by_rel = graph.relationships.match(nodes=(endpoint_node, compute_node), r_type="hostedBy").first() - if hosted_by_rel is not None: - return {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"]} - - query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(compute_node_label, endpoint_node_label, graph_id) - log.info("Executing cypher query: {0}".format(query_to_execute)) - data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result - - return data[0] # we only expect one result from the query - a dictionary with the following fields: forward_latencies, reverse_latencies, response_time + hosted_by_node = graph.relationships.match(nodes=(endpoint_node, None), r_type="hostedBy").first().end_node + if hosted_by_node["name"] == compute_node["name"]: + result = {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"]} + else: + query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(compute_node_label, endpoint_node_label, graph_id) + log.info("Executing cypher query: {0}".format(query_to_execute)) + data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result + result = data[0] + + sf_i_node = graph.match(nodes=(None, endpoint_node), r_type="realisedBy").first().start_node + if sf_i_node is None: + msg = "No service function instance found associated with endpoint {0}".format(endpoint_node["name"]) + log.error("Unexpected error: {0}".format(msg)) + raise HTTPBadRequest(msg) + + sf_node = graph.match(nodes=(sf_i_node, None), r_type="instanceOf").first().end_node + if sf_node is None: + msg = "No service function found associated with service function instance {0}".format(sf_i_node["name"]) + log.error("Unexpected error: {0}".format(msg)) + raise HTTPBadRequest(msg) + + result["global_tags"] = {"ipendpoint": endpoint_node["name"], "host": endpoint_node["host"], "location": hosted_by_node["name"], "sr": endpoint_node["sr"], + "sfc": reference_node["sfc"], "sfc_i": reference_node["sfc_i"], "sf": sf_node["name"], "sf_i": sf_i_node["name"]} + return result # we only expect one result from the query - a dictionary with the following fields: forward_latencies, reverse_latencies, response_time -- GitLab