Skip to content
Snippets Groups Projects
Commit 6793cc30 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Updates graph api to return rtt results with global tag values

parent d62880f4
No related branches found
No related tags found
No related merge requests found
......@@ -147,16 +147,16 @@ def db_testing_data():
to_timestamp = 1528685860
data = [
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 5, 20, 1528385860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 8, 35, 1528385860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 15, 1528389860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 10, 23, 1528389860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 17, 1528395860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 15, 11, 1528395860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 17, 23, 1528485860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 19, 24, 1528485860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 16, 1528545860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 20, 18, 1528545860)
("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 5, 20, 1528385860),
("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 8, 35, 1528385860),
("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 15, 1528389860),
("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 10, 23, 1528389860),
("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 17, 1528395860),
("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 15, 11, 1528395860),
("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 17, 23, 1528485860),
("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 19, 24, 1528485860),
("host1", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 16, 1528545860),
("host2", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 20, 18, 1528545860)
]
influx.write_points([
{"measurement": "nginx",
......@@ -167,16 +167,16 @@ def db_testing_data():
])
data = [
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 86, 1528386860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 15, 75, 1528386860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 105, 1528388860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 60, 1528388860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 121, 1528410860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 154, 1528410860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 14, 84, 1528412860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 5, 45, 1528412860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 63, 1528414860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 16, 86, 1528414860)
("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 86, 1528386860),
("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 15, 75, 1528386860),
("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 105, 1528388860),
("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 60, 1528388860),
("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 121, 1528410860),
("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 154, 1528410860),
("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 14, 84, 1528412860),
("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 5, 45, 1528412860),
("host3", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 63, 1528414860),
("host4", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 16, 86, 1528414860)
]
influx.write_points([
{"measurement": "minio_http",
......
......@@ -149,11 +149,14 @@ class TestGraphAPI(object):
sf_i_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunctionInstance")])
assert sf_i_names == {"nginx_1", "minio_1"}, "The graph must contain 2 service function instances - nginx_1 and minio_1"
endpoints = set([node["name"] for node in graph_db.nodes.match("Endpoint", uuid=request_id)])
assert endpoints == {"minio_1_ep1", "nginx_1_ep1", "nginx_1_ep2"}
assert endpoints == {"minio_1_ep1", "nginx_1_ep1", "nginx_1_ep2"}, "The graph must contain 3 endpoints - minio_1_ep1, nginx_1_ep1, nginx_1_ep2"
sfc_i_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunctionChainInstance")])
assert sfc_i_names == {"test_sfc1_1"}
assert sfc_i_names == {"test_sfc1_1"}, "The graph must contain 1 service function chain instance - test_sfc1_1"
sfc_names = set([node["name"] for node in graph_db.nodes.match("ServiceFunctionChain")])
assert sfc_names == {"test_sfc1"}
assert sfc_names == {"test_sfc1"}, "The graph must contain 1 service function chain - test_sfc1"
reference_node = graph_db.nodes.match("Reference", uuid=request_id, sfc_i="test_sfc1_1", sfc="test_sfc1").first()
assert reference_node is not None and reference_node["from"] == from_timestamp * 10**9 and reference_node["to"] == to_timestamp * 10**9, "Reference node must have been created"
# check the appropriate edges have been created
self.check_exist_relationship(
......@@ -205,6 +208,9 @@ class TestGraphAPI(object):
assert graph_db.nodes.match("ServiceFunctionChainInstance", name="test_sfc2_1").first() is not None, "Service function chain instance test_sfc2_1 must have been added to the graph"
assert graph_db.nodes.match("ServiceFunctionChain", name="test_sfc2").first() is not None, "Service function chain test_sfc2 must have been added to the graph"
reference_node = graph_db.nodes.match("Reference", uuid=request_id, sfc_i="test_sfc2_1", sfc="test_sfc2").first()
assert reference_node is not None and reference_node["from"] == from_timestamp * 10**9 and reference_node["to"] == to_timestamp * 10**9, "Reference node must have been created"
# check the appropriate edges have been created
self.check_exist_relationship(
(
......@@ -250,12 +256,12 @@ class TestGraphAPI(object):
request = testing.DummyRequest()
request.matchdict["graph_id"] = graph_1_id
response = GraphAPI(request).delete_temporal_graph()
assert response == {"uuid": graph_1_id, "deleted": 3}, "Incorrect response when deleting temporal graph"
assert response == {"uuid": graph_1_id, "deleted": 4}, "Incorrect response when deleting temporal graph"
request = testing.DummyRequest()
request.matchdict["graph_id"] = graph_2_id
response = GraphAPI(request).delete_temporal_graph()
assert response == {"uuid": graph_2_id, "deleted": 2}, "Incorrect response when deleting temporal graph"
assert response == {"uuid": graph_2_id, "deleted": 3}, "Incorrect response when deleting temporal graph"
assert len(graph_db.nodes.match("Endpoint")) == 0, "All endpoint nodes should have been deleted"
assert set([node["name"] for node in graph_db.nodes.match("ComputeNode")]) == set(["DC" + str(i) for i in range(1, 7)]), "Compute nodes must not be deleted"
......@@ -349,17 +355,17 @@ class TestGraphAPI(object):
error_raised = True
assert error_raised, "HTTP Not Found error must be thrown for a non existing endpoint"
for dc, endpoint, forward_latencies, reverse_latencies, response_time in (
("DC6", "nginx_1_ep2", [], [], 22.2),
("DC2", "nginx_1_ep2", [22, 30, 9], [11, 26, 15], 22.2),
("DC3", "nginx_1_ep1", [25], [15], 18.2)
for dc, endpoint, forward_latencies, reverse_latencies, response_time, global_tags in (
("DC6", "nginx_1_ep2", [], [], 22.2, {"location": "DC6", "sr": "sr6", "ipendpoint": "nginx_1_ep2", "host": "host2", "sfc": "test_sfc1", "sfc_i": "test_sfc1_1", "sf": "nginx", "sf_i": "nginx_1"}),
("DC2", "nginx_1_ep2", [22, 30, 9], [11, 26, 15], 22.2, {"location": "DC6", "sr": "sr6", "ipendpoint": "nginx_1_ep2", "host": "host2", "sfc": "test_sfc1", "sfc_i": "test_sfc1_1", "sf": "nginx", "sf_i": "nginx_1"}),
("DC3", "nginx_1_ep1", [25], [15], 18.2, {"location": "DC4", "sr": "sr4", "ipendpoint": "nginx_1_ep1", "host": "host1", "sfc": "test_sfc1", "sfc_i": "test_sfc1_1", "sf": "nginx", "sf_i": "nginx_1"})
):
request = testing.DummyRequest()
request.matchdict["graph_id"] = request_id
request.params["endpoint"] = endpoint
request.params["compute_node"] = dc
response = GraphAPI(request).run_rtt_query()
assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time}, "Incorrect RTT response"
assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time, "global_tags": global_tags}, "Incorrect RTT response"
# send a new request for a new service function chain
service_functions = dict(minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)"},
......@@ -376,18 +382,18 @@ class TestGraphAPI(object):
assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response."
request_id = graph_subresponse["uuid"]
for dc, endpoint, forward_latencies, reverse_latencies, response_time in (
("DC5", "apache_1_ep1", [], [], 17.6),
("DC5", "minio_2_ep1", [], [], 7),
("DC3", "apache_1_ep1", [20, 30], [26, 18], 17.6),
("DC2", "minio_2_ep1", [22, 30], [26, 15], 7)
for dc, endpoint, forward_latencies, reverse_latencies, response_time, global_tags in (
("DC5", "apache_1_ep1", [], [], 17.6, {"location": "DC5", "sr": "sr5", "ipendpoint": "apache_1_ep1", "host": "host5", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "apache", "sf_i": "apache_1"}),
("DC5", "minio_2_ep1", [], [], 7, {"location": "DC5", "sr": "sr5", "ipendpoint": "minio_2_ep1", "host": "host4", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "minio", "sf_i": "minio_2"}),
("DC3", "apache_1_ep1", [20, 30], [26, 18], 17.6, {"location": "DC5", "sr": "sr5", "ipendpoint": "apache_1_ep1", "host": "host5", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "apache", "sf_i": "apache_1"}),
("DC2", "minio_2_ep1", [22, 30], [26, 15], 7, {"location": "DC5", "sr": "sr5", "ipendpoint": "minio_2_ep1", "host": "host4", "sfc": "test_sfc2", "sfc_i": "test_sfc2_1", "sf": "minio", "sf_i": "minio_2"})
):
request = testing.DummyRequest()
request.matchdict["graph_id"] = request_id
request.params["endpoint"] = endpoint
request.params["compute_node"] = dc
response = GraphAPI(request).run_rtt_query()
assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time}, "Incorrect RTT response"
assert response == {"forward_latencies": forward_latencies, "reverse_latencies": reverse_latencies, "response_time": response_time, "global_tags": global_tags}, "Incorrect RTT response"
@staticmethod
def check_exist_relationship(relationships_tuple, graph, uuid):
......
......@@ -33,7 +33,7 @@ GRAPH_BUILD_URL_PARAMS = ("from", "to")
GRAPH_BUILD_QUERY_PARAMS = {"database", "retention_policy", "service_function_chain_instance", "service_functions"}
GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "measurement_name"}
INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time FROM "{1}"."{2}".{3} WHERE sfc_i=\'{4}\' and time>={5} and time<{6} GROUP BY ipendpoint, location, sf_i'
INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time FROM "{1}"."{2}".{3} WHERE sfc_i=\'{4}\' and time>={5} and time<{6} GROUP BY ipendpoint, location, sf_i, host, sr'
RTT_CYPHER_QUERY_TEMPLATE = """
......@@ -200,6 +200,11 @@ def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries,
log.info("Building graph for service function chain {0} from database {1} with retention policy {2}".format(sfc_i, db, rp))
sfc = "_".join(sfc_i.split('_')[: -1]) # assumes sfc_i is always in the form <sfc>_<num>
# create a UUID reference node
reference_node = Node("Reference", **{"uuid": request_id, "sfc": sfc, "sfc_i": sfc_i, "from": from_timestamp, "to": to_timestamp})
graph.create(reference_node)
# create a node for the service function chain if it doesn't exist
service_function_chain_node = find_or_create_node(graph, "ServiceFunctionChain", name=sfc)
# create a node for the service function chain instance if it doesn't exist
......@@ -246,7 +251,7 @@ def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries,
find_or_create_edge(graph, "utilizedBy", service_function_instance_node, service_function_chain_instance_node)
# create an Endpoint node from the tag value (if it is not already created)
ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["ipendpoint"], response_time=response_time, uuid=request_id)
ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["ipendpoint"], response_time=response_time, uuid=request_id, host=tags["host"], sr=tags["sr"])
# create an edge between the instance and the endpoint (if it is not already created)
find_or_create_edge(graph, "realisedBy", service_function_instance_node, ipendpoint_node)
......
......@@ -102,13 +102,13 @@ class GraphAPI(object):
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id)
if number_of_deleted_nodes > 0:
return {"uuid": graph_id, "deleted": number_of_deleted_nodes}
else:
if graph.nodes.match("Reference", uuid=graph_id).first() is None:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id)
return {"uuid": graph_id, "deleted": number_of_deleted_nodes}
@view_config(route_name='graph_algorithms_rtt', request_method='GET')
def run_rtt_query(self):
"""
......@@ -133,6 +133,10 @@ class GraphAPI(object):
all_nodes = graph.nodes
reference_node = all_nodes.match("Reference", uuid=graph_id).first()
if reference_node is None:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
compute_node = all_nodes.match("ComputeNode", name=compute_node_label).first()
if compute_node is None:
raise HTTPNotFound("Compute node {0} doesn't exist.".format(compute_node_label))
......@@ -142,12 +146,27 @@ class GraphAPI(object):
raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label))
# check if the endpoint is hosted by the compute node before running the RTT cypher query
hosted_by_rel = graph.relationships.match(nodes=(endpoint_node, compute_node), r_type="hostedBy").first()
if hosted_by_rel is not None:
return {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"]}
query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(compute_node_label, endpoint_node_label, graph_id)
log.info("Executing cypher query: {0}".format(query_to_execute))
data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result
return data[0] # we only expect one result from the query - a dictionary with the following fields: forward_latencies, reverse_latencies, response_time
hosted_by_node = graph.relationships.match(nodes=(endpoint_node, None), r_type="hostedBy").first().end_node
if hosted_by_node["name"] == compute_node["name"]:
result = {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"]}
else:
query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(compute_node_label, endpoint_node_label, graph_id)
log.info("Executing cypher query: {0}".format(query_to_execute))
data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result
result = data[0]
sf_i_node = graph.match(nodes=(None, endpoint_node), r_type="realisedBy").first().start_node
if sf_i_node is None:
msg = "No service function instance found associated with endpoint {0}".format(endpoint_node["name"])
log.error("Unexpected error: {0}".format(msg))
raise HTTPBadRequest(msg)
sf_node = graph.match(nodes=(sf_i_node, None), r_type="instanceOf").first().end_node
if sf_node is None:
msg = "No service function found associated with service function instance {0}".format(sf_i_node["name"])
log.error("Unexpected error: {0}".format(msg))
raise HTTPBadRequest(msg)
result["global_tags"] = {"ipendpoint": endpoint_node["name"], "host": endpoint_node["host"], "location": hosted_by_node["name"], "sr": endpoint_node["sr"],
"sfc": reference_node["sfc"], "sfc_i": reference_node["sfc_i"], "sf": sf_node["name"], "sf_i": sf_i_node["name"]}
return result # we only expect one result from the query - a dictionary with the following fields: forward_latencies, reverse_latencies, response_time
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment