aggregator.py 10.63 KiB
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from threading import Event
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from clmcservice.utilities import generate_e2e_delay_report
import getopt
import sys
class Aggregator(object):
"""
A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process.
"""
REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx
def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database_name: database name to use
:param database_url: database url to use
:param report_period: the report period in seconds
"""
# initialise a database client using the database url and the database name
print("Creating InfluxDB Connection")
url_object = urlparse(database_url)
while True:
try:
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
break
except:
sleep(self.RETRY_PERIOD)
self.db_url = database_url
self.db_name = database_name
self.report_period = report_period
# a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
self.network_cache = {}
self.service_cache = {}
# a stop flag event object used to handle the stopping of the process
self._stop_flag = Event()
def stop(self):
"""
Stop the aggregator from running.
"""
self._stop_flag.set()
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
print("Running aggregator")
current_time = int(time())
while not self._stop_flag.is_set():
boundary_time = current_time - self.report_period
boundary_time_nano = boundary_time * 1000000000
current_time_nano = current_time * 1000000000
# query the network delays and group them by path ID
network_delays = {}
while True:
try:
print("Query for network delays")
result = self.db_client.query(
'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "{0}"."autogen"."network_delays" WHERE time >= {1} and time < {2} GROUP BY path, source, target'.format(self.db_name,
boundary_time_nano, current_time_nano))
break
except Exception as e:
print("Exception getting network delay")
print(e)
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
# query the service delays and group them by endpoint, service function instance and sfr
service_delays = {}
while True:
try:
print("Query for service delays")
result = self.db_client.query(
'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name,
boundary_time_nano, current_time_nano))
break
except Exception as e:
print("Exception getting service delay")
print(e)
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path
if target not in service_delays and target not in self.service_cache:
# if not continue with the other network path reports
continue
e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time}
e2e_arguments['path_ID'] = path_id
e2e_arguments['source_SFR'] = source
e2e_arguments['target_SFR'] = target
e2e_arguments['delay_forward'] = network_delays[path][0]
e2e_arguments['avg_bandwidth'] = network_delays[path][1]
# reverse the path ID to get the network delay for the reversed path
reversed_path = (path_id, target, source)
if reversed_path in network_delays or reversed_path in self.network_cache:
# get the reverse delay, use the latest value if reported or the cache value
e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0]
else:
e2e_arguments['delay_reverse'] = None
# get the response time of the media component connected to the target SFR
service_delay = service_delays.get(target, self.service_cache.get(target))
response_time, request_size, response_size, endpoint, sf_instance = service_delay
# put these points in the e2e arguments dictionary
e2e_arguments['delay_service'] = response_time
e2e_arguments['avg_request_size'] = request_size
e2e_arguments['avg_response_size'] = response_size
e2e_arguments['endpoint'] = endpoint
e2e_arguments['sf_instance'] = sf_instance
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.values():
while True:
try:
self.db_client.write_points(
generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'],
e2e_arguments['delay_service'],
e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'],
e2e_arguments['time']))
break
except:
sleep(self.RETRY_PERIOD)
old_timestamp = current_time
# wait until {report_period) seconds have passed
while current_time < old_timestamp + self.report_period:
sleep(1)
current_time = int(time())
if __name__ == '__main__':
# Parse command line options
try:
opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url='])
arg_period = Aggregator.REPORT_PERIOD
arg_database_name = Aggregator.DATABASE
arg_database_url = Aggregator.DATABASE_URL
# Apply parameters if given
for opt, arg in opts:
if opt in ('-p', '--period'):
arg_period = int(arg)
elif opt in ('-d', '--database'):
arg_database_name = arg
elif opt in ('-u', '--url'):
arg_database_url = arg
Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run()
# print the error messages in case of a parse error
except getopt.GetoptError as err:
print(err)
print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')