#!/usr/bin/python3 """ ## © University of Southampton IT Innovation Centre, 2018 ## ## Copyright in this software belongs to University of Southampton ## IT Innovation Centre of Gamma House, Enterprise Road, ## Chilworth Science Park, Southampton, SO16 7NS, UK. ## ## This software may not be used, sold, licensed, transferred, copied ## or reproduced in whole or in part in any manner or form or in or ## on any media by any person other than in accordance with the terms ## of the Licence Agreement supplied with the software, or otherwise ## without the prior written consent of the copyright owners. ## ## This software is distributed WITHOUT ANY WARRANTY, without even the ## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ## PURPOSE, except where stated in the Licence Agreement supplied with ## the software. ## ## Created By : Nikolay Stanchev ## Created Date : 25-04-2018 ## Created for Project : FLAME """ from threading import Event from influxdb import InfluxDBClient from time import time, sleep from urllib.parse import urlparse from clmcservice.utilities import generate_e2e_delay_report import getopt import sys class Aggregator(object): """ A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process. """ REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'E2EMetrics' # default database the aggregator uses DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): """ Constructs an Aggregator instance. :param database_name: database name to use :param database_url: database url to use :param report_period: the report period in seconds """ # initialise a database client using the database url and the database name print("Creating InfluxDB Connection") url_object = urlparse(database_url) while True: try: self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) break except: sleep(self.RETRY_PERIOD) self.db_url = database_url self.db_name = database_name self.report_period = report_period # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values self.network_cache = {} self.service_cache = {} # a stop flag event object used to handle the stopping of the process self._stop_flag = Event() def stop(self): """ Stop the aggregator from running. """ self._stop_flag.set() def run(self): """ Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. """ print("Running aggregator") current_time = int(time()) while not self._stop_flag.is_set(): boundary_time = current_time - self.report_period boundary_time_nano = boundary_time * 1000000000 current_time_nano = current_time * 1000000000 # query the network delays and group them by path ID network_delays = {} while True: try: print("Query for network delays") result = self.db_client.query( 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "{0}"."autogen"."network_delays" WHERE time >= {1} and time < {2} GROUP BY path, source, target'.format(self.db_name, boundary_time_nano, current_time_nano)) break except Exception as e: print("Exception getting network delay") print(e) sleep(self.RETRY_PERIOD) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] result = next(result_points) network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth'] # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} while True: try: print("Query for service delays") result = self.db_client.query( 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name, boundary_time_nano, current_time_nano)) break except Exception as e: print("Exception getting service delay") print(e) sleep(self.RETRY_PERIOD) for item in result.items(): metadata, result_points = item # measurement = metadata[0] tags = metadata[1] result = next(result_points) service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance']) # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement for path in network_delays: # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr path_id, source, target = path if target not in service_delays and target not in self.service_cache: # if not continue with the other network path reports continue e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time} e2e_arguments['path_ID'] = path_id e2e_arguments['source_SFR'] = source e2e_arguments['target_SFR'] = target e2e_arguments['delay_forward'] = network_delays[path][0] e2e_arguments['avg_bandwidth'] = network_delays[path][1] # reverse the path ID to get the network delay for the reversed path reversed_path = (path_id, target, source) if reversed_path in network_delays or reversed_path in self.network_cache: # get the reverse delay, use the latest value if reported or the cache value e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0] else: e2e_arguments['delay_reverse'] = None # get the response time of the media component connected to the target SFR service_delay = service_delays.get(target, self.service_cache.get(target)) response_time, request_size, response_size, endpoint, sf_instance = service_delay # put these points in the e2e arguments dictionary e2e_arguments['delay_service'] = response_time e2e_arguments['avg_request_size'] = request_size e2e_arguments['avg_response_size'] = response_size e2e_arguments['endpoint'] = endpoint e2e_arguments['sf_instance'] = sf_instance # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row if None not in e2e_arguments.values(): while True: try: self.db_client.write_points( generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time'])) break except: sleep(self.RETRY_PERIOD) old_timestamp = current_time # wait until {report_period) seconds have passed while current_time < old_timestamp + self.report_period: sleep(1) current_time = int(time()) if __name__ == '__main__': # Parse command line options try: opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url=']) arg_period = Aggregator.REPORT_PERIOD arg_database_name = Aggregator.DATABASE arg_database_url = Aggregator.DATABASE_URL # Apply parameters if given for opt, arg in opts: if opt in ('-p', '--period'): arg_period = int(arg) elif opt in ('-d', '--database'): arg_database_name = arg elif opt in ('-u', '--url'): arg_database_url = arg Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run() # print the error messages in case of a parse error except getopt.GetoptError as err: print(err) print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')