diff --git a/clmctest/monitoring/E2EAggregator.py b/clmctest/monitoring/E2EAggregator.py deleted file mode 100644 index 0d681b71b994778a57ab73edd991003c969d8329..0000000000000000000000000000000000000000 --- a/clmctest/monitoring/E2EAggregator.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/python3 -""" -## © University of Southampton IT Innovation Centre, 2018 -## -## Copyright in this software belongs to University of Southampton -## IT Innovation Centre of Gamma House, Enterprise Road, -## Chilworth Science Park, Southampton, SO16 7NS, UK. -## -## This software may not be used, sold, licensed, transferred, copied -## or reproduced in whole or in part in any manner or form or in or -## on any media by any person other than in accordance with the terms -## of the Licence Agreement supplied with the software, or otherwise -## without the prior written consent of the copyright owners. -## -## This software is distributed WITHOUT ANY WARRANTY, without even the -## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR -## PURPOSE, except where stated in the Licence Agreement supplied with -## the software. -## -## Created By : Nikolay Stanchev -## Created Date : 25-04-2018 -## Created for Project : FLAME -""" - -from influxdb import InfluxDBClient -from time import time, sleep -from urllib.parse import urlparse -from threading import Thread, Event -import clmctest.monitoring.LineProtocolGenerator as lp - - -class Aggregator(Thread): - """ - A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread, - so that the implementation can be tested using pytest. - """ - - REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated - DATABASE = 'E2EMetrics' # default database the aggregator uses - DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses - - def __init__(self, database=DATABASE, database_url=DATABASE_URL): - """ - Constructs an Aggregator instance. - - :param database: database name to use - :param database_url: database url to use - """ - - super(Aggregator, self).__init__() # call the constructor of the thread - - # initialise a database client using the database url and the database name - url_object = urlparse(database_url) - self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10) - - self.db_url = database_url - self.db_name = database - - # a stop flag event object used to handle the killing of the thread - self._stop_flag = Event() - - # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values - self.network_cache = {} - self.service_cache = {} - - def stop(self): - """ - A method used to stop the thread. - """ - - self._stop_flag.set() - - def set_event_lock(self, event): - """ - Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing). - - :param event: the event lock object - """ - - setattr(self, 'event', event) - - def run(self): - """ - Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. - """ - - if hasattr(self, 'event'): - self.event.set() - - current_time = int(time()) - while True: - if self._stop_flag.is_set(): - break - - boundary_time = current_time - Aggregator.REPORT_PERIOD - - boundary_time_nano = boundary_time * 1000000000 - current_time_nano = current_time * 1000000000 - - # query the network delays and group them by path ID - network_delays = {} - result = self.db_client.query( - 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( - boundary_time_nano, current_time_nano)) - for item in result.items(): - metadata, result_points = item - # measurement = metadata[0] - tags = metadata[1] - - result = next(result_points) - network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] - self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] - - # query the service delays and group them by endpoint, service function instance and sfr - service_delays = {} - result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) - for item in result.items(): - metadata, result_points = item - # measurement = metadata[0] - tags = metadata[1] - result = next(result_points) - service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) - self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) - - # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement - for path in network_delays: - # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr - path_id, source, target = path - if target not in service_delays and target not in self.service_cache: - # if not continue with the other network path reports - continue - - e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, - "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time} - - e2e_arguments['path_ID'] = path_id - e2e_arguments['source_SFR'] = source - e2e_arguments['target_SFR'] = target - e2e_arguments['delay_forward'] = network_delays[path][0] - e2e_arguments['avg_bandwidth'] = network_delays[path][1] - - # reverse the path ID to get the network delay for the reversed path - reversed_path = (path_id, target, source) - if reversed_path in network_delays or reversed_path in self.network_cache: - # get the reverse delay, use the latest value if reported or the cache value - e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0] - else: - e2e_arguments['delay_reverse'] = None - - # get the response time of the media component connected to the target SFR - service_delay = service_delays.get(target, self.service_cache.get(target)) - response_time, request_size, response_size, endpoint, sf_instance = service_delay - # put these points in the e2e arguments dictionary - e2e_arguments['delay_service'] = response_time - e2e_arguments['avg_request_size'] = request_size - e2e_arguments['avg_response_size'] = response_size - e2e_arguments['endpoint'] = endpoint - e2e_arguments['sf_instance'] = sf_instance - - # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row - if None not in e2e_arguments.values(): - self.db_client.write_points( - lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], - e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], - e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time'])) - - old_timestamp = current_time - # wait until {REPORT_PERIOD} seconds have passed - while current_time < old_timestamp + self.REPORT_PERIOD: - sleep(1) - current_time = int(time()) - - -if __name__ == '__main__': - Aggregator().start() diff --git a/clmctest/monitoring/E2ETestAggregatorThread.py b/clmctest/monitoring/E2ETestAggregatorThread.py new file mode 100644 index 0000000000000000000000000000000000000000..991c33db016871c77e64b6a28ca446a0ad541b3b --- /dev/null +++ b/clmctest/monitoring/E2ETestAggregatorThread.py @@ -0,0 +1,68 @@ +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Nikolay Stanchev +## Created Date : 25-04-2018 +## Created for Project : FLAME +""" + + +from threading import Thread +from clmcservice.aggregator import Aggregator + + +class TestAggregator(Thread): + + REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated + DATABASE = 'E2EMetrics' # default database the aggregator uses + DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses + + def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): + """ + Constructs an Aggregator instance. + + :param database: database name to use + :param database_url: database url to use + """ + + super(TestAggregator, self).__init__() # call the constructor of the thread + + self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period) + + def stop(self): + """ + A method used to stop the thread. + """ + + self.aggregator.stop() + + def set_event_lock(self, event): + """ + Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing). + + :param event: the event lock object + """ + + setattr(self, 'event', event) + + def run(self): + if hasattr(self, 'event'): + self.event.set() + + self.aggregator.run() diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py index 72791eeaaedc266ed8ea1593b05674db679e962a..8c85786de9a2e1ccce89ddfd9489aa9c278b6641 100644 --- a/clmctest/monitoring/conftest.py +++ b/clmctest/monitoring/conftest.py @@ -28,7 +28,7 @@ import pkg_resources from influxdb import InfluxDBClient from clmctest.monitoring.StreamingSim import Sim from clmctest.monitoring.E2ESim import Simulator -from clmctest.monitoring.E2EAggregator import Aggregator +from clmctest.monitoring.E2ETestAggregatorThread import TestAggregator @pytest.fixture(scope="module") @@ -105,4 +105,4 @@ def e2e_aggregator(streaming_sim_config): influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" - return Aggregator(database_url=influx_url) + return TestAggregator(database_url=influx_url) diff --git a/clmctest/monitoring/test_e2eresults.py b/clmctest/monitoring/test_e2eresults.py index 18f2f81f98fbd6f17d8b64adb1773f3d1c3eb989..0b8c8d1ece206104bcfdd0daf4b39e65f238cb2d 100644 --- a/clmctest/monitoring/test_e2eresults.py +++ b/clmctest/monitoring/test_e2eresults.py @@ -95,7 +95,13 @@ class TestE2ESimulation(object): # get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator) actual_result = next(query_result.get_points()) - print("expected_result == actual_result {0}, {1}".format(expected_result, actual_result)) + for key in expected_result: + print("expected_result == actual_result {0}, {1}".format(expected_result.get(key), actual_result.get(key))) + + if type(expected_result.get(key)) == float: + assert expected_result.get(key) == pytest.approx(actual_result.get(key), 0.3) # approximate only when comparing float values + else: + assert expected_result.get(key) == actual_result.get(key), "E2E Simulation test failure" assert expected_result == actual_result, "E2E Simulation test failure" diff --git a/src/clmc-webservice/.coveragerc b/src/clmcwebservice/.coveragerc similarity index 100% rename from src/clmc-webservice/.coveragerc rename to src/clmcwebservice/.coveragerc diff --git a/src/clmc-webservice/MANIFEST.in b/src/clmcwebservice/MANIFEST.in similarity index 100% rename from src/clmc-webservice/MANIFEST.in rename to src/clmcwebservice/MANIFEST.in diff --git a/src/clmc-webservice/clmcservice/__init__.py b/src/clmcwebservice/clmcservice/__init__.py similarity index 100% rename from src/clmc-webservice/clmcservice/__init__.py rename to src/clmcwebservice/clmcservice/__init__.py diff --git a/src/clmc-webservice/clmcservice/aggregator.py b/src/clmcwebservice/clmcservice/aggregator.py similarity index 97% rename from src/clmc-webservice/clmcservice/aggregator.py rename to src/clmcwebservice/clmcservice/aggregator.py index f6dd4073374e2fed4bf4f22393d9776e76f00351..2c0c7d975ee4c4dc3f1afdb1df5ff16986ef80c0 100644 --- a/src/clmc-webservice/clmcservice/aggregator.py +++ b/src/clmcwebservice/clmcservice/aggregator.py @@ -22,6 +22,7 @@ ## Created for Project : FLAME """ +from threading import Event from influxdb import InfluxDBClient from time import time, sleep from urllib.parse import urlparse @@ -66,13 +67,23 @@ class Aggregator(object): self.network_cache = {} self.service_cache = {} + # a stop flag event object used to handle the stopping of the process + self._stop_flag = Event() + + def stop(self): + """ + Stop the aggregator from running. + """ + + self._stop_flag.set() + def run(self): """ Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. """ current_time = int(time()) - while True: + while not self._stop_flag.is_set(): boundary_time = current_time - self.report_period diff --git a/src/clmc-webservice/clmcservice/tests.py b/src/clmcwebservice/clmcservice/tests.py similarity index 100% rename from src/clmc-webservice/clmcservice/tests.py rename to src/clmcwebservice/clmcservice/tests.py diff --git a/src/clmc-webservice/clmcservice/utilities.py b/src/clmcwebservice/clmcservice/utilities.py similarity index 100% rename from src/clmc-webservice/clmcservice/utilities.py rename to src/clmcwebservice/clmcservice/utilities.py diff --git a/src/clmc-webservice/clmcservice/views.py b/src/clmcwebservice/clmcservice/views.py similarity index 100% rename from src/clmc-webservice/clmcservice/views.py rename to src/clmcwebservice/clmcservice/views.py diff --git a/src/clmc-webservice/development.ini b/src/clmcwebservice/development.ini similarity index 100% rename from src/clmc-webservice/development.ini rename to src/clmcwebservice/development.ini diff --git a/src/clmc-webservice/production.ini b/src/clmcwebservice/production.ini similarity index 100% rename from src/clmc-webservice/production.ini rename to src/clmcwebservice/production.ini diff --git a/src/clmc-webservice/pytest.ini b/src/clmcwebservice/pytest.ini similarity index 100% rename from src/clmc-webservice/pytest.ini rename to src/clmcwebservice/pytest.ini diff --git a/src/clmc-webservice/setup.py b/src/clmcwebservice/setup.py similarity index 100% rename from src/clmc-webservice/setup.py rename to src/clmcwebservice/setup.py diff --git a/src/clmc-webservice/tox.ini b/src/clmcwebservice/tox.ini similarity index 100% rename from src/clmc-webservice/tox.ini rename to src/clmcwebservice/tox.ini