From d74fd6859346d3458ec27569b6275bf980200a80 Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Wed, 23 May 2018 12:59:21 +0100 Subject: [PATCH] Adjustments of the API start/restart/stop calls plus test updates --- src/clmc-webservice/clmcservice/__init__.py | 7 +- src/clmc-webservice/clmcservice/aggregator.py | 2 +- src/clmc-webservice/clmcservice/tests.py | 82 +++++---- src/clmc-webservice/clmcservice/utilities.py | 4 +- src/clmc-webservice/clmcservice/views.py | 162 +++++++++++------- 5 files changed, 149 insertions(+), 108 deletions(-) diff --git a/src/clmc-webservice/clmcservice/__init__.py b/src/clmc-webservice/clmcservice/__init__.py index dde9f13..82db449 100644 --- a/src/clmc-webservice/clmcservice/__init__.py +++ b/src/clmc-webservice/clmcservice/__init__.py @@ -24,7 +24,7 @@ from pyramid.config import Configurator from pyramid.settings import asbool -from clmcservice.views import AggregatorConfig, aggregator_starter +from clmcservice.views import AggregatorConfig, AggregatorController from clmcservice.utilities import STATUS_ATTRIBUTE @@ -44,8 +44,9 @@ def main(global_config, **settings): config.add_view(AggregatorConfig, attr='get', request_method='GET') config.add_view(AggregatorConfig, attr='put', request_method='PUT') - config.add_route('aggregator_starter', '/aggregator/status', request_method='PUT') - config.add_view(aggregator_starter) + config.add_route('aggregator_starter', '/aggregator/control') + config.add_view(AggregatorController, attr='get', request_method='GET') + config.add_view(AggregatorController, attr='put', request_method='PUT') config.scan() return config.make_wsgi_app() diff --git a/src/clmc-webservice/clmcservice/aggregator.py b/src/clmc-webservice/clmcservice/aggregator.py index 7f23917..8df70bc 100644 --- a/src/clmc-webservice/clmcservice/aggregator.py +++ b/src/clmc-webservice/clmcservice/aggregator.py @@ -153,7 +153,7 @@ class Aggregator(object): old_timestamp = current_time # wait until {REPORT_PERIOD} seconds have passed - while current_time != old_timestamp + self.report_period: + while current_time < old_timestamp + self.report_period: sleep(1) current_time = int(time()) diff --git a/src/clmc-webservice/clmcservice/tests.py b/src/clmc-webservice/clmcservice/tests.py index 22ccf94..1e4342e 100644 --- a/src/clmc-webservice/clmcservice/tests.py +++ b/src/clmc-webservice/clmcservice/tests.py @@ -25,7 +25,7 @@ import pytest from pyramid import testing from pyramid.httpexceptions import HTTPBadRequest from time import sleep -from clmcservice.utilities import CONFIG_ATTRIBUTES +from clmcservice.utilities import CONFIG_ATTRIBUTES, PROCESS_ATTRIBUTE, STATUS_ATTRIBUTE import os import signal @@ -56,7 +56,7 @@ class TestAggregator(object): from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running." assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" @@ -64,29 +64,28 @@ class TestAggregator(object): request = testing.DummyRequest() response = AggregatorConfig(request).get() - assert response == {'aggregator_running': False, - 'aggregator_report_period': 5, + assert response == {'aggregator_report_period': 5, 'aggregator_database_name': 'E2EMetrics', 'aggregator_database_url': "http://172.40.231.51:8086"}, "Response must be a dictionary representing a JSON object with the correct configuration data of the aggregator." - assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator configuration data." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "A GET request must not modify the aggregator configuration data." assert self.config.get_settings().get('aggregator_report_period') == 5, "A GET request must not modify the aggregator configuration data." assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "A GET request must not modify the aggregator configuration data." assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data." @pytest.mark.parametrize("input_body, output_value", [ ('{"aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://171.40.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}), + {'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}), ('{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + {'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), ('{"aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + {'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), ('{"aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + {'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), ('{"aggregator_report_period": 200, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + {'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), ('{"aggregator_report_period": 150, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}), + {'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}), ("{aggregator_report_period: 2hb5, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), ("{aggregator_report_period: 250-, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:8086}", None), ("{}", None), @@ -101,7 +100,7 @@ class TestAggregator(object): from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running." assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" @@ -130,76 +129,85 @@ class TestAggregator(object): Tests the case of starting the aggregator through an API call. """ - from clmcservice.views import aggregator_starter # nested import so that importing the class view is part of the test itself + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." request = testing.DummyRequest() input_body = '{"action": "start"}' request.body = input_body.encode(request.charset) - response = aggregator_starter(request) - assert response == {'aggregator_running': True}, "The aggregator should have been started." - assert self.config.get_settings().get('aggregator_running'), "The aggregator should have been started." + response = AggregatorController(request).put() + assert response == {STATUS_ATTRIBUTE: True}, "The aggregator should have been started." + assert self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been started." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." # kill the started process after the test is over - pid = request.registry.settings["aggregator_pid"] + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid os.kill(pid, signal.SIGTERM) def test_stop(self): - from clmcservice.views import aggregator_starter # nested import so that importing the class view is part of the test itself + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # send a start request to trigger the aggregator request = testing.DummyRequest() input_body = '{"action": "start"}' request.body = input_body.encode(request.charset) - aggregator_starter(request) + AggregatorController(request).put() + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." # test stopping the aggregator process when it is running request = testing.DummyRequest() input_body = '{"action": "stop"}' request.body = input_body.encode(request.charset) - response = aggregator_starter(request) - assert response == {'aggregator_running': False}, "The aggregator should have been started." - assert not self.config.get_settings().get('aggregator_running'), "The aggregator should have been started." + response = AggregatorController(request).put() + assert response == {STATUS_ATTRIBUTE: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been stopped." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." - sleep(2) + sleep(2) # put a 2 seconds timeout so that the aggregator process can terminate # test stopping the aggregator process when it is not running request = testing.DummyRequest() input_body = '{"action": "stop"}' request.body = input_body.encode(request.charset) - response = aggregator_starter(request) - assert response == {'aggregator_running': False}, "The aggregator should have been started." - assert not self.config.get_settings().get('aggregator_running'), "The aggregator should have been started." + response = AggregatorController(request).put() + assert response == {STATUS_ATTRIBUTE: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been stopped." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." def test_restart(self): - from clmcservice.views import aggregator_starter # nested import so that importing the class view is part of the test itself + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." + assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # test restarting the aggregator process when it is stopped request = testing.DummyRequest() input_body = '{"action": "restart"}' request.body = input_body.encode(request.charset) - response = aggregator_starter(request) - assert response == {'aggregator_running': True}, "The aggregator should have been started." - assert self.config.get_settings().get('aggregator_running'), "The aggregator should have been started." + response = AggregatorController(request).put() + assert response == {STATUS_ATTRIBUTE: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been restarted." + assert self.config.get_settings().get('aggregator_process'), "The aggregator process should have been reinitialised." # test restarting the aggregator process when it is running request = testing.DummyRequest() input_body = '{"action": "restart"}' request.body = input_body.encode(request.charset) - response = aggregator_starter(request) - assert response == {'aggregator_running': True}, "The aggregator should have been started." - assert self.config.get_settings().get('aggregator_running'), "The aggregator should have been started." + response = AggregatorController(request).put() + assert response == {STATUS_ATTRIBUTE: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been restarted." + assert self.config.get_settings().get('aggregator_process'), "The aggregator process should have been reinitialised." # kill the started process after the test is over - pid = request.registry.settings["aggregator_pid"] + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid os.kill(pid, signal.SIGTERM) diff --git a/src/clmc-webservice/clmcservice/utilities.py b/src/clmc-webservice/clmcservice/utilities.py index 3da70bb..4ed8e0f 100644 --- a/src/clmc-webservice/clmcservice/utilities.py +++ b/src/clmc-webservice/clmcservice/utilities.py @@ -28,6 +28,8 @@ CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'ag STATUS_ATTRIBUTE = 'aggregator_running' +PROCESS_ATTRIBUTE = 'aggregator_process' + # # URL regular expression used in the django framework # url_regex = re.compile( # r'^(?:http|ftp)s?://' # http:// or https:// @@ -62,7 +64,7 @@ def validate_config_content(configuration): assert type(configuration['aggregator_report_period']) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period')) - # TODO maybe add a check for a valid URL ? + assert configuration['aggregator_report_period'] > 0, "Report period must be a positive integer, received {0} instead.".format(configuration.get('aggregator_report_period')) return configuration diff --git a/src/clmc-webservice/clmcservice/views.py b/src/clmc-webservice/clmcservice/views.py index b47380c..3d235ca 100644 --- a/src/clmc-webservice/clmcservice/views.py +++ b/src/clmc-webservice/clmcservice/views.py @@ -21,46 +21,17 @@ // Created for Project : FLAME """ -from pyramid.view import view_defaults, view_config +from pyramid.view import view_defaults from pyramid.httpexceptions import HTTPBadRequest from subprocess import Popen, DEVNULL -from clmcservice.utilities import validate_config_content, validate_action_content, CONFIG_ATTRIBUTES, STATUS_ATTRIBUTE +from clmcservice.utilities import validate_config_content, validate_action_content, CONFIG_ATTRIBUTES, STATUS_ATTRIBUTE, PROCESS_ATTRIBUTE import os.path -_process = None - - -def start_aggregator(config): - """ - An auxiliary method to start the aggregator. - - :param config: the configuration containing the arguments for the aggregator - :return: the process ID of the started aggregator script - """ - - global _process - - dir_path = os.path.dirname(os.path.realpath(__file__)) - command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', - config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] - _process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL) - print("\nStarted aggregator process with PID: {0}\n".format(_process.pid)) - - return _process.pid - - -def stop_aggregator(): - """ - An auxiliary method to stop the aggregator. - """ - - global _process - - # check if the process is started before trying to terminate it - _process.poll() only returns something if the process has terminated, hence we check for a None value - if _process is not None and _process.poll() is None: - _process.terminate() - print("\nStopped aggregator process with PID: {0}\n".format(_process.pid)) +# TODO +# 1) Is authorization needed at this stage ? +# 2) Validating the URL address and the database name ? +# 3) Restart the aggregator when configuration is updated @view_defaults(route_name='aggregator_config', renderer='json') @@ -80,14 +51,13 @@ class AggregatorConfig(object): def get(self): """ - A GET API call for the status of the aggregator. + A GET API call for the configuration of the aggregator. - :return: A JSON response with the status of the aggregator. + :return: A JSON response with the configuration of the aggregator. """ aggregator_data = self.request.registry.settings config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES} - config[STATUS_ATTRIBUTE] = aggregator_data.get(STATUS_ATTRIBUTE) return config @@ -95,8 +65,8 @@ class AggregatorConfig(object): """ A PUT API call for the status of the aggregator. - :return: A JSON response to the PUT call (success or fail). - :raises HTTPBadRequest: if form argument cannot be converted to boolean + :return: A JSON response to the PUT call - essentially with the new configured data and comment of the state of the aggregator + :raises HTTPBadRequest: if request body is not a valid JSON for the configurator """ # old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} @@ -112,44 +82,104 @@ class AggregatorConfig(object): # stop_aggregator() # start_aggregator(new_config) - new_config[STATUS_ATTRIBUTE] = self.request.registry.settings.get(STATUS_ATTRIBUTE) return new_config except AssertionError: raise HTTPBadRequest("Bad request content - configuration format is incorrect.") -@view_config(route_name='aggregator_starter', renderer='json') -def aggregator_starter(request): - """ - A function-based view to start/stop/restart the aggregator. +@view_defaults(route_name='aggregator_starter', renderer='json') +class AggregatorController(object): - :param request: client's call request + """ + A class-based view for controlling the aggregator. """ - content = request.body.decode(request.charset) + def __init__(self, request): + """ + Initialises the instance of the view with the request argument. - try: - content = validate_action_content(content) + :param request: client's call request + """ - config = {attribute: request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} + self.request = request - action = content['action'] + def get(self): + """ + A GET API call for the status of the aggregator - running or not. - if action == 'start': - pid = start_aggregator(config) - request.registry.settings[STATUS_ATTRIBUTE] = True - request.registry.settings["aggregator_pid"] = pid - elif action == 'stop': - stop_aggregator() - request.registry.settings[STATUS_ATTRIBUTE] = False - elif action == 'restart': - stop_aggregator() - pid = start_aggregator(config) - request.registry.settings[STATUS_ATTRIBUTE] = True - request.registry.settings["aggregator_pid"] = pid + :return: A JSON response with the status of the aggregator. + """ + + aggregator_data = self.request.registry.settings + config = {STATUS_ATTRIBUTE: aggregator_data.get(STATUS_ATTRIBUTE)} - return {STATUS_ATTRIBUTE: request.registry.settings.get(STATUS_ATTRIBUTE)} + return config + + def put(self): + """ + A PUT API call for the status of the aggregator. + + :return: A JSON response to the PUT call - essentially saying whether the aggregator is running or not + :raises HTTPBadRequest: if request body is not a valid JSON for the starter + """ + + content = self.request.body.decode(self.request.charset) + + try: + content = validate_action_content(content) + + config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} + + action = content['action'] + + if action == 'start': + aggregator_started = self.request.registry.settings[STATUS_ATTRIBUTE] + if not aggregator_started: + process = self.start_aggregator(config) + self.request.registry.settings[STATUS_ATTRIBUTE] = True + self.request.registry.settings[PROCESS_ATTRIBUTE] = process + elif action == 'stop': + self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) + self.request.registry.settings[STATUS_ATTRIBUTE] = False + self.request.registry.settings[PROCESS_ATTRIBUTE] = None + elif action == 'restart': + self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) + process = self.start_aggregator(config) + self.request.registry.settings[STATUS_ATTRIBUTE] = True + self.request.registry.settings[PROCESS_ATTRIBUTE] = process + + return {STATUS_ATTRIBUTE: self.request.registry.settings.get(STATUS_ATTRIBUTE)} + + except AssertionError: + raise HTTPBadRequest("Bad request content - must be in JSON format: {action: value}, where value is 'start', 'stop' or 'restart'.") + + @staticmethod + def start_aggregator(config): + """ + An auxiliary method to start the aggregator. + + :param config: the configuration containing the arguments for the aggregator + :return: the process object of the started aggregator script + """ + + dir_path = os.path.dirname(os.path.realpath(__file__)) + command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', + config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] + process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL) + print("\nStarted aggregator process with PID: {0}\n".format(process.pid)) + + return process + + @staticmethod + def stop_aggregator(process): + """ + An auxiliary method to stop the aggregator. + + :param process: the process to terminate + """ - except AssertionError: - raise HTTPBadRequest("Bad request content - must be in JSON format: {action: value}, where value is 'start', 'stop' or 'restart'.") + # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value + if process is not None and process.poll() is None: + process.terminate() + print("\nStopped aggregator process with PID: {0}\n".format(process.pid)) -- GitLab