Skip to content
Snippets Groups Projects
Commit ae7d576e authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Issue #67 - updated the E2E simulator and implemented the first version of the aggregation script

parent e131768c
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from threading import Thread, Event
import clmctest.monitoring.LineProtocolGenerator as lp
class Aggregator(Thread):
"""
A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread,
so that the implementation can be tested using pytest.
"""
PATH_SEPARATOR = "---" # currently, the path id is assumed to be in the format "SourceEndpointID---TargetEndpointID"
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_url: database url to use
"""
super(Aggregator, self).__init__() # call the constructor of the tread
# initialise a database client using the database url and the database name
url_object = urlparse(database_url)
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10)
self.db_url = database_url
self.db_name = database
# a stop flag event object used to handle the killing of the thread
self._stop_flag = Event()
def stop(self):
"""
A method used to stop the thread.
"""
self._stop_flag.set()
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
current_time = int(time())
while True:
if self._stop_flag.is_set():
break
boundary_time = current_time - Aggregator.REPORT_PERIOD
boundary_time_nano = boundary_time * 1000000000
current_time_nano = current_time * 1000000000
network_delays = {}
result = self.db_client.query(
'SELECT mean(delay) as "Dnet" FROM "E2EMetrics"."autogen".network_delays WHERE time >= {0} and time < {1} GROUP BY path'.format(
boundary_time_nano, current_time_nano))
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
network_delays[tags['path']] = next(result_points)['Dnet']
service_delays = {}
result = self.db_client.query(
'SELECT mean(response_time) as "Dresponse" FROM "E2EMetrics"."autogen".service_delays WHERE time >= {0} and time < {1} GROUP BY FQDN, sf_instance, endpoint'.format(
boundary_time_nano, current_time_nano))
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
if tags['endpoint'] not in service_delays:
service_delays[tags['endpoint']] = [(next(result_points)['Dresponse'], tags['FQDN'], tags['sf_instance'])]
else:
service_delays[tags['endpoint']].append((next(result_points)['Dresponse'], tags['FQDN'], tags['sf_instnace']))
for path in network_delays:
target_endpoint = self.get_target_endpoint(path)
if target_endpoint not in service_delays:
continue
e2e_arguments = {"path_id_f": None, "path_id_r": None, "fqdn": None, "sf_instance": None, "delay_path_f": None, "delay_path_r": None,
"delay_service": None, "time": boundary_time}
e2e_arguments['path_id_f'] = path
e2e_arguments['delay_path_f'] = network_delays[path]
reversed_path = self.reverse_path_id(path)
assert reversed_path in network_delays
e2e_arguments['path_id_r'] = reversed_path
e2e_arguments['delay_path_r'] = network_delays[reversed_path]
for service_delay in service_delays[target_endpoint]:
response_time, fqdn, sf_instance = service_delay
e2e_arguments['delay_service'] = response_time
e2e_arguments['fqdn'] = fqdn
e2e_arguments['sf_isntnace'] = sf_instance
if None not in e2e_arguments.items():
self.db_client.write_points(
lp.generate_e2e_delay_report(e2e_arguments['path_id_f'], e2e_arguments['path_id_r'], e2e_arguments['fqdn'], e2e_arguments['sf_isntnace'],
e2e_arguments['delay_path_f'], e2e_arguments['delay_path_r'], e2e_arguments['delay_service'], e2e_arguments['time']))
old_timestamp = current_time
while current_time != old_timestamp + 5:
sleep(1)
current_time = int(time())
@staticmethod
def reverse_path_id(path):
"""
Reverses a path identifier.
:param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID"
:return: the reversed path ID, e.g. "TargetEndpointID---SourceEndpointID"
"""
source_endpoint, target_endpoint = path.split(Aggregator.PATH_SEPARATOR)
return "{0}{1}{2}".format(target_endpoint, Aggregator.PATH_SEPARATOR, source_endpoint)
@staticmethod
def get_target_endpoint(path):
"""
Get a target endpoint by parsing a path identifier.
:param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID"
:return: the target endpoint retrieved from the path identifier.
"""
return path.split(Aggregator.PATH_SEPARATOR)[1]
if __name__ == '__main__':
Aggregator().start()
...@@ -37,8 +37,8 @@ class Simulator(object): ...@@ -37,8 +37,8 @@ class Simulator(object):
Simulator used to generate E2E measurements. Simulator used to generate E2E measurements.
""" """
DATABASE = 'E2EMetrics' DATABASE = 'E2EMetrics' # default database name
DATABASE_URL = 'http://203.0.113.100:8086' DATABASE_URL = 'http://203.0.113.100:8086' # default database url
TICK = 1 # a simulation tick represents 1s TICK = 1 # a simulation tick represents 1s
SIMULATION_LENGTH = 120 # simulation time in seconds SIMULATION_LENGTH = 120 # simulation time in seconds
...@@ -66,37 +66,29 @@ class Simulator(object): ...@@ -66,37 +66,29 @@ class Simulator(object):
self.db_client.drop_database(self.db_name) self.db_client.drop_database(self.db_name)
self.db_client.create_database(self.db_name) self.db_client.create_database(self.db_name)
self._set_continuous_query()
def _set_continuous_query(self, sample_period=5, period='s'):
sample_period = "{0}{1}".format(sample_period, period)
query = 'CREATE CONTINUOUS QUERY aggregate_query ON {0} BEGIN SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse" INTO "E2EMetrics"."autogen"."e2e_delays" FROM "E2EMetrics"."autogen"."network_delays", "E2EMetrics"."autogen"."service_delays" GROUP BY time({1}) END'\
.format(self.db_name, sample_period)
self.db_client.query(query)
def run(self): def run(self):
""" """
Runs the simulation. Runs the simulation.
""" """
# all network delays start from 1 # all network delays start from 1ms, the dictionary stores the information to report
ip_endpoints = [ ip_endpoints = [
{'agent_id': 'endpoint1.ms-A.ict-flame.eu', {'agent_id': 'endpoint1.ms-A.ict-flame.eu',
'paths': [{ 'paths': [{
'target': 'endpoint2.ms-A.ict-flame.eu', 'target': 'endpoint2.ms-A.ict-flame.eu',
'path_id': 'endpoint1-endpoint2', 'path_id': 'endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu',
'network_delay': 1 'network_delay': 1
}]}, }]},
{'agent_id': 'endpoint2.ms-A.ict-flame.eu', {'agent_id': 'endpoint2.ms-A.ict-flame.eu',
'paths': [{ 'paths': [{
'target': 'endpoint1.ms-A.ict-flame.eu', 'target': 'endpoint1.ms-A.ict-flame.eu',
'path_id': 'endpoint2-endpoint1', 'path_id': 'endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu',
'network_delay': 1 'network_delay': 1
}]} }]}
] ]
# current time in nanoseconds (to test the continuous query we write influx data points related to future time), so we start from the current time # current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time
start_time = int(time.time()) start_time = int(time.time())
sim_time = start_time sim_time = start_time
...@@ -128,7 +120,8 @@ class Simulator(object): ...@@ -128,7 +120,8 @@ class Simulator(object):
# measure service response time every 5 seconds # measure service response time every 5 seconds
if i % sample_period_media == 0: if i % sample_period_media == 0:
self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, sim_time)) self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "ms-A.ict-flame.eu",
"test-sf-clmc-agent-build_INSTANCE", "endpoint2.ms-A.ict-flame.eu", sim_time))
# increase/decrease the delay in every sample report (min delay is 10) # increase/decrease the delay in every sample report (min delay is 10)
mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)])) mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)]))
...@@ -136,6 +129,9 @@ class Simulator(object): ...@@ -136,6 +129,9 @@ class Simulator(object):
# increase the time by one simulation tick # increase the time by one simulation tick
sim_time += self.TICK sim_time += self.TICK
end_time = sim_time
print("Start time: {0}, End time: {1}".format(start_time, end_time))
if __name__ == "__main__": if __name__ == "__main__":
Simulator().run() Simulator().run()
...@@ -29,6 +29,39 @@ import uuid ...@@ -29,6 +29,39 @@ import uuid
from random import randint from random import randint
def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_path_f, delay_path_r, delay_service, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_id_f: The forward path identifier, e.g. endpoint1---endpoint2
:param path_id_r: The reverse path identifier, e.g. endpoint2---endpoint1
:param fqdn: FQDN of the media service
:param sf_instance: service function instance
:param delay_path_f: Path delay (Forward direction)
:param delay_path_r: Path delay (Reverse direction)
:param delay_service: the media service response time
:param time: measurement time
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "e2e_delays",
"tags": {
"pathID_F": path_id_f,
"pathID_R": path_id_r,
"FQDN": fqdn,
"sf_instance": sf_instance
},
"fields": {
"D_path_F": float(delay_path_f),
"D_path_R": float(delay_path_r),
"D_service": float(delay_service)
},
"time": _getNSTime(time)
}]
return result
def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time): def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time):
""" """
Generates a platform measurement about the network delay between two specific endpoints. Generates a platform measurement about the network delay between two specific endpoints.
...@@ -56,16 +89,24 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e ...@@ -56,16 +89,24 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e
return result return result
def generate_service_delay_report(response_time, time): def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, time):
""" """
Generates a service measurement about the media service response time. Generates a service measurement about the media service response time.
:param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service) :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service)
:param fqdn: FQDN of the media service
:param sf_instance: service function instance
:param endpoint: endpoint ID
:param time: the measurement time :param time: the measurement time
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
result = [{"measurement": "service_delays", result = [{"measurement": "service_delays",
"tags": {
"FQDN": fqdn,
"sf_instance": sf_instance,
"endpoint": endpoint
},
"fields": { "fields": {
"response_time": response_time, "response_time": response_time,
}, },
...@@ -171,6 +212,7 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta ...@@ -171,6 +212,7 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta
return result return result
def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ): def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ):
""" """
generates a measurement line for a media component configuration state generates a measurement line for a media component configuration state
......
...@@ -28,7 +28,7 @@ import pkg_resources ...@@ -28,7 +28,7 @@ import pkg_resources
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
from clmctest.monitoring.StreamingSim import Sim from clmctest.monitoring.StreamingSim import Sim
from clmctest.monitoring.E2ESim import Simulator from clmctest.monitoring.E2ESim import Simulator
from clmctest.monitoring.E2EAggregator import Aggregator
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
...@@ -61,6 +61,12 @@ def influx_db(streaming_sim_config, request): ...@@ -61,6 +61,12 @@ def influx_db(streaming_sim_config, request):
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def simulator(streaming_sim_config): def simulator(streaming_sim_config):
"""
A fixture to obtain a simulator instance with the configuration parameters.
:param streaming_sim_config: the configuration object
:return: an instance of the simulator
"""
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
influx_db_name = streaming_sim_config['hosts'][1]['database_name'] influx_db_name = streaming_sim_config['hosts'][1]['database_name']
...@@ -76,7 +82,27 @@ def simulator(streaming_sim_config): ...@@ -76,7 +82,27 @@ def simulator(streaming_sim_config):
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def e2e_simulator(streaming_sim_config): def e2e_simulator(streaming_sim_config):
"""
A fixture to obtain a simulator instance with the configuration parameters.
:param streaming_sim_config: the configuration object
:return: an instance of the E2E simulator
"""
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
return Simulator(database_url=influx_url) return Simulator(database_url=influx_url)
@pytest.fixture(scope="module")
def e2e_aggregator(streaming_sim_config):
"""
A fixture to obtain an instance of the Aggregator class with the configuration parameters.
:param streaming_sim_config: the configuration object
:return: an instance of the Aggregator class
"""
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
return Aggregator(database_url=influx_url)
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
""" """
import pytest import pytest
import random
import time import time
...@@ -32,7 +33,19 @@ class TestE2ESimulation(object): ...@@ -32,7 +33,19 @@ class TestE2ESimulation(object):
""" """
@pytest.fixture(scope='class', autouse=True) @pytest.fixture(scope='class', autouse=True)
def run_simulator(self, e2e_simulator): def run_simulator(self, e2e_simulator, e2e_aggregator):
"""
A fixture, which runs the simulation before running the tests.
:param e2e_simulator: the simulator for the end-to-end data
:param e2e_aggregator: the aggregator which merges the network and service measurements
"""
random.seed(0) # Seed random function so we can reliably test for average queries
print("Starting aggregator...")
e2e_aggregator.start()
print("Running simulation, please wait...") print("Running simulation, please wait...")
e2e_simulator.run() e2e_simulator.run()
...@@ -40,13 +53,17 @@ class TestE2ESimulation(object): ...@@ -40,13 +53,17 @@ class TestE2ESimulation(object):
time.sleep(e2e_simulator.SIMULATION_LENGTH) # wait for data to finish arriving at the INFLUX database time.sleep(e2e_simulator.SIMULATION_LENGTH) # wait for data to finish arriving at the INFLUX database
print("... simulation data fixture finished") print("... simulation data fixture finished")
print("... stopping aggregator")
e2e_aggregator.stop()
@pytest.mark.parametrize("query, expected_result", [ @pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"',
{"time": "1970-01-01T00:00:00Z", "count_delay": 120}), {"time": "1970-01-01T00:00:00Z", "count_delay": 120}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), {"time": "1970-01-01T00:00:00Z", "count_response_time": 24}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "count_Dnet": 24, "count_Dresponse": 24}), {"time": "1970-01-01T00:00:00Z", "count_D_path_F": 24, "count_D_path_R": 24, "count_D_service": 24}),
]) ])
def test_simulation(self, influx_db, query, expected_result): def test_simulation(self, influx_db, query, expected_result):
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment