From b378417766d577d4f0db129e32559c5aa20b8717 Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Mon, 21 May 2018 09:09:15 +0100 Subject: [PATCH] Aggregator update to test the clmc service --- src/clmc-webservice/clmcservice/aggregator.py | 342 ++++++++++-------- 1 file changed, 184 insertions(+), 158 deletions(-) diff --git a/src/clmc-webservice/clmcservice/aggregator.py b/src/clmc-webservice/clmcservice/aggregator.py index 57c9cbd..3c7974f 100644 --- a/src/clmc-webservice/clmcservice/aggregator.py +++ b/src/clmc-webservice/clmcservice/aggregator.py @@ -1,158 +1,184 @@ -# #!/usr/bin/python3 -# """ -# ## © University of Southampton IT Innovation Centre, 2018 -# ## -# ## Copyright in this software belongs to University of Southampton -# ## IT Innovation Centre of Gamma House, Enterprise Road, -# ## Chilworth Science Park, Southampton, SO16 7NS, UK. -# ## -# ## This software may not be used, sold, licensed, transferred, copied -# ## or reproduced in whole or in part in any manner or form or in or -# ## on any media by any person other than in accordance with the terms -# ## of the Licence Agreement supplied with the software, or otherwise -# ## without the prior written consent of the copyright owners. -# ## -# ## This software is distributed WITHOUT ANY WARRANTY, without even the -# ## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR -# ## PURPOSE, except where stated in the Licence Agreement supplied with -# ## the software. -# ## -# ## Created By : Nikolay Stanchev -# ## Created Date : 15-05-2018 -# ## Created for Project : FLAME -# """ -# -# from influxdb import InfluxDBClient -# from time import time, sleep -# from urllib.parse import urlparse -# import getopt -# import sys -# import clmctest.monitoring.LineProtocolGenerator as lp -# -# -# class Aggregator(object): -# """ -# A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process -# """ -# -# REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated -# DATABASE = 'E2EMetrics' # default database the aggregator uses -# DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses -# -# def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): -# """ -# Constructs an Aggregator instance. -# -# :param database_name: database name to use -# :param database_url: database url to use -# :param report_period: the report period in seconds -# """ -# -# # initialise a database client using the database url and the database name -# url_object = urlparse(database_url) -# self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) -# -# self.db_url = database_url -# self.db_name = database_name -# self.report_period = report_period -# -# def run(self): -# """ -# Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. -# """ -# -# current_time = int(time()) -# while True: -# -# boundary_time = current_time - self.report_period -# -# boundary_time_nano = boundary_time * 1000000000 -# current_time_nano = current_time * 1000000000 -# -# # query the network delays and group them by path ID -# network_delays = {} -# result = self.db_client.query( -# 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( -# boundary_time_nano, current_time_nano)) -# for item in result.items(): -# metadata, result_points = item -# # measurement = metadata[0] -# tags = metadata[1] -# -# network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] -# -# # query the service delays and group them by endpoint, service function instance and sfr -# service_delays = {} -# result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) -# for item in result.items(): -# metadata, result_points = item -# # measurement = metadata[0] -# tags = metadata[1] -# service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) -# -# # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement -# for path in network_delays: -# # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr -# path_id, source, target = path -# if target not in service_delays: -# # if not continue with the other network path reports -# continue -# -# e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, -# "delay_service": None, "time": boundary_time} -# -# e2e_arguments['path_ID'] = path_id -# e2e_arguments['delay_forward'] = network_delays[path] -# -# # reverse the path ID to get the network delay for the reversed path -# reversed_path = (path_id, target, source) -# assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A -# e2e_arguments['delay_reverse'] = network_delays[reversed_path] -# -# # get the response time of the media component connected to the target SFR -# service_delay = service_delays[target] -# response_time, endpoint, sf_instance = service_delay -# # put these points in the e2e arguments dictionary -# e2e_arguments['delay_service'] = response_time -# e2e_arguments['endpoint'] = endpoint -# e2e_arguments['sf_instance'] = sf_instance -# -# # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row -# if None not in e2e_arguments.items(): -# self.db_client.write_points( -# lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], -# e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], -# e2e_arguments['time'])) -# -# old_timestamp = current_time -# # wait until {REPORT_PERIOD} seconds have passed -# while current_time != old_timestamp + self.report_period: -# sleep(1) -# current_time = int(time()) -# -# -# if __name__ == '__main__': -# -# # Parse command line options -# try: -# opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url=']) -# -# arg_period = Aggregator.REPORT_PERIOD -# arg_database_name = Aggregator.DATABASE -# arg_database_url = Aggregator.DATABASE_URL -# -# # Apply parameters if given -# for opt, arg in opts: -# if opt in ('-p', '--period'): -# arg_period = int(arg) -# elif opt in ('-d', '--database'): -# arg_database_name = arg -# elif opt in ('-u', '--url'): -# arg_database_url = arg -# -# Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run() -# -# # print the error messages in case of a parse error -# except getopt.GetoptError as err: -# print(err) -# print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>') +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Nikolay Stanchev +## Created Date : 15-05-2018 +## Created for Project : FLAME +""" + +from influxdb import InfluxDBClient +from time import time, sleep +from urllib.parse import urlparse +from clmcservice.utilities import generate_e2e_delay_report +import getopt +import sys + + +class Aggregator(object): + """ + A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process + """ + + REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated + DATABASE = 'E2EMetrics' # default database the aggregator uses + DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses + + def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): + """ + Constructs an Aggregator instance. + + :param database_name: database name to use + :param database_url: database url to use + :param report_period: the report period in seconds + """ + + # initialise a database client using the database url and the database name + url_object = urlparse(database_url) + + while True: + try: + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) + break + except: + sleep(2) + + self.db_url = database_url + self.db_name = database_name + self.report_period = report_period + + def run(self): + """ + Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. + """ + + current_time = int(time()) + while True: + + boundary_time = current_time - self.report_period + + boundary_time_nano = boundary_time * 1000000000 + current_time_nano = current_time * 1000000000 + + # query the network delays and group them by path ID + network_delays = {} + + while True: + try: + result = self.db_client.query( + 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( + boundary_time_nano, current_time_nano)) + break + except: + sleep(2) + + for item in result.items(): + metadata, result_points = item + # measurement = metadata[0] + tags = metadata[1] + + network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] + + # query the service delays and group them by endpoint, service function instance and sfr + service_delays = {} + + while True: + try: + result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) + break + except: + sleep(2) + + for item in result.items(): + metadata, result_points = item + # measurement = metadata[0] + tags = metadata[1] + service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) + + # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement + for path in network_delays: + # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr + path_id, source, target = path + if target not in service_delays: + # if not continue with the other network path reports + continue + + e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, + "delay_service": None, "time": boundary_time} + + e2e_arguments['path_ID'] = path_id + e2e_arguments['delay_forward'] = network_delays[path] + + # reverse the path ID to get the network delay for the reversed path + reversed_path = (path_id, target, source) + assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A + e2e_arguments['delay_reverse'] = network_delays[reversed_path] + + # get the response time of the media component connected to the target SFR + service_delay = service_delays[target] + response_time, endpoint, sf_instance = service_delay + # put these points in the e2e arguments dictionary + e2e_arguments['delay_service'] = response_time + e2e_arguments['endpoint'] = endpoint + e2e_arguments['sf_instance'] = sf_instance + + # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row + if None not in e2e_arguments.items(): + + while True: + try: + self.db_client.write_points( + generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], + e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], + e2e_arguments['time'])) + break + except: + sleep(2) + + old_timestamp = current_time + # wait until {REPORT_PERIOD} seconds have passed + while current_time != old_timestamp + self.report_period: + sleep(1) + current_time = int(time()) + + +if __name__ == '__main__': + + # Parse command line options + try: + opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url=']) + + arg_period = Aggregator.REPORT_PERIOD + arg_database_name = Aggregator.DATABASE + arg_database_url = Aggregator.DATABASE_URL + + # Apply parameters if given + for opt, arg in opts: + if opt in ('-p', '--period'): + arg_period = int(arg) + elif opt in ('-d', '--database'): + arg_database_name = arg + elif opt in ('-u', '--url'): + arg_database_url = arg + + Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run() + + # print the error messages in case of a parse error + except getopt.GetoptError as err: + print(err) + print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>') -- GitLab