diff --git a/src/service/clmcservice/graphapi/tests.py b/src/service/clmcservice/graphapi/tests.py index d55d002983ee9d8b0855d4f68d7281a9c3f9fc74..021877ef215899e6a13a1debb66808e6a853fc67 100644 --- a/src/service/clmcservice/graphapi/tests.py +++ b/src/service/clmcservice/graphapi/tests.py @@ -119,7 +119,7 @@ class TestGraphAPI(object): body = dumps(dict(service_function_chain="sfc", service_function_chain_instance="sfc_1", service_functions=service_functions)) request = testing.DummyRequest() request.body = body.encode(request.charset) - with pytest.raises(HTTPBadRequest, message="A bad request error must have been raised in case of missing timestamp parameters."): + with pytest.raises(HTTPBadRequest): GraphAPI(request).build_temporal_graph() # Create a valid build request and send it to the API endpoint @@ -135,8 +135,13 @@ class TestGraphAPI(object): request.body = body.encode(request.charset) response = GraphAPI(request).build_temporal_graph() graph_subresponse = response.pop("graph") + # remove the "from" and "to" keys, these will be returned in the graph_subresponse + build_json_body.pop("from") + build_json_body.pop("to") assert response == build_json_body, "Response must contain the request body" assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds + assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds request_id = graph_subresponse["uuid"] graph_1_id = request_id @@ -194,8 +199,13 @@ class TestGraphAPI(object): request.body = body.encode(request.charset) response = GraphAPI(request).build_temporal_graph() graph_subresponse = response.pop("graph") + # remove the "from" and "to" keys, these will be returned in the graph_subresponse + build_json_body.pop("from") + build_json_body.pop("to") assert response == build_json_body, "Response must contain the request body" assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds + assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds request_id = graph_subresponse["uuid"] graph_2_id = request_id @@ -331,8 +341,13 @@ class TestGraphAPI(object): request.body = body.encode(request.charset) response = GraphAPI(request).build_temporal_graph() graph_subresponse = response.pop("graph") + # remove the "from" and "to" keys, these will be returned in the graph_subresponse + build_json_body.pop("from") + build_json_body.pop("to") assert response == build_json_body, "Response must contain the request body" assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds + assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds request_id = graph_subresponse["uuid"] # test some more error case handling of the RTT API endpoint @@ -402,8 +417,13 @@ class TestGraphAPI(object): request.body = body.encode(request.charset) response = GraphAPI(request).build_temporal_graph() graph_subresponse = response.pop("graph") + # remove the "from" and "to" keys, these will be returned in the graph_subresponse + build_json_body.pop("from") + build_json_body.pop("to") assert response == build_json_body, "Response must contain the request body" assert graph_subresponse.get("uuid") is not None, "Request UUID must be attached to the response." + assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds + assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds request_id = graph_subresponse["uuid"] # go through the set of input/output (expected) parameters and assert actual results match with expected ones diff --git a/src/service/clmcservice/graphapi/utilities.py b/src/service/clmcservice/graphapi/utilities.py index d10b97d12b0115bfa1ba52f9028d23b0c34620fd..090f7c31a80aa0be65ec1bcf680595e0567cc605 100644 --- a/src/service/clmcservice/graphapi/utilities.py +++ b/src/service/clmcservice/graphapi/utilities.py @@ -32,7 +32,7 @@ GRAPH_ROUND_TRIP_TIME_URL_PARAMS = ("startpoint", "endpoint") GRAPH_BUILD_QUERY_PARAMS = {"from", "to", "service_function_chain", "service_function_chain_instance", "service_functions"} GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "request_size_field", "response_size_field", "measurement_name"} -INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and time>={8} and time<{9} GROUP BY "flame_sfe", "flame_location", "flame_sf"' +INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and "flame_sfp"=\'{8}\' and time>={9} and time<{10} GROUP BY "flame_sfe", "flame_location", "flame_sf"' # in cypher the syntax is {name: 'value'}, here we use {{name: 'value'}} to escape the curly braces when applying the python format function @@ -211,8 +211,8 @@ def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queri compute_nodes = set() # a set is used to keep track of all compute nodes that are found while building the graph, which is then used to retrieve the network latencies # traverse the list of service functions - for service_function in json_queries["service_functions"]: - query_data = json_queries["service_functions"][service_function] + for service_function_package in json_queries["service_functions"]: + query_data = json_queries["service_functions"][service_function_package] response_time_field = query_data["response_time_field"] request_size_field = query_data["request_size_field"] @@ -220,47 +220,49 @@ def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queri measurement = query_data["measurement_name"] # build up the query by setting the placeholders in the query template - query_to_execute = INFLUX_QUERY_TEMPLATE.format(response_time_field, request_size_field, response_size_field, db, rp, measurement, sfc, sfci, from_timestamp, to_timestamp) - - # create a node for the service function if it doesn't exist - service_function_package_node = find_or_create_node(graph, "ServiceFunctionPackage", name=service_function) - # crate a utilizedBy edge between the service function and the service function chain - find_or_create_edge(graph, "utilizedBy", service_function_package_node, service_function_chain_node) + query_to_execute = INFLUX_QUERY_TEMPLATE.format(response_time_field, request_size_field, response_size_field, db, rp, measurement, sfc, sfci, service_function_package, from_timestamp, to_timestamp) log.info("Executing query: {0}".format(query_to_execute)) result = influx_client.query(query_to_execute) # execute the query - - # iterate through each result item - for item in result.items(): - metadata, result_points = item # each result item is a tuple of two elements - - # metadata consists of the result tags and the measurement name - # measurement = metadata[0] - tags = metadata[1] - - result_point = next(result_points) # get the result point dictionary - response_time = result_point["mean_response_time"] # extract the response time of the SF from the result - request_size = result_point["mean_request_size"] # extract the avg request size of the SF from the result - response_size = result_point["mean_response_size"] # extract the avg response size of the SF from the result - - # create a ServiceFunction node from the tag value (if it is not already created) - service_function_node = find_or_create_node(graph, "ServiceFunction", name=tags["flame_sf"]) - # create an edge between the the service function and the package (if it is not already created) - find_or_create_edge(graph, "instanceOf", service_function_node, service_function_package_node) - # crate a utilizedBy edge between the service function and the service function chain instance - find_or_create_edge(graph, "utilizedBy", service_function_node, service_function_chain_instance_node) - - # create an Endpoint node from the tag value (if it is not already created) - ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["flame_sfe"], response_time=response_time, request_size=request_size, response_size=response_size, uuid=request_id) - # create an edge between the service function and the endpoint (if it is not already created) - find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node) - - # create a Cluster node from the tag value (if it is not already created) - compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"]) - # create an edge between the endpoint and the compute node (if it is not already created) - find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node) - - compute_nodes.add(compute_node) # add the compute node to the set of compute nodes + result_items = result.items() + + if len(result_items) > 0: + # create a node for the service function if it doesn't exist + service_function_package_node = find_or_create_node(graph, "ServiceFunctionPackage", name=service_function_package) + # crate a utilizedBy edge between the service function and the service function chain + find_or_create_edge(graph, "utilizedBy", service_function_package_node, service_function_chain_node) + + # iterate through each result item + for item in result_items: + metadata, result_points = item # each result item is a tuple of two elements + + # metadata consists of the result tags and the measurement name + # measurement = metadata[0] + tags = metadata[1] + + result_point = next(result_points) # get the result point dictionary + response_time = result_point["mean_response_time"] # extract the response time of the SF from the result + request_size = result_point["mean_request_size"] # extract the avg request size of the SF from the result + response_size = result_point["mean_response_size"] # extract the avg response size of the SF from the result + + # create a ServiceFunction node from the tag value (if it is not already created) + service_function_node = find_or_create_node(graph, "ServiceFunction", name=tags["flame_sf"]) + # create an edge between the the service function and the package (if it is not already created) + find_or_create_edge(graph, "instanceOf", service_function_node, service_function_package_node) + # crate a utilizedBy edge between the service function and the service function chain instance + find_or_create_edge(graph, "utilizedBy", service_function_node, service_function_chain_instance_node) + + # create an Endpoint node from the tag value (if it is not already created) + ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["flame_sfe"], response_time=response_time, request_size=request_size, response_size=response_size, uuid=request_id) + # create an edge between the service function and the endpoint (if it is not already created) + find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node) + + # create a Cluster node from the tag value (if it is not already created) + compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"]) + # create an edge between the endpoint and the compute node (if it is not already created) + find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node) + + compute_nodes.add(compute_node) # add the compute node to the set of compute nodes log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfci, db, rp)) diff --git a/src/service/clmcservice/graphapi/views.py b/src/service/clmcservice/graphapi/views.py index 9bac0e146306b5e86d6dcd935c81f58d219befc5..21f218c00e914bd4136b5b7f601bcca6b2ce417a 100644 --- a/src/service/clmcservice/graphapi/views.py +++ b/src/service/clmcservice/graphapi/views.py @@ -82,6 +82,9 @@ class GraphAPI(object): build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client) json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}} + json_queries.pop("from") + json_queries.pop("to") + return json_queries @view_config(route_name='graph_manage', request_method='DELETE')