Skip to content
Snippets Groups Projects
Commit b3784177 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Aggregator update to test the clmc service

parent 7a8542bc
No related branches found
No related tags found
No related merge requests found
# #!/usr/bin/python3
# """
# ## © University of Southampton IT Innovation Centre, 2018
# ##
# ## Copyright in this software belongs to University of Southampton
# ## IT Innovation Centre of Gamma House, Enterprise Road,
# ## Chilworth Science Park, Southampton, SO16 7NS, UK.
# ##
# ## This software may not be used, sold, licensed, transferred, copied
# ## or reproduced in whole or in part in any manner or form or in or
# ## on any media by any person other than in accordance with the terms
# ## of the Licence Agreement supplied with the software, or otherwise
# ## without the prior written consent of the copyright owners.
# ##
# ## This software is distributed WITHOUT ANY WARRANTY, without even the
# ## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# ## PURPOSE, except where stated in the Licence Agreement supplied with
# ## the software.
# ##
# ## Created By : Nikolay Stanchev
# ## Created Date : 15-05-2018
# ## Created for Project : FLAME
# """
#
# from influxdb import InfluxDBClient
# from time import time, sleep
# from urllib.parse import urlparse
# import getopt
# import sys
# import clmctest.monitoring.LineProtocolGenerator as lp
#
#
# class Aggregator(object):
# """
# A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process
# """
#
# REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
# DATABASE = 'E2EMetrics' # default database the aggregator uses
# DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses
#
# def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
# """
# Constructs an Aggregator instance.
#
# :param database_name: database name to use
# :param database_url: database url to use
# :param report_period: the report period in seconds
# """
#
# # initialise a database client using the database url and the database name
# url_object = urlparse(database_url)
# self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
#
# self.db_url = database_url
# self.db_name = database_name
# self.report_period = report_period
#
# def run(self):
# """
# Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
# """
#
# current_time = int(time())
# while True:
#
# boundary_time = current_time - self.report_period
#
# boundary_time_nano = boundary_time * 1000000000
# current_time_nano = current_time * 1000000000
#
# # query the network delays and group them by path ID
# network_delays = {}
# result = self.db_client.query(
# 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
# boundary_time_nano, current_time_nano))
# for item in result.items():
# metadata, result_points = item
# # measurement = metadata[0]
# tags = metadata[1]
#
# network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay']
#
# # query the service delays and group them by endpoint, service function instance and sfr
# service_delays = {}
# result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
# for item in result.items():
# metadata, result_points = item
# # measurement = metadata[0]
# tags = metadata[1]
# service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance'])
#
# # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
# for path in network_delays:
# # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
# path_id, source, target = path
# if target not in service_delays:
# # if not continue with the other network path reports
# continue
#
# e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
# "delay_service": None, "time": boundary_time}
#
# e2e_arguments['path_ID'] = path_id
# e2e_arguments['delay_forward'] = network_delays[path]
#
# # reverse the path ID to get the network delay for the reversed path
# reversed_path = (path_id, target, source)
# assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A
# e2e_arguments['delay_reverse'] = network_delays[reversed_path]
#
# # get the response time of the media component connected to the target SFR
# service_delay = service_delays[target]
# response_time, endpoint, sf_instance = service_delay
# # put these points in the e2e arguments dictionary
# e2e_arguments['delay_service'] = response_time
# e2e_arguments['endpoint'] = endpoint
# e2e_arguments['sf_instance'] = sf_instance
#
# # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
# if None not in e2e_arguments.items():
# self.db_client.write_points(
# lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
# e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
# e2e_arguments['time']))
#
# old_timestamp = current_time
# # wait until {REPORT_PERIOD} seconds have passed
# while current_time != old_timestamp + self.report_period:
# sleep(1)
# current_time = int(time())
#
#
# if __name__ == '__main__':
#
# # Parse command line options
# try:
# opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url='])
#
# arg_period = Aggregator.REPORT_PERIOD
# arg_database_name = Aggregator.DATABASE
# arg_database_url = Aggregator.DATABASE_URL
#
# # Apply parameters if given
# for opt, arg in opts:
# if opt in ('-p', '--period'):
# arg_period = int(arg)
# elif opt in ('-d', '--database'):
# arg_database_name = arg
# elif opt in ('-u', '--url'):
# arg_database_url = arg
#
# Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run()
#
# # print the error messages in case of a parse error
# except getopt.GetoptError as err:
# print(err)
# print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 15-05-2018
## Created for Project : FLAME
"""
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from clmcservice.utilities import generate_e2e_delay_report
import getopt
import sys
class Aggregator(object):
"""
A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process
"""
REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses
def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database_name: database name to use
:param database_url: database url to use
:param report_period: the report period in seconds
"""
# initialise a database client using the database url and the database name
url_object = urlparse(database_url)
while True:
try:
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
break
except:
sleep(2)
self.db_url = database_url
self.db_name = database_name
self.report_period = report_period
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
current_time = int(time())
while True:
boundary_time = current_time - self.report_period
boundary_time_nano = boundary_time * 1000000000
current_time_nano = current_time * 1000000000
# query the network delays and group them by path ID
network_delays = {}
while True:
try:
result = self.db_client.query(
'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano))
break
except:
sleep(2)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay']
# query the service delays and group them by endpoint, service function instance and sfr
service_delays = {}
while True:
try:
result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
break
except:
sleep(2)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path
if target not in service_delays:
# if not continue with the other network path reports
continue
e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "time": boundary_time}
e2e_arguments['path_ID'] = path_id
e2e_arguments['delay_forward'] = network_delays[path]
# reverse the path ID to get the network delay for the reversed path
reversed_path = (path_id, target, source)
assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A
e2e_arguments['delay_reverse'] = network_delays[reversed_path]
# get the response time of the media component connected to the target SFR
service_delay = service_delays[target]
response_time, endpoint, sf_instance = service_delay
# put these points in the e2e arguments dictionary
e2e_arguments['delay_service'] = response_time
e2e_arguments['endpoint'] = endpoint
e2e_arguments['sf_instance'] = sf_instance
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.items():
while True:
try:
self.db_client.write_points(
generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
e2e_arguments['time']))
break
except:
sleep(2)
old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed
while current_time != old_timestamp + self.report_period:
sleep(1)
current_time = int(time())
if __name__ == '__main__':
# Parse command line options
try:
opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url='])
arg_period = Aggregator.REPORT_PERIOD
arg_database_name = Aggregator.DATABASE
arg_database_url = Aggregator.DATABASE_URL
# Apply parameters if given
for opt, arg in opts:
if opt in ('-p', '--period'):
arg_period = int(arg)
elif opt in ('-d', '--database'):
arg_database_name = arg
elif opt in ('-u', '--url'):
arg_database_url = arg
Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run()
# print the error messages in case of a parse error
except getopt.GetoptError as err:
print(err)
print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment