From ae7d576e518ad26664b17d2579af5147cc05aa7f Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Thu, 26 Apr 2018 11:13:10 +0100 Subject: [PATCH] Issue #67 - updated the E2E simulator and implemented the first version of the aggregation script --- clmctest/monitoring/E2EAggregator.py | 166 +++++++++++++++++++ clmctest/monitoring/E2ESim.py | 26 ++- clmctest/monitoring/LineProtocolGenerator.py | 44 ++++- clmctest/monitoring/conftest.py | 28 +++- clmctest/monitoring/test_e2eresults.py | 21 ++- 5 files changed, 266 insertions(+), 19 deletions(-) create mode 100644 clmctest/monitoring/E2EAggregator.py diff --git a/clmctest/monitoring/E2EAggregator.py b/clmctest/monitoring/E2EAggregator.py new file mode 100644 index 0000000..d0ee8cb --- /dev/null +++ b/clmctest/monitoring/E2EAggregator.py @@ -0,0 +1,166 @@ +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Nikolay Stanchev +## Created Date : 25-04-2018 +## Created for Project : FLAME +""" + +from influxdb import InfluxDBClient +from time import time, sleep +from urllib.parse import urlparse +from threading import Thread, Event +import clmctest.monitoring.LineProtocolGenerator as lp + + +class Aggregator(Thread): + """ + A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread, + so that the implementation can be tested using pytest. + """ + + PATH_SEPARATOR = "---" # currently, the path id is assumed to be in the format "SourceEndpointID---TargetEndpointID" + REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated + DATABASE = 'E2EMetrics' # default database the aggregator uses + DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses + + def __init__(self, database=DATABASE, database_url=DATABASE_URL): + """ + Constructs an Aggregator instance. + + :param database: database name to use + :param database_url: database url to use + """ + + super(Aggregator, self).__init__() # call the constructor of the tread + + # initialise a database client using the database url and the database name + url_object = urlparse(database_url) + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10) + + self.db_url = database_url + self.db_name = database + + # a stop flag event object used to handle the killing of the thread + self._stop_flag = Event() + + def stop(self): + """ + A method used to stop the thread. + """ + + self._stop_flag.set() + + def run(self): + """ + Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. + """ + + current_time = int(time()) + while True: + if self._stop_flag.is_set(): + break + + boundary_time = current_time - Aggregator.REPORT_PERIOD + + boundary_time_nano = boundary_time * 1000000000 + current_time_nano = current_time * 1000000000 + + network_delays = {} + result = self.db_client.query( + 'SELECT mean(delay) as "Dnet" FROM "E2EMetrics"."autogen".network_delays WHERE time >= {0} and time < {1} GROUP BY path'.format( + boundary_time_nano, current_time_nano)) + for item in result.items(): + metadata, result_points = item + # measurement = metadata[0] + tags = metadata[1] + + network_delays[tags['path']] = next(result_points)['Dnet'] + + service_delays = {} + result = self.db_client.query( + 'SELECT mean(response_time) as "Dresponse" FROM "E2EMetrics"."autogen".service_delays WHERE time >= {0} and time < {1} GROUP BY FQDN, sf_instance, endpoint'.format( + boundary_time_nano, current_time_nano)) + for item in result.items(): + metadata, result_points = item + # measurement = metadata[0] + tags = metadata[1] + + if tags['endpoint'] not in service_delays: + service_delays[tags['endpoint']] = [(next(result_points)['Dresponse'], tags['FQDN'], tags['sf_instance'])] + else: + service_delays[tags['endpoint']].append((next(result_points)['Dresponse'], tags['FQDN'], tags['sf_instnace'])) + + for path in network_delays: + target_endpoint = self.get_target_endpoint(path) + if target_endpoint not in service_delays: + continue + + e2e_arguments = {"path_id_f": None, "path_id_r": None, "fqdn": None, "sf_instance": None, "delay_path_f": None, "delay_path_r": None, + "delay_service": None, "time": boundary_time} + + e2e_arguments['path_id_f'] = path + e2e_arguments['delay_path_f'] = network_delays[path] + + reversed_path = self.reverse_path_id(path) + assert reversed_path in network_delays + e2e_arguments['path_id_r'] = reversed_path + e2e_arguments['delay_path_r'] = network_delays[reversed_path] + + for service_delay in service_delays[target_endpoint]: + response_time, fqdn, sf_instance = service_delay + e2e_arguments['delay_service'] = response_time + e2e_arguments['fqdn'] = fqdn + e2e_arguments['sf_isntnace'] = sf_instance + + if None not in e2e_arguments.items(): + self.db_client.write_points( + lp.generate_e2e_delay_report(e2e_arguments['path_id_f'], e2e_arguments['path_id_r'], e2e_arguments['fqdn'], e2e_arguments['sf_isntnace'], + e2e_arguments['delay_path_f'], e2e_arguments['delay_path_r'], e2e_arguments['delay_service'], e2e_arguments['time'])) + + old_timestamp = current_time + while current_time != old_timestamp + 5: + sleep(1) + current_time = int(time()) + + @staticmethod + def reverse_path_id(path): + """ + Reverses a path identifier. + :param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID" + :return: the reversed path ID, e.g. "TargetEndpointID---SourceEndpointID" + """ + + source_endpoint, target_endpoint = path.split(Aggregator.PATH_SEPARATOR) + return "{0}{1}{2}".format(target_endpoint, Aggregator.PATH_SEPARATOR, source_endpoint) + + @staticmethod + def get_target_endpoint(path): + """ + Get a target endpoint by parsing a path identifier. + + :param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID" + :return: the target endpoint retrieved from the path identifier. + """ + + return path.split(Aggregator.PATH_SEPARATOR)[1] + + +if __name__ == '__main__': + Aggregator().start() diff --git a/clmctest/monitoring/E2ESim.py b/clmctest/monitoring/E2ESim.py index b413f6b..86f9d6e 100644 --- a/clmctest/monitoring/E2ESim.py +++ b/clmctest/monitoring/E2ESim.py @@ -37,8 +37,8 @@ class Simulator(object): Simulator used to generate E2E measurements. """ - DATABASE = 'E2EMetrics' - DATABASE_URL = 'http://203.0.113.100:8086' + DATABASE = 'E2EMetrics' # default database name + DATABASE_URL = 'http://203.0.113.100:8086' # default database url TICK = 1 # a simulation tick represents 1s SIMULATION_LENGTH = 120 # simulation time in seconds @@ -66,37 +66,29 @@ class Simulator(object): self.db_client.drop_database(self.db_name) self.db_client.create_database(self.db_name) - self._set_continuous_query() - - def _set_continuous_query(self, sample_period=5, period='s'): - sample_period = "{0}{1}".format(sample_period, period) - query = 'CREATE CONTINUOUS QUERY aggregate_query ON {0} BEGIN SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse" INTO "E2EMetrics"."autogen"."e2e_delays" FROM "E2EMetrics"."autogen"."network_delays", "E2EMetrics"."autogen"."service_delays" GROUP BY time({1}) END'\ - .format(self.db_name, sample_period) - - self.db_client.query(query) def run(self): """ Runs the simulation. """ - # all network delays start from 1 + # all network delays start from 1ms, the dictionary stores the information to report ip_endpoints = [ {'agent_id': 'endpoint1.ms-A.ict-flame.eu', 'paths': [{ 'target': 'endpoint2.ms-A.ict-flame.eu', - 'path_id': 'endpoint1-endpoint2', + 'path_id': 'endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu', 'network_delay': 1 }]}, {'agent_id': 'endpoint2.ms-A.ict-flame.eu', 'paths': [{ 'target': 'endpoint1.ms-A.ict-flame.eu', - 'path_id': 'endpoint2-endpoint1', + 'path_id': 'endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu', 'network_delay': 1 }]} ] - # current time in nanoseconds (to test the continuous query we write influx data points related to future time), so we start from the current time + # current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time start_time = int(time.time()) sim_time = start_time @@ -128,7 +120,8 @@ class Simulator(object): # measure service response time every 5 seconds if i % sample_period_media == 0: - self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, sim_time)) + self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "ms-A.ict-flame.eu", + "test-sf-clmc-agent-build_INSTANCE", "endpoint2.ms-A.ict-flame.eu", sim_time)) # increase/decrease the delay in every sample report (min delay is 10) mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)])) @@ -136,6 +129,9 @@ class Simulator(object): # increase the time by one simulation tick sim_time += self.TICK + end_time = sim_time + print("Start time: {0}, End time: {1}".format(start_time, end_time)) + if __name__ == "__main__": Simulator().run() diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 59f7610..7757b25 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -29,6 +29,39 @@ import uuid from random import randint +def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_path_f, delay_path_r, delay_service, time): + """ + Generates a combined averaged measurement about the e2e delay and its contributing parts + + :param path_id_f: The forward path identifier, e.g. endpoint1---endpoint2 + :param path_id_r: The reverse path identifier, e.g. endpoint2---endpoint1 + :param fqdn: FQDN of the media service + :param sf_instance: service function instance + :param delay_path_f: Path delay (Forward direction) + :param delay_path_r: Path delay (Reverse direction) + :param delay_service: the media service response time + :param time: measurement time + :return: a list of dict-formatted reports to post on influx + """ + + result = [{"measurement": "e2e_delays", + "tags": { + "pathID_F": path_id_f, + "pathID_R": path_id_r, + "FQDN": fqdn, + "sf_instance": sf_instance + }, + "fields": { + "D_path_F": float(delay_path_f), + "D_path_R": float(delay_path_r), + "D_service": float(delay_service) + }, + "time": _getNSTime(time) + }] + + return result + + def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time): """ Generates a platform measurement about the network delay between two specific endpoints. @@ -56,16 +89,24 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e return result -def generate_service_delay_report(response_time, time): +def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, time): """ Generates a service measurement about the media service response time. :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service) + :param fqdn: FQDN of the media service + :param sf_instance: service function instance + :param endpoint: endpoint ID :param time: the measurement time :return: a list of dict-formatted reports to post on influx """ result = [{"measurement": "service_delays", + "tags": { + "FQDN": fqdn, + "sf_instance": sf_instance, + "endpoint": endpoint + }, "fields": { "response_time": response_time, }, @@ -171,6 +212,7 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta return result + def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ): """ generates a measurement line for a media component configuration state diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py index 0af5663..72791ee 100644 --- a/clmctest/monitoring/conftest.py +++ b/clmctest/monitoring/conftest.py @@ -28,7 +28,7 @@ import pkg_resources from influxdb import InfluxDBClient from clmctest.monitoring.StreamingSim import Sim from clmctest.monitoring.E2ESim import Simulator - +from clmctest.monitoring.E2EAggregator import Aggregator @pytest.fixture(scope="module") @@ -61,6 +61,12 @@ def influx_db(streaming_sim_config, request): @pytest.fixture(scope="module") def simulator(streaming_sim_config): + """ + A fixture to obtain a simulator instance with the configuration parameters. + + :param streaming_sim_config: the configuration object + :return: an instance of the simulator + """ influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" influx_db_name = streaming_sim_config['hosts'][1]['database_name'] @@ -76,7 +82,27 @@ def simulator(streaming_sim_config): @pytest.fixture(scope="module") def e2e_simulator(streaming_sim_config): + """ + A fixture to obtain a simulator instance with the configuration parameters. + + :param streaming_sim_config: the configuration object + :return: an instance of the E2E simulator + """ influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" return Simulator(database_url=influx_url) + + +@pytest.fixture(scope="module") +def e2e_aggregator(streaming_sim_config): + """ + A fixture to obtain an instance of the Aggregator class with the configuration parameters. + + :param streaming_sim_config: the configuration object + :return: an instance of the Aggregator class + """ + + influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" + + return Aggregator(database_url=influx_url) diff --git a/clmctest/monitoring/test_e2eresults.py b/clmctest/monitoring/test_e2eresults.py index 95a0e12..7a5a864 100644 --- a/clmctest/monitoring/test_e2eresults.py +++ b/clmctest/monitoring/test_e2eresults.py @@ -23,6 +23,7 @@ """ import pytest +import random import time @@ -32,7 +33,19 @@ class TestE2ESimulation(object): """ @pytest.fixture(scope='class', autouse=True) - def run_simulator(self, e2e_simulator): + def run_simulator(self, e2e_simulator, e2e_aggregator): + """ + A fixture, which runs the simulation before running the tests. + + :param e2e_simulator: the simulator for the end-to-end data + :param e2e_aggregator: the aggregator which merges the network and service measurements + """ + + random.seed(0) # Seed random function so we can reliably test for average queries + + print("Starting aggregator...") + e2e_aggregator.start() + print("Running simulation, please wait...") e2e_simulator.run() @@ -40,13 +53,17 @@ class TestE2ESimulation(object): time.sleep(e2e_simulator.SIMULATION_LENGTH) # wait for data to finish arriving at the INFLUX database print("... simulation data fixture finished") + print("... stopping aggregator") + e2e_aggregator.stop() + + @pytest.mark.parametrize("query, expected_result", [ ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', {"time": "1970-01-01T00:00:00Z", "count_delay": 120}), ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', {"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', - {"time": "1970-01-01T00:00:00Z", "count_Dnet": 24, "count_Dresponse": 24}), + {"time": "1970-01-01T00:00:00Z", "count_D_path_F": 24, "count_D_path_R": 24, "count_D_service": 24}), ]) def test_simulation(self, influx_db, query, expected_result): """ -- GitLab