From e131768c10ac7ba6b0e0c185ae12c63ffe1ec175 Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Fri, 20 Apr 2018 11:49:03 +0100 Subject: [PATCH] [Issue #67] - updated the new simulator with a continuous query --- clmctest/monitoring/E2ESim.py | 126 +++++++++++-------- clmctest/monitoring/LineProtocolGenerator.py | 46 +++++++ clmctest/monitoring/conftest.py | 4 +- clmctest/monitoring/test_e2eresults.py | 14 +-- 4 files changed, 129 insertions(+), 61 deletions(-) diff --git a/clmctest/monitoring/E2ESim.py b/clmctest/monitoring/E2ESim.py index 53ed774..b413f6b 100644 --- a/clmctest/monitoring/E2ESim.py +++ b/clmctest/monitoring/E2ESim.py @@ -26,94 +26,116 @@ from influxdb import InfluxDBClient +import clmctest.monitoring.LineProtocolGenerator as lp +import urllib.parse +import time +import random class Simulator(object): """ - Simualator used to generate E2E measurements. + Simulator used to generate E2E measurements. """ DATABASE = 'E2EMetrics' - MINUTE = 60000000000 # a minute in nanoseconds - SIMULATION_LENGTH = 22 # simulation time in minutes + DATABASE_URL = 'http://203.0.113.100:8086' - def __init__(self, host, port): + TICK = 1 # a simulation tick represents 1s + SIMULATION_LENGTH = 120 # simulation time in seconds + + def __init__(self, database_url=DATABASE_URL, database=DATABASE): """ Initialises the simulator by creating a db client object and resetting the database. - :param host: db host url - :param port: db port number + :param database_url: db url + :param database: db name """ - self.db_client = InfluxDBClient(host=host, port=port, database=self.DATABASE, timeout=10) + url_object = urllib.parse.urlparse(database_url) + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10) + + self.db_url = database_url + self.db_name = database - self.reset_db() + self._reset_db() - def reset_db(self): + def _reset_db(self): """ Reset the database using the already initialised db client object. """ - self.db_client.drop_database(self.DATABASE) - self.db_client.create_database(self.DATABASE) + self.db_client.drop_database(self.db_name) + self.db_client.create_database(self.db_name) + self._set_continuous_query() - def aggregate_measurements(self, start_time, end_time, sample_period=2): - """ - Executes a query to aggregate the measurements after the simulation is over. - :param start_time: start time of simulation (in nanoseconds) - :param end_time: end time of simulation (in nanoseconds) - :param sample_period: sample period for grouping (in minutes) - :return: - """ + def _set_continuous_query(self, sample_period=5, period='s'): + sample_period = "{0}{1}".format(sample_period, period) + query = 'CREATE CONTINUOUS QUERY aggregate_query ON {0} BEGIN SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse" INTO "E2EMetrics"."autogen"."e2e_delays" FROM "E2EMetrics"."autogen"."network_delays", "E2EMetrics"."autogen"."service_delays" GROUP BY time({1}) END'\ + .format(self.db_name, sample_period) - query = 'SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse_time" INTO "E2EMetrics"."autogen"."e2e" from "E2EMetrics"."autogen"."net", "E2EMetrics"."autogen"."service" WHERE time >= {0} and time <= {1} GROUP BY time({2}m) fill(previous)'.format(start_time, end_time, sample_period) - self.db_client.query(query=query) + self.db_client.query(query) def run(self): """ Runs the simulation. """ - start_time = 1523779200000000 # 15/04/2018, 09:00:00 in nanoseconds + # all network delays start from 1 + ip_endpoints = [ + {'agent_id': 'endpoint1.ms-A.ict-flame.eu', + 'paths': [{ + 'target': 'endpoint2.ms-A.ict-flame.eu', + 'path_id': 'endpoint1-endpoint2', + 'network_delay': 1 + }]}, + {'agent_id': 'endpoint2.ms-A.ict-flame.eu', + 'paths': [{ + 'target': 'endpoint1.ms-A.ict-flame.eu', + 'path_id': 'endpoint2-endpoint1', + 'network_delay': 1 + }]} + ] + + # current time in nanoseconds (to test the continuous query we write influx data points related to future time), so we start from the current time + start_time = int(time.time()) + sim_time = start_time - mean_delay_seconds_net = 1 # initial mean network delay mean_delay_seconds_media = 10 # initial mean media service delay - sample_period_net = 2 # sample period for reporting network delays (measured in minutes) - sample_period_media = 5 # sample period for reporting media service delays (measured in minutes) + sample_period_net = 2 # sample period for reporting network delays (measured in seconds) - net measurements reported every 2s + sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) - service measurements reported every 5 seconds for i in range(0, self.SIMULATION_LENGTH): - # net delay + # measure net delay every 2 seconds for endpoint 1 (generates on tick 0, 2, 4, 6, 8, 10.. etc.) if i % sample_period_net == 0: - point = [{"measurement": "net", - "fields": { - "delay": mean_delay_seconds_net - }, - "time": sim_time - }] - self.db_client.write_points(point) - # increase the delay by 1 every sample - mean_delay_seconds_net += 1 - - # service response time - if i % sample_period_media == 0: - point = [{"measurement": "service", - "fields": { - "response_time": mean_delay_seconds_media - }, - "time": sim_time - }] - self.db_client.write_points(point) + endpoint = ip_endpoints[0] + paths = endpoint['paths'] + for path in paths: + self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time)) + + # increase/decrease the delay in every sample report (min delay is 1) + path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) + + # measure net delay every 2 seconds for endpoint 2 (generates on tick 1, 3, 5, 7, 9, 11.. etc.) + if (i+1) % sample_period_net == 0: + endpoint = ip_endpoints[1] + paths = endpoint['paths'] + for path in paths: + self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time)) - # increase the delay by 20 every sample - mean_delay_seconds_media += 20 + # increase/decrease the delay in every sample report (min delay is 1) + path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) + + # measure service response time every 5 seconds + if i % sample_period_media == 0: + self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, sim_time)) - # increase the time by one minute - sim_time += self.MINUTE + # increase/decrease the delay in every sample report (min delay is 10) + mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)])) - end_time = sim_time - self.MINUTE # decrement one minute to get the last time a measurement have been reported - self.aggregate_measurements(start_time, end_time) + # increase the time by one simulation tick + sim_time += self.TICK if __name__ == "__main__": - Simulator('203.0.113.100', 8086).run() + Simulator().run() diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 76ffd32..59f7610 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -29,6 +29,52 @@ import uuid from random import randint +def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time): + """ + Generates a platform measurement about the network delay between two specific endpoints. + + :param path_id: the identifier of the path between the two endpoints (TODO can we derive source_endpoint and target_endpoint from this path) + :param source_endpoint: the source endpoint + :param target_endpoint: the target endpoint + :param e2e_delay: the e2e network delay for traversing the path (source_endpoint) -> (target_endpoint) (NOT round-trip-time) + :param time: the measurement time + :return: a list of dict-formatted reports to post on influx + """ + + result = [{"measurement": "network_delays", + "tags": { + "path": path_id, + "source_endpoint": source_endpoint, # TODO source and target endpoint tags might have to be removed if they can be derived from path ID + "target_endpoint": target_endpoint + }, + "fields": { + "delay": e2e_delay + }, + "time": _getNSTime(time) + }] + + return result + + +def generate_service_delay_report(response_time, time): + """ + Generates a service measurement about the media service response time. + + :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service) + :param time: the measurement time + :return: a list of dict-formatted reports to post on influx + """ + + result = [{"measurement": "service_delays", + "fields": { + "response_time": response_time, + }, + "time": _getNSTime(time) + }] + + return result + + # Reports TX and RX, scaling on requested quality def generate_network_report(recieved_bytes, sent_bytes, time): result = [{"measurement": "net_port_io", diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py index 2d866f2..0af5663 100644 --- a/clmctest/monitoring/conftest.py +++ b/clmctest/monitoring/conftest.py @@ -77,6 +77,6 @@ def simulator(streaming_sim_config): @pytest.fixture(scope="module") def e2e_simulator(streaming_sim_config): - simulator = Simulator(streaming_sim_config['hosts'][0]['ip_address'], 8086) + influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" - return simulator + return Simulator(database_url=influx_url) diff --git a/clmctest/monitoring/test_e2eresults.py b/clmctest/monitoring/test_e2eresults.py index 2feb0e7..95a0e12 100644 --- a/clmctest/monitoring/test_e2eresults.py +++ b/clmctest/monitoring/test_e2eresults.py @@ -37,16 +37,16 @@ class TestE2ESimulation(object): e2e_simulator.run() print("Waiting for INFLUX to finish receiving simulation data...") - time.sleep(5) # wait for data to finish arriving at the INFLUX database + time.sleep(e2e_simulator.SIMULATION_LENGTH) # wait for data to finish arriving at the INFLUX database print("... simulation data fixture finished") @pytest.mark.parametrize("query, expected_result", [ - ('SELECT count(*) FROM "E2EMetrics"."autogen"."net"', - {"time": "1970-01-01T00:00:00Z", "count_delay": 11}), - ('SELECT count(*) FROM "E2EMetrics"."autogen"."service"', - {"time": "1970-01-01T00:00:00Z", "count_response_time": 5}), - ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e"', - {"time": "1970-01-01T00:00:00Z", "count_Dnet": 11, "count_Dresponse_time": 11}), + ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', + {"time": "1970-01-01T00:00:00Z", "count_delay": 120}), + ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', + {"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), + ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', + {"time": "1970-01-01T00:00:00Z", "count_Dnet": 24, "count_Dresponse": 24}), ]) def test_simulation(self, influx_db, query, expected_result): """ -- GitLab