diff --git a/docs/clmc-service.md b/docs/clmc-service.md index eacb13a44957d545c6ccdf2e9bf5726da3e72e6c..e52635aa8d1b72ac616850bbc32f90c38c8ae148 100644 --- a/docs/clmc-service.md +++ b/docs/clmc-service.md @@ -239,6 +239,25 @@ with **/clmc-service** so that the nginx reverse proxy server (listening on port } ``` +* **DELETE** ***/graph/monitor/{request_id}*** + + This API methods instructs the CLMC service to stop running a graph monitoring pipeline script associated with the request identifier in the URL. + (retrieved from the response of a POST request for /graph/monitor), e.g. request sent to */graph/monitor/75df6f8d-3829-4fd8-a3e6-b3e917010141* + + * Response: + + The response of this request is a JSON content, which contains a single output message related to the state of the monitoring process before it gets killed. + + Returns a 404 Not Found error if the request ID is not associated with any graph monitoring process. + + * Response Body Example: + + ```json + { + "msg": "Monitoring process has been successfully stopped." + } + ``` + * **POST** ***/graph/temporal*** This API method sends a request to the CLMC service to build a graph snapshot in the time range between the *from* and *to* timestamps. diff --git a/src/service/clmcservice/__init__.py b/src/service/clmcservice/__init__.py index b603568ed2fedd877178748bd2fe7b401cc893cb..1f40aaca0173ec291b97faefaa1e626f8789fe77 100644 --- a/src/service/clmcservice/__init__.py +++ b/src/service/clmcservice/__init__.py @@ -71,6 +71,7 @@ def main(global_config, **settings): config.add_route('graph_algorithms_rtt', '/graph/temporal/{graph_id}/round-trip-time') config.add_route('graph_network_topology', '/graph/network') config.add_route('graph_execute_pipeline', '/graph/monitor') + config.add_route('graph_manage_pipeline', '/graph/monitor/{request_id}') # add routes of the Alerts Configuration API config.add_route('alerts_configuration', '/alerts') diff --git a/src/service/clmcservice/graphapi/tests.py b/src/service/clmcservice/graphapi/tests.py index 521bbe9e034866b6754deac9b2fe4a3ae8561d0e..0e72a83e71d5a3b97ce840e9a08cfd49eb30e7db 100644 --- a/src/service/clmcservice/graphapi/tests.py +++ b/src/service/clmcservice/graphapi/tests.py @@ -23,6 +23,7 @@ """ from json import dumps +from signal import SIGKILL from unittest.mock import patch, Mock, PropertyMock import pytest from pyramid import testing @@ -36,10 +37,6 @@ class TestGraphAPI(object): A pytest-implementation test for the Graph API endpoints. """ - # used to store graph UUIDs in the build test and reuse these in the delete test - graph_1_test_id = "graph_mock_uuid1" - graph_2_test_id = "graph_mock_uuid2" - @pytest.fixture(autouse=True) def app_config(self): """ @@ -125,24 +122,15 @@ class TestGraphAPI(object): assert switch_nodes == set("127.0.0." + str(i) for i in range(1, 7)), "Switch nodes must have been created by the db_testing_data fixture" # Create a valid build request and send it to the API endpoint - uuid_mock.return_value = self.graph_1_test_id - service_functions = dict(nginx={"measurement_name": "nginx", "response_time_field": "mean(avg_processing_time)", - "request_size_field": "mean(avg_request_size)", "response_size_field": "mean(avg_response_size)"}, - minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)", - "request_size_field": "mean(total_requests_size)/mean(total_requests_count)", "response_size_field": "mean(total_response_size)/mean(total_requests_count)"}) - build_json_body = dict(service_function_chain="test_sfc", service_function_chain_instance="test_sfc_premium", service_functions=service_functions) - build_json_body["from"] = from_timestamp - build_json_body["to"] = to_timestamp - body = dumps(build_json_body) - request = testing.DummyRequest() - request.body = body.encode(request.charset) - response = GraphAPI(request).build_temporal_graph() + uuid_mock.return_value = "graph_test_build_uuid1" + responses = graph_generator(from_timestamp, to_timestamp) + response = next(responses) graph_subresponse = response.pop("graph") assert response == {"database": "test_sfc"}, "Response must contain the database name" - assert graph_subresponse["uuid"] == self.graph_1_test_id, "Request UUID must be attached to the response." + assert graph_subresponse["uuid"] == uuid_mock.return_value, "Request UUID must be attached to the response." assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds assert set(graph_subresponse["endpoints"]) == {"minio_1_ep1", "nginx_1_ep1", "nginx_1_ep2"}, "Wrong list of new endpoints was returned by the build request" @@ -190,24 +178,14 @@ class TestGraphAPI(object): assert endpoint_node["response_size"] == pytest.approx(response_size, 1), "Wrong response size attribute of endpoint node" # send a new request for a new service function chain instance and check the new subgraph has been created - uuid_mock.return_value = self.graph_2_test_id - service_functions = dict(minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)", - "request_size_field": "mean(total_requests_size)/mean(total_requests_count)", "response_size_field": "mean(total_response_size)/mean(total_requests_count)"}, - apache={"measurement_name": "apache", "response_time_field": "mean(avg_processing_time)", - "request_size_field": "mean(avg_request_size)", "response_size_field": "mean(avg_response_size)"}) - build_json_body = dict(service_function_chain="test_sfc", service_function_chain_instance="test_sfc_non_premium", service_functions=service_functions) - build_json_body["from"] = from_timestamp - build_json_body["to"] = to_timestamp - body = dumps(build_json_body) - request = testing.DummyRequest() - request.body = body.encode(request.charset) - response = GraphAPI(request).build_temporal_graph() + uuid_mock.return_value = "graph_test_build_uuid2" + response = next(responses) graph_subresponse = response.pop("graph") assert response == {"database": "test_sfc"}, "Response must contain the database name" - assert graph_subresponse["uuid"] == self.graph_2_test_id, "Request UUID must be attached to the response." + assert graph_subresponse["uuid"] == uuid_mock.return_value, "Request UUID must be attached to the response." assert graph_subresponse["time_range"]["from"] == from_timestamp * 10**9 # timestamp returned in nanoseconds assert graph_subresponse["time_range"]["to"] == to_timestamp * 10**9 # timestamp returned in nanoseconds assert set(graph_subresponse["endpoints"]) == {"minio_2_ep1", "apache_1_ep1"}, "Wrong list of new endpoints was returned by the build request" @@ -253,7 +231,8 @@ class TestGraphAPI(object): assert endpoint_node["request_size"] == pytest.approx(request_size, 1), "Wrong request size attribute of endpoint node" assert endpoint_node["response_size"] == pytest.approx(response_size, 1), "Wrong response size attribute of endpoint node" - def test_delete(self, db_testing_data): + @patch('clmcservice.graphapi.views.uuid4') + def test_delete(self, uuid_mock, db_testing_data): """ Tests the delete API endpoint of the Graph API - the test depends on the build test to have been passed successfully so that graph_1_id and graph_2_id have been set @@ -262,6 +241,18 @@ class TestGraphAPI(object): from_timestamp, to_timestamp, graph_db = db_testing_data + # build test graphs + responses = graph_generator(from_timestamp, to_timestamp) + + uuid_mock.return_value = "graph_test_delete_uuid1" + graph_1_test_id = uuid_mock.return_value + next(responses) # return value is the response, but we don't need it for this test + + uuid_mock.return_value = "graph_test_delete_uuid2" + graph_2_test_id = uuid_mock.return_value + next(responses) # return value is the response, but we don't need it for this test + + # test erroneous behaviour request = testing.DummyRequest() request.matchdict["graph_id"] = "invalid_graph_id" error_raised = False @@ -271,19 +262,23 @@ class TestGraphAPI(object): error_raised = True assert error_raised, "HTTP Not Found error must be raised in case of unrecognized subgraph ID" + assert len(graph_db.nodes.match(uuid=graph_1_test_id)) == 4, "Graph build generator function is broken" + assert len(graph_db.nodes.match(uuid=graph_2_test_id)) == 3, "Graph build generator function is broken" + # delete the graph associated with graph_1_id request = testing.DummyRequest() - request.matchdict["graph_id"] = self.graph_1_test_id + request.matchdict["graph_id"] = graph_1_test_id response = GraphAPI(request).delete_temporal_graph() assert response == {"deleted": 4}, "Incorrect response when deleting temporal graph" + assert len(graph_db.nodes.match(uuid=graph_1_test_id)) == 0, "Delete request is broken" # delete the graph associated with graph_2_id request = testing.DummyRequest() - request.matchdict["graph_id"] = self.graph_2_test_id + request.matchdict["graph_id"] = graph_2_test_id response = GraphAPI(request).delete_temporal_graph() assert response == {"deleted": 3}, "Incorrect response when deleting temporal graph" + assert len(graph_db.nodes.match(uuid=graph_2_test_id)) == 0, "Delete request is broken" - assert len(graph_db.nodes.match("Endpoint")) == 0, "All endpoint nodes should have been deleted" assert set([node["name"] for node in graph_db.nodes.match("Cluster")]) == set(["DC" + str(i) for i in range(1, 7)]), "Cluster nodes must not be deleted" assert set([node["name"] for node in graph_db.nodes.match("Switch")]) == set(["127.0.0." + str(i) for i in range(1, 7)]), "Switch nodes must not be deleted" assert set([node["name"] for node in graph_db.nodes.match("UserEquipment")]) == set(["ue" + str(i) for i in (2, 3, 6)]), "UE nodes must not be deleted" @@ -333,20 +328,11 @@ class TestGraphAPI(object): from_timestamp, to_timestamp, graph_db = db_testing_data # create a graph to use for RTT test by using the build API endpoint - service_functions = dict(nginx={"measurement_name": "nginx", "response_time_field": "mean(avg_processing_time)", - "request_size_field": "mean(avg_request_size)", "response_size_field": "mean(avg_response_size)"}, - minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)", - "request_size_field": "mean(total_requests_size)/mean(total_requests_count)", "response_size_field": "mean(total_response_size)/mean(total_requests_count)"}) - build_json_body = dict(service_function_chain="test_sfc", service_function_chain_instance="test_sfc_premium", service_functions=service_functions) - build_json_body["from"] = from_timestamp - build_json_body["to"] = to_timestamp - body = dumps(build_json_body) - request = testing.DummyRequest() - request.body = body.encode(request.charset) - response = GraphAPI(request).build_temporal_graph() + responses = graph_generator(from_timestamp, to_timestamp) + response = next(responses) request_id = response["graph"]["uuid"] - # test some more error case handling of the RTT API endpoint + # test more error case handling of the RTT API endpoint request = testing.DummyRequest() request.matchdict["graph_id"] = request_id request.params["endpoint"] = "nginx_1_ep1" @@ -380,6 +366,7 @@ class TestGraphAPI(object): error_raised = True assert error_raised, "HTTP Not Found error must be thrown for a non existing endpoint" + # test valid requests # go through the set of input/output (expected) parameters and assert actual results match with expected ones for startpoint, endpoint, forward_latencies, reverse_latencies, response_time, request_size, response_size, rtt, global_tags in ( ("DC6", "nginx_1_ep2", [], [], 22.2, 35600, 6420, 22.2, {"flame_location": "DC6", "flame_sfe": "nginx_1_ep2", "flame_server": "DC6", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_premium", "flame_sfp": "nginx", "flame_sf": "nginx_1"}), @@ -403,19 +390,10 @@ class TestGraphAPI(object): "request_size": request_size, "response_size": response_size}, "Incorrect RTT response" # send a new request for a new service function chain to create a second subgraph to test - service_functions = dict(minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)", - "request_size_field": "mean(total_requests_size)/mean(total_requests_count)", "response_size_field": "mean(total_response_size)/mean(total_requests_count)"}, - apache={"measurement_name": "apache", "response_time_field": "mean(avg_processing_time)", - "request_size_field": "mean(avg_request_size)", "response_size_field": "mean(avg_response_size)"}) - build_json_body = dict(service_function_chain="test_sfc", service_function_chain_instance="test_sfc_non_premium", service_functions=service_functions) - build_json_body["from"] = from_timestamp - build_json_body["to"] = to_timestamp - body = dumps(build_json_body) - request = testing.DummyRequest() - request.body = body.encode(request.charset) - response = GraphAPI(request).build_temporal_graph() + response = next(responses) request_id = response["graph"]["uuid"] + # test valid requests # go through the set of input/output (expected) parameters and assert actual results match with expected ones for startpoint, endpoint, forward_latencies, reverse_latencies, response_time, request_size, response_size, rtt, global_tags in ( ("DC5", "apache_1_ep1", [], [], 17.6, 1480, 7860, 17.6, {"flame_location": "DC5", "flame_sfe": "apache_1_ep1", "flame_server": "DC5", "flame_sfc": "test_sfc", "flame_sfci": "test_sfc_non_premium", "flame_sfp": "apache", "flame_sf": "apache_1"}), @@ -502,7 +480,10 @@ class TestGraphAPI(object): @patch('clmcservice.graphapi.views.uuid4') def test_execute_graph_pipeline(self, uuid_mock, popen_mock): """ - Tests the functionality to start a pipeline script executing the graph API workflow - build, query, delete + Tests the functionality to start a pipeline script executing the graph API workflow - build, query, delete. + + :param uuid_mock: mock object for the uuid generator function + :param popen_mock: mock object for the process creation function """ # mock the behaviour of the uuid function @@ -538,7 +519,7 @@ class TestGraphAPI(object): assert MonitoringProcess.exists(uuid_mock.return_value), "Request identifier was not saved during the request processing" assert MonitoringProcess.get(uuid_mock.return_value) == pid_property_mock.return_value, "Incorrect PID was saved during the request processing" - # check erroneous behaviour + # check erroneous behaviour - process returns a value immediately returncode_property_mock.return_value = -1 request = testing.DummyRequest() request.body = body.encode(request.charset) @@ -553,6 +534,47 @@ class TestGraphAPI(object): pid_property_mock.assert_called_with() # assert that the process ID attribute was called and saved returncode_property_mock.assert_called_with() # assert that the process return code attribute was called to check if the process has started successfully + @patch('clmcservice.graphapi.views.kill') + def test_stop_graph_pipeline(self, mock_kill): + """ + Tests the funcitonality to stop a graph monitoring script. + + :param mock_kill: mock object to mimic the behavior of the os.kill functionality + """ + + # mock a monitoring process + pid = 111 + reqid = "test_request_id" + MonitoringProcess.add({"request_id": reqid, "process_id": pid}) + + # test behaviour with not-existing request UUID + request = testing.DummyRequest() + request.matchdict["request_id"] = "unknown-request-uuid" + error_raised = False + try: + GraphAPI(request).stop_graph_pipeline() + except HTTPNotFound: + error_raised = True + assert error_raised, "Error must have been raised for unrecognised request UUID." + + # test the behaviour when the PID doesn't exist or another OSError is thrown + mock_kill.side_effect = OSError("error") + request = testing.DummyRequest() + request.matchdict["request_id"] = reqid + response = GraphAPI(request).stop_graph_pipeline() + assert response == {"msg": "Monitoring process has been stopped before this request was executed."} + + # test behaviour with existing request UUID and existing PID + MonitoringProcess.add({"request_id": reqid, "process_id": pid}) + assert MonitoringProcess.exists(reqid) + mock_kill.side_effect = None + request = testing.DummyRequest() + request.matchdict["request_id"] = reqid + response = GraphAPI(request).stop_graph_pipeline() + assert response == {"msg": "Monitoring process has been successfully stopped."} + mock_kill.assert_called_with(pid, SIGKILL) # assert that os.kill was called with termination signal + assert not MonitoringProcess.exists(reqid), "Request ID must be removed when the process is killed." + @staticmethod def check_exist_relationship(relationships_tuple, graph, uuid): """ @@ -578,3 +600,40 @@ class TestGraphAPI(object): assert to_node is not None # IMPORTANT, assert the from_node exists, otherwise the py2neo RelationshipMatcher object assumes you are looking for any node (instead of raising an error) assert graph.relationships.match(nodes=(from_node, to_node), r_type=relationship_type).first() is not None, "Graph is missing a required relationship" + + +def graph_generator(from_timestamp, to_timestamp): + """ + Utility function (generator) used to send 2 valid graph build requests to the graph API - yields the response of each. + + :param from_timestamp: 'from' timestamp + :param to_timestamp: 'to' timestamp + """ + + service_functions = dict(nginx={"measurement_name": "nginx", "response_time_field": "mean(avg_processing_time)", + "request_size_field": "mean(avg_request_size)", "response_size_field": "mean(avg_response_size)"}, + minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)", + "request_size_field": "mean(total_requests_size)/mean(total_requests_count)", "response_size_field": "mean(total_response_size)/mean(total_requests_count)"}) + build_json_body = dict(service_function_chain="test_sfc", service_function_chain_instance="test_sfc_premium", service_functions=service_functions) + build_json_body["from"] = from_timestamp + build_json_body["to"] = to_timestamp + body = dumps(build_json_body) + request = testing.DummyRequest() + request.body = body.encode(request.charset) + response = GraphAPI(request).build_temporal_graph() + + yield response + + service_functions = dict(minio={"measurement_name": "minio_http", "response_time_field": "mean(total_processing_time)/mean(total_requests_count)", + "request_size_field": "mean(total_requests_size)/mean(total_requests_count)", "response_size_field": "mean(total_response_size)/mean(total_requests_count)"}, + apache={"measurement_name": "apache", "response_time_field": "mean(avg_processing_time)", + "request_size_field": "mean(avg_request_size)", "response_size_field": "mean(avg_response_size)"}) + build_json_body = dict(service_function_chain="test_sfc", service_function_chain_instance="test_sfc_non_premium", service_functions=service_functions) + build_json_body["from"] = from_timestamp + build_json_body["to"] = to_timestamp + body = dumps(build_json_body) + request = testing.DummyRequest() + request.body = body.encode(request.charset) + response = GraphAPI(request).build_temporal_graph() + + yield response diff --git a/src/service/clmcservice/graphapi/views.py b/src/service/clmcservice/graphapi/views.py index fd953275e3b86d4f68ce729f227c508869e0491b..4c442a3e5dd128b3ab7cf66b4618bf95d0fff3b3 100644 --- a/src/service/clmcservice/graphapi/views.py +++ b/src/service/clmcservice/graphapi/views.py @@ -34,10 +34,12 @@ from requests import exceptions, get from uuid import uuid4 from json import load from subprocess import Popen -import logging +from os import kill +from signal import SIGKILL +from logging import getLogger -log = logging.getLogger('service_logger') +log = getLogger('service_logger') @view_defaults(renderer='json') @@ -380,3 +382,29 @@ class GraphAPI(object): else: # a valid returned code was returned, hence the process has terminated one way or another - we do not expect this since the pipeline script must be continuously running log.warning("Graph pipeline process for SFC {0} with PID {1} has finished executing unexpectedly with return code {2}".format(sfc, process_pid, process_return_code)) raise HTTPInternalServerError("An unexpected error occurred while trying to start monitoring graph measurements for service function chain {0}".format(sfc)) + + @view_config(route_name='graph_manage_pipeline', request_method='DELETE') + def stop_graph_pipeline(self): + """ + An API endpoint to stop a running in the background graph pipeline script. + + :return: A JSON response with a simple info message for the process + """ + + request_id = self.request.matchdict['request_id'] # get the UUID of the request from the URL + process_id = MonitoringProcess.get(request_id) + + if process_id is None: + raise HTTPNotFound("A monitoring process with ID {0} couldn't be found.".format(request_id)) + + try: + kill(process_id, SIGKILL) + log.info("Successfully stopped process with request ID {0} and process ID {1}".format(request_id, process_id)) + response = {"msg": "Monitoring process has been successfully stopped."} + except OSError as e: + log.warning("Couldn't stop monitoring process with request ID {0} and process ID {1} due to error {2}".format(request_id, process_id, e)) + response = {"msg": "Monitoring process has been stopped before this request was executed."} + + MonitoringProcess.delete(request_id) + + return response diff --git a/src/service/clmcservice/models/graphapi_models.py b/src/service/clmcservice/models/graphapi_models.py index 33e1b5c5cafa406ca7d1f51afa2a6a2f3cabdb75..53217fae02800bd991ba2b857715058ec356f625 100644 --- a/src/service/clmcservice/models/graphapi_models.py +++ b/src/service/clmcservice/models/graphapi_models.py @@ -27,6 +27,7 @@ class _MonitoringProcess: """ Container-like class used to store key value pairs between graph pipelines request identifiers and process identifiers. The class is declared as "pseudo-private" and is ought to be initialised only in this module and exposed as a singleton. + This class mimics the interfaces of the other model classes. """ def __init__(self): @@ -42,6 +43,15 @@ class _MonitoringProcess: self.__pipelines[instance["request_id"]] = instance["process_id"] + def delete(self, request_id): + """ + Deletes a key-value pair from the container representing a request ID mapped to a PID. + + :param request_id: the request UUID + """ + + self.__pipelines.pop(request_id) + def get(self, request_id): """ Gets the data associated with a request identifier - currently the data is just the process identifier.