diff --git a/src/clmc-webservice/clmcservice/__init__.py b/src/clmc-webservice/clmcservice/__init__.py index 2f9844ed2a313e084bf4d964c9e46da2d6283566..0dea774a58d72dcb7a7ede5404f4fb158a2de0c8 100644 --- a/src/clmc-webservice/clmcservice/__init__.py +++ b/src/clmc-webservice/clmcservice/__init__.py @@ -24,23 +24,28 @@ from pyramid.config import Configurator from pyramid.settings import asbool -from clmcservice.views import AggregatorConfig +from clmcservice.views import AggregatorConfig, aggregator_starter +from clmcservice.utilities import STATUS_ATTRIBUTE def main(global_config, **settings): """ This function returns a Pyramid WSGI application.""" # a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings - aggregator_running = asbool(settings.get('aggregator_running', 'false')) - settings['aggregator_running'] = aggregator_running + aggregator_running = asbool(settings.get('aggregator_running', False)) + settings[STATUS_ATTRIBUTE] = asbool(aggregator_running) + aggregator_report_period = int(settings.get('aggregator_report_period', 5)) settings['aggregator_report_period'] = aggregator_report_period config = Configurator(settings=settings) - config.add_route('aggregator', '/aggregator') + config.add_route('aggregator_config', '/aggregator/config') config.add_view(AggregatorConfig, attr='get', request_method='GET') config.add_view(AggregatorConfig, attr='post', request_method='POST') + config.add_route('aggregator_starter', '/aggregator/status') + config.add_view(aggregator_starter, request_method='POST') + config.scan() return config.make_wsgi_app() diff --git a/src/clmc-webservice/clmcservice/aggregator.py b/src/clmc-webservice/clmcservice/aggregator.py index 3c7974f0ed7306a1436a872041106a102d344fd0..7f239179612d79375d378c7332093b82a8f51e2d 100644 --- a/src/clmc-webservice/clmcservice/aggregator.py +++ b/src/clmc-webservice/clmcservice/aggregator.py @@ -38,6 +38,7 @@ class Aggregator(object): REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'E2EMetrics' # default database the aggregator uses DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses + RETRY_PERIOD = 5 def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): """ @@ -56,7 +57,7 @@ class Aggregator(object): self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) break except: - sleep(2) + sleep(self.RETRY_PERIOD) self.db_url = database_url self.db_name = database_name @@ -85,7 +86,7 @@ class Aggregator(object): boundary_time_nano, current_time_nano)) break except: - sleep(2) + sleep(self.RETRY_PERIOD) for item in result.items(): metadata, result_points = item @@ -102,7 +103,7 @@ class Aggregator(object): result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) break except: - sleep(2) + sleep(self.RETRY_PERIOD) for item in result.items(): metadata, result_points = item @@ -148,7 +149,7 @@ class Aggregator(object): e2e_arguments['time'])) break except: - sleep(2) + sleep(self.RETRY_PERIOD) old_timestamp = current_time # wait until {REPORT_PERIOD} seconds have passed diff --git a/src/clmc-webservice/clmcservice/tests.py b/src/clmc-webservice/clmcservice/tests.py index b620772242ab79551a5498508eaefbabcaa8be81..a0212fbb92c60b0f1ca52d1aabba1f32d0656e2a 100644 --- a/src/clmc-webservice/clmcservice/tests.py +++ b/src/clmc-webservice/clmcservice/tests.py @@ -53,7 +53,7 @@ class TestAggregatorConfig(object): """ from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself - print(self.config.get_settings()) + assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." @@ -73,20 +73,20 @@ class TestAggregatorConfig(object): assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data." @pytest.mark.parametrize("input_body, output_value", [ - ('{"aggregator_running": True, "aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}', - {'aggregator_running': True, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}), - ('{"aggregator_running": true, "aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', - {'aggregator_running': True, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), - ('{"aggregator_running":True, "aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', - {'aggregator_running': True, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), - ('{"aggregator_running": False, "aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + ('{"aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://171.40.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}), + ('{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + ('{"aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + ('{"aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', {'aggregator_running': False, 'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), - ('{"aggregator_running": false, "aggregator_report_period": 20, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), - ('{"aggregator_running":False, "aggregator_report_period": 15, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}', - {'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}), - ("{aggregator_running: t, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), - ("{aggregator_running: f, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), + ('{"aggregator_report_period": 200, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + ('{"aggregator_report_period": 150, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}), + ("{aggregator_report_period: 2hb5, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), + ("{aggregator_report_period: 250-, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:8086}", None), ("{}", None), ("{aggregator_running: true}", None), ]) @@ -122,3 +122,21 @@ class TestAggregatorConfig(object): error_raised = True assert error_raised, "Error must be raised in case of an invalid argument." + + +class TestAggregatorStarter(object): + """ + A pytest-implementation test for the aggregator starter API calls + """ + + def test_start(self): + #TODO + pass + + def test_stop(self): + #TODO + pass + + def test_restart(self): + #TODO + pass diff --git a/src/clmc-webservice/clmcservice/utilities.py b/src/clmc-webservice/clmcservice/utilities.py index 07495b43304f1a316e3ed58f5990856fc1cd8a6b..3da70bbcd8c6e4d08612a23e13e575178d2cabc1 100644 --- a/src/clmc-webservice/clmcservice/utilities.py +++ b/src/clmc-webservice/clmcservice/utilities.py @@ -24,10 +24,22 @@ from json import loads -CONFIG_ATTRIBUTES = ('aggregator_running', 'aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url') +CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url') +STATUS_ATTRIBUTE = 'aggregator_running' -def validate_content(configuration): +# # URL regular expression used in the django framework +# url_regex = re.compile( +# r'^(?:http|ftp)s?://' # http:// or https:// +# r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... +# r'localhost|' # localhost... +# r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 +# r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 +# r'(?::\d+)?' # optional port +# r'(?:/?|[/?]\S+)$', re.IGNORECASE) + + +def validate_config_content(configuration): """ A utility function to validate a configuration string representing a JSON dictionary. @@ -38,11 +50,6 @@ def validate_content(configuration): global CONFIG_ATTRIBUTES - configuration = configuration.replace('"aggregator_running": True', '"aggregator_running": true') - configuration = configuration.replace('"aggregator_running":True', '"aggregator_running": true') - configuration = configuration.replace('"aggregator_running": False', '"aggregator_running": false') - configuration = configuration.replace('"aggregator_running":False', '"aggregator_running": false') - try: configuration = loads(configuration) except: @@ -53,8 +60,64 @@ def validate_content(configuration): for attribute in CONFIG_ATTRIBUTES: assert attribute in configuration - assert type(configuration['aggregator_running']) == bool, "Boolean value expected for the 'aggregator_running' attribute, received {0} instead.".format(configuration.get('aggregator_running')) - assert type(configuration['aggregator_report_period']) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period')) + # TODO maybe add a check for a valid URL ? + return configuration + + +def validate_action_content(content): + """ + A utility function to validate a content string representing a JSON dictionary. + + :param content: the content string to validate + :return: the validated content dictionary + :raise AssertionError: if the argument is not a valid json content + """ + + try: + content = loads(content) + except: + raise AssertionError("Content must be a JSON object.") + + assert len(content) == 1, "Content mustn't contain more attributes than the required one." + + assert content['action'] in ('start', 'stop', 'restart') + + return content + + +def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time): + """ + Generates a combined averaged measurement about the e2e delay and its contributing parts. + + :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_sfr: source service router + :param target_sfr: target service router + :param endpoint: endpoint of the media component + :param sf_instance: service function instance (media component) + :param delay_forward: Path delay (Forward direction) + :param delay_reverse: Path delay (Reverse direction) + :param delay_service: the media service component response time + :param time: measurement timestamp + :return: a list of dict-formatted reports to post on influx + """ + + result = [{"measurement": "e2e_delays", + "tags": { + "path_ID": path_id, + "source_SFR": source_sfr, + "target_SFR": target_sfr, + "endpoint": endpoint, + "sf_instance": sf_instance + }, + "fields": { + "delay_forward": float(delay_forward), + "delay_reverse": float(delay_reverse), + "delay_service": float(delay_service) + }, + "time": int(1000000000*time) + }] + + return result diff --git a/src/clmc-webservice/clmcservice/views.py b/src/clmc-webservice/clmcservice/views.py index 7b2a27b9239703f1de776eb1d637ce5cebab5f6b..70e48384e490c7f0efe8cd628a0042f83a8dc3db 100644 --- a/src/clmc-webservice/clmcservice/views.py +++ b/src/clmc-webservice/clmcservice/views.py @@ -21,20 +21,54 @@ // Created for Project : FLAME """ -from pyramid.view import view_defaults +from pyramid.view import view_defaults, view_config from pyramid.httpexceptions import HTTPBadRequest -from clmcservice.utilities import validate_content, CONFIG_ATTRIBUTES +from subprocess import Popen, DEVNULL +from clmcservice.utilities import validate_config_content, validate_action_content, CONFIG_ATTRIBUTES, STATUS_ATTRIBUTE +import os.path -@view_defaults(route_name='aggregator', renderer='json') +_process = None + + +def start_aggregator(config): + """ + An auxiliary method to start the aggregator. + + :param config: the configuration containing the arguments for the aggregator + """ + + global _process + + dir_path = os.path.dirname(os.path.realpath(__file__)) + command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', + config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] + _process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL) + print("Started aggregator process with ID: {0}".format(_process.pid)) + + +def stop_aggregator(): + """ + An auxiliary method to stop the aggregator. + """ + + global _process + + # check if the process is started before trying to terminate it - _process.poll() only returns something if the process has terminated, hence we check for a None value + if _process is not None and _process.poll() is None: + _process.terminate() + + +@view_defaults(route_name='aggregator_config', renderer='json') class AggregatorConfig(object): """ - A class-based view for accessing and mutating the status of the aggregator. + A class-based view for accessing and mutating the configuration of the aggregator. """ def __init__(self, request): """ Initialises the instance of the view with the request argument. + :param request: client's call request """ @@ -43,29 +77,73 @@ class AggregatorConfig(object): def get(self): """ A GET API call for the status of the aggregator. + :return: A JSON response with the status of the aggregator. """ aggregator_data = self.request.registry.settings - return {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES} + config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES} + config[STATUS_ATTRIBUTE] = aggregator_data.get(STATUS_ATTRIBUTE) + + return config def post(self): """ A POST API call for the status of the aggregator. + :return: A JSON response to the POST call (success or fail). :raises HTTPBadRequest: if form argument cannot be converted to boolean """ + # old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} new_config = self.request.body.decode(self.request.charset) try: - new_config = validate_content(new_config) + new_config = validate_config_content(new_config) for attribute in CONFIG_ATTRIBUTES: self.request.registry.settings[attribute] = new_config.get(attribute) - # TODO start/stop aggregator based on value of new status + # if not equal_configurations(old_config, new_config): + # stop_aggregator() + # start_aggregator(new_config) + + new_config[STATUS_ATTRIBUTE] = self.request.registry.settings.get(STATUS_ATTRIBUTE) return new_config except AssertionError: raise HTTPBadRequest("Bad request content - configuration format is incorrect.") + + +@view_config(route_name='aggregator_starter', renderer='json') +def aggregator_starter(request): + """ + A function-based view to start/stop/restart the aggregator. + + :param request: client's call request + """ + + content = request.body.decode(request.charset) + + try: + content = validate_action_content(content) + + config = {attribute: request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} + + action = content['action'] + + if action == 'start': + start_aggregator(config) + request.registry.settings[STATUS_ATTRIBUTE] = True + elif action == 'stop': + stop_aggregator() + request.registry.settings[STATUS_ATTRIBUTE] = False + elif action == 'restart': + stop_aggregator() + start_aggregator(config) + request.registry.settings[STATUS_ATTRIBUTE] = True + + return {STATUS_ATTRIBUTE: request.registry.settings.get(STATUS_ATTRIBUTE)} + + except AssertionError: + raise HTTPBadRequest("Bad request content - must be in JSON format: {action: value}, where value is 'start', 'stop' or 'restart'.")