From e8ecb471e4dfc2741b95993070a527feed18e4c3 Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Fri, 4 May 2018 11:30:54 +0100 Subject: [PATCH] aggregation script and simulator - updated --- clmctest/monitoring/E2EAggregator.py | 72 +++++++------------- clmctest/monitoring/E2ESim.py | 48 ++++++------- clmctest/monitoring/LineProtocolGenerator.py | 56 +++++++-------- clmctest/monitoring/test_e2eresults.py | 4 +- 4 files changed, 75 insertions(+), 105 deletions(-) diff --git a/clmctest/monitoring/E2EAggregator.py b/clmctest/monitoring/E2EAggregator.py index 372ec3c..a53bd3e 100644 --- a/clmctest/monitoring/E2EAggregator.py +++ b/clmctest/monitoring/E2EAggregator.py @@ -34,8 +34,7 @@ class Aggregator(Thread): A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread, so that the implementation can be tested using pytest. """ - - PATH_SEPARATOR = "---" # currently, the path id is assumed to be in the format "SourceEndpointID---TargetEndpointID" + REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'E2EMetrics' # default database the aggregator uses DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses @@ -48,7 +47,7 @@ class Aggregator(Thread): :param database_url: database url to use """ - super(Aggregator, self).__init__() # call the constructor of the tread + super(Aggregator, self).__init__() # call the constructor of the thread # initialise a database client using the database url and the database name url_object = urlparse(database_url) @@ -85,83 +84,60 @@ class Aggregator(Thread): # query the network delays and group them by path ID network_delays = {} result = self.db_client.query( - 'SELECT mean(delay) as "Dnet" FROM "E2EMetrics"."autogen".network_delays WHERE time >= {0} and time < {1} GROUP BY path'.format( + 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - network_delays[tags['path']] = next(result_points)['Dnet'] + network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] - # query the service delays and group them by FQDN, service function instance and endpoint + # query the service delays and group them by FQDN, service function instance and sfr service_delays = {} - result = self.db_client.query( - 'SELECT mean(response_time) as "Dresponse" FROM "E2EMetrics"."autogen".service_delays WHERE time >= {0} and time < {1} GROUP BY FQDN, sf_instance, endpoint'.format( - boundary_time_nano, current_time_nano)) + result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY FQDN, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - service_delays[tags['endpoint']] = (next(result_points)['Dresponse'], tags['FQDN'], tags['sf_instance']) + service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['FQDN'], tags['sf_instance']) - # for each path identifier check if there is a media service delay report for the target endpoint - if so, generate an e2e_delay measurement + # for each path identifier check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement for path in network_delays: - # check if target endpoint is reported in service delays, that is there is a media service instance running on target endpoint - target_endpoint = self.get_target_endpoint(path) - if target_endpoint not in service_delays: - # if not continue with the other path IDs + # check if target sfr is reported in service delays, that is there is a media service instance being connected to target sfr + path_id, source, target = path + if target not in service_delays: + # if not continue with the other path reports continue - e2e_arguments = {"path_id_f": None, "path_id_r": None, "fqdn": None, "sf_instance": None, "delay_path_f": None, "delay_path_r": None, + e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "fqdn": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, "delay_service": None, "time": boundary_time} - e2e_arguments['path_id_f'] = path - e2e_arguments['delay_path_f'] = network_delays[path] + e2e_arguments['path_ID'] = path_id + e2e_arguments['delay_forward'] = network_delays[path] # reverse the path ID to get the network delay for the reversed path - reversed_path = self.reverse_path_id(path) - assert reversed_path in network_delays # an assertion is made here, since reversed path should always be reported as well - e2e_arguments['path_id_r'] = reversed_path - e2e_arguments['delay_path_r'] = network_delays[reversed_path] + reversed_path = (path_id, target, source) + assert reversed_path in network_delays # an assertion is made here, since reversed path must always be reported as well + e2e_arguments['delay_reverse'] = network_delays[reversed_path] - service_delay = service_delays[target_endpoint] + # get the response time of the media component connected to the target SFR + service_delay = service_delays[target] response_time, fqdn, sf_instance = service_delay e2e_arguments['delay_service'] = response_time e2e_arguments['fqdn'] = fqdn - e2e_arguments['sf_isntnace'] = sf_instance + e2e_arguments['sf_instance'] = sf_instance if None not in e2e_arguments.items(): self.db_client.write_points( - lp.generate_e2e_delay_report(e2e_arguments['path_id_f'], e2e_arguments['path_id_r'], e2e_arguments['fqdn'], e2e_arguments['sf_isntnace'], - e2e_arguments['delay_path_f'], e2e_arguments['delay_path_r'], e2e_arguments['delay_service'], e2e_arguments['time'])) + lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['fqdn'], + e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], + e2e_arguments['time'])) old_timestamp = current_time while current_time != old_timestamp + 5: sleep(1) current_time = int(time()) - - @staticmethod - def reverse_path_id(path): - """ - Reverses a path identifier. - :param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID" - :return: the reversed path ID, e.g. "TargetEndpointID---SourceEndpointID" - """ - - source_endpoint, target_endpoint = path.split(Aggregator.PATH_SEPARATOR) - return "{0}{1}{2}".format(target_endpoint, Aggregator.PATH_SEPARATOR, source_endpoint) - - @staticmethod - def get_target_endpoint(path): - """ - Get a target endpoint by parsing a path identifier. - - :param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID" - :return: the target endpoint retrieved from the path identifier. - """ - - return path.split(Aggregator.PATH_SEPARATOR)[1] if __name__ == '__main__': diff --git a/clmctest/monitoring/E2ESim.py b/clmctest/monitoring/E2ESim.py index 12a4401..6297ad8 100644 --- a/clmctest/monitoring/E2ESim.py +++ b/clmctest/monitoring/E2ESim.py @@ -73,19 +73,15 @@ class Simulator(object): """ # all network delays start from 1ms, the dictionary stores the information to report - ip_endpoints = [ - {'agent_id': 'endpoint1.ms-A.ict-flame.eu', - 'paths': [{ - 'target': 'endpoint2.ms-A.ict-flame.eu', - 'path_id': 'endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu', - 'network_delay': 1 - }]}, - {'agent_id': 'endpoint2.ms-A.ict-flame.eu', - 'paths': [{ - 'target': 'endpoint1.ms-A.ict-flame.eu', - 'path_id': 'endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu', - 'network_delay': 1 - }]} + paths = [ + {'target': 'SR3', + 'source': 'SR1', + 'path_id': 'SR1---SR3', + 'network_delay': 1}, + {'target': 'SR1', + 'source': 'SR3', + 'path_id': 'SR1---SR3', + 'network_delay': 1} ] # current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time @@ -98,30 +94,26 @@ class Simulator(object): sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) - service measurements reported every 5 seconds for i in range(0, self.SIMULATION_LENGTH): - # measure net delay every 2 seconds for endpoint 1 (generates on tick 0, 2, 4, 6, 8, 10.. etc.) + # measure net delay every 2 seconds for path SR1-SR3 (generates on tick 0, 2, 4, 6, 8, 10.. etc.) if i % sample_period_net == 0: - endpoint = ip_endpoints[0] - paths = endpoint['paths'] - for path in paths: - self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time)) + path = paths[0] + self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time)) - # increase/decrease the delay in every sample report (min delay is 1) - path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) + # increase/decrease the delay in every sample report (min delay is 1) + path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) - # measure net delay every 2 seconds for endpoint 2 (generates on tick 1, 3, 5, 7, 9, 11.. etc.) + # measure net delay every 2 seconds for path SR2-SR3 (generates on tick 1, 3, 5, 7, 9, 11.. etc.) if (i+1) % sample_period_net == 0: - endpoint = ip_endpoints[1] - paths = endpoint['paths'] - for path in paths: - self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time)) + path = paths[1] + self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time)) - # increase/decrease the delay in every sample report (min delay is 1) - path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) + # increase/decrease the delay in every sample report (min delay is 1) + path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) # measure service response time every 5 seconds if i % sample_period_media == 0: self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "ms-A.ict-flame.eu", - "test-sf-clmc-agent-build_INSTANCE", "endpoint2.ms-A.ict-flame.eu", sim_time)) + "test-sf-clmc-agent-build_INSTANCE", "SR3", sim_time)) # increase/decrease the delay in every sample report (min delay is 10) mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)])) diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 7757b25..fc0677c 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -29,32 +29,34 @@ import uuid from random import randint -def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_path_f, delay_path_r, delay_service, time): +def generate_e2e_delay_report(path_id, source_sfr, target_sfr, fqdn, sf_instance, delay_forward, delay_reverse, delay_service, time): """ Generates a combined averaged measurement about the e2e delay and its contributing parts - :param path_id_f: The forward path identifier, e.g. endpoint1---endpoint2 - :param path_id_r: The reverse path identifier, e.g. endpoint2---endpoint1 + :param path_ID: The path identifier, which is a bidirectional path ID for the request and the response path + :param source_SFR: source service router + :param target_SFR: target service router :param fqdn: FQDN of the media service - :param sf_instance: service function instance - :param delay_path_f: Path delay (Forward direction) - :param delay_path_r: Path delay (Reverse direction) - :param delay_service: the media service response time - :param time: measurement time + :param sf_instance: service function instance (media component) + :param delay_forward: Path delay (Forward direction) + :param delay_reverse: Path delay (Reverse direction) + :param delay_service: the media service component response time + :param time: measurement timestamp :return: a list of dict-formatted reports to post on influx """ result = [{"measurement": "e2e_delays", "tags": { - "pathID_F": path_id_f, - "pathID_R": path_id_r, + "path_ID": path_id, + "source_SFR": source_sfr, + "target_SFR": target_sfr, "FQDN": fqdn, "sf_instance": sf_instance }, "fields": { - "D_path_F": float(delay_path_f), - "D_path_R": float(delay_path_r), - "D_service": float(delay_service) + "delay_forward": float(delay_forward), + "delay_reverse": float(delay_reverse), + "delay_service": float(delay_service) }, "time": _getNSTime(time) }] @@ -62,23 +64,23 @@ def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_pat return result -def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time): +def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, time): """ - Generates a platform measurement about the network delay between two specific endpoints. + Generates a platform measurement about the network delay between two specific service routers. - :param path_id: the identifier of the path between the two endpoints (TODO can we derive source_endpoint and target_endpoint from this path) - :param source_endpoint: the source endpoint - :param target_endpoint: the target endpoint - :param e2e_delay: the e2e network delay for traversing the path (source_endpoint) -> (target_endpoint) (NOT round-trip-time) - :param time: the measurement time + :param path_id: the identifier of the path between the two service routers + :param source_sfr: the source service router + :param target_sfr: the target service router + :param e2e_delay: the e2e network delay for traversing the path between the two service routers + :param time: the measurement timestamp :return: a list of dict-formatted reports to post on influx """ result = [{"measurement": "network_delays", "tags": { "path": path_id, - "source_endpoint": source_endpoint, # TODO source and target endpoint tags might have to be removed if they can be derived from path ID - "target_endpoint": target_endpoint + "source": source_sfr, + "target": target_sfr }, "fields": { "delay": e2e_delay @@ -89,15 +91,15 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e return result -def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, time): +def generate_service_delay_report(response_time, fqdn, sf_instance, sfr, time): """ Generates a service measurement about the media service response time. - :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service) + :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service component) :param fqdn: FQDN of the media service :param sf_instance: service function instance - :param endpoint: endpoint ID - :param time: the measurement time + :param sfr: the service function router that connects the endpoint of the SF instance to the FLAME network + :param time: the measurement timestamp :return: a list of dict-formatted reports to post on influx """ @@ -105,7 +107,7 @@ def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, ti "tags": { "FQDN": fqdn, "sf_instance": sf_instance, - "endpoint": endpoint + "sfr": sfr }, "fields": { "response_time": response_time, diff --git a/clmctest/monitoring/test_e2eresults.py b/clmctest/monitoring/test_e2eresults.py index fe73623..5486bcb 100644 --- a/clmctest/monitoring/test_e2eresults.py +++ b/clmctest/monitoring/test_e2eresults.py @@ -63,10 +63,10 @@ class TestE2ESimulation(object): ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', {"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', - {"time": "1970-01-01T00:00:00Z", "count_D_path_F": 24, "count_D_path_R": 24, "count_D_service": 24}), + {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 24, "count_delay_reverse": 24, "count_delay_service": 24}), ('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"', - {"time": "1970-01-01T00:00:00Z", "mean_D_path_F": 13.159722222222223, "mean_D_path_R": 3.256944444444444, "mean_D_service": 32.791666666666664}), + {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 13.159722222222223, "mean_delay_reverse": 3.256944444444444, "mean_delay_service": 32.791666666666664}), ]) def test_simulation(self, influx_db, query, expected_result): """ -- GitLab