diff --git a/src/clmc-webservice/clmcservice/aggregator.py b/src/clmc-webservice/clmcservice/aggregator.py index 8df70bca515293aaba3fe169d09f0a150d586781..f6dd4073374e2fed4bf4f22393d9776e76f00351 100644 --- a/src/clmc-webservice/clmcservice/aggregator.py +++ b/src/clmc-webservice/clmcservice/aggregator.py @@ -18,7 +18,7 @@ ## the software. ## ## Created By : Nikolay Stanchev -## Created Date : 15-05-2018 +## Created Date : 25-04-2018 ## Created for Project : FLAME """ @@ -32,13 +32,13 @@ import sys class Aggregator(object): """ - A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process + A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process. """ REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'E2EMetrics' # default database the aggregator uses - DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses - RETRY_PERIOD = 5 + DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses + RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): """ @@ -51,7 +51,6 @@ class Aggregator(object): # initialise a database client using the database url and the database name url_object = urlparse(database_url) - while True: try: self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) @@ -63,6 +62,10 @@ class Aggregator(object): self.db_name = database_name self.report_period = report_period + # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values + self.network_cache = {} + self.service_cache = {} + def run(self): """ Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. @@ -82,7 +85,7 @@ class Aggregator(object): while True: try: result = self.db_client.query( - 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( + 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( boundary_time_nano, current_time_nano)) break except: @@ -93,14 +96,18 @@ class Aggregator(object): # measurement = metadata[0] tags = metadata[1] - network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] + result = next(result_points) + network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] + self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} while True: try: - result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) + result = self.db_client.query( + 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format( + boundary_time_nano, current_time_nano)) break except: sleep(self.RETRY_PERIOD) @@ -109,50 +116,62 @@ class Aggregator(object): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] - service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) + result = next(result_points) + service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) + self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement for path in network_delays: # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr path_id, source, target = path - if target not in service_delays: + if target not in service_delays and target not in self.service_cache: # if not continue with the other network path reports continue e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, - "delay_service": None, "time": boundary_time} + "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time} e2e_arguments['path_ID'] = path_id - e2e_arguments['delay_forward'] = network_delays[path] + e2e_arguments['source_SFR'] = source + e2e_arguments['target_SFR'] = target + e2e_arguments['delay_forward'] = network_delays[path][0] + e2e_arguments['avg_bandwidth'] = network_delays[path][1] # reverse the path ID to get the network delay for the reversed path reversed_path = (path_id, target, source) - assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A - e2e_arguments['delay_reverse'] = network_delays[reversed_path] + if reversed_path in network_delays or reversed_path in self.network_cache: + # get the reverse delay, use the latest value if reported or the cache value + e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0] + else: + e2e_arguments['delay_reverse'] = None # get the response time of the media component connected to the target SFR - service_delay = service_delays[target] - response_time, endpoint, sf_instance = service_delay + service_delay = service_delays.get(target, self.service_cache.get(target)) + response_time, request_size, response_size, endpoint, sf_instance = service_delay # put these points in the e2e arguments dictionary e2e_arguments['delay_service'] = response_time + e2e_arguments['avg_request_size'] = request_size + e2e_arguments['avg_response_size'] = response_size e2e_arguments['endpoint'] = endpoint e2e_arguments['sf_instance'] = sf_instance # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row - if None not in e2e_arguments.items(): + if None not in e2e_arguments.values(): while True: try: self.db_client.write_points( generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], - e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], + e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], + e2e_arguments['delay_service'], + e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time'])) break except: sleep(self.RETRY_PERIOD) old_timestamp = current_time - # wait until {REPORT_PERIOD} seconds have passed + # wait until {report_period) seconds have passed while current_time < old_timestamp + self.report_period: sleep(1) current_time = int(time()) diff --git a/src/clmc-webservice/clmcservice/utilities.py b/src/clmc-webservice/clmcservice/utilities.py index 7f7fa7522f6cf7c30a4dd2ba3326210bfd539b04..44023545c3a2ec9798a257978620ef4751f42a56 100644 --- a/src/clmc-webservice/clmcservice/utilities.py +++ b/src/clmc-webservice/clmcservice/utilities.py @@ -98,9 +98,9 @@ def validate_action_content(content): return content -def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time): +def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time): """ - Generates a combined averaged measurement about the e2e delay and its contributing parts. + Generates a combined averaged measurement about the e2e delay and its contributing parts :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path :param source_sfr: source service router @@ -110,6 +110,9 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst :param delay_forward: Path delay (Forward direction) :param delay_reverse: Path delay (Reverse direction) :param delay_service: the media service component response time + :param avg_request_size: averaged request size + :param avg_response_size: averaged response size + :param avg_bandwidth: averaged bandwidth :param time: measurement timestamp :return: a list of dict-formatted reports to post on influx """ @@ -125,7 +128,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst "fields": { "delay_forward": float(delay_forward), "delay_reverse": float(delay_reverse), - "delay_service": float(delay_service) + "delay_service": float(delay_service), + "avg_request_size": float(avg_request_size), + "avg_response_size": float(avg_response_size), + "avg_bandwidth": float(avg_bandwidth) }, "time": int(1000000000*time) }]