diff --git a/src/service/clmcservice/graphapi/conftest.py b/src/service/clmcservice/graphapi/conftest.py index 03e3f6d72df618f05cdeb3578c1fa155a1a090eb..63e241deb7ef366d9b40b9d2e80129ecb878d46b 100644 --- a/src/service/clmcservice/graphapi/conftest.py +++ b/src/service/clmcservice/graphapi/conftest.py @@ -24,99 +24,62 @@ import pytest from influxdb import InfluxDBClient -from clmcservice.generate_network_measurements import report_network_measurements from py2neo import Graph +from clmcservice.graphapi.utilities import build_network_graph # static network configuration data used for testing cases -network_config = { - "bandwidth": 104857600, - "links": [ - { - "source": "DC1", - "target": "DC2", - "min_response_time": 10, - "max_response_time": 20, - "avg_response_time": 15 - }, - { - "source": "DC2", - "target": "DC1", - "min_response_time": 16, - "max_response_time": 28, - "avg_response_time": 22 - }, - { - "source": "DC1", - "target": "DC3", - "min_response_time": 17, - "max_response_time": 19, - "avg_response_time": 18 - }, - { - "source": "DC3", - "target": "DC1", - "min_response_time": 15, - "max_response_time": 25, - "avg_response_time": 20 - }, - { - "source": "DC1", - "target": "DC5", - "min_response_time": 27, - "max_response_time": 33, - "avg_response_time": 30 - }, - { - "source": "DC5", - "target": "DC1", - "min_response_time": 10, - "max_response_time": 42, - "avg_response_time": 26 - }, - { - "source": "DC2", - "target": "DC4", - "min_response_time": 11, - "max_response_time": 29, - "avg_response_time": 20 - }, - { - "source": "DC4", - "target": "DC2", - "min_response_time": 12, - "max_response_time": 40, - "avg_response_time": 26 - }, - { - "source": "DC3", - "target": "DC4", - "min_response_time": 23, - "max_response_time": 27, - "avg_response_time": 25 - }, - { - "source": "DC4", - "target": "DC3", - "min_response_time": 12, - "max_response_time": 18, - "avg_response_time": 15 - }, - { - "source": "DC5", - "target": "DC6", - "min_response_time": 3, - "max_response_time": 15, - "avg_response_time": 9 - }, - { - "source": "DC6", - "target": "DC5", - "min_response_time": 11, - "max_response_time": 11, - "avg_response_time": 11 - }, - ] +links = [ + { + "src-switch": "dpid1", + "dst-switch": "dpid2", + "latency": 7.5 + }, + { + "src-switch": "dpid1", + "dst-switch": "dpid3", + "latency": 9 + }, + { + "src-switch": "dpid1", + "dst-switch": "dpid5", + "latency": 15 + }, + { + "src-switch": "dpid2", + "dst-switch": "dpid4", + "latency": 10 + }, + { + "src-switch": "dpid3", + "dst-switch": "dpid4", + "latency": 12.5 + }, + { + "src-switch": "dpid5", + "dst-switch": "dpid6", + "latency": 4.5 + } +] + + +switches = { + "dpid1": "127.0.0.1", + "dpid2": "127.0.0.2", + "dpid3": "127.0.0.3", + "dpid4": "127.0.0.4", + "dpid5": "127.0.0.5", + "dpid6": "127.0.0.6" +} + + +clusters = { + "127.0.0.1": "DC1", + "127.0.0.2": "DC2", + "127.0.0.3": "DC3", + "127.0.0.4": "DC4", + "127.0.0.5": "DC5", + "127.0.0.6": "DC6" } @@ -128,7 +91,7 @@ def db_testing_data(): :return: pair of time stamps - the from-to range of the generated influx test data, test database name and the graph db client object """ - global network_config + global links, switches, clusters test_sfc_name = "test_sfc" test_sfc_instance_1_name = "test_sfc_premium" @@ -142,9 +105,7 @@ def db_testing_data(): # create the physical infrastructure subgraph dbs = influx.get_list_database() - if "CLMCMetrics" not in dbs: - influx.create_database("CLMCMetrics") - report_network_measurements("localhost", "CLMCMetrics", network_config, "localhost", "admin") + build_network_graph(graph, switches, links, clusters) # check if exists ( if so, clear ) or create the test DB in influx if test_db_name in dbs: diff --git a/src/service/clmcservice/graphapi/tests.py b/src/service/clmcservice/graphapi/tests.py index 60ee4c1809b165a8166dc64cada16071d704b0e8..1615b3c0b34fd7c1f240caf6f14e1ecfe31c8d57 100644 --- a/src/service/clmcservice/graphapi/tests.py +++ b/src/service/clmcservice/graphapi/tests.py @@ -106,7 +106,7 @@ class TestGraphAPI(object): from_timestamp, to_timestamp, graph_db = db_testing_data - dc_nodes = set([node["name"] for node in graph_db.nodes.match("ComputeNode")]) + dc_nodes = set([node["name"] for node in graph_db.nodes.match("Cluster")]) assert dc_nodes == set("DC" + str(i) for i in range(1, 7)), "Compute nodes must have been created by the db_testing_data fixture" # test with invalid URL parameters naming @@ -164,9 +164,9 @@ class TestGraphAPI(object): # check the appropriate edges have been created self.check_exist_relationship( ( - ("minio_1_ep1", "Endpoint", "DC4", "ComputeNode", "hostedBy"), - ("nginx_1_ep1", "Endpoint", "DC4", "ComputeNode", "hostedBy"), - ("nginx_1_ep2", "Endpoint", "DC6", "ComputeNode", "hostedBy"), + ("minio_1_ep1", "Endpoint", "DC4", "Cluster", "hostedBy"), + ("nginx_1_ep1", "Endpoint", "DC4", "Cluster", "hostedBy"), + ("nginx_1_ep2", "Endpoint", "DC6", "Cluster", "hostedBy"), ("minio_1", "ServiceFunction", "minio_1_ep1", "Endpoint", "realisedBy"), ("nginx_1", "ServiceFunction", "nginx_1_ep1", "Endpoint", "realisedBy"), ("nginx_1", "ServiceFunction", "nginx_1_ep2", "Endpoint", "realisedBy"), @@ -223,8 +223,8 @@ class TestGraphAPI(object): # check the appropriate edges have been created self.check_exist_relationship( ( - ("minio_2_ep1", "Endpoint", "DC5", "ComputeNode", "hostedBy"), - ("apache_1_ep1", "Endpoint", "DC5", "ComputeNode", "hostedBy"), + ("minio_2_ep1", "Endpoint", "DC5", "Cluster", "hostedBy"), + ("apache_1_ep1", "Endpoint", "DC5", "Cluster", "hostedBy"), ("minio_2", "ServiceFunction", "minio_2_ep1", "Endpoint", "realisedBy"), ("apache_1", "ServiceFunction", "apache_1_ep1", "Endpoint", "realisedBy"), ("minio_2", "ServiceFunction", "minio", "ServiceFunctionPackage", "instanceOf"), @@ -278,7 +278,7 @@ class TestGraphAPI(object): assert response == {"uuid": graph_2_id, "deleted": 3}, "Incorrect response when deleting temporal graph" assert len(graph_db.nodes.match("Endpoint")) == 0, "All endpoint nodes should have been deleted" - assert set([node["name"] for node in graph_db.nodes.match("ComputeNode")]) == set(["DC" + str(i) for i in range(1, 7)]), "Compute nodes must not be deleted" + assert set([node["name"] for node in graph_db.nodes.match("Cluster")]) == set(["DC" + str(i) for i in range(1, 7)]), "Compute nodes must not be deleted" assert set([node["name"] for node in graph_db.nodes.match("ServiceFunction")]) == {"nginx_1", "apache_1", "minio_1", "minio_2"}, "Service functions must not be deleted." assert set([node["name"] for node in graph_db.nodes.match("ServiceFunctionPackage")]) == {"nginx", "minio", "apache"}, "Service function packages must not be deleted" assert set([node["name"] for node in graph_db.nodes.match("ServiceFunctionChainInstance")]) == {"test_sfc_premium", "test_sfc_non_premium"}, "Service function chain instances must not be deleted" @@ -378,8 +378,8 @@ class TestGraphAPI(object): # go through the set of input/output (expected) parameters and assert actual results match with expected ones for dc, endpoint, forward_latencies, reverse_latencies, response_time, request_size, response_size, rtt, global_tags in ( ("DC6", "nginx_1_ep2", [], [], 22.2, 35600, 6420, 22.2, {"flame_location": "DC6", "flame_sfe": "nginx_1_ep2", "flame_server": "DC6", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_premium", "flame_sfp": "nginx", "flame_sf": "nginx_1"}), - ("DC2", "nginx_1_ep2", [11, 15, 4.5], [5.5, 13, 7.5], 22.2, 35600, 6420, 78, {"flame_location": "DC6", "flame_sfe": "nginx_1_ep2", "flame_server": "DC6", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_premium", "flame_sfp": "nginx", "flame_sf": "nginx_1"}), - ("DC3", "nginx_1_ep1", [12.5], [7.5], 18.2, 2260, 9660, 38, {"flame_location": "DC4", "flame_sfe": "nginx_1_ep1", "flame_server": "DC4", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_premium", "flame_sfp": "nginx", "flame_sf": "nginx_1"}) + ("DC2", "nginx_1_ep2", [0, 7.5, 15, 4.5, 0], [0, 4.5, 15, 7.5, 0], 22.2, 35600, 6420, 78, {"flame_location": "DC6", "flame_sfe": "nginx_1_ep2", "flame_server": "DC6", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_premium", "flame_sfp": "nginx", "flame_sf": "nginx_1"}), + ("DC3", "nginx_1_ep1", [0, 12.5, 0], [0, 12.5, 0], 18.2, 2260, 9660, 38, {"flame_location": "DC4", "flame_sfe": "nginx_1_ep1", "flame_server": "DC4", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_premium", "flame_sfp": "nginx", "flame_sf": "nginx_1"}) ): request = testing.DummyRequest() request.matchdict["graph_id"] = request_id @@ -413,8 +413,8 @@ class TestGraphAPI(object): for dc, endpoint, forward_latencies, reverse_latencies, response_time, request_size, response_size, rtt, global_tags in ( ("DC5", "apache_1_ep1", [], [], 17.6, 1480, 7860, 17.6, {"flame_location": "DC5", "flame_sfe": "apache_1_ep1", "flame_server": "DC5", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_non_premium", "flame_sfp": "apache", "flame_sf": "apache_1"}), ("DC5", "minio_2_ep1", [], [], 7, 2998, 3610, 7, {"flame_location": "DC5", "flame_sfe": "minio_2_ep1", "flame_server": "DC5", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_non_premium", "flame_sfp": "minio", "flame_sf": "minio_2"}), - ("DC3", "apache_1_ep1", [10, 15], [13, 9], 17.6, 1480, 7860, 64, {"flame_location": "DC5", "flame_sfe": "apache_1_ep1", "flame_server": "DC5", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_non_premium", "flame_sfp": "apache", "flame_sf": "apache_1"}), - ("DC2", "minio_2_ep1", [11, 15], [13, 7.5], 7, 2998, 3610, 53, {"flame_location": "DC5", "flame_sfe": "minio_2_ep1", "flame_server": "DC5", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_non_premium", "flame_sfp": "minio", "flame_sf": "minio_2"}) + ("DC3", "apache_1_ep1", [0, 9, 15, 0], [0, 15, 9, 0], 17.6, 1480, 7860, 64, {"flame_location": "DC5", "flame_sfe": "apache_1_ep1", "flame_server": "DC5", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_non_premium", "flame_sfp": "apache", "flame_sf": "apache_1"}), + ("DC2", "minio_2_ep1", [0, 7.5, 15, 0], [0, 15, 7.5, 0], 7, 2998, 3610, 53, {"flame_location": "DC5", "flame_sfe": "minio_2_ep1", "flame_server": "DC5", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_non_premium", "flame_sfp": "minio", "flame_sf": "minio_2"}) ): request = testing.DummyRequest() request.matchdict["graph_id"] = request_id diff --git a/src/service/clmcservice/graphapi/utilities.py b/src/service/clmcservice/graphapi/utilities.py index 18efc199408c727c9bdbb42c81de68da67214934..1459e6905c622da41bf23220cf6c6088bc429a4c 100644 --- a/src/service/clmcservice/graphapi/utilities.py +++ b/src/service/clmcservice/graphapi/utilities.py @@ -36,18 +36,28 @@ GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "request_size_field", "res INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and time>={8} and time<{9} GROUP BY "flame_sfe", "flame_location", "flame_sf"' +# in cypher the syntax is {name: 'value'}, here we use {{name: 'value'}} to escape the curly braces when applying the python format function RTT_CYPHER_QUERY_TEMPLATE = """ -MATCH (dc:ComputeNode {{ name: '{0}' }}),(endpoint:Endpoint {{ name: '{1}', uuid: '{2}'}}), +MATCH (dc:Cluster {{ name: '{0}' }}),(endpoint:Endpoint {{ name: '{1}', uuid: '{2}'}}), path = shortestPath((dc)-[*]-(endpoint)) WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' ) -WITH nodes(path) as all_nodes, endpoint as endpoint - WITH all_nodes[0..size(all_nodes)-1] as network_nodes, endpoint as endpoint - UNWIND RANGE(0, size(network_nodes) - 2) as id - WITH network_nodes[id] as source, network_nodes[id+1] as target, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size - MATCH (source) -[r1]-> (target), (target) -[r2]-> (source) - RETURN collect(r1.latency) as forward_latencies, reverse(collect(r2.latency)) as reverse_latencies, response_time, request_size, response_size +WITH extract(y in filter(x in relationships(path) WHERE type(x) = 'linkedTo') | y.latency) as latencies, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size +RETURN latencies as forward_latencies, reverse(latencies) as reverse_latencies, response_time, request_size, response_size """ +# DEPRECATED QUERY - use this if we have to switch back to using two directed edges between a given pair of nodes +# RTT_CYPHER_QUERY_TEMPLATE = """ +# MATCH (dc:Cluster {{ name: '{0}' }}),(endpoint:Endpoint {{ name: '{1}', uuid: '{2}'}}), +# path = shortestPath((dc)-[*]-(endpoint)) +# WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' ) +# WITH nodes(path) as all_nodes, endpoint as endpoint +# WITH all_nodes[0..size(all_nodes)-1] as network_nodes, endpoint as endpoint +# UNWIND RANGE(0, size(network_nodes) - 2) as id +# WITH network_nodes[id] as source, network_nodes[id+1] as target, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size +# MATCH (source) -[r1]-> (target), (target) -[r2]-> (source) +# RETURN collect(r1.latency) as forward_latencies, reverse(collect(r2.latency)) as reverse_latencies, response_time, request_size, response_size +# """ + log = logging.getLogger('service_logger') @@ -182,7 +192,7 @@ def find_or_create_edge(graph, edge_type, from_node, to_node, **properties): return edge -def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client): +def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client): """ A function used to generate a temporal graph in the neo4j db. @@ -261,8 +271,8 @@ def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, # create an edge between the service function and the endpoint (if it is not already created) find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node) - # create a ComputeNode node from the tag value (if it is not already created) - compute_node = find_or_create_node(graph, "ComputeNode", name=tags["flame_location"]) + # create a Cluster node from the tag value (if it is not already created) + compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"]) # create an edge between the endpoint and the compute node (if it is not already created) find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node) @@ -292,3 +302,44 @@ def delete_temporal_subgraph(graph, subgraph_id): log.info("Deleted {0} nodes associated with ID {1}".format(nodes_matched, subgraph_id)) return nodes_matched + + +def build_network_graph(graph, switches, links, clusters): + """ + A function used to build the network topology in the neo4j graph given the collection of switches, links and clusters. + + :param graph: the neo4j graph database client + :param switches: a collection of all switches in the topology - mapping between the DPID of the switch and its IP address + :param links: a collection of all switch-to-switch links in the network topology - JSON format, list of objects, each object must have "src-switch", "dst-switch" and "latency" as keys + :param clusters: a collection of all clusters and the IP address of the service router that they are connected to - mapping between an IP address of a service router and a cluster identifier + """ + + for link in links: + # get the DPID of the source switch + source = link["src-switch"] + # get the IP address of the source switch + source = switches[source] + + # get the DPID of the destination switch + destination = link["dst-switch"] + # get the IP address of the destination switch + destination = switches[destination] + + # retrieve the latency for this link + latency = link["latency"] + + from_node = find_or_create_node(graph, "Switch", name=source) + to_node = find_or_create_node(graph, "Switch", name=destination) + + # create the link between the two nodes + find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency) + + if source in clusters: + cluster_name = clusters[source] + cluster_node = find_or_create_node(graph, "Cluster", name=cluster_name) + find_or_create_edge(graph, "linkedTo", cluster_node, from_node, latency=0) + + if destination in clusters: + cluster_name = clusters[destination] + cluster_node = find_or_create_node(graph, "Cluster", name=cluster_name) + find_or_create_edge(graph, "linkedTo", cluster_node, to_node, latency=0) diff --git a/src/service/clmcservice/graphapi/views.py b/src/service/clmcservice/graphapi/views.py index 0223aea5b0d7b54967d10ad91832be8592ad10f4..a208e7ae710b7a67fbe2652cfadf5f9c3763ef05 100644 --- a/src/service/clmcservice/graphapi/views.py +++ b/src/service/clmcservice/graphapi/views.py @@ -23,7 +23,7 @@ """ -from clmcservice.graphapi.utilities import validate_json_queries_body, validate_graph_url_params, build_temporal_graph, delete_temporal_subgraph, validate_graph_rtt_params, RTT_CYPHER_QUERY_TEMPLATE +from clmcservice.graphapi.utilities import validate_json_queries_body, validate_graph_url_params, build_temporal_subgraph, delete_temporal_subgraph, validate_graph_rtt_params, RTT_CYPHER_QUERY_TEMPLATE from uuid import uuid4 from influxdb import InfluxDBClient from py2neo import Graph @@ -83,7 +83,7 @@ class GraphAPI(object): request_id = str(uuid4()) - build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client) + build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client) json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}} return json_queries @@ -133,7 +133,7 @@ class GraphAPI(object): if reference_node is None: raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id)) - compute_node = all_nodes.match("ComputeNode", name=compute_node_label).first() + compute_node = all_nodes.match("Cluster", name=compute_node_label).first() if compute_node is None: raise HTTPNotFound("Compute node {0} doesn't exist.".format(compute_node_label))