diff --git a/clmctest/monitoring/E2EAggregator.py b/clmctest/monitoring/E2EAggregator.py index c601d68fc4b1acbc242b2db8a070b67f9ac35e05..58ec908fa28e484e93e63866539db0b1df809851 100644 --- a/clmctest/monitoring/E2EAggregator.py +++ b/clmctest/monitoring/E2EAggregator.py @@ -59,6 +59,10 @@ class Aggregator(Thread): # a stop flag event object used to handle the killing of the thread self._stop_flag = Event() + # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values + self.network_cache = {} + self.service_cache = {} + def stop(self): """ A method used to stop the thread. @@ -84,57 +88,69 @@ class Aggregator(Thread): # query the network delays and group them by path ID network_delays = {} result = self.db_client.query( - 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( + 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] + result = next(result_points) + network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] + self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} - result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) + result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) + result = next(result_points) + service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) + self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement for path in network_delays: # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr path_id, source, target = path - if target not in service_delays: + if target not in service_delays and target not in self.service_cache: # if not continue with the other network path reports continue e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, - "delay_service": None, "time": boundary_time} + "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time} e2e_arguments['path_ID'] = path_id - e2e_arguments['delay_forward'] = network_delays[path] + e2e_arguments['source_SFR'] = source + e2e_arguments['target_SFR'] = target + e2e_arguments['delay_forward'] = network_delays[path][0] + e2e_arguments['avg_bandwidth'] = network_delays[path][1] # reverse the path ID to get the network delay for the reversed path reversed_path = (path_id, target, source) - assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A - e2e_arguments['delay_reverse'] = network_delays[reversed_path] + if reversed_path in network_delays or reversed_path in self.network_cache: + # get the reverse delay, use the latest value if reported or the cache value + e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0] + else: + e2e_arguments['delay_reverse'] = None # get the response time of the media component connected to the target SFR - service_delay = service_delays[target] - response_time, endpoint, sf_instance = service_delay + service_delay = service_delays.get(target, self.service_cache.get(target)) + response_time, request_size, response_size, endpoint, sf_instance = service_delay # put these points in the e2e arguments dictionary e2e_arguments['delay_service'] = response_time + e2e_arguments['avg_request_size'] = request_size + e2e_arguments['avg_response_size'] = response_size e2e_arguments['endpoint'] = endpoint e2e_arguments['sf_instance'] = sf_instance # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row - if None not in e2e_arguments.items(): + if None not in e2e_arguments.values(): self.db_client.write_points( lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], - e2e_arguments['time'])) + e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time'])) old_timestamp = current_time # wait until {REPORT_PERIOD} seconds have passed diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 2d2b7a0db3d79f8f9f7989718d799b6d17593919..b9bf9a6249234131506e9fddca2fa77af8450ede 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -29,18 +29,21 @@ import uuid from random import randint -def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time): +def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time): """ Generates a combined averaged measurement about the e2e delay and its contributing parts - :param path_ID: The path identifier, which is a bidirectional path ID for the request and the response path - :param source_SFR: source service router - :param target_SFR: target service router + :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_sfr: source service router + :param target_sfr: target service router :param endpoint: endpoint of the media component :param sf_instance: service function instance (media component) :param delay_forward: Path delay (Forward direction) :param delay_reverse: Path delay (Reverse direction) :param delay_service: the media service component response time + :param avg_request_size: averaged request size + :param avg_response_size: averaged response size + :param avg_bandwidth: averaged bandwidth :param time: measurement timestamp :return: a list of dict-formatted reports to post on influx """ @@ -56,7 +59,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst "fields": { "delay_forward": float(delay_forward), "delay_reverse": float(delay_reverse), - "delay_service": float(delay_service) + "delay_service": float(delay_service), + "avg_request_size": float(avg_request_size), + "avg_response_size": float(avg_response_size), + "avg_bandwidth": float(avg_bandwidth) }, "time": _getNSTime(time) }] diff --git a/clmctest/monitoring/test_e2eresults.py b/clmctest/monitoring/test_e2eresults.py index 2f469b72705f9d2b6e3ae0d21911a5523532e5e7..f68c666ee85ce9482cc6d77a1e203095f4b7abb4 100644 --- a/clmctest/monitoring/test_e2eresults.py +++ b/clmctest/monitoring/test_e2eresults.py @@ -56,17 +56,18 @@ class TestE2ESimulation(object): print("... stopping aggregator") e2e_aggregator.stop() - @pytest.mark.parametrize("query, expected_result", [ ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', {"time": "1970-01-01T00:00:00Z", "count_latency": 120, "count_bandwidth": 120}), ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', {"time": "1970-01-01T00:00:00Z", "count_response_time": 24, "count_request_size": 24, "count_response_size": 24}), ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', - {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 24, "count_delay_reverse": 24, "count_delay_service": 24}), + {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 38, "count_delay_reverse": 38, "count_delay_service": 38, + "count_avg_request_size": 38, "count_avg_response_size": 38, "count_avg_bandwidth": 38}), ('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"', - {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 13.159722222222223, "mean_delay_reverse": 3.256944444444444, "mean_delay_service": 32.791666666666664}), + {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 8.010964912280702, "mean_delay_reverse": 12.881578947368423, "mean_delay_service": 23.42105263157895, + 'mean_avg_request_size': 10485760, 'mean_avg_response_size': 1024, 'mean_avg_bandwidth': 104857600}), ]) def test_simulation(self, influx_db, query, expected_result): """