From cac1853d17f70dc3550b83a77ef6b23fcd1fda7e Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Tue, 5 Jun 2018 10:13:12 +0100 Subject: [PATCH] Implements unit test for aggregator --- scripts/clmc-service/install.sh | 1 + .../clmcservice/aggregation/__init__.py | 0 .../{ => aggregation}/aggregator.py | 83 ++++++- .../aggregation/influx_data_interface.py | 216 ++++++++++++++++++ .../aggregation/test_aggregator.py | 168 ++++++++++++++ src/service/clmcservice/views.py | 2 +- .../monitoring/E2ETestAggregatorThread.py | 68 ------ src/test/clmctest/monitoring/conftest.py | 4 +- 8 files changed, 461 insertions(+), 81 deletions(-) create mode 100644 src/service/clmcservice/aggregation/__init__.py rename src/service/clmcservice/{ => aggregation}/aggregator.py (75%) create mode 100644 src/service/clmcservice/aggregation/influx_data_interface.py create mode 100644 src/service/clmcservice/aggregation/test_aggregator.py delete mode 100644 src/test/clmctest/monitoring/E2ETestAggregatorThread.py diff --git a/scripts/clmc-service/install.sh b/scripts/clmc-service/install.sh index 4928337..6349501 100755 --- a/scripts/clmc-service/install.sh +++ b/scripts/clmc-service/install.sh @@ -144,6 +144,7 @@ echo "----> Running tox" TOX_OUTPUT="$(tox)" # check if tox output contains the 'congratulations :)' bit for tests passed if [[ $TOX_OUTPUT != *"congratulations :)"* ]]; then + echo $TOX_OUTPUT echo "CLMC service unit tests failed." exit 1 fi diff --git a/src/service/clmcservice/aggregation/__init__.py b/src/service/clmcservice/aggregation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/service/clmcservice/aggregator.py b/src/service/clmcservice/aggregation/aggregator.py similarity index 75% rename from src/service/clmcservice/aggregator.py rename to src/service/clmcservice/aggregation/aggregator.py index 1f796dd..727bd82 100644 --- a/src/service/clmcservice/aggregator.py +++ b/src/service/clmcservice/aggregation/aggregator.py @@ -22,7 +22,7 @@ ## Created for Project : FLAME """ -from threading import Event +from threading import Thread, Event from influxdb import InfluxDBClient from time import time, sleep from urllib.parse import urlparse @@ -106,28 +106,43 @@ class Aggregator(object): # measurement = metadata[0] tags = metadata[1] - result = next(result_points) - network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] - self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] + result_point = next(result_points) + network_delays[(tags['path'], tags['source'], tags['target'])] = result_point['net_latency'], result_point['net_bandwidth'] + self.network_cache[(tags['path'], tags['source'], tags['target'])] = result_point['net_latency'], result_point['net_bandwidth'] # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} result = self.db_client.query( - 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name, - boundary_time_nano, current_time_nano)) + 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format( + self.db_name, boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - result = next(result_points) - service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) - self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) + result_point = next(result_points) + service_delays[tags['sfr']] = (result_point['response_time'], result_point['request_size'], result_point['response_size'], tags['endpoint'], tags['sf_instance']) + self.service_cache[tags['sfr']] = (result_point['response_time'], result_point['request_size'], result_point['response_size'], tags['endpoint'], tags['sf_instance']) # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement for path in network_delays: - # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr path_id, source, target = path + + # check if we have a reverse path without a forward path for a potential aggregated row - e.g. SR3 to SR1 network row with service on SR3 and no row from SR1 to SR3 + if (source in service_delays or source in self.service_cache) and (path_id, target, source) not in network_delays and (path_id, target, source) in self.network_cache: + # hence search for the forward path in the cache + forward_path = self.network_cache.get((path_id, target, source)) + reverse_path = network_delays.get((path_id, source, target)) + forward_delay = forward_path[0] + avg_bandwidth = forward_path[1] + reverse_delay = reverse_path[0] + service_delay = service_delays.get(source, self.service_cache.get(source)) + response_time, request_size, response_size, endpoint, sf_instance = service_delay + self.db_client.write_points( + generate_e2e_delay_report(path_id, target, source, endpoint, sf_instance, forward_delay, reverse_delay, response_time, + request_size, response_size, avg_bandwidth, boundary_time)) + + # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr if target not in service_delays and target not in self.service_cache: # if not continue with the other network path reports continue @@ -180,6 +195,54 @@ class Aggregator(object): self.log.info("Aggregator stopped running.") +class AggregatorThread(Thread): + """ + A utility class used to wrap around the Aggregator class and return a Thread instance, which can then be used for testing (provides start and stop methods) + """ + + REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated + DATABASE = 'CLMCMetrics' # default database the aggregator uses + DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses + + def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): + """ + Constructs an Aggregator instance. + + :param database: database name to use + :param database_url: database url to use + """ + + super(AggregatorThread, self).__init__() # call the constructor of the thread + + self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period) + + def stop(self): + """ + A method used to stop the thread. + """ + + self.aggregator.stop() + + def set_event_lock(self, event): + """ + Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing). + + :param event: the event lock object + """ + + setattr(self, 'event', event) + + def run(self): + """ + The method to execute when the thread starts. + """ + + if hasattr(self, 'event'): + self.event.set() + + self.aggregator.run() + + if __name__ == '__main__': # initialise a file logger, only when module's main method is run (NOT when aggregator class is imported somewhere else) log = logging.getLogger('aggregator') diff --git a/src/service/clmcservice/aggregation/influx_data_interface.py b/src/service/clmcservice/aggregation/influx_data_interface.py new file mode 100644 index 0000000..41f6115 --- /dev/null +++ b/src/service/clmcservice/aggregation/influx_data_interface.py @@ -0,0 +1,216 @@ +from clmcservice.utilities import generate_e2e_delay_report + +""" +A python module which provides auxiliary functions to mimic the behaviour of an InfluxDBClient when unit testing the aggregator. +""" + + +class MockResultSet(object): + """ + A mock object used to mimic the behaviour of a ResultSet in the influx library (we only need the functionality of an object that collects + a group of points and has an items() method to get the collected points. + """ + + def __init__(self, points): + """ + Initialise the mock result set. + + :param points: the collected points + """ + + self.points = points + + def items(self): + """ + Get the data points in the result set. + + :return: the collected data points + """ + + return self.points + + +# The following are network-related auxiliary functions to generate test data. + +def _network_result_point(net_latency, net_bandwidth): + """ + Returns a generator, which yields one data point representing a network measurement (fields only) + + :param net_latency: the reported network latency + :param net_bandwidth: the reported network bandwidth. + + :return: a generator object with one element (same behaviour is used in the influxdb library even when only one point is returned from the query) + """ + + yield {"net_latency": net_latency, "net_bandwidth": net_bandwidth} + + +def _network_tags(path, source, target): + """ + Returns a dictionary representing a network measurement (tags only) + + :param path: the path identifier + :param source: the source service router + :param target: the target service router + + :return: a dictionary with those values + """ + + return {"path": path, "source": source, "target": target} + + +def _network_metadata(measurement, path, source, target): + """ + Returns an influxdb-styled metadata about a network measurement. + + :param measurement: the measurement table name + :param path: the path identifier + :param source: the source service router + :param target: the target service router + + :return: a tuple with the first element being the measurement name and the second element being a dictionary with the network measurement tag values + """ + + return measurement, _network_tags(path, source, target) + + +def network_result_item(measurement, path, source, target, net_latency, net_bandwidth): + """ + Returns a full influxdb-styled network measurement item - with tag and field values. + + :param measurement: the measurement table name + :param path: the path identifier + :param source: the source service router + :param target: the target service router + :param net_latency: the reported network latency + :param net_bandwidth: the reported network bandwidth. + + :return: a tuple with the first element being the result metadata (measurement name and tags) and the second element being the data field points + """ + + return _network_metadata(measurement, path, source, target), _network_result_point(net_latency, net_bandwidth) + + +# The following are service-related auxiliary functions to generate test data. + +def _service_result_point(response_time, request_size, response_size): + """ + Returns a generator, which yields one data point representing a service measurement (fields only) + + :param response_time: the response time of the service + :param request_size: the averaged request size of the service + :param response_size: the averaged response size of the service + + :return: a generator object with one element (same behaviour is used in the influxdb library even when only one point is returned from the query) + """ + + yield {"response_time": response_time, "request_size": request_size, "response_size": response_size} + + +def _service_tags(sfr, endpoint, sf_instance): + """ + Returns a dictionary representing a service measurement (tags only) + + :param sfr: the service router to which the service's endpoint is connected to + :param endpoint: the endpoint the service is being deployed on + :param sf_instance: the service function instance (FQDN) + + :return: a dictionary with those values + """ + + return {"sfr": sfr, "endpoint": endpoint, "sf_instance": sf_instance} + + +def _service_metadata(measurement, sfr, endpoint, sf_instance): + """ + Returns an influxdb-styled metadata about a service measurement. + + :param measurement: the measurement table name + :param sfr: the service router to which the service's endpoint is connected to + :param endpoint: the endpoint the service is being deployed on + :param sf_instance: the service function instance (FQDN) + + :return: a tuple with the first element being the measurement name and the second element being a dictionary with the service measurement tag values + """ + + return measurement, _service_tags(sfr, endpoint, sf_instance) + + +def service_result_item(measurement, sfr, endpoint, sf_instance, response_time, request_size, response_size): + """ + Returns a full influxdb-styled service measurement item - with tag and field values. + + :param measurement: the measurement table name + :param sfr: the service router to which the service's endpoint is connected to + :param endpoint: the endpoint the service is being deployed on + :param sf_instance: the service function instance (FQDN) + :param response_time: the response time of the service + :param request_size: the averaged request size of the service + :param response_size: the averaged response size of the service + + :return: a tuple with the first element being the result metadata (measurement name and tags) and the second element being the data field points + """ + + return _service_metadata(measurement, sfr, endpoint, sf_instance), _service_result_point(response_time, request_size, response_size) + + +# The following are auxiliary functions for generating an e2e row used in the unit testing of the aggregator. + +def drop_timestamp(d): + """ + Drops the time stamp from a dictionary-represented influx result item object + + :param d: the dictionary object representing a measurement row from influx + + :return: the same dictionary with no timestamp + """ + + d.pop('time') + return d + + +def _generate_e2e_row(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth): + """ + Generates a combined averaged measurement about the e2e delay and its contributing parts with default timestamp (set as 0) + + :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_sfr: source service router + :param target_sfr: target service router + :param endpoint: endpoint of the media component + :param sf_instance: service function instance (media component) + :param delay_forward: Path delay (Forward direction) + :param delay_reverse: Path delay (Reverse direction) + :param delay_service: the media service component response time + :param avg_request_size: averaged request size + :param avg_response_size: averaged response size + :param avg_bandwidth: averaged bandwidth + + :return: a list of dict-formatted reports to post on influx + """ + + return generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, + delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, 0)[0] + + +def generate_e2e_no_timestamp_row(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, + delay_service, avg_request_size, avg_response_size, avg_bandwidth): + """ + Generates a combined averaged measurement about the e2e delay and its contributing parts (with no timestamp, used for testing) + + :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_sfr: source service router + :param target_sfr: target service router + :param endpoint: endpoint of the media component + :param sf_instance: service function instance (media component) + :param delay_forward: Path delay (Forward direction) + :param delay_reverse: Path delay (Reverse direction) + :param delay_service: the media service component response time + :param avg_request_size: averaged request size + :param avg_response_size: averaged response size + :param avg_bandwidth: averaged bandwidth + + :return: a list of dict-formatted reports to post on influx + """ + + return drop_timestamp(_generate_e2e_row(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, + avg_request_size, avg_response_size, avg_bandwidth)) diff --git a/src/service/clmcservice/aggregation/test_aggregator.py b/src/service/clmcservice/aggregation/test_aggregator.py new file mode 100644 index 0000000..14a9e94 --- /dev/null +++ b/src/service/clmcservice/aggregation/test_aggregator.py @@ -0,0 +1,168 @@ +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Nikolay Stanchev +## Created Date : 04-06-2018 +## Created for Project : FLAME +""" +from collections import OrderedDict +from threading import Event +from unittest import mock +from time import sleep +from clmcservice.aggregation.aggregator import AggregatorThread +from clmcservice.aggregation.influx_data_interface import MockResultSet, network_result_item, service_result_item, drop_timestamp, generate_e2e_no_timestamp_row + + +class TestAggregation(object): + + ACTUAL_RESULTS = "actual_aggregated_results" + EXPECTED_RESULTS = "expected_aggregated_results" + FINISHED = "finished_event" + + def points_generator(self, network_items, service_items): + assert len(network_items) == len(service_items), "The data points generator must receive the same number of network items as the number of service items" + index = 0 + + while not getattr(self, self.FINISHED).is_set(): + items = network_items[index] + yield MockResultSet(items) + + items = service_items[index] + + # before yielding the service data points, check if both sets of data points are enumerated + if index == len(network_items)-1: + getattr(self, self.FINISHED).set() + + yield MockResultSet(items) + + index += 1 + + def setup_mock_db_client(self, mock_class): + setattr(self, self.ACTUAL_RESULTS, []) + setattr(self, self.EXPECTED_RESULTS, [ + generate_e2e_no_timestamp_row(path_id="SR1-SR3", source_sfr="SR1", target_sfr="SR3", endpoint="endpoint1", sf_instance="ms1.flame.org", delay_forward=10, + delay_reverse=15, delay_service=10, avg_request_size=1024, avg_response_size=8, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR1-SR3", source_sfr="SR1", target_sfr="SR3", endpoint="endpoint1", sf_instance="ms1.flame.org", delay_forward=5, + delay_reverse=25, delay_service=40, avg_request_size=16, avg_response_size=2048, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR1-SR2", source_sfr="SR1", target_sfr="SR2", endpoint="endpoint2", sf_instance="ms2.flame.org", delay_forward=15, + delay_reverse=35, delay_service=60, avg_request_size=32, avg_response_size=1024, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR4-SR5", source_sfr="SR4", target_sfr="SR5", endpoint="endpoint5", sf_instance="ms5.flame.org", delay_forward=11, + delay_reverse=25, delay_service=50, avg_request_size=2048, avg_response_size=32, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR1-SR2", source_sfr="SR1", target_sfr="SR2", endpoint="endpoint2", sf_instance="ms2.flame.org", delay_forward=12, + delay_reverse=5, delay_service=60, avg_request_size=32, avg_response_size=1024, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR1-SR3", source_sfr="SR1", target_sfr="SR3", endpoint="endpoint1", sf_instance="ms1.flame.org", delay_forward=16, + delay_reverse=25, delay_service=40, avg_request_size=16, avg_response_size=2048, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR10-SR12", source_sfr="SR12", target_sfr="SR10", endpoint="endpoint10", sf_instance="ms4.flame.org", delay_forward=22, + delay_reverse=3, delay_service=75, avg_request_size=1024, avg_response_size=64, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR14-SR15", source_sfr="SR14", target_sfr="SR15", endpoint="endpoint15", sf_instance="ms2.flame.org", delay_forward=24, + delay_reverse=27, delay_service=105, avg_request_size=1024, avg_response_size=128, avg_bandwidth=1200), + generate_e2e_no_timestamp_row(path_id="SR14-SR15", source_sfr="SR15", target_sfr="SR14", endpoint="endpoint14", sf_instance="ms1.flame.org", delay_forward=27, + delay_reverse=24, delay_service=85, avg_request_size=32, avg_response_size=64, avg_bandwidth=1200), + ]) + setattr(self, self.FINISHED, Event()) + + mock_points = self.points_generator( + network_items=[ + ( + network_result_item("network_delays", "SR1-SR3", "SR1", "SR3", 10, 1200), + network_result_item("network_delays", "SR1-SR3", "SR3", "SR1", 15, 1200), + network_result_item("network_delays", "SR1-SR33", "SR1", "SR33", 15, 1200), + network_result_item("network_delays", "SR2-SR11", "SR11", "SR2", 15, 1200) + ), + ( + network_result_item("network_delays", "SR1-SR3", "SR1", "SR3", 5, 1200), + network_result_item("network_delays", "SR1-SR3", "SR3", "SR1", 25, 1200), + network_result_item("network_delays", "SR1-SR2", "SR1", "SR2", 15, 1200), + network_result_item("network_delays", "SR1-SR2", "SR2", "SR1", 35, 1200) + ), + ( + network_result_item("network_delays", "SR4-SR5", "SR4", "SR5", 5, 1200), + network_result_item("network_delays", "SR4-SR5", "SR5", "SR4", 25, 1200), + ), + (), + ( + network_result_item("network_delays", "SR4-SR5", "SR4", "SR5", 5, 1200), + network_result_item("network_delays", "SR4-SR5", "SR5", "SR4", 25, 1200), + network_result_item("network_delays", "SR0-SR1", "SR0", "SR1", 5, 1200), + network_result_item("network_delays", "SR0-SR1", "SR1", "SR0", 25, 1200), + network_result_item("network_delays", "SR10-SR12", "SR10", "SR12", 11, 1200), + network_result_item("network_delays", "SR10-SR12", "SR12", "SR10", 22, 1200), + network_result_item("network_delays", "SR14-SR15", "SR14", "SR15", 24, 1200), + network_result_item("network_delays", "SR14-SR15", "SR15", "SR14", 26, 1200), + ), + ( + network_result_item("network_delays", "SR4-SR5", "SR4", "SR5", 11, 1200), + network_result_item("network_delays", "SR1-SR2", "SR1", "SR2", 12, 1200), + network_result_item("network_delays", "SR1-SR2", "SR2", "SR1", 5, 1200), + network_result_item("network_delays", "SR1-SR3", "SR1", "SR3", 16, 1200), + network_result_item("network_delays", "SR10-SR12", "SR10", "SR12", 3, 1200), + network_result_item("network_delays", "SR14-SR15", "SR15", "SR14", 27, 1200), + ) + ], + service_items=[ + ( + service_result_item("service_delays", "SR3", "endpoint1", "ms1.flame.org", 10, 1024, 8), + service_result_item("service_delays", "SR33", "endpoint33", "ms2.flame.org", 20, 4096, 8), + service_result_item("service_delays", "SR11", "endpoint11", "ms3.flame.org", 30, 1024, 8), + ), + ( + service_result_item("service_delays", "SR3", "endpoint1", "ms1.flame.org", 40, 16, 2048), + service_result_item("service_delays", "SR2", "endpoint2", "ms2.flame.org", 60, 32, 1024) + ), + ( + service_result_item("service_delays", "SR6", "endpoint6", "ms1.flame.org", 60, 1024, 8), + service_result_item("service_delays", "SR7", "endpoint7", "ms1.flame.org", 70, 1024, 8), + ), + ( + service_result_item("service_delays", "SR6", "endpoint6", "ms1.flame.org", 65, 2048, 16), + service_result_item("service_delays", "SR8", "endpoint8", "ms2.flame.org", 75, 2048, 16), + service_result_item("service_delays", "SR9", "endpoint9", "ms3.flame.org", 25, 2048, 16), + ), + (), + ( + service_result_item("service_delays", "SR5", "endpoint5", "ms5.flame.org", 50, 2048, 32), + service_result_item("service_delays", "SR10", "endpoint10", "ms4.flame.org", 75, 1024, 64), + service_result_item("service_delays", "SR15", "endpoint15", "ms2.flame.org", 105, 1024, 128), + service_result_item("service_delays", "SR14", "endpoint14", "ms1.flame.org", 85, 32, 64), + ) + ] + ) + + mock_class.query = lambda query: next(mock_points) + mock_class.write_points = lambda points: getattr(self, self.ACTUAL_RESULTS).append(drop_timestamp(points[0])) + + @mock.patch('clmcservice.aggregation.aggregator.InfluxDBClient', autospec=True) + def test_aggregator(self, MockDBClient): + self.setup_mock_db_client(MockDBClient.return_value) + + t = AggregatorThread(report_period=2) + t.start() + + while not getattr(self, self.FINISHED).is_set(): + sleep(1) + + t.stop() + + expected_results = getattr(self, self.EXPECTED_RESULTS) + actual_results = getattr(self, self.ACTUAL_RESULTS) + assert type(actual_results) is list + assert type(expected_results) is list + assert len(actual_results) == len(expected_results), "Actual and expected result differ in length." + assert sorted(actual_results, key=lambda k: k['tags']['path_ID']) == sorted(expected_results, key=lambda k: k['tags']['path_ID']), \ + "Test failure - aggregation process returns incorrect results." diff --git a/src/service/clmcservice/views.py b/src/service/clmcservice/views.py index 2671064..b033675 100644 --- a/src/service/clmcservice/views.py +++ b/src/service/clmcservice/views.py @@ -179,7 +179,7 @@ class AggregatorController(object): :return: the process object of the started aggregator script """ - dir_path = os.path.dirname(os.path.realpath(__file__)) + dir_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'aggregation') python_interpreter = sys.executable command = [python_interpreter, 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] diff --git a/src/test/clmctest/monitoring/E2ETestAggregatorThread.py b/src/test/clmctest/monitoring/E2ETestAggregatorThread.py deleted file mode 100644 index 2b97976..0000000 --- a/src/test/clmctest/monitoring/E2ETestAggregatorThread.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/python3 -""" -## © University of Southampton IT Innovation Centre, 2018 -## -## Copyright in this software belongs to University of Southampton -## IT Innovation Centre of Gamma House, Enterprise Road, -## Chilworth Science Park, Southampton, SO16 7NS, UK. -## -## This software may not be used, sold, licensed, transferred, copied -## or reproduced in whole or in part in any manner or form or in or -## on any media by any person other than in accordance with the terms -## of the Licence Agreement supplied with the software, or otherwise -## without the prior written consent of the copyright owners. -## -## This software is distributed WITHOUT ANY WARRANTY, without even the -## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR -## PURPOSE, except where stated in the Licence Agreement supplied with -## the software. -## -## Created By : Nikolay Stanchev -## Created Date : 25-04-2018 -## Created for Project : FLAME -""" - - -from threading import Thread -from clmcservice.aggregator import Aggregator - - -class TestAggregator(Thread): - - REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated - DATABASE = 'CLMCMetrics' # default database the aggregator uses - DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses - - def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): - """ - Constructs an Aggregator instance. - - :param database: database name to use - :param database_url: database url to use - """ - - super(TestAggregator, self).__init__() # call the constructor of the thread - - self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period) - - def stop(self): - """ - A method used to stop the thread. - """ - - self.aggregator.stop() - - def set_event_lock(self, event): - """ - Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing). - - :param event: the event lock object - """ - - setattr(self, 'event', event) - - def run(self): - if hasattr(self, 'event'): - self.event.set() - - self.aggregator.run() diff --git a/src/test/clmctest/monitoring/conftest.py b/src/test/clmctest/monitoring/conftest.py index 819909c..1c9d4ad 100644 --- a/src/test/clmctest/monitoring/conftest.py +++ b/src/test/clmctest/monitoring/conftest.py @@ -28,7 +28,7 @@ import pkg_resources from influxdb import InfluxDBClient from clmctest.monitoring.StreamingSim import Sim from clmctest.monitoring.E2ESim import Simulator -from clmctest.monitoring.E2ETestAggregatorThread import TestAggregator +from clmcservice.aggregation.aggregator import AggregatorThread @pytest.fixture(scope="module") @@ -109,4 +109,4 @@ def e2e_aggregator(streaming_sim_config): influx_url = "http://" + streaming_sim_config[0]['ip_address'] + ":8086" - return TestAggregator(database_url=influx_url) + return AggregatorThread(database_url=influx_url) -- GitLab