Skip to content
Snippets Groups Projects
aggregator.py 10.63 KiB
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
##      Created By :            Nikolay Stanchev
##      Created Date :          25-04-2018
##      Created for Project :   FLAME
"""

from threading import Event
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from clmcservice.utilities import generate_e2e_delay_report
import getopt
import sys


class Aggregator(object):
    """
    A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process.
    """

    REPORT_PERIOD = 5  # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
    DATABASE = 'E2EMetrics'  # default database the aggregator uses
    DATABASE_URL = 'http://203.0.113.100:8086'  # default database URL the aggregator uses
    RETRY_PERIOD = 5  # number of seconds to wait before retrying connection/posting data to Influx

    def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
        """
        Constructs an Aggregator instance.

        :param database_name: database name to use
        :param database_url: database url to use
        :param report_period: the report period in seconds
        """

        # initialise a database client using the database url and the database name
        print("Creating InfluxDB Connection")
        url_object = urlparse(database_url)
        while True:
            try:
                self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
                break
            except:
                sleep(self.RETRY_PERIOD)

        self.db_url = database_url
        self.db_name = database_name
        self.report_period = report_period

        # a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
        self.network_cache = {}
        self.service_cache = {}

        # a stop flag event object used to handle the stopping of the process
        self._stop_flag = Event()

    def stop(self):
        """
        Stop the aggregator from running.
        """

        self._stop_flag.set()

    def run(self):
        """
        Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
        """

        print("Running aggregator")

        current_time = int(time())
        while not self._stop_flag.is_set():

            boundary_time = current_time - self.report_period

            boundary_time_nano = boundary_time * 1000000000
            current_time_nano = current_time * 1000000000

            # query the network delays and group them by path ID
            network_delays = {}

            while True:

                try:
                    print("Query for network delays")
                    result = self.db_client.query(
                        'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "{0}"."autogen"."network_delays" WHERE time >= {1} and time < {2} GROUP BY path, source, target'.format(self.db_name,
                            boundary_time_nano, current_time_nano))
                    break
                except Exception as e:
                    print("Exception getting network delay")
                    print(e)
                    sleep(self.RETRY_PERIOD)

            for item in result.items():
                metadata, result_points = item
                # measurement = metadata[0]
                tags = metadata[1]

                result = next(result_points)
                network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
                self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']

            # query the service delays and group them by endpoint, service function instance and sfr
            service_delays = {}

            while True:
                try:
                    print("Query for service delays")                    
                    result = self.db_client.query(
                        'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name,
                            boundary_time_nano, current_time_nano))
                    break
                except Exception as e:
                    print("Exception getting service delay")
                    print(e)
                    sleep(self.RETRY_PERIOD)

            for item in result.items():
                metadata, result_points = item
                # measurement = metadata[0]
                tags = metadata[1]
                result = next(result_points)
                service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
                self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])

            # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
            for path in network_delays:
                # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
                path_id, source, target = path
                if target not in service_delays and target not in self.service_cache:
                    # if not continue with the other network path reports
                    continue

                e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
                                 "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time}

                e2e_arguments['path_ID'] = path_id
                e2e_arguments['source_SFR'] = source
                e2e_arguments['target_SFR'] = target
                e2e_arguments['delay_forward'] = network_delays[path][0]
                e2e_arguments['avg_bandwidth'] = network_delays[path][1]

                # reverse the path ID to get the network delay for the reversed path
                reversed_path = (path_id, target, source)
                if reversed_path in network_delays or reversed_path in self.network_cache:
                    # get the reverse delay, use the latest value if reported or the cache value
                    e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0]
                else:
                    e2e_arguments['delay_reverse'] = None

                # get the response time of the media component connected to the target SFR
                service_delay = service_delays.get(target, self.service_cache.get(target))
                response_time, request_size, response_size, endpoint, sf_instance = service_delay
                # put these points in the e2e arguments dictionary
                e2e_arguments['delay_service'] = response_time
                e2e_arguments['avg_request_size'] = request_size
                e2e_arguments['avg_response_size'] = response_size
                e2e_arguments['endpoint'] = endpoint
                e2e_arguments['sf_instance'] = sf_instance

                # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
                if None not in e2e_arguments.values():

                    while True:
                        try:
                            self.db_client.write_points(
                                generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
                                                          e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'],
                                                          e2e_arguments['delay_service'],
                                                          e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'],
                                                          e2e_arguments['time']))
                            break
                        except:
                            sleep(self.RETRY_PERIOD)

            old_timestamp = current_time
            # wait until {report_period) seconds have passed
            while current_time < old_timestamp + self.report_period:
                sleep(1)
                current_time = int(time())


if __name__ == '__main__':

    # Parse command line options
    try:
        opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url='])

        arg_period = Aggregator.REPORT_PERIOD
        arg_database_name = Aggregator.DATABASE
        arg_database_url = Aggregator.DATABASE_URL

        # Apply parameters if given
        for opt, arg in opts:
            if opt in ('-p', '--period'):
                arg_period = int(arg)
            elif opt in ('-d', '--database'):
                arg_database_name = arg
            elif opt in ('-u', '--url'):
                arg_database_url = arg

        Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run()

    # print the error messages in case of a parse error
    except getopt.GetoptError as err:
        print(err)
        print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')