diff --git a/.coveragerc b/.coveragerc deleted file mode 100644 index a5040abd159bdd7db4b7be8b9954163e2ca7a1f1..0000000000000000000000000000000000000000 --- a/.coveragerc +++ /dev/null @@ -1,3 +0,0 @@ -[run] -source = CLMCservice -omit = CLMCservice/tests.py diff --git a/.gitignore b/.gitignore index 9225ad6177e3a883536cf870c851dda742d4640c..cb69709cf8833a9ae16e813795f997ebe9c00207 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,6 @@ ubuntu-xenial-16.04-cloudimg-console.log .idea/ *.egg *.pyc -.pytest_cache .tox *$py.class +**/.pytest_cache/ diff --git a/CLMCservice/__init__.py b/CLMCservice/__init__.py deleted file mode 100644 index 80dfcc74f743b5e543ce9320d4e92541577b8f7b..0000000000000000000000000000000000000000 --- a/CLMCservice/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -from pyramid.config import Configurator -from pyramid.settings import asbool -from CLMCservice.views import AggregatorConfig - - -def main(global_config, **settings): - """ This function returns a Pyramid WSGI application.""" - - # a conversion is necessary so that the configuration value of the aggregator is stored as bool and not as string - aggregator_running = asbool(settings.get('aggregator_running', 'false')) - settings['aggregator_running'] = aggregator_running - - config = Configurator(settings=settings) - - config.add_route('aggregator', '/aggregator') - config.add_view(AggregatorConfig, attr='get', request_method='GET') - config.add_view(AggregatorConfig, attr='post', request_method='POST') - - config.scan() - return config.make_wsgi_app() diff --git a/CLMCservice/tests.py b/CLMCservice/tests.py deleted file mode 100644 index c475fa7fa0e49591d13aefbf718ae536c6ec5980..0000000000000000000000000000000000000000 --- a/CLMCservice/tests.py +++ /dev/null @@ -1,75 +0,0 @@ -import pytest -from pyramid import testing -from pyramid.httpexceptions import HTTPBadRequest - - -class TestAggregatorConfig(object): - """ - A pytest-implementation test for the aggregator configuration API calls - """ - - @pytest.fixture(autouse=True) - def app_config(self): - """ - A fixture to implement setUp/tearDown functionality for all tests by initializing configuration structure for the web service - """ - - self.config = testing.setUp() - self.config.add_settings({'aggregator_running': False}) - - yield - - testing.tearDown() - - def test_GET(self): - """ - Tests the GET method for the status of the aggregator. - """ - - from CLMCservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself - - assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." - - request = testing.DummyRequest() - response = AggregatorConfig(request).get() - - assert type(response) == dict, "Response must be a dictionary representing a JSON object." - assert not response.get('aggregator_running'), "The response of the API call must return the aggregator status being set as False" - assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator status." - - @pytest.mark.parametrize("input_val, output_val", [ - ("True", True), - ("true", True), - ("1", True), - ("False", False), - ("false", False), - ("0", False), - ("t", None), - ("f", None), - ]) - def test_POST(self, input_val, output_val): - """ - Tests the POST method for the status of the aggregator - :param input_val: the input form parameter - :param output_val: the expected output value, None for expecting an Exception - """ - - from CLMCservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself - - assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running." - - request = testing.DummyRequest() - - request.params['running'] = input_val - if output_val is not None: - response = AggregatorConfig(request).post() - assert response == {'aggregator_running': output_val}, "Response of POST request must include the new status of the aggregator" - assert self.config.get_settings().get('aggregator_running') == output_val, "Aggregator status must be updated to running." - else: - error_raised = False - try: - AggregatorConfig(request).post() - except HTTPBadRequest: - error_raised = True - - assert error_raised, "Error must be raised in case of an invalid argument." diff --git a/CLMCservice/utilities.py b/CLMCservice/utilities.py deleted file mode 100644 index e17818ae15f7be180d85aebc79ec1574761a1840..0000000000000000000000000000000000000000 --- a/CLMCservice/utilities.py +++ /dev/null @@ -1,17 +0,0 @@ -def str_to_bool(value): - """ - A utility function to convert a string to boolean based on simple rules. - :param value: the value to convert - :return: True or False - :raises ValueError: if value cannot be converted to boolean - """ - - if type(value) is not str: - raise ValueError("This method only converts string to booolean.") - - if value in ('False', 'false', '0'): - return False - elif value in ('True', 'true', '1'): - return True - else: - raise ValueError("Invalid argument for conversion") diff --git a/CLMCservice/views.py b/CLMCservice/views.py deleted file mode 100644 index 4b92523509077c7b2319c3d0d625a880ebd5cdbb..0000000000000000000000000000000000000000 --- a/CLMCservice/views.py +++ /dev/null @@ -1,46 +0,0 @@ -from pyramid.view import view_defaults -from pyramid.httpexceptions import HTTPBadRequest - -from CLMCservice.utilities import str_to_bool - - -@view_defaults(route_name='aggregator', renderer='json') -class AggregatorConfig(object): - """ - A class-based view for accessing and mutating the status of the aggregator. - """ - - def __init__(self, request): - """ - Initialises the instance of the view with the request argument. - :param request: client's call request - """ - - self.request = request - - def get(self): - """ - A GET API call for the status of the aggregator. - :return: A JSON response with the status of the aggregator. - """ - - aggregator_running = self.request.registry.settings.get('aggregator_running') - return {'aggregator_running': aggregator_running} - - def post(self): - """ - A POST API call for the status of the aggregator. - :return: A JSON response to the POST call (success or fail). - :raises HTTPBadRequest: if form argument cannot be converted to boolean - """ - - new_status = self.request.params.get('running') - - try: - new_status = str_to_bool(new_status) - except ValueError: - raise HTTPBadRequest("Bad request parameter - expected a boolean, received {0}".format(self.request.params.get('running'))) - - self.request.registry.settings['aggregator_running'] = new_status - # TODO start/stop aggregator based on value of new status - return {'aggregator_running': new_status} diff --git a/README.md b/README.md index 56222b74a8e0afb4c9b66c0c83fd3fc29b7fac0b..05f8934137f2f05f9519ff0c6b02eb0cda64c205 100644 --- a/README.md +++ b/README.md @@ -104,45 +104,4 @@ Then the package is installed Then the tests are run -`sudo apt-get install python3-pip` - -`pip3 install pytest` - - -#### CLMC Service - -The CLMC service is implemented using the Pyramid framework. (currently under development) - -Before installing the CLMC service and its dependencies, it is recommended to use a virtual environment. To manage virtual -environments, **virtualenvwrapper** can be used. - -``` -pip install virtualenvwrapper -``` - -To create a virtual environment use the **mkvirtualenv** command: - -``` -mkvirtualenv CLMC -``` - -When created, you should already be set to use the new virtual environment, but to make sure of this use the **workon** command: - -``` -workon CLMC -``` - -Now, any installed libraries will be specificly installed in this environment only. To install and use the CLMC service -locally, the easiest thing to do is to use **pip** (make sure you are in the root folder of the project - ***flame-clmc***): - -``` -pip install -e . -``` - -Finally, start the service on localhost by using pyramid's **pserve**: - -``` -pserve development.ini --reload -``` - -You should now be able to see the 'Hello world' message when visiting **http://localhost:8080** in your browser. +`vagrant --fixture=scripts -- ssh test-runner -- -tt "pytest -s --pyargs clmctest.scripts"` diff --git a/clmctest/monitoring/E2EAggregator.py b/clmctest/monitoring/E2EAggregator.py index 8bef3d921ef0345fc3db6eb7ef6933b6c7101f34..0d681b71b994778a57ab73edd991003c969d8329 100644 --- a/clmctest/monitoring/E2EAggregator.py +++ b/clmctest/monitoring/E2EAggregator.py @@ -59,6 +59,10 @@ class Aggregator(Thread): # a stop flag event object used to handle the killing of the thread self._stop_flag = Event() + # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values + self.network_cache = {} + self.service_cache = {} + def stop(self): """ A method used to stop the thread. @@ -66,11 +70,23 @@ class Aggregator(Thread): self._stop_flag.set() + def set_event_lock(self, event): + """ + Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing). + + :param event: the event lock object + """ + + setattr(self, 'event', event) + def run(self): """ Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. """ + if hasattr(self, 'event'): + self.event.set() + current_time = int(time()) while True: if self._stop_flag.is_set(): @@ -84,61 +100,73 @@ class Aggregator(Thread): # query the network delays and group them by path ID network_delays = {} result = self.db_client.query( - 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( + 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] + result = next(result_points) + network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] + self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} - result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) + result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) + result = next(result_points) + service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) + self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement for path in network_delays: # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr path_id, source, target = path - if target not in service_delays: + if target not in service_delays and target not in self.service_cache: # if not continue with the other network path reports continue e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, - "delay_service": None, "time": boundary_time} + "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time} e2e_arguments['path_ID'] = path_id - e2e_arguments['delay_forward'] = network_delays[path] + e2e_arguments['source_SFR'] = source + e2e_arguments['target_SFR'] = target + e2e_arguments['delay_forward'] = network_delays[path][0] + e2e_arguments['avg_bandwidth'] = network_delays[path][1] # reverse the path ID to get the network delay for the reversed path reversed_path = (path_id, target, source) - assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A - e2e_arguments['delay_reverse'] = network_delays[reversed_path] + if reversed_path in network_delays or reversed_path in self.network_cache: + # get the reverse delay, use the latest value if reported or the cache value + e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0] + else: + e2e_arguments['delay_reverse'] = None # get the response time of the media component connected to the target SFR - service_delay = service_delays[target] - response_time, endpoint, sf_instance = service_delay + service_delay = service_delays.get(target, self.service_cache.get(target)) + response_time, request_size, response_size, endpoint, sf_instance = service_delay # put these points in the e2e arguments dictionary e2e_arguments['delay_service'] = response_time + e2e_arguments['avg_request_size'] = request_size + e2e_arguments['avg_response_size'] = response_size e2e_arguments['endpoint'] = endpoint e2e_arguments['sf_instance'] = sf_instance # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row - if None not in e2e_arguments.items(): + if None not in e2e_arguments.values(): self.db_client.write_points( lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], - e2e_arguments['time'])) + e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time'])) old_timestamp = current_time # wait until {REPORT_PERIOD} seconds have passed - while current_time != old_timestamp + self.REPORT_PERIOD: + while current_time < old_timestamp + self.REPORT_PERIOD: sleep(1) current_time = int(time()) diff --git a/clmctest/monitoring/E2ESim.py b/clmctest/monitoring/E2ESim.py index a8974f880916ff5b568b98a859e3161b39f17d03..a6520d280b52353ff7120746949a9127ecbed875 100644 --- a/clmctest/monitoring/E2ESim.py +++ b/clmctest/monitoring/E2ESim.py @@ -74,49 +74,79 @@ class Simulator(object): # all network delays start from 1ms, the dictionary stores the information to report paths = [ - {'target': 'SR3', - 'source': 'SR1', - 'path_id': 'SR1---SR3', - 'network_delay': 1}, - {'target': 'SR1', - 'source': 'SR3', - 'path_id': 'SR1---SR3', - 'network_delay': 1} + { + 'target': 'SR2', + 'source': 'SR1', + 'path_id': 'SR1---SR2', + 'latency': 5, + 'bandwidth': 100*1024*1024 + }, + { + 'target': 'SR1', + 'source': 'SR2', + 'path_id': 'SR1---SR2', + 'latency': 5, + 'bandwidth': 100*1024*1024 + }, + { + 'target': 'SR3', + 'source': 'SR1', + 'path_id': 'SR1---SR3', + 'latency': 5, + 'bandwidth': 100*1024*1024 + }, + { + 'target': 'SR1', + 'source': 'SR3', + 'path_id': 'SR1---SR3', + 'latency': 5, + 'bandwidth': 100*1024*1024 + } ] + service_function_instances = [ + { + 'endpoint': 'ms1.flame.org', + 'sf_instance': 'sr2.ms1.flame.org', # TODO: what did we decide the sf_instance would look like? + 'sfr': 'SR2', + 'service_delay': 40, + 'cpus': 1 + }, + { + 'endpoint': 'ms1.flame.org', + 'sf_instance': 'sr3.ms1.flame.org', # TODO: what did we decide the sf_instance would look like? + 'sfr': 'SR3', + 'service_delay': 10, + 'cpus': 4 + } + ] + + av_request_size = 10 * 1024 * 1024 # average request size measured by service function / Bytes + av_response_size = 1 * 1024 # average request size measured by service function / Bytes + # current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time start_time = int(time.time()) sim_time = start_time - mean_delay_seconds_media = 10 # initial mean media service delay - sample_period_net = 2 # sample period for reporting network delays (measured in seconds) - net measurements reported every 2s - sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) - service measurements reported every 5 seconds + sample_period_net = 1 # sample period for reporting network delays (measured in seconds) + sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) for i in range(0, self.SIMULATION_LENGTH): - # measure net delay every 2 seconds for path SR1-SR3 (generates on tick 0, 2, 4, 6, 8, 10.. etc.) + # report one of the network delays every sample_period_net seconds if i % sample_period_net == 0: - path = paths[0] - self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time)) + path = random.choice(paths) + self.db_client.write_points( + lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['latency'], path['bandwidth'], sim_time)) # increase/decrease the delay in every sample report (min delay is 1) - path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) - - # measure net delay every 2 seconds for path SR2-SR3 (generates on tick 1, 3, 5, 7, 9, 11.. etc.) - if (i+1) % sample_period_net == 0: - path = paths[1] - self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time)) + path['latency'] = max(1, path['latency'] + random.randint(-3, 3)) - # increase/decrease the delay in every sample report (min delay is 1) - path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) - - # measure service response time every 5 seconds + # report one of the service_function_instance response times every sample_period_media seconds if i % sample_period_media == 0: - self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "endpoint-1", - "ms-A.ict-flame.eu", "SR3", sim_time)) - - # increase/decrease the delay in every sample report (min delay is 10) - mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)])) + service = random.choice(service_function_instances) + self.db_client.write_points(lp.generate_service_delay_report( + service['endpoint'], service['sf_instance'], service['sfr'], service['service_delay'], av_request_size, av_response_size, sim_time)) # increase the time by one simulation tick sim_time += self.TICK diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 432f27d41769bcf68d776b91f719d9f4bcb8d122..b9bf9a6249234131506e9fddca2fa77af8450ede 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -29,18 +29,21 @@ import uuid from random import randint -def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time): +def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time): """ Generates a combined averaged measurement about the e2e delay and its contributing parts - :param path_ID: The path identifier, which is a bidirectional path ID for the request and the response path - :param source_SFR: source service router - :param target_SFR: target service router + :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_sfr: source service router + :param target_sfr: target service router :param endpoint: endpoint of the media component :param sf_instance: service function instance (media component) :param delay_forward: Path delay (Forward direction) :param delay_reverse: Path delay (Reverse direction) :param delay_service: the media service component response time + :param avg_request_size: averaged request size + :param avg_response_size: averaged response size + :param avg_bandwidth: averaged bandwidth :param time: measurement timestamp :return: a list of dict-formatted reports to post on influx """ @@ -56,7 +59,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst "fields": { "delay_forward": float(delay_forward), "delay_reverse": float(delay_reverse), - "delay_service": float(delay_service) + "delay_service": float(delay_service), + "avg_request_size": float(avg_request_size), + "avg_response_size": float(avg_response_size), + "avg_bandwidth": float(avg_bandwidth) }, "time": _getNSTime(time) }] @@ -64,14 +70,15 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst return result -def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, time): +def generate_network_delay_report(path_id, source_sfr, target_sfr, latency, bandwidth, time): """ Generates a platform measurement about the network delay between two specific service routers. :param path_id: the identifier of the path between the two service routers :param source_sfr: the source service router :param target_sfr: the target service router - :param e2e_delay: the e2e network delay for traversing the path between the two service routers + :param latency: the e2e network delay for traversing the path between the two service routers + :param bandwidth: the bandwidth of the path (minimum of bandwidths of the links it is composed of) :param time: the measurement timestamp :return: a list of dict-formatted reports to post on influx """ @@ -83,7 +90,8 @@ def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, ti "target": target_sfr }, "fields": { - "delay": e2e_delay + "latency": latency, + "bandwidth": bandwidth }, "time": _getNSTime(time) }] @@ -91,14 +99,16 @@ def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, ti return result -def generate_service_delay_report(response_time, endpoint, sf_instance, sfr, time): +def generate_service_delay_report(endpoint, sf_instance, sfr, response_time, request_size, response_size, time): """ Generates a service measurement about the media service response time. - :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service component) :param endpoint: endpoint of the media component :param sf_instance: service function instance :param sfr: the service function router that connects the endpoint of the SF instance to the FLAME network + :param response_time: the media service response time (this is not the response time for the whole round-trip, but only for the processing part of the media service component) + :param request_size: the size of the request received by the service in Bytes + :param response_size: the size of the response received by the service in Bytes :param time: the measurement timestamp :return: a list of dict-formatted reports to post on influx """ @@ -111,6 +121,8 @@ def generate_service_delay_report(response_time, endpoint, sf_instance, sfr, tim }, "fields": { "response_time": response_time, + "request_size": request_size, + "response_size": response_size }, "time": _getNSTime(time) }] diff --git a/clmctest/monitoring/test_e2eresults.py b/clmctest/monitoring/test_e2eresults.py index 5486bcb618f645dbdbc69887adf6000c2c423e08..fea3a7769ad53b468bcb8d6a7b5a7784de333d56 100644 --- a/clmctest/monitoring/test_e2eresults.py +++ b/clmctest/monitoring/test_e2eresults.py @@ -25,6 +25,7 @@ import pytest import random import time +import threading class TestE2ESimulation(object): @@ -44,8 +45,12 @@ class TestE2ESimulation(object): random.seed(0) # Seed random function so we can reliably test for average queries print("Starting aggregator...") + event = threading.Event() + e2e_aggregator.set_event_lock(event) e2e_aggregator.start() + event.wait() # wait until the aggregator thread has set the event lock (it has reached its run method and is ready to start) + print("Running simulation, please wait...") e2e_simulator.run() @@ -56,17 +61,18 @@ class TestE2ESimulation(object): print("... stopping aggregator") e2e_aggregator.stop() - @pytest.mark.parametrize("query, expected_result", [ ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', - {"time": "1970-01-01T00:00:00Z", "count_delay": 120}), + {"time": "1970-01-01T00:00:00Z", "count_latency": 120, "count_bandwidth": 120}), ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', - {"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), + {"time": "1970-01-01T00:00:00Z", "count_response_time": 24, "count_request_size": 24, "count_response_size": 24}), ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', - {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 24, "count_delay_reverse": 24, "count_delay_service": 24}), + {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 38, "count_delay_reverse": 38, "count_delay_service": 38, + "count_avg_request_size": 38, "count_avg_response_size": 38, "count_avg_bandwidth": 38}), ('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"', - {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 13.159722222222223, "mean_delay_reverse": 3.256944444444444, "mean_delay_service": 32.791666666666664}), + {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 8.010964912280702, "mean_delay_reverse": 12.881578947368423, "mean_delay_service": 23.42105263157895, + 'mean_avg_request_size': 10485760, 'mean_avg_response_size': 1024, 'mean_avg_bandwidth': 104857600}), ]) def test_simulation(self, influx_db, query, expected_result): """ diff --git a/docs/clmc-service.md b/docs/clmc-service.md new file mode 100644 index 0000000000000000000000000000000000000000..37a41bd8f61b9cf879f3c0c08d572e639363fdc1 --- /dev/null +++ b/docs/clmc-service.md @@ -0,0 +1,234 @@ +<!-- +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 02-05-2018 +// Created for Project : FLAME +--> + +# **Flame CLMC Service Documentation** + +#### **Authors** + +|Authors|Organisation| +|---|---| +|[Nikolay Stanchev](mailto:ns17@it-innovation.soton.ac.uk)|[University of Southampton, IT Innovation Centre](http://www.it-innovation.soton.ac.uk)| + +#### Description + +This document describes the CLMC service and its API endpoints. The CLMC service is implemented in the *Python* framework called **Pyramid**. +It offers different API endpoints to configure and control the aggregator, which is an essential part in the process of measuring the end-to-end performance. +All source code, tests and configuration files of the service can be found in the **src/clmc-webservice** folder. + +#### API Endpoints + +* **GET** ***/aggregator/config*** + + This API method retrieves information about the configuration of the aggregator. + + * Response: + + Returns a JSON-formatted response with the configuration data of the aggregator - *aggregator_report_period*, *aggregator_database_name*, + *aggregator_database_url*. + + * Response Body Example: + + ```json + { + "aggregator_report_period": 5, + "aggregator_database_name": "E2EMetrics", + "aggregator_database_url": "http://172.40.231.51:8086" + } + ``` + +* **PUT** ***/aggregator/config*** + + This API method updates the configuration of the aggregator. + + * Request: + + Expects a JSON-formatted request body with the new configuration of the aggregator. The body should contain only + three key fields - *aggregator_report_period* (positive integer, seconds), *aggregator_database_name* and *aggregator_database_url* (a valid URL). + + * Request Body Example: + + ```json + { + "aggregator_report_period": 25, + "aggregator_database_name": "E2EMetrics", + "aggregator_database_url": "http://172.50.231.61:8086" + } + ``` + + * Response: + + The body of the request is first validated before updating the configuration. If validation is successful, returns + a JSON-formatted response with the new configuration data. Otherwise, an **HTTP Bad Request** response is returned. + + * Response Body Example: + + ```json + { + "aggregator_report_period": 25, + "aggregator_database_name": "E2EMetrics", + "aggregator_database_url": "http://172.50.231.61:8086" + } + ``` + + * Notes: + + If the configuration is updated, while the aggregator is running, it is not automatically restarted. An explicit API call + must be made with a *restart* request to apply the updated configuration. In the case of such PUT request as the one described + above, the response will contain more information indicating that the configuration of the aggregator is in a malformed state. + + * Response Body Example: + + ```json + { + "aggregator_report_period": 125, + "aggregator_database_name": "E2EMetrics", + "aggregator_database_url": "http://172.50.231.61:8086/", + "malformed": true, + "comment": "Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used." + } + ``` + +* **GET** ***/aggregator/control*** + + This API method retrieves information about the status of the aggregator - whether it is running or not. + + * Response: + + Returns a JSON-formatted response with the status data of the aggregator - *aggregator_running* field. If the aggregator + is running in a malformed state, the response will also indicate this with two additional fields - *malformed* and *comment*. + + * Response Body Example: + + ```json + { + "aggregator_running": true + } + ``` + + * Response Body Example - for malformed configuration: + + ```json + { + "aggregator_running": true, + "malformed": true, + "comment": "Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used." + } + ``` + +* **PUT** ***/aggregator/control*** + + This API method updates the status of the aggregator - a user can start, stop or restart it. + + * Request: + + Expects a JSON-formatted request body with the new status of the aggregator. The body should contain only one key + field - *action* (the action to undertake, which can be **start**, **restart** or **stop**) + + * Request Body Example: + + ```json + { + "action": "start" + } + ``` + + * Response: + + The body of the request is first validated before taking any actions. If the action is not one of the listed above, + then the validation will fail. If validation is successful, returns a JSON-formatted response with the new status of + the aggregator. Otherwise, an **HTTP Bad Request** response is returned. + + * Response Body Example: + + ```json + { + "aggregator_running": true + } + ``` + + * Notes: + + * If a **start** action is requested, while the aggregator is running, then the request will be ignored. To restart the + aggregator, a user should use a **restart** action. + + * If a **stop** action is requested, while the aggregator is not running, then the request will be ignored. + + * A request with a **restart** action, while the aggregator is not running, has the same functionality as a request + with a **start** action. + + * The functionality of a request with a **restart** action is the same as the functionlity of a **stop** action + followed by a **start** action. + +#### Installing and running the CLMC service (development mode) + +Before installing the CLMC service and its dependencies, it is recommended to use a python virtual environment. To easily +manage virtual environments, **virtualenvwrapper** can be used. + +``` +pip install virtualenvwrapper +``` + +To create a virtual environment use the **mkvirtualenv** command: + +``` +mkvirtualenv CLMC +``` + +When created, you should already be set to use the new virtual environment, but to make sure of this use the **workon** command: + +``` +workon CLMC +``` + +Now, any installed libraries will be installed relative to this environment only. + +The easiest way to install and use the CLMC service locally is to use **pip**. Navigate to the clmc-webservice folder: +``` +cd src/clmc-webservice +``` + +Test the CLMC service using **tox** along with the ***tox.ini*** configuration file. If tox is not installed run: + +``` +pip install tox +``` + +After it is installed, simply use the **tox** command: + +``` +tox +``` + +Then install the service in development mode. + +``` +pip install -e . +``` + +Finally, start the service on localhost by using pyramid's **pserve**: + +``` +pserve development.ini --reload +``` + +You should now be able to make requests to the CLMC service on http://localhost:8080/aggregator/config and http://localhost:8080/aggregator/control. diff --git a/docs/total-service-request-delay.md b/docs/total-service-request-delay.md index 774c3616b10d852140934c67dc84fce568ca78e4..a27f47665f3f77341193feff8f74f51e89f5bb3b 100644 --- a/docs/total-service-request-delay.md +++ b/docs/total-service-request-delay.md @@ -16,6 +16,7 @@ If we ignore the OSI L6 protocol (e.g. HTTP, FTP, Tsunami) then we are modelling ``` network_delay = latency + (time difference from start of the data to the end of the data) + = latency + data_delay ``` ### Latency @@ -24,10 +25,10 @@ The latency (or propagation delay) of the network path is the time taken for a p latency = distance / speed -For optical fibre (or even an eletric wire), the speed naively would be the speed of light. In fact, the speed is slower than this (in optical fibre this is because of the internal refraction that occurs, which is different for different wavelengths). According to http://www.m2optics.com/blog/bid/70587/Calculating-Optical-Fiber-Latency the delay (1/speed) is approximately 5 microseconds / km +For optical fibre (or even an eletric wire), the speed naively would be the speed of light. In fact, the speed is slower than this (in optical fibre this is because of the internal refraction that occurs, which is different for different wavelengths). According to [m2.optics.com](http://www.m2optics.com/blog/bid/70587/Calculating-Optical-Fiber-Latency) the delay (1/speed) is approximately 5 microseconds / km ``` -if +if distance is in m delay is in s/m latency is in s @@ -48,35 +49,179 @@ if data_size is in Bytes bandwidth is in Mb/s data_delay is in s -then +then data_delay = data_size * 8 / bandwidth * 1E6 ``` -The data_size naively is the size of the data you want to send over the network (call this the "file_size"). However, the data is split into packets and each packet has a header on it so the amount of data going over the network is actually more than the amount sent. +The data_size naively is the size of the data you want to send over the network (call this the "file_size"). However, the data is split into packets and each packet has a header on it so the amount of data going over the network is actually more than the amount sent. The header includes contributions from (at least) the L6 protocol (e.g. HTTP), L4 (e.g. TCP) and L3 (e.g. IP) layers. ``` -let +let packet_size = packet_header_size + packet_payload_size then data_size = (packet_size / packet_payload_size) * file_size or - data_size = (packet_size / packet_size - packet_header_size) * file_size + data_size = [packet_size / (packet_size - packet_header_size)] * file_size + = file_size * packet_size / (packet_size - packet_header_size) ``` -### Total delay +### Measuring and Predicting + +Bringing the above parts together we have: ``` -delay = latency + data_delay - = (distance * 5 / 1E9) + {[(packet_size / packet_size - packet_header_size) * file_size] * 8 / bandwidth * 1E6} +network_delay = latency + data_delay + = (distance * 5 / 1E9) + {[file_size * packet_size / (packet_size - packet_header_size)] * 8 / (bandwidth * 1E6)} + = (distance * 5 / 1E9) + (8 / 1E6) * (file_size / bandwidth) * [packet_size / (packet_size - packet_header_size)] ``` -### Effect of Protocol +i.e. `file_size / bandwidth` with an adjustment to increase the size of the data transmitted because of the packet header and some unit factors. + +We want to be able to measure the `network_delay` and also want to be able to predict what the delay is likely to be for a given deployment. + +Parameter | Known / measured +----------|-------- +latency | measured by network probes +distance | sometimes known +packet_size | known (a property of the network) +packet_header_size | known (at least for L3 and L4) +file_size | measured at the service function +bandwidth | known (a property of the network), can also be measured + +Measuring the actual `latency` can be done in software. For a given `file_size`, the `network_delay` could then be predicted. + +*We are ignoring network congestion and the effect of the protocol (see below).* + +### Effect of protocol -The choice of protocol has a large effect in networks with a high bandwidth-delay product. +The analysis above ignores the network protocol. However, the choice of protocol has a large effect in networks with a high bandwidth-delay product. In data communications, bandwidth-delay product is the product of a data link's capacity (in bits per second) and its round-trip delay time (in seconds). The result, an amount of data measured in bits (or bytes), is equivalent to the maximum amount of data on the network circuit at any given time, i.e., data that has been transmitted but not yet acknowledged. TCP for instance expects acknowledgement of every packet sent and if the sender has not received an acknowledgement within a specified time period then the packet will be retransmitted. Furthermore, TCP uses a flow-control method whereby the receiver specifies how much data it is willing to buffer and the sending host must pause sending and wait for acknowledgement once that amount of data is sent. +### Effect of congestion + +The analysis above considers the best case where the whole bandwidth of the link is available for the data transfer. + ## Service Delay +A particular service function may have several operations (API calls) on it. A model of service function performance needs to consider the resource the service function is deployed upon (and its variability and reliability), the availability of the resource (i.e. whether the service function have the resource to itself), the workload (a statistical distribution of API calls and request sizes) and the speed at which the resource can compute the basic computations invoked by the requests. + +We must simplify sufficiently to make the problem tractable but not too much so that the result is of no practical use. + +To simplify we can: + +* assume that the resource is invariable, 100% available and 100% reliable; +* assume that the distribution of API calls is constant and that the workload can be represented sufficiently by the average request size. + +To be concrete, if a service function has two API calls: `transcode_video(video_data)` and `get_status()` then we would like to model the average response time over "normal" usage which might be 10% of calls to `transcode_video` and 90% of calls to `get_status` and a variety of `video_data` sizes with a defined average size. + +### Measuring + +As an example, the `minio` service reports the average response time over all API calls already so, for that service at least, measuring the `service_delay` is easy. We expect to also be able to measure the average `file_size` which will do as a measure of workload. + +### Predicting + +As noted above, a simple model must consider: + +* the resource the service function is deployed upon (e.g. CPU, memory, disk); +* the workload (an average request size) +* the speed at which the resource can compute the basic computations invoked by the requests (dependent on the service function). + +We can therefore write that: + +``` +service_delay = f(resource, workload, service function characteristics) +``` + +For our simplified workload we could assume that this can be written as: + +``` +service_delay = workload * f(resource, service function characteristics) +``` + +The resource could be described in terms of the number of CPUs, amount of RAM and amount of disk. Even if the resource was a physical machine more detail would be required such as the CPU clock speed, CPU cache sizes, RAM speed, disk speed, etc. In a virtualised environment it is even more complicated as elements of the physical CPU may or may not be exposed to the virtual CPU (which may in fact be emulated). + +Benchmarks are often used to help measure the performance of a resource so that one resource may be compared to another without going into all the detail of the precise artchitecture. Application benchmarks (those executing realistic workloads such as matrix operations or fast fourier transforms) can be more useful than general benchmark scores (such as SPECint or SPECfp). For more information on this, see [Snow White Clouds and the Seven Dwarfs](https://eprints.soton.ac.uk/273157/1/23157.pdf). + +The best benchmark for a service function is the service function itself combined with a representative workload. That is, to predict the performance of a service function on a given resource, it is best to just run it and find out. In the absence of that, the next best would be to execute Dwarf benchmarks on each resource type and correlate them with the service functions, but that is beyond what we can do now. + +We might execute a single benchmark such as the [Livermoor Loops](http://www.netlib.org/benchmark/livermorec) benchmark which stresses a variety of CPU operations. Livermoor Loops provides a single benchmark figure in Megaflops/sec. + +Our service_delay equation would then just reduce to: + +``` +service_delay = workload * f(benchmark, service function characteristics) + = workload * service_function_scaling_factor / benchmark +``` + +The `service_function_scaling_factor` essentially scales the `workload` number into a number of Megaflops. So for a `workload` in bytes the `service_function_scaling_factor` would be representing Megaflops/byte. + +If we don't have a benchmark then the best we can do is approximate the benchmark by the number of CPUs: + +``` +service_delay = workload * f(benchmark, service function characteristics) + = workload * service_function_scaling_factor / cpus +``` + +Is this a simplification too far? It ignores the size of RAM for instance which cannot normally be included as a linear factor (i.e. twice as much RAM does not always give twice the performance). Not having sufficient RAM results in disk swapping or complete failure. Once you have enough for a workload, adding more makes no difference. + +## Conclusion + +The total delay is: + +``` +total_delay = forward_network_delay + service_delay + reverse_network_delay +``` + +To *measure* the `total_delay` we need: + +``` +total_delay = forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay + = forward_latency + + {(8 / 1E6) * (request_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]} + + service_delay + + reverse_latency + + {(8 / 1E6) * (response_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]} +``` + +With: + +* forward_latency / s (measured by network probe) +* reverse_latency / s (measured by network probe) +* request_size / Bytes (measured at service) +* response_size / Bytes (measured at service) +* bandwidth / Mb/s (b = bit) (assumed constant and known or measured) +* packet_size / Bytes (constant and known) +* packet_header_size / Bytes (constant and known) + +This calculation assumes: + +* there is no network congestion, i.e. the whole bandwidth is available +* that the protocol (such as TCP) has no effect (see discussion of flow control above) +* there is no data loss on the network +* that the service delay is proportional to the `request_size`, i.e. that the service is processing the data in the request +* that the service does not start processing until the complete request is received +* that the amount of memory and disk on the compute resource is irrelevant +* that the service delay is inversely proprtional to the number of CPUs (and all CPUs are equal) +* that the compute resource is invariable, 100% available and 100% reliable +* that the distribution of API calls is constant and that the workload can be represented sufficiently by the average request size + +To *predict* the `total_delay` we need: + +``` +total_delay = forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay + = forward_latency + + {(8 / 1E6) * (request_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]} + + request_size * service_function_scaling_factor / cpus + + reverse_latency + + {(8 / 1E6) * (response_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]} +``` + +With: + +* service_function_scaling_factor / Mflops/Byte (known, somehow) +* cpus (unitless) (known) + +As discussed above, you could also predict the latency if you know the length of a link but this seems a bit too theoretical. diff --git a/setup.py b/setup.py index 93a649e3c05aa1067bc48493036d8ccaffb4c0c4..3c77424ab152637ae34d4220bbf181a596f9f536 100644 --- a/setup.py +++ b/setup.py @@ -21,21 +21,18 @@ // Created for Project : FLAME """ +<<<<<<< HEAD +======= + +import os +import os.path +>>>>>>> 478adc7ea66ad80c634766768a19a1fa205d82ea from setuptools import setup, find_packages -requires = [ - 'plaster_pastedeploy', - 'pyramid', - 'pyramid_debugtoolbar', - 'waitress', - 'influxdb', -] -tests_require = [ - 'WebTest >= 1.3.1', # py3 compat - 'pytest', - 'pytest-cov', -] +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() + def get_version(fname): if os.path.isfile(fname): @@ -45,30 +42,23 @@ def get_version(fname): return git_revision + setup( - name = "CLMCservice", - version = "SNAPSHOT", + name = "clmc", + version = get_version("clmctest/_version.py"), author = "Michael Boniface", author_email = "mjb@it-innovation.soton.ac.uk", - description = "FLAME CLMC Testing Module", - long_description="long description", - license = "license", - keywords = "FLAME CLMC service test", + description = "FLAME CLMC Test Module", + license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE", + keywords = "FLAME CLMC", + url='https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc', packages=find_packages(exclude=["services"]), include_package_data=True, - install_requires=requires, - extras_require={ - 'testing': tests_require, - }, - package_data={'': ['git-commit-ref', '*.yml', '*.sh', '*.json', '*.conf']}, + package_data={'': ['_version.py', '*.yml', '*.sh', '*.json', '*.conf']}, + long_description="FLAME CLMC", classifiers=[ "Development Status :: Alpha", "Topic :: FLAME Tests", "License :: ", ], - entry_points={ - 'paste.app_factory': [ - 'main = CLMCservice:main', - ], - }, -) \ No newline at end of file +) diff --git a/src/clmc-webservice/.coveragerc b/src/clmc-webservice/.coveragerc new file mode 100644 index 0000000000000000000000000000000000000000..a3edc11a103c310fff7b5b2dc3ef5937f1dcfa0d --- /dev/null +++ b/src/clmc-webservice/.coveragerc @@ -0,0 +1,3 @@ +[run] +source = clmcservice +omit = clmcservice/tests.py diff --git a/src/clmc-webservice/MANIFEST.in b/src/clmc-webservice/MANIFEST.in new file mode 100644 index 0000000000000000000000000000000000000000..eaf16db0e28d54b95ae8f085c1d4c3cfa1b8278d --- /dev/null +++ b/src/clmc-webservice/MANIFEST.in @@ -0,0 +1,2 @@ +include MANIFEST.in +recursive-include clmcservice \ No newline at end of file diff --git a/src/clmc-webservice/clmcservice/__init__.py b/src/clmc-webservice/clmcservice/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bd560622e940ec78c2e6b7f5daaa340c325170e1 --- /dev/null +++ b/src/clmc-webservice/clmcservice/__init__.py @@ -0,0 +1,58 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +from pyramid.config import Configurator +from pyramid.settings import asbool + +from clmcservice.utilities import RUNNING_FLAG, MALFORMED_FLAG + + +def main(global_config, **settings): + """ + This function returns a Pyramid WSGI application. + """ + + # a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings + aggregator_running = asbool(settings.get(RUNNING_FLAG, False)) + settings[RUNNING_FLAG] = asbool(aggregator_running) + + aggregator_report_period = int(settings.get('aggregator_report_period', 5)) + settings['aggregator_report_period'] = aggregator_report_period + + settings[MALFORMED_FLAG] = False + + config = Configurator(settings=settings) + + config.add_route('aggregator_config', '/aggregator/config') + config.add_view('clmcservice.views.AggregatorConfig', attr='get', request_method='GET') + config.add_view('clmcservice.views.AggregatorConfig', attr='put', request_method='PUT') + + config.add_route('aggregator_controller', '/aggregator/control') + config.add_view('clmcservice.views.AggregatorController', attr='get', request_method='GET') + config.add_view('clmcservice.views.AggregatorController', attr='put', request_method='PUT') + + config.add_route('round_trip_time_query', '/query/round-trip-time') + config.add_view('clmcservice.views.RoundTripTimeQuery', attr='get', request_method='GET') + + config.scan() + return config.make_wsgi_app() diff --git a/src/clmc-webservice/clmcservice/aggregator.py b/src/clmc-webservice/clmcservice/aggregator.py new file mode 100644 index 0000000000000000000000000000000000000000..f6dd4073374e2fed4bf4f22393d9776e76f00351 --- /dev/null +++ b/src/clmc-webservice/clmcservice/aggregator.py @@ -0,0 +1,204 @@ +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Nikolay Stanchev +## Created Date : 25-04-2018 +## Created for Project : FLAME +""" + +from influxdb import InfluxDBClient +from time import time, sleep +from urllib.parse import urlparse +from clmcservice.utilities import generate_e2e_delay_report +import getopt +import sys + + +class Aggregator(object): + """ + A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process. + """ + + REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated + DATABASE = 'E2EMetrics' # default database the aggregator uses + DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses + RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx + + def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): + """ + Constructs an Aggregator instance. + + :param database_name: database name to use + :param database_url: database url to use + :param report_period: the report period in seconds + """ + + # initialise a database client using the database url and the database name + url_object = urlparse(database_url) + while True: + try: + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) + break + except: + sleep(self.RETRY_PERIOD) + + self.db_url = database_url + self.db_name = database_name + self.report_period = report_period + + # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values + self.network_cache = {} + self.service_cache = {} + + def run(self): + """ + Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. + """ + + current_time = int(time()) + while True: + + boundary_time = current_time - self.report_period + + boundary_time_nano = boundary_time * 1000000000 + current_time_nano = current_time * 1000000000 + + # query the network delays and group them by path ID + network_delays = {} + + while True: + try: + result = self.db_client.query( + 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( + boundary_time_nano, current_time_nano)) + break + except: + sleep(self.RETRY_PERIOD) + + for item in result.items(): + metadata, result_points = item + # measurement = metadata[0] + tags = metadata[1] + + result = next(result_points) + network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] + self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] + + # query the service delays and group them by endpoint, service function instance and sfr + service_delays = {} + + while True: + try: + result = self.db_client.query( + 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format( + boundary_time_nano, current_time_nano)) + break + except: + sleep(self.RETRY_PERIOD) + + for item in result.items(): + metadata, result_points = item + # measurement = metadata[0] + tags = metadata[1] + result = next(result_points) + service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) + self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) + + # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement + for path in network_delays: + # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr + path_id, source, target = path + if target not in service_delays and target not in self.service_cache: + # if not continue with the other network path reports + continue + + e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, + "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time} + + e2e_arguments['path_ID'] = path_id + e2e_arguments['source_SFR'] = source + e2e_arguments['target_SFR'] = target + e2e_arguments['delay_forward'] = network_delays[path][0] + e2e_arguments['avg_bandwidth'] = network_delays[path][1] + + # reverse the path ID to get the network delay for the reversed path + reversed_path = (path_id, target, source) + if reversed_path in network_delays or reversed_path in self.network_cache: + # get the reverse delay, use the latest value if reported or the cache value + e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0] + else: + e2e_arguments['delay_reverse'] = None + + # get the response time of the media component connected to the target SFR + service_delay = service_delays.get(target, self.service_cache.get(target)) + response_time, request_size, response_size, endpoint, sf_instance = service_delay + # put these points in the e2e arguments dictionary + e2e_arguments['delay_service'] = response_time + e2e_arguments['avg_request_size'] = request_size + e2e_arguments['avg_response_size'] = response_size + e2e_arguments['endpoint'] = endpoint + e2e_arguments['sf_instance'] = sf_instance + + # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row + if None not in e2e_arguments.values(): + + while True: + try: + self.db_client.write_points( + generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], + e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], + e2e_arguments['delay_service'], + e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], + e2e_arguments['time'])) + break + except: + sleep(self.RETRY_PERIOD) + + old_timestamp = current_time + # wait until {report_period) seconds have passed + while current_time < old_timestamp + self.report_period: + sleep(1) + current_time = int(time()) + + +if __name__ == '__main__': + + # Parse command line options + try: + opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url=']) + + arg_period = Aggregator.REPORT_PERIOD + arg_database_name = Aggregator.DATABASE + arg_database_url = Aggregator.DATABASE_URL + + # Apply parameters if given + for opt, arg in opts: + if opt in ('-p', '--period'): + arg_period = int(arg) + elif opt in ('-d', '--database'): + arg_database_name = arg + elif opt in ('-u', '--url'): + arg_database_url = arg + + Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run() + + # print the error messages in case of a parse error + except getopt.GetoptError as err: + print(err) + print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>') diff --git a/src/clmc-webservice/clmcservice/tests.py b/src/clmc-webservice/clmcservice/tests.py new file mode 100644 index 0000000000000000000000000000000000000000..eee634b7d0e1f65a443e8b4f7d74222e9be3905e --- /dev/null +++ b/src/clmc-webservice/clmcservice/tests.py @@ -0,0 +1,419 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +import pytest +from pyramid import testing +from pyramid.httpexceptions import HTTPBadRequest +from time import sleep +from clmcservice.utilities import CONFIG_ATTRIBUTES, PROCESS_ATTRIBUTE, RUNNING_FLAG, MALFORMED_FLAG, URL_REGEX +import os +import signal + + +class TestAggregatorAPI(object): + """ + A pytest-implementation test for the aggregator API calls + """ + + @pytest.fixture(autouse=True) + def app_config(self): + """ + A fixture to implement setUp/tearDown functionality for all tests by initializing configuration structure for the web service + """ + + self.config = testing.setUp() + self.config.add_settings({'aggregator_running': False, 'malformed': False, 'aggregator_report_period': 5, + 'aggregator_database_name': 'E2EMetrics', 'aggregator_database_url': "http://172.40.231.51:8086"}) + + yield + + testing.tearDown() + + def test_GET_config(self): + """ + Tests the GET method for the configuration of the aggregator. + """ + + from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself + + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" + + request = testing.DummyRequest() + response = AggregatorConfig(request).get() + + assert response == {'aggregator_report_period': 5, + 'aggregator_database_name': 'E2EMetrics', + 'aggregator_database_url': "http://172.40.231.51:8086"}, "Response must be a dictionary representing a JSON object with the correct configuration data of the aggregator." + + assert self.config.get_settings().get('aggregator_report_period') == 5, "A GET request must not modify the aggregator configuration data." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "A GET request must not modify the aggregator configuration data." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data." + + @pytest.mark.parametrize("input_body, output_value", [ + ('{"aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://171.40.231.51:8086"}', + {'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}), + ('{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}', + {'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}), + ('{"aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + {'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + ('{"aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}', + {'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}), + ('{"aggregator_report_period": 200, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "https://172.50.231.51:8086"}', + {'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "https://172.50.231.51:8086"}), + ('{"aggregator_report_period": 150, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "https://localhost:8086"}', + {'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "https://localhost:8086"}), + ("{aggregator_report_period: 2hb5, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None), + ("{aggregator_report_period: 250-, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:8086}", None), + ("{aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: ftp://172.60.231.51:8086}", None), + ("{aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086/query param}", None), + ("{aggregator_report_period: 250, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:808686}", None), + ("{}", None), + ("{aggregator_running: true}", None), + ]) + def test_PUT_config(self, input_body, output_value): + """ + Tests the PUT method for the configuration of the aggregator + :param input_body: the input body parameter + :param output_value: the expected output value, None for expecting an Exception + """ + + from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" + + request = testing.DummyRequest() + request.body = input_body.encode(request.charset) + + if output_value is not None: + response = AggregatorConfig(request).put() + assert response == output_value, "Response of PUT request must include the new configuration of the aggregator" + + for attribute in CONFIG_ATTRIBUTES: + assert self.config.get_settings().get(attribute) == output_value.get(attribute), "Aggregator settings configuration is not updated." + + assert not self.config.get_settings().get(RUNNING_FLAG), "Aggregator running status should not be updated after a configuration update." + else: + error_raised = False + try: + AggregatorConfig(request).put() + except HTTPBadRequest: + error_raised = True + + assert error_raised, "Error must be raised in case of an invalid argument." + + def test_start(self): + """ + Tests starting the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been started." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been started." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." + + # kill the started process after the test is over + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid + os.kill(pid, signal.SIGTERM) + + def test_stop(self): + """ + Tests stopping the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + # send a start request to trigger the aggregator + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + AggregatorController(request).put() + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." + + # test stopping the aggregator process when it is running + request = testing.DummyRequest() + input_body = '{"action": "stop"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." + + sleep(2) # put a 2 seconds timeout so that the aggregator process can terminate + + # test stopping the aggregator process when it is not running + request = testing.DummyRequest() + input_body = '{"action": "stop"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." + + def test_restart(self): + """ + Tests restarting the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + # test restarting the aggregator process when it is stopped + request = testing.DummyRequest() + input_body = '{"action": "restart"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." + + # test restarting the aggregator process when it is running + request = testing.DummyRequest() + input_body = '{"action": "restart"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." + + # kill the started process after the test is over + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid + os.kill(pid, signal.SIGTERM) + + @pytest.mark.parametrize("input_body", [ + '{"action": "malformed"}', + '{"action": true}', + '{"action": false}', + '{"action": 1}', + '{invalid-json}', + '{"action": "start", "unneeded_argument": false}', + '{}' + ]) + def test_malformed_actions(self, input_body): + """ + Tests sending a malformed type of action to the aggregator through an API call. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + # test restarting the aggregator process when it is running + request = testing.DummyRequest() + input_body = input_body + request.body = input_body.encode(request.charset) + + error_raised = False + try: + AggregatorController(request).put() + except HTTPBadRequest: + error_raised = True + + assert error_raised + + def test_GET_status(self): + """ + Tests the GET method for the status of the aggregator. + """ + + from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + + request = testing.DummyRequest() + response = AggregatorController(request).get() + + assert response == {'aggregator_running': False}, "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." + + assert not self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." + + # test status with malformed configuration + self.config.get_settings()[MALFORMED_FLAG] = True + self.config.get_settings()[RUNNING_FLAG] = True + request = testing.DummyRequest() + response = AggregatorController(request).get() + + assert response == {'aggregator_running': True, + 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'}, \ + "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." + + assert self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert self.config.get_settings().get(MALFORMED_FLAG), "A GET request must not modify the aggregator malformed flag." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." + + def test_malformed_flag_behaviour(self): + """ + Tests the behaviour of the malformed configuration flag of the aggregator when doing a sequence of API calls. + """ + + from clmcservice.views import AggregatorController, AggregatorConfig # nested import so that importing the class view is part of the test itself + + assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not self.config.get_settings().get(MALFORMED_FLAG), "Initially aggregator is not in a malformed state" + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." + assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." + assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics." + assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" + + # start the aggregator with the default configuration + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been started." + + # update the configuration of the aggregator while it is running + config_body = '{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}' + output_body = {'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086", 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'} + request = testing.DummyRequest() + request.body = config_body.encode(request.charset) + response = AggregatorConfig(request).put() + assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" + + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." + + # check that the malformed flag has been updated through a GET call + request = testing.DummyRequest() + response = AggregatorController(request).get() + assert response == {'aggregator_running': True, + 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'}, \ + "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." + + # restart the aggregator with the new configuration + request = testing.DummyRequest() + input_body = '{"action": "restart"}' + request.body = input_body.encode(request.charset) + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator should have been restarted." + + # update the configuration again while the aggregator is running + config_body = '{"aggregator_report_period": 30, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}' + output_body = {'aggregator_report_period': 30, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086", 'malformed': True, + 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'} + request = testing.DummyRequest() + request.body = config_body.encode(request.charset) + response = AggregatorConfig(request).put() + assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" + + assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." + + # stop the aggregator - this should also reset the malformed status flag + # restart the aggregator with the new configuration + request = testing.DummyRequest() + input_body = '{"action": "stop"}' + request.body = input_body.encode(request.charset) + response = AggregatorController(request).put() + assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." + assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "The aggregator should have been stopped." + + +class TestRegexURL(object): + """ + A pytest-implementation test for the regular expression the service uses to validate the database URL + """ + + @pytest.mark.parametrize("valid_url", [ + "http://localhost:8080/", + "https://localhost:80/url/path", + "https://192.168.20.20/?query=param", + "http://custom.domain.com", + "http://domain.net:8888/", + "https://10.160.150.4:21", + "http://localhost:12345", + "http://domain.com:21/path", + "http://domain.com:32?path", + "http://domain.com:43#path" + ]) + def test_valid_urls(self, valid_url): + """ + Tests that the regular expression can detect valid URLs. + + :param valid_url: a string representing a valid URL + """ + + matched_object = URL_REGEX.match(valid_url) + + assert matched_object is not None, "The regular expression fails in validating a correct URL." + + assert matched_object.group() is not None, "The matched object should return the full-match string" + + @pytest.mark.parametrize("invalid_url", [ + "ftp://localhost:80/url/path", + "tcp://192.168.20.20/?query=param", + "http:/localhost:80/", + "https//localhost:8080/", + "https://domain:1234/url/path", + "http://domain.com:808080/", + "http://localhost:8-080/", + "http://localhost:port80/", + "http://domain.com:8080url/path", + "http://domain.com:8080/?url path", + ]) + def test_invalid_urls(self, invalid_url): + """ + Tests that the regular expression can detect invalid URLs. + + :param invalid_url: a string representing an invalid URL + """ + + matched_object = URL_REGEX.match(invalid_url) + + assert matched_object is None, "The regular expression fails in detecting an invalid URL." diff --git a/src/clmc-webservice/clmcservice/utilities.py b/src/clmc-webservice/clmcservice/utilities.py new file mode 100644 index 0000000000000000000000000000000000000000..44ccffed7ce4120b22c31b55a7b81cde33344f9b --- /dev/null +++ b/src/clmc-webservice/clmcservice/utilities.py @@ -0,0 +1,162 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +from json import loads +from re import compile, IGNORECASE + +CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url') # all of the configuration attributes - to be used as dictionary keys + +RUNNING_FLAG = 'aggregator_running' # Attribute for storing the flag, which shows whether the aggregator is running or not - to be used as a dictionary key + +PROCESS_ATTRIBUTE = 'aggregator_process' # Attribute for storing the process object of the aggregator - to be used as a dictionary key + +# a 'malformed' running state of the aggregator is when the configuration is updated, but the aggregator is not restarted so it is running with an old version of the conf. +MALFORMED_FLAG = 'malformed' # Attribute for storing the flag, which shows whether the aggregator is running in an malformed state or not - to be used as a dictionary key + +# used to indicate a malformed configuration message +COMMENT_ATTRIBUTE = 'comment' +COMMENT_VALUE = 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.' + +# the attributes of the JSON response body that are expected when querying round trip time +ROUND_TRIP_ATTRIBUTES = ('media_service', 'start_timestamp', 'end_timestamp') + + +URL_REGEX = compile( + r'^https?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain, e.g. example.domain.com + r'localhost|' # or localhost... + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # or IP address (IPv4 format) + r'(?::\d{2,5})?' # optional port number + r'(?:[/?#][^\s]*)?$', # URL path or query parameters + IGNORECASE) + + +def validate_config_content(configuration): + """ + A utility function to validate a configuration string representing a JSON dictionary. + + :param configuration: the configuration string to validate + :return the validated configuration dictionary object with the values converted to their required type + :raise AssertionError: if the argument is not a valid configuration + """ + + global CONFIG_ATTRIBUTES + + try: + configuration = loads(configuration) + except: + raise AssertionError("Configuration must be a JSON object.") + + assert len(configuration) == len(CONFIG_ATTRIBUTES), "Configuration mustn't contain more attributes than the required ones." + + for attribute in CONFIG_ATTRIBUTES: + assert attribute in configuration, "Required attribute not found in the request content." + + assert type(configuration.get('aggregator_report_period')) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period')) + + assert configuration.get('aggregator_report_period') > 0, "Report period must be a positive integer, received {0} instead.".format(configuration.get('aggregator_report_period')) + + assert URL_REGEX.match(configuration.get('aggregator_database_url')) is not None, "The aggregator must have a valid database URL in its configuration, received {0} instead.".format(configuration.get('aggregator_database_url')) + + return configuration + + +def validate_action_content(content): + """ + A utility function to validate a content string representing a JSON dictionary. + + :param content: the content string to validate + :return: the validated content dictionary + :raise AssertionError: if the argument is not a valid json content + """ + + try: + content = loads(content) + except: + raise AssertionError("Content must be a JSON object.") + + assert len(content) == 1, "Content mustn't contain more attributes than the required one." + + assert content['action'] in ('start', 'stop', 'restart') + + return content + + +def validate_round_trip_query_params(params): + """ + A utility function to validate a dictionary of parameters. + + :param params: the params dict to validate + :return: the validated parameters dictionary + :raise AssertionError: if the argument is not a valid json content + """ + + global ROUND_TRIP_ATTRIBUTES + + assert len(params) == len(ROUND_TRIP_ATTRIBUTES), "Content mustn't contain more attributes than the required ones." + + for attribute in ROUND_TRIP_ATTRIBUTES: + assert attribute in params, "Required attribute not found in the request content." + + return params + + +def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time): + """ + Generates a combined averaged measurement about the e2e delay and its contributing parts + + :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_sfr: source service router + :param target_sfr: target service router + :param endpoint: endpoint of the media component + :param sf_instance: service function instance (media component) + :param delay_forward: Path delay (Forward direction) + :param delay_reverse: Path delay (Reverse direction) + :param delay_service: the media service component response time + :param avg_request_size: averaged request size + :param avg_response_size: averaged response size + :param avg_bandwidth: averaged bandwidth + :param time: measurement timestamp + :return: a list of dict-formatted reports to post on influx + """ + + result = [{"measurement": "e2e_delays", + "tags": { + "path_ID": path_id, + "source_SFR": source_sfr, + "target_SFR": target_sfr, + "endpoint": endpoint, + "sf_instance": sf_instance + }, + "fields": { + "delay_forward": float(delay_forward), + "delay_reverse": float(delay_reverse), + "delay_service": float(delay_service), + "avg_request_size": float(avg_request_size), + "avg_response_size": float(avg_response_size), + "avg_bandwidth": float(avg_bandwidth) + }, + "time": int(1000000000*time) + }] + + return result diff --git a/src/clmc-webservice/clmcservice/views.py b/src/clmc-webservice/clmcservice/views.py new file mode 100644 index 0000000000000000000000000000000000000000..049be9daacb187ea3ecfb837f9c86a75a3660804 --- /dev/null +++ b/src/clmc-webservice/clmcservice/views.py @@ -0,0 +1,279 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + +from pyramid.view import view_defaults +from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError +from influxdb import InfluxDBClient +from urllib.parse import urlparse +from subprocess import Popen, DEVNULL +from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \ + CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE +import os.path + + +@view_defaults(route_name='aggregator_config', renderer='json') +class AggregatorConfig(object): + """ + A class-based view for accessing and mutating the configuration of the aggregator. + """ + + def __init__(self, request): + """ + Initialises the instance of the view with the request argument. + + :param request: client's call request + """ + + self.request = request + + def get(self): + """ + A GET API call for the configuration of the aggregator. + + :return: A JSON response with the configuration of the aggregator. + """ + + aggregator_data = self.request.registry.settings + config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES} + + return config + + def put(self): + """ + A PUT API call for the status of the aggregator. + + :return: A JSON response to the PUT call - essentially with the new configured data and comment of the state of the aggregator + :raises HTTPBadRequest: if request body is not a valid JSON for the configurator + """ + + old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} + new_config = self.request.body.decode(self.request.charset) + + try: + new_config = validate_config_content(new_config) + + for attribute in CONFIG_ATTRIBUTES: + self.request.registry.settings[attribute] = new_config.get(attribute) + + # if configuration is not already malformed, check whether the configuration is updated + if not self.request.registry.settings[MALFORMED_FLAG]: + malformed = old_config != new_config and self.request.registry.settings[RUNNING_FLAG] + self.request.registry.settings[MALFORMED_FLAG] = malformed + if malformed: + new_config[MALFORMED_FLAG] = True + new_config[COMMENT_ATTRIBUTE] = COMMENT_VALUE + + return new_config + + except AssertionError: + raise HTTPBadRequest("Bad request content - configuration format is incorrect.") + + +@view_defaults(route_name='aggregator_controller', renderer='json') +class AggregatorController(object): + + """ + A class-based view for controlling the aggregator. + """ + + def __init__(self, request): + """ + Initialises the instance of the view with the request argument. + + :param request: client's call request + """ + + self.request = request + + def get(self): + """ + A GET API call for the status of the aggregator - running or not. + + :return: A JSON response with the status of the aggregator. + """ + + aggregator_data = self.request.registry.settings + config = {RUNNING_FLAG: aggregator_data.get(RUNNING_FLAG)} + + if aggregator_data[MALFORMED_FLAG] and aggregator_data[RUNNING_FLAG]: + config[MALFORMED_FLAG] = True + config[COMMENT_ATTRIBUTE] = COMMENT_VALUE + + return config + + def put(self): + """ + A PUT API call for the status of the aggregator. + + :return: A JSON response to the PUT call - essentially saying whether the aggregator is running or not + :raises HTTPBadRequest: if request body is not a valid JSON for the controller + """ + + content = self.request.body.decode(self.request.charset) + + try: + content = validate_action_content(content) + + config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES} + + action = content['action'] + + if action == 'start': + aggregator_started = self.request.registry.settings[RUNNING_FLAG] + if not aggregator_started: + process = self.start_aggregator(config) + self.request.registry.settings[RUNNING_FLAG] = True + self.request.registry.settings[PROCESS_ATTRIBUTE] = process + elif action == 'stop': + self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) + self.request.registry.settings[RUNNING_FLAG] = False + self.request.registry.settings[PROCESS_ATTRIBUTE] = None + self.request.registry.settings[MALFORMED_FLAG] = False + elif action == 'restart': + self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) + process = self.start_aggregator(config) + self.request.registry.settings[RUNNING_FLAG] = True + self.request.registry.settings[PROCESS_ATTRIBUTE] = process + self.request.registry.settings[MALFORMED_FLAG] = False + + return {RUNNING_FLAG: self.request.registry.settings.get(RUNNING_FLAG)} + + except AssertionError: + raise HTTPBadRequest('Bad request content - must be in JSON format: {"action": value}, where value is "start", "stop" or "restart".') + + @staticmethod + def start_aggregator(config): + """ + An auxiliary method to start the aggregator. + + :param config: the configuration containing the arguments for the aggregator + :return: the process object of the started aggregator script + """ + + dir_path = os.path.dirname(os.path.realpath(__file__)) + command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', + config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] + process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL) + print("\nStarted aggregator process with PID: {0}\n".format(process.pid)) + + return process + + @staticmethod + def stop_aggregator(process): + """ + An auxiliary method to stop the aggregator. + + :param process: the process to terminate + """ + + # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value + if process is not None and process.poll() is None: + process.terminate() + print("\nStopped aggregator process with PID: {0}\n".format(process.pid)) + + +@view_defaults(route_name='round_trip_time_query', renderer='json') +class RoundTripTimeQuery(object): + + """ + A class-based view for querying the round trip time in a given range. + """ + + def __init__(self, request): + """ + Initialises the instance of the view with the request argument. + + :param request: client's call request + """ + + self.request = request + + def get(self): + """ + A GET API call for the averaged round trip time of a specific media service over a given time range. + + :return: A JSON response with the round trip time and its contributing parts. + """ + + params = {} + for attribute in ROUND_TRIP_ATTRIBUTES: + if attribute in self.request.params: + params[attribute] = self.request.params.get(attribute) + + try: + params = validate_round_trip_query_params(params) + config_data = {config_attribute: self.request.registry.settings.get(config_attribute) for config_attribute in CONFIG_ATTRIBUTES} + + media_service = params.get(ROUND_TRIP_ATTRIBUTES[0]) + start_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[1]) + end_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[2]) + influx_db_name = config_data.get(CONFIG_ATTRIBUTES[1]) + influx_db_url = config_data.get(CONFIG_ATTRIBUTES[2]) + + url_object = urlparse(influx_db_url) + try: + db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=influx_db_name, timeout=10) + query = 'SELECT mean(*) FROM "{0}"."autogen"."e2e_delays" WHERE time >= {1} and time < {2} and sf_instance = \'{3}\''.format( + influx_db_name, start_timestamp, end_timestamp, media_service) + print(query) + result = db_client.query(query) + + actual_result = next(result.get_points(), None) + if actual_result is None: + return {"result": None} + else: + forward_latency = actual_result.get("mean_delay_forward") + reverse_latency = actual_result.get("mean_delay_reverse") + service_delay = actual_result.get("mean_delay_service") + request_size = actual_result.get("mean_avg_request_size") + response_size = actual_result.get("mean_avg_response_size") + bandwidth = actual_result.get("mean_avg_bandwidth") + + rtt = self.calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth) + return {"result": rtt} + except: + raise HTTPInternalServerError("Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url)) + + except AssertionError: + raise HTTPBadRequest('Bad request content - must be in JSON format: {"media_service": value, "start_timestamp": value, "end_timestamp": value}.') + + @staticmethod + def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50): + """ + Calculates the round trip time given the list of arguments. + + :param forward_latency: network latency in forward direction (s) + :param reverse_latency: network latency in reverse direction (s) + :param service_delay: media service delay (s) + :param request_size: request size (bytes) + :param response_size: response size (bytes) + :param bandwidth: network bandwidth (Mb/s) + :param packet_size: size of packet (bytes) + :param packet_header_size: size of the header of the packet (bytes) + :return: the calculated round trip time + """ + + forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size)) + reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size)) + + return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay diff --git a/development.ini b/src/clmc-webservice/development.ini similarity index 83% rename from development.ini rename to src/clmc-webservice/development.ini index 0d37eed41f6e4abf6db45b22dc6d186af0d77355..9ec4dc522937ff2c1606dd19287c5c2f23ce5598 100644 --- a/development.ini +++ b/src/clmc-webservice/development.ini @@ -4,7 +4,7 @@ ### [app:main] -use = egg:CLMCservice +use = egg:clmcservice pyramid.reload_templates = true pyramid.debug_authorization = false @@ -13,6 +13,9 @@ pyramid.debug_routematch = false pyramid.default_locale_name = en pyramid.includes = pyramid_debugtoolbar aggregator_running = false +aggregator_report_period = 5 +aggregator_database_name = E2EMetrics +aggregator_database_url = http://172.40.231.51:8086 # By default, the toolbar only appears for clients from IP addresses # '127.0.0.1' and '::1'. @@ -32,7 +35,7 @@ listen = localhost:8080 ### [loggers] -keys = root, CLMCservice +keys = root, clmcservice [handlers] keys = console @@ -44,10 +47,10 @@ keys = generic level = INFO handlers = console -[logger_CLMCservice] +[logger_clmcservice] level = DEBUG handlers = -qualname = CLMCservice +qualname = clmcservice [handler_console] class = StreamHandler diff --git a/production.ini b/src/clmc-webservice/production.ini similarity index 80% rename from production.ini rename to src/clmc-webservice/production.ini index d127f3a092030526e856386b5498c6a23bfa5ade..7f5e32331a58c686de4361e6d12a011e56a340b1 100644 --- a/production.ini +++ b/src/clmc-webservice/production.ini @@ -4,7 +4,7 @@ ### [app:main] -use = egg:CLMCservice +use = egg:clmcservice pyramid.reload_templates = false pyramid.debug_authorization = false @@ -12,6 +12,9 @@ pyramid.debug_notfound = false pyramid.debug_routematch = false pyramid.default_locale_name = en aggregator_running = false +aggregator_report_period = 5 +aggregator_database_name = E2EMetrics +aggregator_database_url = http://172.40.231.51:8086 ### # wsgi server configuration @@ -27,7 +30,7 @@ listen = *:8080 ### [loggers] -keys = root, CLMCservice +keys = root, clmcservice [handlers] keys = console @@ -39,10 +42,10 @@ keys = generic level = WARN handlers = console -[logger_CLMCservice] +[logger_clmcservice] level = WARN handlers = -qualname = CLMCservice +qualname = clmcservice [handler_console] class = StreamHandler diff --git a/pytest.ini b/src/clmc-webservice/pytest.ini similarity index 54% rename from pytest.ini rename to src/clmc-webservice/pytest.ini index d30f667ec5a99c0f58df21f2c653592982fa731c..2fb94a6c8131efcc6dcc9357a5b56f68af607de3 100644 --- a/pytest.ini +++ b/src/clmc-webservice/pytest.ini @@ -1,3 +1,3 @@ [pytest] -testpaths = CLMCservice +testpaths = clmcservice python_files = *.py diff --git a/src/clmc-webservice/setup.py b/src/clmc-webservice/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..817d1bdbebd1b4d58b61144a3edf692832d2ea60 --- /dev/null +++ b/src/clmc-webservice/setup.py @@ -0,0 +1,84 @@ +""" +// © University of Southampton IT Innovation Centre, 2018 +// +// Copyright in this software belongs to University of Southampton +// IT Innovation Centre of Gamma House, Enterprise Road, +// Chilworth Science Park, Southampton, SO16 7NS, UK. +// +// This software may not be used, sold, licensed, transferred, copied +// or reproduced in whole or in part in any manner or form or in or +// on any media by any person other than in accordance with the terms +// of the Licence Agreement supplied with the software, or otherwise +// without the prior written consent of the copyright owners. +// +// This software is distributed WITHOUT ANY WARRANTY, without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE, except where stated in the Licence Agreement supplied with +// the software. +// +// Created By : Nikolay Stanchev +// Created Date : 15-05-2018 +// Created for Project : FLAME +""" + + +import os +import os.path +from setuptools import setup, find_packages + + +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() + + +def get_version(fname): + if os.path.isfile(fname): + git_revision = read(fname) + else: + git_revision = "SNAPSHOT" + + return git_revision + + +requires = [ + 'plaster_pastedeploy', + 'pyramid', + 'pyramid_debugtoolbar', + 'waitress', + 'influxdb', + 'pytest', +] + +tests_require = [ + 'WebTest >= 1.3.1', # py3 compat + 'pytest-cov', +] + +setup( + name = "clmcservice", + version = get_version("_version.py"), + author = "Michael Boniface", + author_email = "mjb@it-innovation.soton.ac.uk", + description = "FLAME CLMC Service Module", + long_description="FLAME CLMC Service", + license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE", + keywords = "FLAME CLMC service", + url = 'https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc', + packages=find_packages(), + include_package_data=True, + install_requires=requires, + extras_require={ + 'testing': tests_require, + }, + package_data={'': ['_version.py']}, + classifiers=[ + "Development Status :: Alpha", + "Topic :: FLAME CLMC Service", + "License :: ", + ], + entry_points={ + 'paste.app_factory': [ + 'main = clmcservice:main', + ], + }, +) \ No newline at end of file diff --git a/src/clmc-webservice/tox.ini b/src/clmc-webservice/tox.ini new file mode 100644 index 0000000000000000000000000000000000000000..b7fe1eac855e39b717787d62b2a9c2144d715b7e --- /dev/null +++ b/src/clmc-webservice/tox.ini @@ -0,0 +1,5 @@ +[tox] +envlist = py36 +[testenv] +deps=pytest +commands=pytest \ No newline at end of file