diff --git a/clmctest/monitoring/E2ETestAggregatorThread.py b/clmctest/monitoring/E2ETestAggregatorThread.py new file mode 100644 index 0000000000000000000000000000000000000000..991c33db016871c77e64b6a28ca446a0ad541b3b --- /dev/null +++ b/clmctest/monitoring/E2ETestAggregatorThread.py @@ -0,0 +1,68 @@ +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Nikolay Stanchev +## Created Date : 25-04-2018 +## Created for Project : FLAME +""" + + +from threading import Thread +from clmcservice.aggregator import Aggregator + + +class TestAggregator(Thread): + + REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated + DATABASE = 'E2EMetrics' # default database the aggregator uses + DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses + + def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): + """ + Constructs an Aggregator instance. + + :param database: database name to use + :param database_url: database url to use + """ + + super(TestAggregator, self).__init__() # call the constructor of the thread + + self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period) + + def stop(self): + """ + A method used to stop the thread. + """ + + self.aggregator.stop() + + def set_event_lock(self, event): + """ + Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing). + + :param event: the event lock object + """ + + setattr(self, 'event', event) + + def run(self): + if hasattr(self, 'event'): + self.event.set() + + self.aggregator.run() diff --git a/src/clmcwebservice/.coveragerc b/src/clmcwebservice/.coveragerc new file mode 100644 index 0000000000000000000000000000000000000000..a3edc11a103c310fff7b5b2dc3ef5937f1dcfa0d --- /dev/null +++ b/src/clmcwebservice/.coveragerc @@ -0,0 +1,3 @@ +[run] +source = clmcservice +omit = clmcservice/tests.py diff --git a/src/clmcwebservice/MANIFEST.in b/src/clmcwebservice/MANIFEST.in new file mode 100644 index 0000000000000000000000000000000000000000..eaf16db0e28d54b95ae8f085c1d4c3cfa1b8278d --- /dev/null +++ b/src/clmcwebservice/MANIFEST.in @@ -0,0 +1,2 @@ +include MANIFEST.in +recursive-include clmcservice \ No newline at end of file diff --git a/src/clmcwebservice/clmcservice/__init__.py b/src/clmcwebservice/clmcservice/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bd560622e940ec78c2e6b7f5daaa340c325170e1 --- /dev/null +++ b/src/clmcwebservice/clmcservice/__init__.py @@ -0,0 +1,58 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +from pyramid.config import Configurator +from pyramid.settings import asbool + +from clmcservice.utilities import RUNNING_FLAG, MALFORMED_FLAG + + +def main(global_config, **settings): + """ + This function returns a Pyramid WSGI application. + """ + + # a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings + aggregator_running = asbool(settings.get(RUNNING_FLAG, False)) + settings[RUNNING_FLAG] = asbool(aggregator_running) + + aggregator_report_period = int(settings.get('aggregator_report_period', 5)) + settings['aggregator_report_period'] = aggregator_report_period + + settings[MALFORMED_FLAG] = False + + config = Configurator(settings=settings) + + config.add_route('aggregator_config', '/aggregator/config') + config.add_view('clmcservice.views.AggregatorConfig', attr='get', request_method='GET') + config.add_view('clmcservice.views.AggregatorConfig', attr='put', request_method='PUT') + + config.add_route('aggregator_controller', '/aggregator/control') + config.add_view('clmcservice.views.AggregatorController', attr='get', request_method='GET') + config.add_view('clmcservice.views.AggregatorController', attr='put', request_method='PUT') + + config.add_route('round_trip_time_query', '/query/round-trip-time') + config.add_view('clmcservice.views.RoundTripTimeQuery', attr='get', request_method='GET') + + config.scan() + return config.make_wsgi_app() diff --git a/clmctest/monitoring/E2EAggregator.py b/src/clmcwebservice/clmcservice/aggregator.py similarity index 59% rename from clmctest/monitoring/E2EAggregator.py rename to src/clmcwebservice/clmcservice/aggregator.py index 0d681b71b994778a57ab73edd991003c969d8329..2c0c7d975ee4c4dc3f1afdb1df5ff16986ef80c0 100644 --- a/clmctest/monitoring/E2EAggregator.py +++ b/src/clmcwebservice/clmcservice/aggregator.py @@ -22,86 +22,86 @@ ## Created for Project : FLAME """ +from threading import Event from influxdb import InfluxDBClient from time import time, sleep from urllib.parse import urlparse -from threading import Thread, Event -import clmctest.monitoring.LineProtocolGenerator as lp +from clmcservice.utilities import generate_e2e_delay_report +import getopt +import sys -class Aggregator(Thread): +class Aggregator(object): """ - A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread, - so that the implementation can be tested using pytest. + A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process. """ - REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated + REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'E2EMetrics' # default database the aggregator uses DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses - - def __init__(self, database=DATABASE, database_url=DATABASE_URL): + RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx + + def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): """ Constructs an Aggregator instance. - :param database: database name to use + :param database_name: database name to use :param database_url: database url to use + :param report_period: the report period in seconds """ - super(Aggregator, self).__init__() # call the constructor of the thread - # initialise a database client using the database url and the database name url_object = urlparse(database_url) - self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10) + while True: + try: + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) + break + except: + sleep(self.RETRY_PERIOD) self.db_url = database_url - self.db_name = database - - # a stop flag event object used to handle the killing of the thread - self._stop_flag = Event() + self.db_name = database_name + self.report_period = report_period # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values self.network_cache = {} self.service_cache = {} + # a stop flag event object used to handle the stopping of the process + self._stop_flag = Event() + def stop(self): """ - A method used to stop the thread. + Stop the aggregator from running. """ self._stop_flag.set() - def set_event_lock(self, event): - """ - Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing). - - :param event: the event lock object - """ - - setattr(self, 'event', event) - def run(self): """ Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. """ - if hasattr(self, 'event'): - self.event.set() - current_time = int(time()) - while True: - if self._stop_flag.is_set(): - break + while not self._stop_flag.is_set(): - boundary_time = current_time - Aggregator.REPORT_PERIOD + boundary_time = current_time - self.report_period boundary_time_nano = boundary_time * 1000000000 current_time_nano = current_time * 1000000000 # query the network delays and group them by path ID network_delays = {} - result = self.db_client.query( - 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( - boundary_time_nano, current_time_nano)) + + while True: + try: + result = self.db_client.query( + 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( + boundary_time_nano, current_time_nano)) + break + except: + sleep(self.RETRY_PERIOD) + for item in result.items(): metadata, result_points = item # measurement = metadata[0] @@ -113,7 +113,16 @@ class Aggregator(Thread): # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} - result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) + + while True: + try: + result = self.db_client.query( + 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format( + boundary_time_nano, current_time_nano)) + break + except: + sleep(self.RETRY_PERIOD) + for item in result.items(): metadata, result_points = item # measurement = metadata[0] @@ -159,17 +168,48 @@ class Aggregator(Thread): # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row if None not in e2e_arguments.values(): - self.db_client.write_points( - lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], - e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], - e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time'])) + + while True: + try: + self.db_client.write_points( + generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], + e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], + e2e_arguments['delay_service'], + e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], + e2e_arguments['time'])) + break + except: + sleep(self.RETRY_PERIOD) old_timestamp = current_time - # wait until {REPORT_PERIOD} seconds have passed - while current_time < old_timestamp + self.REPORT_PERIOD: + # wait until {report_period) seconds have passed + while current_time < old_timestamp + self.report_period: sleep(1) current_time = int(time()) if __name__ == '__main__': - Aggregator().start() + + # Parse command line options + try: + opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url=']) + + arg_period = Aggregator.REPORT_PERIOD + arg_database_name = Aggregator.DATABASE + arg_database_url = Aggregator.DATABASE_URL + + # Apply parameters if given + for opt, arg in opts: + if opt in ('-p', '--period'): + arg_period = int(arg) + elif opt in ('-d', '--database'): + arg_database_name = arg + elif opt in ('-u', '--url'): + arg_database_url = arg + + Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run() + + # print the error messages in case of a parse error + except getopt.GetoptError as err: + print(err) + print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>') diff --git a/src/clmcwebservice/clmcservice/tests.py b/src/clmcwebservice/clmcservice/tests.py new file mode 100644 index 0000000000000000000000000000000000000000..eee634b7d0e1f65a443e8b4f7d74222e9be3905e --- /dev/null +++ b/src/clmcwebservice/clmcservice/tests.py @@ -0,0 +1,419 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +import pytest +from pyramid import testing +from pyramid.httpexceptions import HTTPBadRequest +from time import sleep +from clmcservice.utilities import CONFIG_ATTRIBUTES, PROCESS_ATTRIBUTE, RUNNING_FLAG, MALFORMED_FLAG, URL_REGEX +import os +import signal + + +class TestAggregatorAPI(object): + """ + A pytest-implementation test for the aggregator API calls + """ + + @pytest.fixture(autouse=True) + def app_config(self): + """ + A fixture to implement setUp/tearDown functionality for all tests by initializing configuration structure for the web service + """ + + self.config = testing.setUp() + self.config.add_settings({'aggregator_running': False, 'malformed': False, 'aggregator_report_period': 5, + 'aggregator_database_name': 'E2EMetrics', 'aggregator_database_url': "http://172.40.231.51:8086"}) + + yield + + testing.tearDown() + + def test_GET_config(self): + """ + Tests the GET method for the configuration of the aggregator. + """ + + from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself + + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" + + request = testing.DummyRequest() + response = AggregatorConfig(request).get() + + assert response == {'aggregator_report_period': 5, + 'aggregator_database_name': 'E2EMetrics', + 'aggregator_database_url': "http://172.40.231.51:8086"}, "Response must be a dictionary representing a JSON object with the correct configuration data of the aggregator." + + assert self.config.get_settings().get('aggregator_report_period') == 5, "A GET request must not modify the aggregator configuration data." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "A GET request must not modify the aggregator configuration data." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data." + + @pytest.mark.parametrize("input_body, output_value", [ + ('{"aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://171.40.231.51:8086"}', + {'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}), + ('{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', + {'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + ('{"aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + {'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + ('{"aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + {'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + ('{"aggregator_report_period": 200, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "https://172.50.231.51:8086"}', + {'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "https://172.50.231.51:8086"}), + ('{"aggregator_report_period": 150, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "https://localhost:8086"}', + {'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "https://localhost:8086"}), + ("{aggregator_report_period: 2hb5, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), + ("{aggregator_report_period: 250-, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:8086}", None), + ("{aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: ftp://172.60.231.51:8086}", None), + ("{aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086/query param}", None), + ("{aggregator_report_period: 250, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:808686}", None), + ("{}", None), + ("{aggregator_running: true}", None), + ]) + def test_PUT_config(self, input_body, output_value): + """ + Tests the PUT method for the configuration of the aggregator + :param input_body: the input body parameter + :param output_value: the expected output value, None for expecting an Exception + """ + + from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" + + request = testing.DummyRequest() + request.body = input_body.encode(request.charset) + + if output_value is not None: + response = AggregatorConfig(request).put() + assert response == output_value, "Response of PUT request must include the new configuration of the aggregator" + + for attribute in CONFIG_ATTRIBUTES: + assert self.config.get_settings().get(attribute) == output_value.get(attribute), "Aggregator settings configuration is not updated." + + assert not self.config.get_settings().get(RUNNING_FLAG), "Aggregator running status should not be updated after a configuration update." + else: + error_raised = False + try: + AggregatorConfig(request).put() + except HTTPBadRequest: + error_raised = True + + assert error_raised, "Error must be raised in case of an invalid argument." + + def test_start(self): + """ + Tests starting the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been started." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been started." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." + + # kill the started process after the test is over + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid + os.kill(pid, signal.SIGTERM) + + def test_stop(self): + """ + Tests stopping the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + # send a start request to trigger the aggregator + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + AggregatorController(request).put() + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." + + # test stopping the aggregator process when it is running + request = testing.DummyRequest() + input_body = '{"action": "stop"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." + + sleep(2) # put a 2 seconds timeout so that the aggregator process can terminate + + # test stopping the aggregator process when it is not running + request = testing.DummyRequest() + input_body = '{"action": "stop"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." + + def test_restart(self): + """ + Tests restarting the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + # test restarting the aggregator process when it is stopped + request = testing.DummyRequest() + input_body = '{"action": "restart"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." + + # test restarting the aggregator process when it is running + request = testing.DummyRequest() + input_body = '{"action": "restart"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." + + # kill the started process after the test is over + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid + os.kill(pid, signal.SIGTERM) + + @pytest.mark.parametrize("input_body", [ + '{"action": "malformed"}', + '{"action": true}', + '{"action": false}', + '{"action": 1}', + '{invalid-json}', + '{"action": "start", "unneeded_argument": false}', + '{}' + ]) + def test_malformed_actions(self, input_body): + """ + Tests sending a malformed type of action to the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + # test restarting the aggregator process when it is running + request = testing.DummyRequest() + input_body = input_body + request.body = input_body.encode(request.charset) + + error_raised = False + try: + AggregatorController(request).put() + except HTTPBadRequest: + error_raised = True + + assert error_raised + + def test_GET_status(self): + """ + Tests the GET method for the status of the aggregator. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + request = testing.DummyRequest() + response = AggregatorController(request).get() + + assert response == {'aggregator_running': False}, "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." + + assert not self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." + + # test status with malformed configuration + self.config.get_settings()[MALFORMED_FLAG] = True + self.config.get_settings()[RUNNING_FLAG] = True + request = testing.DummyRequest() + response = AggregatorController(request).get() + + assert response == {'aggregator_running': True, + 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'}, \ + "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." + + assert self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert self.config.get_settings().get(MALFORMED_FLAG), "A GET request must not modify the aggregator malformed flag." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." + + def test_malformed_flag_behaviour(self): + """ + Tests the behaviour of the malformed configuration flag of the aggregator when doing a sequence of API calls. + """ + + from clmcservice.views import AggregatorController, AggregatorConfig # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not self.config.get_settings().get(MALFORMED_FLAG), "Initially aggregator is not in a malformed state" + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" + + # start the aggregator with the default configuration + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been started." + + # update the configuration of the aggregator while it is running + config_body = '{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}' + output_body = {'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086", 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'} + request = testing.DummyRequest() + request.body = config_body.encode(request.charset) + response = AggregatorConfig(request).put() + assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" + + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." + + # check that the malformed flag has been updated through a GET call + request = testing.DummyRequest() + response = AggregatorController(request).get() + assert response == {'aggregator_running': True, + 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'}, \ + "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." + + # restart the aggregator with the new configuration + request = testing.DummyRequest() + input_body = '{"action": "restart"}' + request.body = input_body.encode(request.charset) + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator should have been restarted." + + # update the configuration again while the aggregator is running + config_body = '{"aggregator_report_period": 30, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}' + output_body = {'aggregator_report_period': 30, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086", 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'} + request = testing.DummyRequest() + request.body = config_body.encode(request.charset) + response = AggregatorConfig(request).put() + assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" + + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." + + # stop the aggregator - this should also reset the malformed status flag + # restart the aggregator with the new configuration + request = testing.DummyRequest() + input_body = '{"action": "stop"}' + request.body = input_body.encode(request.charset) + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "The aggregator should have been stopped." + + +class TestRegexURL(object): + """ + A pytest-implementation test for the regular expression the service uses to validate the database URL + """ + + @pytest.mark.parametrize("valid_url", [ + "http://localhost:8080/", + "https://localhost:80/url/path", + "https://192.168.20.20/?query=param", + "http://custom.domain.com", + "http://domain.net:8888/", + "https://10.160.150.4:21", + "http://localhost:12345", + "http://domain.com:21/path", + "http://domain.com:32?path", + "http://domain.com:43#path" + ]) + def test_valid_urls(self, valid_url): + """ + Tests that the regular expression can detect valid URLs. + + :param valid_url: a string representing a valid URL + """ + + matched_object = URL_REGEX.match(valid_url) + + assert matched_object is not None, "The regular expression fails in validating a correct URL." + + assert matched_object.group() is not None, "The matched object should return the full-match string" + + @pytest.mark.parametrize("invalid_url", [ + "ftp://localhost:80/url/path", + "tcp://192.168.20.20/?query=param", + "http:/localhost:80/", + "https//localhost:8080/", + "https://domain:1234/url/path", + "http://domain.com:808080/", + "http://localhost:8-080/", + "http://localhost:port80/", + "http://domain.com:8080url/path", + "http://domain.com:8080/?url path", + ]) + def test_invalid_urls(self, invalid_url): + """ + Tests that the regular expression can detect invalid URLs. + + :param invalid_url: a string representing an invalid URL + """ + + matched_object = URL_REGEX.match(invalid_url) + + assert matched_object is None, "The regular expression fails in detecting an invalid URL." diff --git a/src/clmcwebservice/clmcservice/utilities.py b/src/clmcwebservice/clmcservice/utilities.py new file mode 100644 index 0000000000000000000000000000000000000000..44ccffed7ce4120b22c31b55a7b81cde33344f9b --- /dev/null +++ b/src/clmcwebservice/clmcservice/utilities.py @@ -0,0 +1,162 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +from json import loads +from re import compile, IGNORECASE + +CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url') # all of the configuration attributes - to be used as dictionary keys + +RUNNING_FLAG = 'aggregator_running' # Attribute for storing the flag, which shows whether the aggregator is running or not - to be used as a dictionary key + +PROCESS_ATTRIBUTE = 'aggregator_process' # Attribute for storing the process object of the aggregator - to be used as a dictionary key + +# a 'malformed' running state of the aggregator is when the configuration is updated, but the aggregator is not restarted so it is running with an old version of the conf. +MALFORMED_FLAG = 'malformed' # Attribute for storing the flag, which shows whether the aggregator is running in an malformed state or not - to be used as a dictionary key + +# used to indicate a malformed configuration message +COMMENT_ATTRIBUTE = 'comment' +COMMENT_VALUE = 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.' + +# the attributes of the JSON response body that are expected when querying round trip time +ROUND_TRIP_ATTRIBUTES = ('media_service', 'start_timestamp', 'end_timestamp') + + +URL_REGEX = compile( + r'^https?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain, e.g. example.domain.com + r'localhost|' # or localhost... + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # or IP address (IPv4 format) + r'(?::\d{2,5})?' # optional port number + r'(?:[/?#][^\s]*)?$', # URL path or query parameters + IGNORECASE) + + +def validate_config_content(configuration): + """ + A utility function to validate a configuration string representing a JSON dictionary. + + :param configuration: the configuration string to validate + :return the validated configuration dictionary object with the values converted to their required type + :raise AssertionError: if the argument is not a valid configuration + """ + + global CONFIG_ATTRIBUTES + + try: + configuration = loads(configuration) + except: + raise AssertionError("Configuration must be a JSON object.") + + assert len(configuration) == len(CONFIG_ATTRIBUTES), "Configuration mustn't contain more attributes than the required ones." + + for attribute in CONFIG_ATTRIBUTES: + assert attribute in configuration, "Required attribute not found in the request content." + + assert type(configuration.get('aggregator_report_period')) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period')) + + assert configuration.get('aggregator_report_period') > 0, "Report period must be a positive integer, received {0} instead.".format(configuration.get('aggregator_report_period')) + + assert URL_REGEX.match(configuration.get('aggregator_database_url')) is not None, "The aggregator must have a valid database URL in its configuration, received {0} instead.".format(configuration.get('aggregator_database_url')) + + return configuration + + +def validate_action_content(content): + """ + A utility function to validate a content string representing a JSON dictionary. + + :param content: the content string to validate + :return: the validated content dictionary + :raise AssertionError: if the argument is not a valid json content + """ + + try: + content = loads(content) + except: + raise AssertionError("Content must be a JSON object.") + + assert len(content) == 1, "Content mustn't contain more attributes than the required one." + + assert content['action'] in ('start', 'stop', 'restart') + + return content + + +def validate_round_trip_query_params(params): + """ + A utility function to validate a dictionary of parameters. + + :param params: the params dict to validate + :return: the validated parameters dictionary + :raise AssertionError: if the argument is not a valid json content + """ + + global ROUND_TRIP_ATTRIBUTES + + assert len(params) == len(ROUND_TRIP_ATTRIBUTES), "Content mustn't contain more attributes than the required ones." + + for attribute in ROUND_TRIP_ATTRIBUTES: + assert attribute in params, "Required attribute not found in the request content." + + return params + + +def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time): + """ + Generates a combined averaged measurement about the e2e delay and its contributing parts + + :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_sfr: source service router + :param target_sfr: target service router + :param endpoint: endpoint of the media component + :param sf_instance: service function instance (media component) + :param delay_forward: Path delay (Forward direction) + :param delay_reverse: Path delay (Reverse direction) + :param delay_service: the media service component response time + :param avg_request_size: averaged request size + :param avg_response_size: averaged response size + :param avg_bandwidth: averaged bandwidth + :param time: measurement timestamp + :return: a list of dict-formatted reports to post on influx + """ + + result = [{"measurement": "e2e_delays", + "tags": { + "path_ID": path_id, + "source_SFR": source_sfr, + "target_SFR": target_sfr, + "endpoint": endpoint, + "sf_instance": sf_instance + }, + "fields": { + "delay_forward": float(delay_forward), + "delay_reverse": float(delay_reverse), + "delay_service": float(delay_service), + "avg_request_size": float(avg_request_size), + "avg_response_size": float(avg_response_size), + "avg_bandwidth": float(avg_bandwidth) + }, + "time": int(1000000000*time) + }] + + return result diff --git a/src/clmcwebservice/clmcservice/views.py b/src/clmcwebservice/clmcservice/views.py new file mode 100644 index 0000000000000000000000000000000000000000..049be9daacb187ea3ecfb837f9c86a75a3660804 --- /dev/null +++ b/src/clmcwebservice/clmcservice/views.py @@ -0,0 +1,279 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +from pyramid.view import view_defaults +from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError +from influxdb import InfluxDBClient +from urllib.parse import urlparse +from subprocess import Popen, DEVNULL +from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \ + CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE +import os.path + + +@view_defaults(route_name='aggregator_config', renderer='json') +class AggregatorConfig(object): + """ + A class-based view for accessing and mutating the configuration of the aggregator. + """ + + def __init__(self, request): + """ + Initialises the instance of the view with the request argument. + + :param request: client's call request + """ + + self.request = request + + def get(self): + """ + A GET API call for the configuration of the aggregator. + + :return: A JSON response with the configuration of the aggregator. + """ + + aggregator_data = self.request.registry.settings + config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES} + + return config + + def put(self): + """ + A PUT API call for the status of the aggregator. + + :return: A JSON response to the PUT call - essentially with the new configured data and comment of the state of the aggregator + :raises HTTPBadRequest: if request body is not a valid JSON for the configurator + """ + + old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} + new_config = self.request.body.decode(self.request.charset) + + try: + new_config = validate_config_content(new_config) + + for attribute in CONFIG_ATTRIBUTES: + self.request.registry.settings[attribute] = new_config.get(attribute) + + # if configuration is not already malformed, check whether the configuration is updated + if not self.request.registry.settings[MALFORMED_FLAG]: + malformed = old_config != new_config and self.request.registry.settings[RUNNING_FLAG] + self.request.registry.settings[MALFORMED_FLAG] = malformed + if malformed: + new_config[MALFORMED_FLAG] = True + new_config[COMMENT_ATTRIBUTE] = COMMENT_VALUE + + return new_config + + except AssertionError: + raise HTTPBadRequest("Bad request content - configuration format is incorrect.") + + +@view_defaults(route_name='aggregator_controller', renderer='json') +class AggregatorController(object): + + """ + A class-based view for controlling the aggregator. + """ + + def __init__(self, request): + """ + Initialises the instance of the view with the request argument. + + :param request: client's call request + """ + + self.request = request + + def get(self): + """ + A GET API call for the status of the aggregator - running or not. + + :return: A JSON response with the status of the aggregator. + """ + + aggregator_data = self.request.registry.settings + config = {RUNNING_FLAG: aggregator_data.get(RUNNING_FLAG)} + + if aggregator_data[MALFORMED_FLAG] and aggregator_data[RUNNING_FLAG]: + config[MALFORMED_FLAG] = True + config[COMMENT_ATTRIBUTE] = COMMENT_VALUE + + return config + + def put(self): + """ + A PUT API call for the status of the aggregator. + + :return: A JSON response to the PUT call - essentially saying whether the aggregator is running or not + :raises HTTPBadRequest: if request body is not a valid JSON for the controller + """ + + content = self.request.body.decode(self.request.charset) + + try: + content = validate_action_content(content) + + config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} + + action = content['action'] + + if action == 'start': + aggregator_started = self.request.registry.settings[RUNNING_FLAG] + if not aggregator_started: + process = self.start_aggregator(config) + self.request.registry.settings[RUNNING_FLAG] = True + self.request.registry.settings[PROCESS_ATTRIBUTE] = process + elif action == 'stop': + self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) + self.request.registry.settings[RUNNING_FLAG] = False + self.request.registry.settings[PROCESS_ATTRIBUTE] = None + self.request.registry.settings[MALFORMED_FLAG] = False + elif action == 'restart': + self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) + process = self.start_aggregator(config) + self.request.registry.settings[RUNNING_FLAG] = True + self.request.registry.settings[PROCESS_ATTRIBUTE] = process + self.request.registry.settings[MALFORMED_FLAG] = False + + return {RUNNING_FLAG: self.request.registry.settings.get(RUNNING_FLAG)} + + except AssertionError: + raise HTTPBadRequest('Bad request content - must be in JSON format: {"action": value}, where value is "start", "stop" or "restart".') + + @staticmethod + def start_aggregator(config): + """ + An auxiliary method to start the aggregator. + + :param config: the configuration containing the arguments for the aggregator + :return: the process object of the started aggregator script + """ + + dir_path = os.path.dirname(os.path.realpath(__file__)) + command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', + config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] + process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL) + print("\nStarted aggregator process with PID: {0}\n".format(process.pid)) + + return process + + @staticmethod + def stop_aggregator(process): + """ + An auxiliary method to stop the aggregator. + + :param process: the process to terminate + """ + + # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value + if process is not None and process.poll() is None: + process.terminate() + print("\nStopped aggregator process with PID: {0}\n".format(process.pid)) + + +@view_defaults(route_name='round_trip_time_query', renderer='json') +class RoundTripTimeQuery(object): + + """ + A class-based view for querying the round trip time in a given range. + """ + + def __init__(self, request): + """ + Initialises the instance of the view with the request argument. + + :param request: client's call request + """ + + self.request = request + + def get(self): + """ + A GET API call for the averaged round trip time of a specific media service over a given time range. + + :return: A JSON response with the round trip time and its contributing parts. + """ + + params = {} + for attribute in ROUND_TRIP_ATTRIBUTES: + if attribute in self.request.params: + params[attribute] = self.request.params.get(attribute) + + try: + params = validate_round_trip_query_params(params) + config_data = {config_attribute: self.request.registry.settings.get(config_attribute) for config_attribute in CONFIG_ATTRIBUTES} + + media_service = params.get(ROUND_TRIP_ATTRIBUTES[0]) + start_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[1]) + end_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[2]) + influx_db_name = config_data.get(CONFIG_ATTRIBUTES[1]) + influx_db_url = config_data.get(CONFIG_ATTRIBUTES[2]) + + url_object = urlparse(influx_db_url) + try: + db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=influx_db_name, timeout=10) + query = 'SELECT mean(*) FROM "{0}"."autogen"."e2e_delays" WHERE time >= {1} and time < {2} and sf_instance = \'{3}\''.format( + influx_db_name, start_timestamp, end_timestamp, media_service) + print(query) + result = db_client.query(query) + + actual_result = next(result.get_points(), None) + if actual_result is None: + return {"result": None} + else: + forward_latency = actual_result.get("mean_delay_forward") + reverse_latency = actual_result.get("mean_delay_reverse") + service_delay = actual_result.get("mean_delay_service") + request_size = actual_result.get("mean_avg_request_size") + response_size = actual_result.get("mean_avg_response_size") + bandwidth = actual_result.get("mean_avg_bandwidth") + + rtt = self.calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth) + return {"result": rtt} + except: + raise HTTPInternalServerError("Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url)) + + except AssertionError: + raise HTTPBadRequest('Bad request content - must be in JSON format: {"media_service": value, "start_timestamp": value, "end_timestamp": value}.') + + @staticmethod + def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50): + """ + Calculates the round trip time given the list of arguments. + + :param forward_latency: network latency in forward direction (s) + :param reverse_latency: network latency in reverse direction (s) + :param service_delay: media service delay (s) + :param request_size: request size (bytes) + :param response_size: response size (bytes) + :param bandwidth: network bandwidth (Mb/s) + :param packet_size: size of packet (bytes) + :param packet_header_size: size of the header of the packet (bytes) + :return: the calculated round trip time + """ + + forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size)) + reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size)) + + return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay diff --git a/src/clmcwebservice/development.ini b/src/clmcwebservice/development.ini new file mode 100644 index 0000000000000000000000000000000000000000..9ec4dc522937ff2c1606dd19287c5c2f23ce5598 --- /dev/null +++ b/src/clmcwebservice/development.ini @@ -0,0 +1,62 @@ +### +# app configuration +# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/environment.html +### + +[app:main] +use = egg:clmcservice + +pyramid.reload_templates = true +pyramid.debug_authorization = false +pyramid.debug_notfound = false +pyramid.debug_routematch = false +pyramid.default_locale_name = en +pyramid.includes = pyramid_debugtoolbar +aggregator_running = false +aggregator_report_period = 5 +aggregator_database_name = E2EMetrics +aggregator_database_url = http://172.40.231.51:8086 + +# By default, the toolbar only appears for clients from IP addresses +# '127.0.0.1' and '::1'. +# debugtoolbar.hosts = 127.0.0.1 ::1 + +### +# wsgi server configuration +### + +[server:main] +use = egg:waitress#main +listen = localhost:8080 + +### +# logging configuration +# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/logging.html +### + +[loggers] +keys = root, clmcservice + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = INFO +handlers = console + +[logger_clmcservice] +level = DEBUG +handlers = +qualname = clmcservice + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s diff --git a/src/clmcwebservice/production.ini b/src/clmcwebservice/production.ini new file mode 100644 index 0000000000000000000000000000000000000000..7f5e32331a58c686de4361e6d12a011e56a340b1 --- /dev/null +++ b/src/clmcwebservice/production.ini @@ -0,0 +1,57 @@ +### +# app configuration +# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/environment.html +### + +[app:main] +use = egg:clmcservice + +pyramid.reload_templates = false +pyramid.debug_authorization = false +pyramid.debug_notfound = false +pyramid.debug_routematch = false +pyramid.default_locale_name = en +aggregator_running = false +aggregator_report_period = 5 +aggregator_database_name = E2EMetrics +aggregator_database_url = http://172.40.231.51:8086 + +### +# wsgi server configuration +### + +[server:main] +use = egg:waitress#main +listen = *:8080 + +### +# logging configuration +# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/logging.html +### + +[loggers] +keys = root, clmcservice + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console + +[logger_clmcservice] +level = WARN +handlers = +qualname = clmcservice + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s diff --git a/src/clmcwebservice/pytest.ini b/src/clmcwebservice/pytest.ini new file mode 100644 index 0000000000000000000000000000000000000000..2fb94a6c8131efcc6dcc9357a5b56f68af607de3 --- /dev/null +++ b/src/clmcwebservice/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = clmcservice +python_files = *.py diff --git a/src/clmcwebservice/setup.py b/src/clmcwebservice/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..817d1bdbebd1b4d58b61144a3edf692832d2ea60 --- /dev/null +++ b/src/clmcwebservice/setup.py @@ -0,0 +1,84 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + + +import os +import os.path +from setuptools import setup, find_packages + + +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() + + +def get_version(fname): + if os.path.isfile(fname): + git_revision = read(fname) + else: + git_revision = "SNAPSHOT" + + return git_revision + + +requires = [ + 'plaster_pastedeploy', + 'pyramid', + 'pyramid_debugtoolbar', + 'waitress', + 'influxdb', + 'pytest', +] + +tests_require = [ + 'WebTest >= 1.3.1', # py3 compat + 'pytest-cov', +] + +setup( + name = "clmcservice", + version = get_version("_version.py"), + author = "Michael Boniface", + author_email = "mjb@it-innovation.soton.ac.uk", + description = "FLAME CLMC Service Module", + long_description="FLAME CLMC Service", + license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE", + keywords = "FLAME CLMC service", + url = 'https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc', + packages=find_packages(), + include_package_data=True, + install_requires=requires, + extras_require={ + 'testing': tests_require, + }, + package_data={'': ['_version.py']}, + classifiers=[ + "Development Status :: Alpha", + "Topic :: FLAME CLMC Service", + "License :: ", + ], + entry_points={ + 'paste.app_factory': [ + 'main = clmcservice:main', + ], + }, +) \ No newline at end of file diff --git a/src/clmcwebservice/tox.ini b/src/clmcwebservice/tox.ini new file mode 100644 index 0000000000000000000000000000000000000000..b7fe1eac855e39b717787d62b2a9c2144d715b7e --- /dev/null +++ b/src/clmcwebservice/tox.ini @@ -0,0 +1,5 @@ +[tox] +envlist = py36 +[testenv] +deps=pytest +commands=pytest \ No newline at end of file