Skip to content
Snippets Groups Projects
Commit f1f96bfd authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

POST request to /aggregator now configures all the settings including database...

POST request to /aggregator now configures all the settings including database name and url, report period and running status (on/off)
parent 002bd106
No related branches found
No related tags found
No related merge requests found
......@@ -23,15 +23,18 @@
from pyramid.config import Configurator
from pyramid.settings import asbool
from clmcservice.views import AggregatorConfig
def main(global_config, **settings):
""" This function returns a Pyramid WSGI application."""
# a conversion is necessary so that the configuration value of the aggregator is stored as bool and not as string
# a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings
aggregator_running = asbool(settings.get('aggregator_running', 'false'))
settings['aggregator_running'] = aggregator_running
aggregator_report_period = int(settings.get('aggregator_report_period', 5))
settings['aggregator_report_period'] = aggregator_report_period
config = Configurator(settings=settings)
......
# #!/usr/bin/python3
# """
# ## © University of Southampton IT Innovation Centre, 2018
# ##
# ## Copyright in this software belongs to University of Southampton
# ## IT Innovation Centre of Gamma House, Enterprise Road,
# ## Chilworth Science Park, Southampton, SO16 7NS, UK.
# ##
# ## This software may not be used, sold, licensed, transferred, copied
# ## or reproduced in whole or in part in any manner or form or in or
# ## on any media by any person other than in accordance with the terms
# ## of the Licence Agreement supplied with the software, or otherwise
# ## without the prior written consent of the copyright owners.
# ##
# ## This software is distributed WITHOUT ANY WARRANTY, without even the
# ## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# ## PURPOSE, except where stated in the Licence Agreement supplied with
# ## the software.
# ##
# ## Created By : Nikolay Stanchev
# ## Created Date : 15-05-2018
# ## Created for Project : FLAME
# """
#
# from influxdb import InfluxDBClient
# from time import time, sleep
# from urllib.parse import urlparse
# import getopt
# import sys
# import clmctest.monitoring.LineProtocolGenerator as lp
#
#
# class Aggregator(object):
# """
# A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process
# """
#
# REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
# DATABASE = 'E2EMetrics' # default database the aggregator uses
# DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses
#
# def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
# """
# Constructs an Aggregator instance.
#
# :param database_name: database name to use
# :param database_url: database url to use
# :param report_period: the report period in seconds
# """
#
# # initialise a database client using the database url and the database name
# url_object = urlparse(database_url)
# self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
#
# self.db_url = database_url
# self.db_name = database_name
# self.report_period = report_period
#
# def run(self):
# """
# Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
# """
#
# current_time = int(time())
# while True:
#
# boundary_time = current_time - self.report_period
#
# boundary_time_nano = boundary_time * 1000000000
# current_time_nano = current_time * 1000000000
#
# # query the network delays and group them by path ID
# network_delays = {}
# result = self.db_client.query(
# 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
# boundary_time_nano, current_time_nano))
# for item in result.items():
# metadata, result_points = item
# # measurement = metadata[0]
# tags = metadata[1]
#
# network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay']
#
# # query the service delays and group them by endpoint, service function instance and sfr
# service_delays = {}
# result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
# for item in result.items():
# metadata, result_points = item
# # measurement = metadata[0]
# tags = metadata[1]
# service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance'])
#
# # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
# for path in network_delays:
# # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
# path_id, source, target = path
# if target not in service_delays:
# # if not continue with the other network path reports
# continue
#
# e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
# "delay_service": None, "time": boundary_time}
#
# e2e_arguments['path_ID'] = path_id
# e2e_arguments['delay_forward'] = network_delays[path]
#
# # reverse the path ID to get the network delay for the reversed path
# reversed_path = (path_id, target, source)
# assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A
# e2e_arguments['delay_reverse'] = network_delays[reversed_path]
#
# # get the response time of the media component connected to the target SFR
# service_delay = service_delays[target]
# response_time, endpoint, sf_instance = service_delay
# # put these points in the e2e arguments dictionary
# e2e_arguments['delay_service'] = response_time
# e2e_arguments['endpoint'] = endpoint
# e2e_arguments['sf_instance'] = sf_instance
#
# # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
# if None not in e2e_arguments.items():
# self.db_client.write_points(
# lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
# e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
# e2e_arguments['time']))
#
# old_timestamp = current_time
# # wait until {REPORT_PERIOD} seconds have passed
# while current_time != old_timestamp + self.report_period:
# sleep(1)
# current_time = int(time())
#
#
# if __name__ == '__main__':
#
# # Parse command line options
# try:
# opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url='])
#
# arg_period = Aggregator.REPORT_PERIOD
# arg_database_name = Aggregator.DATABASE
# arg_database_url = Aggregator.DATABASE_URL
#
# # Apply parameters if given
# for opt, arg in opts:
# if opt in ('-p', '--period'):
# arg_period = int(arg)
# elif opt in ('-d', '--database'):
# arg_database_name = arg
# elif opt in ('-u', '--url'):
# arg_database_url = arg
#
# Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run()
#
# # print the error messages in case of a parse error
# except getopt.GetoptError as err:
# print(err)
# print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')
......@@ -25,6 +25,8 @@ import pytest
from pyramid import testing
from pyramid.httpexceptions import HTTPBadRequest
from clmcservice.utilities import CONFIG_ATTRIBUTES
class TestAggregatorConfig(object):
"""
......@@ -38,7 +40,8 @@ class TestAggregatorConfig(object):
"""
self.config = testing.setUp()
self.config.add_settings({'aggregator_running': False})
self.config.add_settings({'aggregator_running': False, 'aggregator_report_period': 5,
'aggregator_database_name': 'E2EMetrics', 'aggregator_database_url': "http://172.40.231.51:8086"})
yield
......@@ -50,44 +53,67 @@ class TestAggregatorConfig(object):
"""
from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself
print(self.config.get_settings())
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds."
assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics."
assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086"
request = testing.DummyRequest()
response = AggregatorConfig(request).get()
assert type(response) == dict, "Response must be a dictionary representing a JSON object."
assert not response.get('aggregator_running'), "The response of the API call must return the aggregator status being set as False"
assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator status."
@pytest.mark.parametrize("input_val, output_val", [
("True", True),
("true", True),
("1", True),
("False", False),
("false", False),
("0", False),
("t", None),
("f", None),
assert response == {'aggregator_running': False,
'aggregator_report_period': 5,
'aggregator_database_name': 'E2EMetrics',
'aggregator_database_url': "http://172.40.231.51:8086"}, "Response must be a dictionary representing a JSON object with the correct configuration data of the aggregator."
assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator configuration data."
assert self.config.get_settings().get('aggregator_report_period') == 5, "A GET request must not modify the aggregator configuration data."
assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "A GET request must not modify the aggregator configuration data."
assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data."
@pytest.mark.parametrize("input_body, output_value", [
('{"aggregator_running": True, "aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}',
{'aggregator_running': True, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}),
('{"aggregator_running": true, "aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': True, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_running":True, "aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
{'aggregator_running': True, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
('{"aggregator_running": False, "aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
('{"aggregator_running": false, "aggregator_report_period": 20, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_running":False, "aggregator_report_period": 15, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}),
("{aggregator_running: t, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None),
("{aggregator_running: f, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None),
("{}", None),
("{aggregator_running: true}", None),
])
def test_POST(self, input_val, output_val):
def test_POST(self, input_body, output_value):
"""
Tests the POST method for the status of the aggregator
:param input_val: the input form parameter
:param output_val: the expected output value, None for expecting an Exception
Tests the POST method for the configuration of the aggregator
:param input_body: the input form parameter
:param output_value: the expected output value, None for expecting an Exception
"""
from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds."
assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics."
assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086"
request = testing.DummyRequest()
request.params['running'] = input_val
if output_val is not None:
request.body = input_body.encode(request.charset)
if output_value is not None:
response = AggregatorConfig(request).post()
assert response == {'aggregator_running': output_val}, "Response of POST request must include the new status of the aggregator"
assert self.config.get_settings().get('aggregator_running') == output_val, "Aggregator status must be updated to running."
assert response == output_value, "Response of POST request must include the new status of the aggregator"
for attribute in CONFIG_ATTRIBUTES:
assert self.config.get_settings().get(attribute) == output_value.get(attribute), "Aggregator settings configuration is not updated."
else:
error_raised = False
try:
......
......@@ -21,20 +21,40 @@
// Created for Project : FLAME
"""
def str_to_bool(value):
from json import loads
CONFIG_ATTRIBUTES = ('aggregator_running', 'aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url')
def validate_content(configuration):
"""
A utility function to convert a string to boolean based on simple rules.
:param value: the value to convert
:return: True or False
:raises ValueError: if value cannot be converted to boolean
A utility function to validate a configuration string representing a JSON dictionary.
:param configuration: the configuration string to validate
:return the validated configuration dictionary object with the values converted to their required type
:raise AssertionError: if the argument is not a valid configuration
"""
if type(value) is not str:
raise ValueError("This method only converts string to booolean.")
global CONFIG_ATTRIBUTES
configuration = configuration.replace('"aggregator_running": True', '"aggregator_running": true')
configuration = configuration.replace('"aggregator_running":True', '"aggregator_running": true')
configuration = configuration.replace('"aggregator_running": False', '"aggregator_running": false')
configuration = configuration.replace('"aggregator_running":False', '"aggregator_running": false')
try:
configuration = loads(configuration)
except:
raise AssertionError("Configuration must be a JSON object.")
assert len(configuration) == len(CONFIG_ATTRIBUTES), "Configuration mustn't contain more attributes than the required ones."
for attribute in CONFIG_ATTRIBUTES:
assert attribute in configuration
assert type(configuration['aggregator_running']) == bool, "Boolean value expected for the 'aggregator_running' attribute, received {0} instead.".format(configuration.get('aggregator_running'))
assert type(configuration['aggregator_report_period']) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
if value in ('False', 'false', '0'):
return False
elif value in ('True', 'true', '1'):
return True
else:
raise ValueError("Invalid argument for conversion")
return configuration
......@@ -23,8 +23,7 @@
from pyramid.view import view_defaults
from pyramid.httpexceptions import HTTPBadRequest
from clmcservice.utilities import str_to_bool
from clmcservice.utilities import validate_content, CONFIG_ATTRIBUTES
@view_defaults(route_name='aggregator', renderer='json')
......@@ -47,8 +46,8 @@ class AggregatorConfig(object):
:return: A JSON response with the status of the aggregator.
"""
aggregator_running = self.request.registry.settings.get('aggregator_running')
return {'aggregator_running': aggregator_running}
aggregator_data = self.request.registry.settings
return {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES}
def post(self):
"""
......@@ -57,13 +56,16 @@ class AggregatorConfig(object):
:raises HTTPBadRequest: if form argument cannot be converted to boolean
"""
new_status = self.request.params.get('running')
new_config = self.request.body.decode(self.request.charset)
try:
new_status = str_to_bool(new_status)
except ValueError:
raise HTTPBadRequest("Bad request parameter - expected a boolean, received {0}".format(self.request.params.get('running')))
new_config = validate_content(new_config)
for attribute in CONFIG_ATTRIBUTES:
self.request.registry.settings[attribute] = new_config.get(attribute)
# TODO start/stop aggregator based on value of new status
return new_config
self.request.registry.settings['aggregator_running'] = new_status
# TODO start/stop aggregator based on value of new status
return {'aggregator_running': new_status}
except AssertionError:
raise HTTPBadRequest("Bad request content - configuration format is incorrect.")
......@@ -13,6 +13,9 @@ pyramid.debug_routematch = false
pyramid.default_locale_name = en
pyramid.includes = pyramid_debugtoolbar
aggregator_running = false
aggregator_report_period = 5
aggregator_database_name = E2EMetrics
aggregator_database_url = http://172.40.231.51:8086
# By default, the toolbar only appears for clients from IP addresses
# '127.0.0.1' and '::1'.
......
......@@ -12,6 +12,9 @@ pyramid.debug_notfound = false
pyramid.debug_routematch = false
pyramid.default_locale_name = en
aggregator_running = false
aggregator_report_period = 5
aggregator_database_name = E2EMetrics
aggregator_database_url = http://172.40.231.51:8086
###
# wsgi server configuration
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment