Skip to content
Snippets Groups Projects
Commit d0b659b6 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Slight updates to graph API

parent 55d5a848
No related branches found
No related tags found
No related merge requests found
......@@ -119,7 +119,7 @@ class TestGraphAPI(object):
body = dumps(dict(service_function_chain="sfc", service_function_chain_instance="sfc_1", service_functions=service_functions))
request = testing.DummyRequest()
request.body = body.encode(request.charset)
with pytest.raises(HTTPBadRequest, message="A bad request error must have been raised in case of missing timestamp parameters."):
with pytest.raises(HTTPBadRequest):
GraphAPI(request).build_temporal_graph()
# Create a valid build request and send it to the API endpoint
......@@ -135,8 +135,13 @@ class TestGraphAPI(object):
request.body = body.encode(request.charset)
response = GraphAPI(request).build_temporal_graph()
graph_subresponse = response.pop("graph")
# remove the "from" and "to" keys, these will be returned in the graph_subresponse
build_json_body.pop("from")
build_json_body.pop("to")
assert response == build_json_body, "Response must contain the request body"
assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response."
assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds
assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds
request_id = graph_subresponse["uuid"]
graph_1_id = request_id
......@@ -194,8 +199,13 @@ class TestGraphAPI(object):
request.body = body.encode(request.charset)
response = GraphAPI(request).build_temporal_graph()
graph_subresponse = response.pop("graph")
# remove the "from" and "to" keys, these will be returned in the graph_subresponse
build_json_body.pop("from")
build_json_body.pop("to")
assert response == build_json_body, "Response must contain the request body"
assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response."
assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds
assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds
request_id = graph_subresponse["uuid"]
graph_2_id = request_id
......@@ -331,8 +341,13 @@ class TestGraphAPI(object):
request.body = body.encode(request.charset)
response = GraphAPI(request).build_temporal_graph()
graph_subresponse = response.pop("graph")
# remove the "from" and "to" keys, these will be returned in the graph_subresponse
build_json_body.pop("from")
build_json_body.pop("to")
assert response == build_json_body, "Response must contain the request body"
assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response."
assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds
assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds
request_id = graph_subresponse["uuid"]
# test some more error case handling of the RTT API endpoint
......@@ -402,8 +417,13 @@ class TestGraphAPI(object):
request.body = body.encode(request.charset)
response = GraphAPI(request).build_temporal_graph()
graph_subresponse = response.pop("graph")
# remove the "from" and "to" keys, these will be returned in the graph_subresponse
build_json_body.pop("from")
build_json_body.pop("to")
assert response == build_json_body, "Response must contain the request body"
assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response."
assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds
assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds
request_id = graph_subresponse["uuid"]
# go through the set of input/output (expected) parameters and assert actual results match with expected ones
......
......@@ -32,7 +32,7 @@ GRAPH_ROUND_TRIP_TIME_URL_PARAMS = ("startpoint", "endpoint")
GRAPH_BUILD_QUERY_PARAMS = {"from", "to", "service_function_chain", "service_function_chain_instance", "service_functions"}
GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "request_size_field", "response_size_field", "measurement_name"}
INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and time>={8} and time<{9} GROUP BY "flame_sfe", "flame_location", "flame_sf"'
INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and "flame_sfp"=\'{8}\' and time>={9} and time<{10} GROUP BY "flame_sfe", "flame_location", "flame_sf"'
# in cypher the syntax is {name: 'value'}, here we use {{name: 'value'}} to escape the curly braces when applying the python format function
......@@ -211,8 +211,8 @@ def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queri
compute_nodes = set() # a set is used to keep track of all compute nodes that are found while building the graph, which is then used to retrieve the network latencies
# traverse the list of service functions
for service_function in json_queries["service_functions"]:
query_data = json_queries["service_functions"][service_function]
for service_function_package in json_queries["service_functions"]:
query_data = json_queries["service_functions"][service_function_package]
response_time_field = query_data["response_time_field"]
request_size_field = query_data["request_size_field"]
......@@ -220,47 +220,49 @@ def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queri
measurement = query_data["measurement_name"]
# build up the query by setting the placeholders in the query template
query_to_execute = INFLUX_QUERY_TEMPLATE.format(response_time_field, request_size_field, response_size_field, db, rp, measurement, sfc, sfci, from_timestamp, to_timestamp)
# create a node for the service function if it doesn't exist
service_function_package_node = find_or_create_node(graph, "ServiceFunctionPackage", name=service_function)
# crate a utilizedBy edge between the service function and the service function chain
find_or_create_edge(graph, "utilizedBy", service_function_package_node, service_function_chain_node)
query_to_execute = INFLUX_QUERY_TEMPLATE.format(response_time_field, request_size_field, response_size_field, db, rp, measurement, sfc, sfci, service_function_package, from_timestamp, to_timestamp)
log.info("Executing query: {0}".format(query_to_execute))
result = influx_client.query(query_to_execute) # execute the query
# iterate through each result item
for item in result.items():
metadata, result_points = item # each result item is a tuple of two elements
# metadata consists of the result tags and the measurement name
# measurement = metadata[0]
tags = metadata[1]
result_point = next(result_points) # get the result point dictionary
response_time = result_point["mean_response_time"] # extract the response time of the SF from the result
request_size = result_point["mean_request_size"] # extract the avg request size of the SF from the result
response_size = result_point["mean_response_size"] # extract the avg response size of the SF from the result
# create a ServiceFunction node from the tag value (if it is not already created)
service_function_node = find_or_create_node(graph, "ServiceFunction", name=tags["flame_sf"])
# create an edge between the the service function and the package (if it is not already created)
find_or_create_edge(graph, "instanceOf", service_function_node, service_function_package_node)
# crate a utilizedBy edge between the service function and the service function chain instance
find_or_create_edge(graph, "utilizedBy", service_function_node, service_function_chain_instance_node)
# create an Endpoint node from the tag value (if it is not already created)
ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["flame_sfe"], response_time=response_time, request_size=request_size, response_size=response_size, uuid=request_id)
# create an edge between the service function and the endpoint (if it is not already created)
find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node)
# create a Cluster node from the tag value (if it is not already created)
compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"])
# create an edge between the endpoint and the compute node (if it is not already created)
find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node)
compute_nodes.add(compute_node) # add the compute node to the set of compute nodes
result_items = result.items()
if len(result_items) > 0:
# create a node for the service function if it doesn't exist
service_function_package_node = find_or_create_node(graph, "ServiceFunctionPackage", name=service_function_package)
# crate a utilizedBy edge between the service function and the service function chain
find_or_create_edge(graph, "utilizedBy", service_function_package_node, service_function_chain_node)
# iterate through each result item
for item in result_items:
metadata, result_points = item # each result item is a tuple of two elements
# metadata consists of the result tags and the measurement name
# measurement = metadata[0]
tags = metadata[1]
result_point = next(result_points) # get the result point dictionary
response_time = result_point["mean_response_time"] # extract the response time of the SF from the result
request_size = result_point["mean_request_size"] # extract the avg request size of the SF from the result
response_size = result_point["mean_response_size"] # extract the avg response size of the SF from the result
# create a ServiceFunction node from the tag value (if it is not already created)
service_function_node = find_or_create_node(graph, "ServiceFunction", name=tags["flame_sf"])
# create an edge between the the service function and the package (if it is not already created)
find_or_create_edge(graph, "instanceOf", service_function_node, service_function_package_node)
# crate a utilizedBy edge between the service function and the service function chain instance
find_or_create_edge(graph, "utilizedBy", service_function_node, service_function_chain_instance_node)
# create an Endpoint node from the tag value (if it is not already created)
ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["flame_sfe"], response_time=response_time, request_size=request_size, response_size=response_size, uuid=request_id)
# create an edge between the service function and the endpoint (if it is not already created)
find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node)
# create a Cluster node from the tag value (if it is not already created)
compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"])
# create an edge between the endpoint and the compute node (if it is not already created)
find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node)
compute_nodes.add(compute_node) # add the compute node to the set of compute nodes
log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfci, db, rp))
......
......@@ -82,6 +82,9 @@ class GraphAPI(object):
build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client)
json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}}
json_queries.pop("from")
json_queries.pop("to")
return json_queries
@view_config(route_name='graph_manage', request_method='DELETE')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment