Skip to content
Snippets Groups Projects
Commit 5fe62951 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

converted aggregation scenario to pytest - [Issue #67]

parent e9b17e32
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Michael Boniface
## Created Date : 15-04-2018
## Updated By : Nikolay Stanchev
## Updated Date : 16-04-2018
## Created for Project : FLAME
"""
from influxdb import InfluxDBClient
class Simulator(object):
"""
Simualator used to generate E2E measurements.
"""
DATABASE = 'E2EMetrics'
MINUTE = 60000000000 # a minute in nanoseconds
SIMULATION_LENGTH = 22 # simulation time in minutes
def __init__(self, host, port):
"""
Initialises the simulator by creating a db client object and resetting the database.
:param host: db host url
:param port: db port number
"""
self.db_client = InfluxDBClient(host=host, port=port, database=self.DATABASE, timeout=10)
self.reset_db()
def reset_db(self):
"""
Reset the database using the already initialised db client object.
"""
self.db_client.drop_database(self.DATABASE)
self.db_client.create_database(self.DATABASE)
def aggregate_measurements(self, start_time, end_time, sample_period=2):
"""
Executes a query to aggregate the measurements after the simulation is over.
:param start_time: start time of simulation (in nanoseconds)
:param end_time: end time of simulation (in nanoseconds)
:param sample_period: sample period for grouping (in minutes)
:return:
"""
query = 'SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse_time" INTO "E2EMetrics"."autogen"."e2e" from "E2EMetrics"."autogen"."net", "E2EMetrics"."autogen"."service" WHERE time >= {0} and time <= {1} GROUP BY time({2}m) fill(previous)'.format(start_time, end_time, sample_period)
self.db_client.query(query=query)
def run(self):
"""
Runs the simulation.
"""
start_time = 1523779200000000 # 15/04/2018, 09:00:00 in nanoseconds
sim_time = start_time
mean_delay_seconds_net = 1 # initial mean network delay
mean_delay_seconds_media = 10 # initial mean media service delay
sample_period_net = 2 # sample period for reporting network delays (measured in minutes)
sample_period_media = 5 # sample period for reporting media service delays (measured in minutes)
for i in range(0, self.SIMULATION_LENGTH):
# net delay
if i % sample_period_net == 0:
point = [{"measurement": "net",
"fields": {
"delay": mean_delay_seconds_net
},
"time": sim_time
}]
self.db_client.write_points(point)
# increase the delay by 1 every sample
mean_delay_seconds_net += 1
# service response time
if i % sample_period_media == 0:
point = [{"measurement": "service",
"fields": {
"response_time": mean_delay_seconds_media
},
"time": sim_time
}]
self.db_client.write_points(point)
# increase the delay by 20 every sample
mean_delay_seconds_media += 20
# increase the time by one minute
sim_time += self.MINUTE
end_time = sim_time - self.MINUTE # decrement one minute to get the last time a measurement have been reported
self.aggregate_measurements(start_time, end_time)
if __name__ == "__main__":
Simulator('203.0.113.100', 8086).run()
...@@ -27,6 +27,8 @@ import yaml ...@@ -27,6 +27,8 @@ import yaml
import pkg_resources import pkg_resources
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
from clmctest.monitoring.StreamingSim import Sim from clmctest.monitoring.StreamingSim import Sim
from clmctest.monitoring.E2ESim import Simulator
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
...@@ -70,3 +72,11 @@ def simulator(streaming_sim_config): ...@@ -70,3 +72,11 @@ def simulator(streaming_sim_config):
simulator.reset() simulator.reset()
return simulator return simulator
@pytest.fixture(scope="module")
def e2e_simulator(streaming_sim_config):
simulator = Simulator(streaming_sim_config['hosts'][0]['ip_address'], 8086)
return simulator
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 17-04-2018
## Created for Project : FLAME
"""
import pytest
import time
class TestE2ESimulation(object):
"""
A testing class used to group all the tests related to the E2E simulation data
"""
@pytest.fixture(scope='class', autouse=True)
def run_simulator(self, e2e_simulator):
print("Running simulation, please wait...")
e2e_simulator.run()
print("Waiting for INFLUX to finish receiving simulation data...")
time.sleep(5) # wait for data to finish arriving at the INFLUX database
print("... simulation data fixture finished")
@pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "E2EMetrics"."autogen"."net"',
{"time": "1970-01-01T00:00:00Z", "count_delay": 11}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."service"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 5}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e"',
{"time": "1970-01-01T00:00:00Z", "count_Dnet": 11, "count_Dresponse_time": 11}),
])
def test_simulation(self, influx_db, query, expected_result):
"""
This is the entry point of the test. This method will be found and executed when the module is ran using pytest
:param query: the query to execute (value obtained from the pytest parameter decorator)
:param expected_result: the result expected from executing the query (value obtained from the pytest parameter decorator)
:param influx_db the import db client fixture - imported from contest.py
"""
# pytest automatically goes through all queries under test, declared in the parameters decorator
print("\n") # prints a blank line for formatting purposes
# the raise_errors=False argument is given so that we could actually test that the DB didn't return any errors instead of raising an exception
query_result = influx_db.query(query, raise_errors=False)
# test the error attribute of the result is None, that is no error is returned from executing the DB query
assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
actual_result = next(query_result.get_points())
assert expected_result == actual_result, "E2E Simulation test failure"
print("Successfully passed test for the following query: {0}".format(query))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment