Skip to content
Snippets Groups Projects
Commit e131768c authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

[Issue #67] - updated the new simulator with a continuous query

parent 5fe62951
No related branches found
No related tags found
No related merge requests found
......@@ -26,94 +26,116 @@
from influxdb import InfluxDBClient
import clmctest.monitoring.LineProtocolGenerator as lp
import urllib.parse
import time
import random
class Simulator(object):
"""
Simualator used to generate E2E measurements.
Simulator used to generate E2E measurements.
"""
DATABASE = 'E2EMetrics'
MINUTE = 60000000000 # a minute in nanoseconds
SIMULATION_LENGTH = 22 # simulation time in minutes
DATABASE_URL = 'http://203.0.113.100:8086'
def __init__(self, host, port):
TICK = 1 # a simulation tick represents 1s
SIMULATION_LENGTH = 120 # simulation time in seconds
def __init__(self, database_url=DATABASE_URL, database=DATABASE):
"""
Initialises the simulator by creating a db client object and resetting the database.
:param host: db host url
:param port: db port number
:param database_url: db url
:param database: db name
"""
self.db_client = InfluxDBClient(host=host, port=port, database=self.DATABASE, timeout=10)
url_object = urllib.parse.urlparse(database_url)
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10)
self.db_url = database_url
self.db_name = database
self.reset_db()
self._reset_db()
def reset_db(self):
def _reset_db(self):
"""
Reset the database using the already initialised db client object.
"""
self.db_client.drop_database(self.DATABASE)
self.db_client.create_database(self.DATABASE)
self.db_client.drop_database(self.db_name)
self.db_client.create_database(self.db_name)
self._set_continuous_query()
def aggregate_measurements(self, start_time, end_time, sample_period=2):
"""
Executes a query to aggregate the measurements after the simulation is over.
:param start_time: start time of simulation (in nanoseconds)
:param end_time: end time of simulation (in nanoseconds)
:param sample_period: sample period for grouping (in minutes)
:return:
"""
def _set_continuous_query(self, sample_period=5, period='s'):
sample_period = "{0}{1}".format(sample_period, period)
query = 'CREATE CONTINUOUS QUERY aggregate_query ON {0} BEGIN SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse" INTO "E2EMetrics"."autogen"."e2e_delays" FROM "E2EMetrics"."autogen"."network_delays", "E2EMetrics"."autogen"."service_delays" GROUP BY time({1}) END'\
.format(self.db_name, sample_period)
query = 'SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse_time" INTO "E2EMetrics"."autogen"."e2e" from "E2EMetrics"."autogen"."net", "E2EMetrics"."autogen"."service" WHERE time >= {0} and time <= {1} GROUP BY time({2}m) fill(previous)'.format(start_time, end_time, sample_period)
self.db_client.query(query=query)
self.db_client.query(query)
def run(self):
"""
Runs the simulation.
"""
start_time = 1523779200000000 # 15/04/2018, 09:00:00 in nanoseconds
# all network delays start from 1
ip_endpoints = [
{'agent_id': 'endpoint1.ms-A.ict-flame.eu',
'paths': [{
'target': 'endpoint2.ms-A.ict-flame.eu',
'path_id': 'endpoint1-endpoint2',
'network_delay': 1
}]},
{'agent_id': 'endpoint2.ms-A.ict-flame.eu',
'paths': [{
'target': 'endpoint1.ms-A.ict-flame.eu',
'path_id': 'endpoint2-endpoint1',
'network_delay': 1
}]}
]
# current time in nanoseconds (to test the continuous query we write influx data points related to future time), so we start from the current time
start_time = int(time.time())
sim_time = start_time
mean_delay_seconds_net = 1 # initial mean network delay
mean_delay_seconds_media = 10 # initial mean media service delay
sample_period_net = 2 # sample period for reporting network delays (measured in minutes)
sample_period_media = 5 # sample period for reporting media service delays (measured in minutes)
sample_period_net = 2 # sample period for reporting network delays (measured in seconds) - net measurements reported every 2s
sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) - service measurements reported every 5 seconds
for i in range(0, self.SIMULATION_LENGTH):
# net delay
# measure net delay every 2 seconds for endpoint 1 (generates on tick 0, 2, 4, 6, 8, 10.. etc.)
if i % sample_period_net == 0:
point = [{"measurement": "net",
"fields": {
"delay": mean_delay_seconds_net
},
"time": sim_time
}]
self.db_client.write_points(point)
# increase the delay by 1 every sample
mean_delay_seconds_net += 1
# service response time
if i % sample_period_media == 0:
point = [{"measurement": "service",
"fields": {
"response_time": mean_delay_seconds_media
},
"time": sim_time
}]
self.db_client.write_points(point)
endpoint = ip_endpoints[0]
paths = endpoint['paths']
for path in paths:
self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time))
# increase/decrease the delay in every sample report (min delay is 1)
path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3))
# measure net delay every 2 seconds for endpoint 2 (generates on tick 1, 3, 5, 7, 9, 11.. etc.)
if (i+1) % sample_period_net == 0:
endpoint = ip_endpoints[1]
paths = endpoint['paths']
for path in paths:
self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time))
# increase the delay by 20 every sample
mean_delay_seconds_media += 20
# increase/decrease the delay in every sample report (min delay is 1)
path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3))
# measure service response time every 5 seconds
if i % sample_period_media == 0:
self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, sim_time))
# increase the time by one minute
sim_time += self.MINUTE
# increase/decrease the delay in every sample report (min delay is 10)
mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)]))
end_time = sim_time - self.MINUTE # decrement one minute to get the last time a measurement have been reported
self.aggregate_measurements(start_time, end_time)
# increase the time by one simulation tick
sim_time += self.TICK
if __name__ == "__main__":
Simulator('203.0.113.100', 8086).run()
Simulator().run()
......@@ -29,6 +29,52 @@ import uuid
from random import randint
def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time):
"""
Generates a platform measurement about the network delay between two specific endpoints.
:param path_id: the identifier of the path between the two endpoints (TODO can we derive source_endpoint and target_endpoint from this path)
:param source_endpoint: the source endpoint
:param target_endpoint: the target endpoint
:param e2e_delay: the e2e network delay for traversing the path (source_endpoint) -> (target_endpoint) (NOT round-trip-time)
:param time: the measurement time
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "network_delays",
"tags": {
"path": path_id,
"source_endpoint": source_endpoint, # TODO source and target endpoint tags might have to be removed if they can be derived from path ID
"target_endpoint": target_endpoint
},
"fields": {
"delay": e2e_delay
},
"time": _getNSTime(time)
}]
return result
def generate_service_delay_report(response_time, time):
"""
Generates a service measurement about the media service response time.
:param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service)
:param time: the measurement time
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "service_delays",
"fields": {
"response_time": response_time,
},
"time": _getNSTime(time)
}]
return result
# Reports TX and RX, scaling on requested quality
def generate_network_report(recieved_bytes, sent_bytes, time):
result = [{"measurement": "net_port_io",
......
......@@ -77,6 +77,6 @@ def simulator(streaming_sim_config):
@pytest.fixture(scope="module")
def e2e_simulator(streaming_sim_config):
simulator = Simulator(streaming_sim_config['hosts'][0]['ip_address'], 8086)
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
return simulator
return Simulator(database_url=influx_url)
......@@ -37,16 +37,16 @@ class TestE2ESimulation(object):
e2e_simulator.run()
print("Waiting for INFLUX to finish receiving simulation data...")
time.sleep(5) # wait for data to finish arriving at the INFLUX database
time.sleep(e2e_simulator.SIMULATION_LENGTH) # wait for data to finish arriving at the INFLUX database
print("... simulation data fixture finished")
@pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "E2EMetrics"."autogen"."net"',
{"time": "1970-01-01T00:00:00Z", "count_delay": 11}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."service"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 5}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e"',
{"time": "1970-01-01T00:00:00Z", "count_Dnet": 11, "count_Dresponse_time": 11}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"',
{"time": "1970-01-01T00:00:00Z", "count_delay": 120}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 24}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "count_Dnet": 24, "count_Dresponse": 24}),
])
def test_simulation(self, influx_db, query, expected_result):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment