From f1f96bfd2303dbb4ff9d526d6def976d9686a6fe Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Thu, 17 May 2018 10:56:47 +0100 Subject: [PATCH] POST request to /aggregator now configures all the settings including database name and url, report period and running status (on/off) --- src/clmcservice/clmcservice/__init__.py | 5 +- src/clmcservice/clmcservice/aggregator.py | 158 ++++++++++++++++++++++ src/clmcservice/clmcservice/tests.py | 72 ++++++---- src/clmcservice/clmcservice/utilities.py | 46 +++++-- src/clmcservice/clmcservice/views.py | 24 ++-- src/clmcservice/development.ini | 3 + src/clmcservice/production.ini | 3 + 7 files changed, 263 insertions(+), 48 deletions(-) create mode 100644 src/clmcservice/clmcservice/aggregator.py diff --git a/src/clmcservice/clmcservice/__init__.py b/src/clmcservice/clmcservice/__init__.py index c933337..2f9844e 100644 --- a/src/clmcservice/clmcservice/__init__.py +++ b/src/clmcservice/clmcservice/__init__.py @@ -23,15 +23,18 @@ from pyramid.config import Configurator from pyramid.settings import asbool + from clmcservice.views import AggregatorConfig def main(global_config, **settings): """ This function returns a Pyramid WSGI application.""" - # a conversion is necessary so that the configuration value of the aggregator is stored as bool and not as string + # a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings aggregator_running = asbool(settings.get('aggregator_running', 'false')) settings['aggregator_running'] = aggregator_running + aggregator_report_period = int(settings.get('aggregator_report_period', 5)) + settings['aggregator_report_period'] = aggregator_report_period config = Configurator(settings=settings) diff --git a/src/clmcservice/clmcservice/aggregator.py b/src/clmcservice/clmcservice/aggregator.py new file mode 100644 index 0000000..57c9cbd --- /dev/null +++ b/src/clmcservice/clmcservice/aggregator.py @@ -0,0 +1,158 @@ +# #!/usr/bin/python3 +# """ +# ## © University of Southampton IT Innovation Centre, 2018 +# ## +# ## Copyright in this software belongs to University of Southampton +# ## IT Innovation Centre of Gamma House, Enterprise Road, +# ## Chilworth Science Park, Southampton, SO16 7NS, UK. +# ## +# ## This software may not be used, sold, licensed, transferred, copied +# ## or reproduced in whole or in part in any manner or form or in or +# ## on any media by any person other than in accordance with the terms +# ## of the Licence Agreement supplied with the software, or otherwise +# ## without the prior written consent of the copyright owners. +# ## +# ## This software is distributed WITHOUT ANY WARRANTY, without even the +# ## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +# ## PURPOSE, except where stated in the Licence Agreement supplied with +# ## the software. +# ## +# ## Created By : Nikolay Stanchev +# ## Created Date : 15-05-2018 +# ## Created for Project : FLAME +# """ +# +# from influxdb import InfluxDBClient +# from time import time, sleep +# from urllib.parse import urlparse +# import getopt +# import sys +# import clmctest.monitoring.LineProtocolGenerator as lp +# +# +# class Aggregator(object): +# """ +# A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process +# """ +# +# REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated +# DATABASE = 'E2EMetrics' # default database the aggregator uses +# DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses +# +# def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): +# """ +# Constructs an Aggregator instance. +# +# :param database_name: database name to use +# :param database_url: database url to use +# :param report_period: the report period in seconds +# """ +# +# # initialise a database client using the database url and the database name +# url_object = urlparse(database_url) +# self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) +# +# self.db_url = database_url +# self.db_name = database_name +# self.report_period = report_period +# +# def run(self): +# """ +# Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. +# """ +# +# current_time = int(time()) +# while True: +# +# boundary_time = current_time - self.report_period +# +# boundary_time_nano = boundary_time * 1000000000 +# current_time_nano = current_time * 1000000000 +# +# # query the network delays and group them by path ID +# network_delays = {} +# result = self.db_client.query( +# 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( +# boundary_time_nano, current_time_nano)) +# for item in result.items(): +# metadata, result_points = item +# # measurement = metadata[0] +# tags = metadata[1] +# +# network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] +# +# # query the service delays and group them by endpoint, service function instance and sfr +# service_delays = {} +# result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) +# for item in result.items(): +# metadata, result_points = item +# # measurement = metadata[0] +# tags = metadata[1] +# service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) +# +# # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement +# for path in network_delays: +# # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr +# path_id, source, target = path +# if target not in service_delays: +# # if not continue with the other network path reports +# continue +# +# e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, +# "delay_service": None, "time": boundary_time} +# +# e2e_arguments['path_ID'] = path_id +# e2e_arguments['delay_forward'] = network_delays[path] +# +# # reverse the path ID to get the network delay for the reversed path +# reversed_path = (path_id, target, source) +# assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A +# e2e_arguments['delay_reverse'] = network_delays[reversed_path] +# +# # get the response time of the media component connected to the target SFR +# service_delay = service_delays[target] +# response_time, endpoint, sf_instance = service_delay +# # put these points in the e2e arguments dictionary +# e2e_arguments['delay_service'] = response_time +# e2e_arguments['endpoint'] = endpoint +# e2e_arguments['sf_instance'] = sf_instance +# +# # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row +# if None not in e2e_arguments.items(): +# self.db_client.write_points( +# lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], +# e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], +# e2e_arguments['time'])) +# +# old_timestamp = current_time +# # wait until {REPORT_PERIOD} seconds have passed +# while current_time != old_timestamp + self.report_period: +# sleep(1) +# current_time = int(time()) +# +# +# if __name__ == '__main__': +# +# # Parse command line options +# try: +# opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url=']) +# +# arg_period = Aggregator.REPORT_PERIOD +# arg_database_name = Aggregator.DATABASE +# arg_database_url = Aggregator.DATABASE_URL +# +# # Apply parameters if given +# for opt, arg in opts: +# if opt in ('-p', '--period'): +# arg_period = int(arg) +# elif opt in ('-d', '--database'): +# arg_database_name = arg +# elif opt in ('-u', '--url'): +# arg_database_url = arg +# +# Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run() +# +# # print the error messages in case of a parse error +# except getopt.GetoptError as err: +# print(err) +# print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>') diff --git a/src/clmcservice/clmcservice/tests.py b/src/clmcservice/clmcservice/tests.py index cd52c64..b620772 100644 --- a/src/clmcservice/clmcservice/tests.py +++ b/src/clmcservice/clmcservice/tests.py @@ -25,6 +25,8 @@ import pytest from pyramid import testing from pyramid.httpexceptions import HTTPBadRequest +from clmcservice.utilities import CONFIG_ATTRIBUTES + class TestAggregatorConfig(object): """ @@ -38,7 +40,8 @@ class TestAggregatorConfig(object): """ self.config = testing.setUp() - self.config.add_settings({'aggregator_running': False}) + self.config.add_settings({'aggregator_running': False, 'aggregator_report_period': 5, + 'aggregator_database_name': 'E2EMetrics', 'aggregator_database_url': "http://172.40.231.51:8086"}) yield @@ -50,44 +53,67 @@ class TestAggregatorConfig(object): """ from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself - + print(self.config.get_settings()) assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" request = testing.DummyRequest() response = AggregatorConfig(request).get() - assert type(response) == dict, "Response must be a dictionary representing a JSON object." - assert not response.get('aggregator_running'), "The response of the API call must return the aggregator status being set as False" - assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator status." - - @pytest.mark.parametrize("input_val, output_val", [ - ("True", True), - ("true", True), - ("1", True), - ("False", False), - ("false", False), - ("0", False), - ("t", None), - ("f", None), + assert response == {'aggregator_running': False, + 'aggregator_report_period': 5, + 'aggregator_database_name': 'E2EMetrics', + 'aggregator_database_url': "http://172.40.231.51:8086"}, "Response must be a dictionary representing a JSON object with the correct configuration data of the aggregator." + + assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator configuration data." + assert self.config.get_settings().get('aggregator_report_period') == 5, "A GET request must not modify the aggregator configuration data." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "A GET request must not modify the aggregator configuration data." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data." + + @pytest.mark.parametrize("input_body, output_value", [ + ('{"aggregator_running": True, "aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}', + {'aggregator_running': True, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}), + ('{"aggregator_running": true, "aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', + {'aggregator_running': True, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + ('{"aggregator_running":True, "aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + {'aggregator_running': True, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + ('{"aggregator_running": False, "aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + ('{"aggregator_running": false, "aggregator_report_period": 20, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + ('{"aggregator_running":False, "aggregator_report_period": 15, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}', + {'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}), + ("{aggregator_running: t, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), + ("{aggregator_running: f, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), + ("{}", None), + ("{aggregator_running: true}", None), ]) - def test_POST(self, input_val, output_val): + def test_POST(self, input_body, output_value): """ - Tests the POST method for the status of the aggregator - :param input_val: the input form parameter - :param output_val: the expected output value, None for expecting an Exception + Tests the POST method for the configuration of the aggregator + :param input_body: the input form parameter + :param output_value: the expected output value, None for expecting an Exception """ from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" request = testing.DummyRequest() - request.params['running'] = input_val - if output_val is not None: + request.body = input_body.encode(request.charset) + + if output_value is not None: response = AggregatorConfig(request).post() - assert response == {'aggregator_running': output_val}, "Response of POST request must include the new status of the aggregator" - assert self.config.get_settings().get('aggregator_running') == output_val, "Aggregator status must be updated to running." + assert response == output_value, "Response of POST request must include the new status of the aggregator" + + for attribute in CONFIG_ATTRIBUTES: + assert self.config.get_settings().get(attribute) == output_value.get(attribute), "Aggregator settings configuration is not updated." else: error_raised = False try: diff --git a/src/clmcservice/clmcservice/utilities.py b/src/clmcservice/clmcservice/utilities.py index e3d3273..07495b4 100644 --- a/src/clmcservice/clmcservice/utilities.py +++ b/src/clmcservice/clmcservice/utilities.py @@ -21,20 +21,40 @@ // Created for Project : FLAME """ -def str_to_bool(value): +from json import loads + + +CONFIG_ATTRIBUTES = ('aggregator_running', 'aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url') + + +def validate_content(configuration): """ - A utility function to convert a string to boolean based on simple rules. - :param value: the value to convert - :return: True or False - :raises ValueError: if value cannot be converted to boolean + A utility function to validate a configuration string representing a JSON dictionary. + + :param configuration: the configuration string to validate + :return the validated configuration dictionary object with the values converted to their required type + :raise AssertionError: if the argument is not a valid configuration """ - if type(value) is not str: - raise ValueError("This method only converts string to booolean.") + global CONFIG_ATTRIBUTES + + configuration = configuration.replace('"aggregator_running": True', '"aggregator_running": true') + configuration = configuration.replace('"aggregator_running":True', '"aggregator_running": true') + configuration = configuration.replace('"aggregator_running": False', '"aggregator_running": false') + configuration = configuration.replace('"aggregator_running":False', '"aggregator_running": false') + + try: + configuration = loads(configuration) + except: + raise AssertionError("Configuration must be a JSON object.") + + assert len(configuration) == len(CONFIG_ATTRIBUTES), "Configuration mustn't contain more attributes than the required ones." + + for attribute in CONFIG_ATTRIBUTES: + assert attribute in configuration + + assert type(configuration['aggregator_running']) == bool, "Boolean value expected for the 'aggregator_running' attribute, received {0} instead.".format(configuration.get('aggregator_running')) + + assert type(configuration['aggregator_report_period']) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period')) - if value in ('False', 'false', '0'): - return False - elif value in ('True', 'true', '1'): - return True - else: - raise ValueError("Invalid argument for conversion") + return configuration diff --git a/src/clmcservice/clmcservice/views.py b/src/clmcservice/clmcservice/views.py index b079f5e..7b2a27b 100644 --- a/src/clmcservice/clmcservice/views.py +++ b/src/clmcservice/clmcservice/views.py @@ -23,8 +23,7 @@ from pyramid.view import view_defaults from pyramid.httpexceptions import HTTPBadRequest - -from clmcservice.utilities import str_to_bool +from clmcservice.utilities import validate_content, CONFIG_ATTRIBUTES @view_defaults(route_name='aggregator', renderer='json') @@ -47,8 +46,8 @@ class AggregatorConfig(object): :return: A JSON response with the status of the aggregator. """ - aggregator_running = self.request.registry.settings.get('aggregator_running') - return {'aggregator_running': aggregator_running} + aggregator_data = self.request.registry.settings + return {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES} def post(self): """ @@ -57,13 +56,16 @@ class AggregatorConfig(object): :raises HTTPBadRequest: if form argument cannot be converted to boolean """ - new_status = self.request.params.get('running') + new_config = self.request.body.decode(self.request.charset) try: - new_status = str_to_bool(new_status) - except ValueError: - raise HTTPBadRequest("Bad request parameter - expected a boolean, received {0}".format(self.request.params.get('running'))) + new_config = validate_content(new_config) + + for attribute in CONFIG_ATTRIBUTES: + self.request.registry.settings[attribute] = new_config.get(attribute) + + # TODO start/stop aggregator based on value of new status + return new_config - self.request.registry.settings['aggregator_running'] = new_status - # TODO start/stop aggregator based on value of new status - return {'aggregator_running': new_status} + except AssertionError: + raise HTTPBadRequest("Bad request content - configuration format is incorrect.") diff --git a/src/clmcservice/development.ini b/src/clmcservice/development.ini index dc8949d..9ec4dc5 100644 --- a/src/clmcservice/development.ini +++ b/src/clmcservice/development.ini @@ -13,6 +13,9 @@ pyramid.debug_routematch = false pyramid.default_locale_name = en pyramid.includes = pyramid_debugtoolbar aggregator_running = false +aggregator_report_period = 5 +aggregator_database_name = E2EMetrics +aggregator_database_url = http://172.40.231.51:8086 # By default, the toolbar only appears for clients from IP addresses # '127.0.0.1' and '::1'. diff --git a/src/clmcservice/production.ini b/src/clmcservice/production.ini index b3c6439..7f5e323 100644 --- a/src/clmcservice/production.ini +++ b/src/clmcservice/production.ini @@ -12,6 +12,9 @@ pyramid.debug_notfound = false pyramid.debug_routematch = false pyramid.default_locale_name = en aggregator_running = false +aggregator_report_period = 5 +aggregator_database_name = E2EMetrics +aggregator_database_url = http://172.40.231.51:8086 ### # wsgi server configuration -- GitLab