From e426f20249fdb897d5cbc997e0a194c325a549dc Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Mon, 4 Mar 2019 12:59:26 +0000 Subject: [PATCH] Modifies graph API to not start graph monitoring process if no UserEquipment nodes are found in the graph --- src/service/clmcservice/graphapi/tests.py | 57 ++++--- src/service/clmcservice/graphapi/utilities.py | 61 ++++---- src/service/clmcservice/graphapi/views.py | 148 ++++++++---------- 3 files changed, 135 insertions(+), 131 deletions(-) diff --git a/src/service/clmcservice/graphapi/tests.py b/src/service/clmcservice/graphapi/tests.py index 0b07a87..48512ab 100644 --- a/src/service/clmcservice/graphapi/tests.py +++ b/src/service/clmcservice/graphapi/tests.py @@ -24,7 +24,7 @@ from json import dumps, loads from signal import SIGKILL -from unittest.mock import patch, Mock, MagicMock, PropertyMock +from unittest.mock import patch, Mock, PropertyMock import pytest from pyramid import testing from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPInternalServerError @@ -518,18 +518,16 @@ class TestGraphAPI(object): error_raised = True assert error_raised, error_msg - @patch('clmcservice.graphapi.views.load') - @patch('clmcservice.graphapi.views.open') + @patch('clmcservice.graphapi.views.Graph') @patch('clmcservice.graphapi.views.Popen') @patch('clmcservice.graphapi.views.uuid4') - def test_execute_graph_pipeline(self, uuid_mock, popen_mock, fileopen_mock, jsonload_mock): + def test_execute_graph_pipeline(self, uuid_mock, popen_mock, graph_mock): """ Tests the functionality to start a pipeline script executing the graph API workflow - build, query, delete. :param uuid_mock: mock object for the uuid generator function :param popen_mock: mock object for the process creation function - :param fileopen_mock: mock object the mimic the behaviour of opening a file - :param jsonload_mock: mock object to mimic the behaviour of the JSON load function + :param graph_mock: mock object for the graph DB client """ # mock the behaviour of the uuid4 function @@ -543,12 +541,11 @@ class TestGraphAPI(object): type(popen_intance_mock).returncode = returncode_property_mock # a property mock cannot be attached directly to the mock object, hence use its type object popen_mock.return_value = popen_intance_mock - # mock the behaviur of the open() and load() function - fileopen_mock.return_value = MagicMock() # a magic mock is needed so that the dunder methods __enter__ and __exit__ are generated - ues_dict = {"127.0.0.1": "ue1", "127.0.0.2": "ue2", "127.0.0.3": "ue3"} - jsonload_mock.return_value = ues_dict + # mock the behaviour of the graph nodes match function + nodes_matcher_mock = Mock(return_value=[{"name": "ue1"}, {"name": "ue2"}, {"name": "ue3"}]) # the API method expects node objects with 'name' key (node['name']) + graph_mock.return_value.nodes.match = nodes_matcher_mock - # check proper behaviour + # check proper behaviour of the API endpoint service_functions = dict(nginx={"measurement_name": "nginx", "response_time_field": "mean(avg_processing_time)", "request_size_field": "mean(avg_request_size)", "response_size_field": "mean(avg_response_size)"}, minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)", @@ -562,14 +559,19 @@ class TestGraphAPI(object): response = GraphAPI(request).execute_graph_pipeline() assert response == {"uuid": uuid_mock.return_value, "database": "test_sfc"} - monitor_json_body["ues"] = list(ues_dict.values()) + # the API endpoint should have done this - appending the list of UEs to the JSON config of the pipeline script + monitor_json_body["ues"] = [node["name"] for node in nodes_matcher_mock.return_value] # assert that the graph pipeline script is ran with the JSON config that was received in the request along with the UEs - actual_call_arguments = popen_mock.call_args[0][0] # we expect exactly one call to Popen() with one argument which is a list - assert actual_call_arguments[0] == "graph-pipeline.sh", "Incorrect graph pipeline script name" - assert loads(actual_call_arguments[1]) == monitor_json_body, "Incorrect JSON configuration passed to pipeline script" + # we expect one call to the Popen() mock (first 0 index), we take the proper arguments, not kwargs (second 0 index) + actual_call_arguments = popen_mock.call_args_list[0][0] + # expecting one argument which is a list + assert len(actual_call_arguments) == 1 and type(actual_call_arguments[0]) == list, "Incorrect call to Popen" + assert actual_call_arguments[0][0] == "graph-pipeline.sh", "Incorrect graph pipeline script name" + assert loads(actual_call_arguments[0][1]) == monitor_json_body, "Incorrect JSON configuration passed to pipeline script" pid_property_mock.assert_called_once_with() # assert that the process ID attribute was called and saved returncode_property_mock.assert_called_once_with() # assert that the process return code attribute was called to check if the process has started successfully + nodes_matcher_mock.assert_called_once_with("UserEquipment") # assert that the graph nodes match function has been called with "UserEquipment" as argument # check that the process ID was saved assert MonitoringProcess.exists(uuid_mock.return_value), "Request identifier was not saved during the request processing" @@ -586,12 +588,29 @@ class TestGraphAPI(object): error_raised = True assert error_raised, "Expecting a 500 HTTP error if the process terminated immediately after it was started" - # assert that the graph pipeline script is ran with the JSON config that was received in the request along with the UEs - actual_call_arguments = popen_mock.call_args[0][0] # we expect exactly one call to Popen() with one argument which is a list - assert actual_call_arguments[0] == "graph-pipeline.sh", "Incorrect graph pipeline script name" - assert loads(actual_call_arguments[1]) == monitor_json_body, "Incorrect JSON configuration passed to pipeline script" + # assert that the graph pipeline script was still started with the JSON config that was received in the request along with the UEs + # we expect a second call to the Popen() mock (index 1), we take the proper arguments, not kwargs (first 0 index) + actual_call_arguments = popen_mock.call_args_list[1][0] + # expecting one argument which is a list + assert len(actual_call_arguments) == 1 and type(actual_call_arguments[0]) == list, "Incorrect call to Popen" + assert actual_call_arguments[0][0] == "graph-pipeline.sh", "Incorrect graph pipeline script name" + assert loads(actual_call_arguments[0][1]) == monitor_json_body, "Incorrect JSON configuration passed to pipeline script" pid_property_mock.assert_called_with() # assert that the process ID attribute was called and saved returncode_property_mock.assert_called_with() # assert that the process return code attribute was called to check if the process has started successfully + nodes_matcher_mock.assert_called_with("UserEquipment") # assert that the graph nodes match function has been called with "UserEquipment" as argument + + # check erroneous behaviour - no UE nodes are found, expecting bad request + nodes_matcher_mock.return_value = [] + request = testing.DummyRequest() + request.body = body.encode(request.charset) + error_raised = False + try: + GraphAPI(request).execute_graph_pipeline() + except HTTPBadRequest: + error_raised = True + assert error_raised, "Expecting a 400 HTTP Bad Request error if no UE nodes were found (network topology not built)." + assert len(popen_mock.call_args_list) == 2, "No subprocess should be started if the UE nodes list is empty (network topology not built)" + nodes_matcher_mock.assert_called_with("UserEquipment") # assert that the graph nodes match function has been called with "UserEquipment" as argument @patch('clmcservice.graphapi.views.kill') def test_stop_graph_pipeline(self, mock_kill): diff --git a/src/service/clmcservice/graphapi/utilities.py b/src/service/clmcservice/graphapi/utilities.py index 10b0a31..b8e736d 100644 --- a/src/service/clmcservice/graphapi/utilities.py +++ b/src/service/clmcservice/graphapi/utilities.py @@ -165,7 +165,7 @@ def find_or_create_node(graph, node_type, return_created=False, **properties): """ This function checks if a node of the given type with the given properties exists, and if not - creates it. - :param graph: the graph object + :param graph: the graph DB object :param node_type: the type of the node to find or create :param return_created: if True the result will contain both the node and a boolean flag if the node was created now :param properties: the properties of the node to find or create @@ -194,7 +194,7 @@ def find_or_create_edge(graph, edge_type, from_node, to_node, **properties): """ This function checks if an edge of the given type with the given properties exists, and if not - creates it. - :param graph: the graph object + :param graph: the graph DB object :param edge_type: the type of the edge to find or create :param from_node: the source of the edge :param to_node: the target of the edge @@ -211,6 +211,28 @@ def find_or_create_edge(graph, edge_type, from_node, to_node, **properties): return edge +def delete_nodes_with_type(graph, node_type): + """ + This function deletes all nodes of a given type from the graph. + + :param graph: the graph DB object + :param node_type: the type of the nodes to delete, e.g. Switch + + :return: the number of deleted nodes + """ + + log.info("Deleting {0} nodes.".format(node_type)) + + subgraph = graph.nodes.match(node_type) + deleted_nodes = len(subgraph) + for node in subgraph: + graph.delete(node) + + log.info("Deleted {0} {1} nodes.".format(deleted_nodes, node_type)) + + return deleted_nodes + + def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client): """ A function used to generate a temporal graph in the neo4j db. @@ -232,7 +254,7 @@ def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queri db = sfc rp = "autogen" - log.info("Building graph for service function chain {0}/{1} from database {2} with retention policy {3}".format(sfc, sfci, db, rp)) + log.info("Building graph for service function chain {0}/{1} from database {2} with retention policy {3} and request ID {4}".format(sfc, sfci, db, rp, request_id)) # create a UUID reference node reference_node = Node("Reference", **{"uuid": request_id, "sfc": sfc, "sfci": sfci, "from": from_timestamp, "to": to_timestamp}) @@ -408,38 +430,15 @@ def create_node_from_mapping(graph, node, node_ip, mapping, new_node_type): def delete_network_graph(graph): """ - A function used to delete all nodes of type Switch and Cluster in the neo4j graph. + A function used to delete all nodes of type Switch, Cluster and UserEquipment in the neo4j graph. :param graph: the neo4j graph - :return: the number of deleted switches and clusters + :return: the number of deleted switches, clusters and UEs """ - log.info("Deleting Switch nodes.".format()) - - subgraph = graph.nodes.match("Switch") - deleted_switches = len(subgraph) - for node in subgraph: - graph.delete(node) - - log.info("Deleted {0} Switch nodes.".format(deleted_switches)) - - log.info("Deleting Cluster nodes.") - - subgraph = graph.nodes.match("Cluster") - deleted_clusters = len(subgraph) - for node in subgraph: - graph.delete(node) - - log.info("Deleted {0} Cluster nodes.".format(deleted_clusters)) - - log.info("Deleting UserEquipment nodes.") - - subgraph = graph.nodes.match("UserEquipment") - deleted_ues = len(subgraph) - for node in subgraph: - graph.delete(node) - - log.info("Deleted {0} UserEquipment nodes.".format(deleted_clusters)) + deleted_switches = delete_nodes_with_type(graph, "Switch") + deleted_clusters = delete_nodes_with_type(graph, "Cluster") + deleted_ues = delete_nodes_with_type(graph, "UserEquipment") return deleted_switches, deleted_clusters, deleted_ues diff --git a/src/service/clmcservice/graphapi/views.py b/src/service/clmcservice/graphapi/views.py index 582b731..8aff5fa 100644 --- a/src/service/clmcservice/graphapi/views.py +++ b/src/service/clmcservice/graphapi/views.py @@ -263,78 +263,28 @@ class GraphAPI(object): sdn_controller_ip = self.request.registry.settings['sdn_controller_ip'] sdn_controller_port = self.request.registry.settings['sdn_controller_port'] - # retrieve all switches - if SDN controller is unavailable on the given IP address return 503 Service Unavailable - try: - url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/core/controller/switches/json") - response = get(url) - except exceptions.ConnectionError: - msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port) - log.error("Unexpected error: {0}".format(msg)) - raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.") - - # check if the SDN controller returned the expected response - if response.status_code != 200: - msg = "The SDN controller returned a response with status code different than 200." - log.error("Unexpected error: {0}".format(msg)) - raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the list of switches.") - - try: - content = response.json() - except ValueError: # response not in JSON - msg = "The SDN controller returned a response which couldn't be converted to JSON." - log.error("Unexpected error: {0}".format(msg)) - raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the list of switches.") + # retrieve all switches + url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/core/controller/switches/json") + log.info("Fetching the list of switches from the SDN controller - {0}".format(url)) + switches_json_response = self.get_sdn_controller_response(url) # map the DPID of each switch to its IP address switches = {} - for switch in content: + for switch in switches_json_response: # map the dpid to the switch IP address, the IP address is in the format '/172.168.23.54:1234' switches[switch["switchDPID"]] = switch["inetAddress"][1:].split(":")[0] - # retrieve all external links (gathered through BDDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable - try: - url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/external-links/json") - response = get(url) - except exceptions.ConnectionError: - msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port) - log.error("Unexpected error: {0}".format(msg)) - raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.") - - # check if the SDN controller returned the expected response - if response.status_code != 200: - msg = "The SDN controller returned a response with status code different than 200." - log.error("Unexpected error: {0}".format(msg)) - raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.") - - try: - external_links = response.json() - except ValueError: # response not in JSON - msg = "The SDN controller returned a response which couldn't be converted to JSON." - log.error("Unexpected error: {0}".format(msg)) - raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.") + # retrieve all external links (gathered through BDDP) + url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/external-links/json") + log.info("Fetching all external links from the SDN controller - {0}".format(url)) + external_links = self.get_sdn_controller_response(url) - # retrieve all local links (gathered through LLDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable - try: - url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/links/json") - response = get(url) - except exceptions.ConnectionError: - msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port) - log.error("Unexpected error: {0}".format(msg)) - raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.") - - if response.status_code != 200: - msg = "The SDN controller returned a response with status code different than 200." - log.error("Unexpected error: {0}".format(msg)) - raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.") - - try: - local_links = response.json() - except ValueError: # response not in JSON - msg = "The SDN controller returned a response which couldn't be converted to JSON." - log.error("Unexpected error: {0}".format(msg)) - raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.") + # retrieve all local links (gathered through LLDP) + url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/links/json") + log.info("Fetching all local links from the SDN controller - {0}".format(url)) + local_links = self.get_sdn_controller_response(url) - # TODO this is a temporary solution - currently the service router to clusters mapping is read from a file (which must be manually prepared beforehand) + # TODO this is a temporary solution - currently the service router to cluster mapping is read from a file (which must be manually prepared beforehand) clusters_file = self.request.registry.settings["network_clusters_path"] try: with open(clusters_file) as fh: @@ -344,7 +294,7 @@ class GraphAPI(object): log.error("No service-router-to-cluster mapping was found while building the network topology.") clusters = {} - # TODO this is a temporary solution - currently the service router to ues mapping is read from a file (which must be manually prepared beforehand) + # TODO this is a temporary solution - currently the service router to ue mapping is read from a file (which must be manually prepared beforehand) ues_file = self.request.registry.settings["network_ues_path"] try: with open(ues_file) as fh: @@ -363,6 +313,43 @@ class GraphAPI(object): return {"new_switches_count": switch_count, "new_clusters_count": clusters_count, "new_ues_count": ues_count} + @staticmethod + def get_sdn_controller_response(url): + """ + Send a GET request to the SDN controller and validate the response - expecting a JSON response (currently only applicable for Floodlight) + + :param url: the full url - SDN controller IP and port + API endpoint path + + :return: the response in JSON + + :raises HTTPServiceUnavailable: if the SDN controller couldn't be reached on the given IP and port + :raises HTTPNotImplemented: if the SDN controller returned status code different than 200 or the response couldn't be converted to JSON + """ + + # try getting any response from the controller + try: + response = get(url) + except exceptions.ConnectionError: + msg = "The SDN controller is not available - request was sent to {0}.".format(url) + log.error("Unexpected error: {0}".format(msg)) + raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.") + + # check if the SDN controller returned the expected response + if response.status_code != 200: + msg = "The SDN controller returned a response with status code different than 200 - request was sent to {0}.".format(url) + log.error("Unexpected error: {0}".format(msg)) + raise HTTPNotImplemented("The SDN controller failed to return a successful response status code.") + + # check if the SDN controller returned a JSON response + try: + response_json = response.json() + except ValueError: # response not in JSON + msg = "The SDN controller returned a response which couldn't be converted to JSON - request was sent to {0}.".format(url) + log.error("Unexpected error: {0}".format(msg)) + raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response.") + + return response_json + @view_config(route_name='graph_network_topology', request_method='DELETE') def delete_network_topology(self): """ @@ -393,32 +380,31 @@ class GraphAPI(object): influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10) - database_name = json_queries["service_function_chain"] + sfc = json_queries["service_function_chain"] + database_name = json_queries["service_function_chain"] # currently DB is named after the SFC if database_name not in [db["name"] for db in influx_client.get_list_database()]: raise HTTPBadRequest("Database for service function chain {0} not found.".format(database_name)) - request_uuid = str(uuid4()) - sfc = json_queries["service_function_chain"] - - # get the list of ues - ues_file = self.request.registry.settings["network_ues_path"] - try: - with open(ues_file) as fh: - ues = load(fh) - except Exception as e: - log.error("Unexpected error: {0}".format(e)) - log.error("No service-router-to-ue mapping was found while building the network topology.") - ues = {} - - ues_list = list(ues.values()) + # get the list of UEs from the Neo4j graph - if no UEs are found, the network topology has not been built yet, so return bad request + graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) + ue_nodes = graph.nodes.match("UserEquipment") + ues_list = [ue_node["name"] for ue_node in ue_nodes] + # return bad request for empty list of UEs + if len(ues_list) == 0: + raise HTTPBadRequest("Graph pipeline process for SFC {0} cannot be started - no UserEquipment nodes found, the graph network topology has not been built.".format(sfc)) + # add the list of UEs to the JSON configuration passed to the pipeline script json_queries["ues"] = ues_list + # start a graph pipeline subprocess process = Popen(["graph-pipeline.sh", dumps(json_queries)]) process_pid = process.pid process_return_code = process.returncode + # check if for some reason the process returned immediately (could be if the graph-pipeline script is not found if process_return_code is None: # process has started running - log.info("Started a graph pipeline process for SFC {0} with PID {1}".format(sfc, process_pid)) + request_uuid = str(uuid4()) # generate a request UUID used to map to the process ID of the graph pipeline + + log.info("Started a graph pipeline process for SFC {0} with PID {1} and request UUID {2}".format(sfc, process_pid, request_uuid)) MonitoringProcess.add({"request_id": request_uuid, "process_id": process_pid}) -- GitLab