diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index a420b6b9c2ef499679d15269d05b976e5463d300..213e84da0fff40208d621de5597bca3711ba4a7e 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -94,19 +94,54 @@ def _getNSTime(time): return timestamp -def generate_mc_serviceConfig( mcLabel, sStop, asStop, sStart, asStart, time ): - result = [{ "measurement" : "mediaComponentConfig", - "tags" : { "mediaComp" : mcLabel }, - "fields" : - { "serviceStopped" : sStop, - "avgServiceStopped" : asStop, - "serviceStarted" : sStart, - "avgServiceStarted" : asStart +def generate_mc_service_config( mcMeasurement, stateTimeStats, time ): + + validStats = validate_state_time_stats( stateTimeStats ) + + result = [{ "measurement" : mcMeasurement, + "fields" : + { "stopped" : validStats['stopped'], + "avg_stopped" : validStats['avg_stopped'], + "starting" : validStats['starting'], + "avg_starting" : validStats['avg_starting'], + "running" : validStats['running'], + "avg_running" : validStats['avg_running'], + "stopping" : validStats['stopping'], + "avg_stopping" : validStats['avg_stopping'] }, "time" : _getNSTime(time) }] return result +def validate_state_time_stats( stateTimeStats ): + + if ( not 'stopped' in stateTimeStats ): + stateTimeStats['stopped'] = 0.0 + + if ( not 'avg_stopped' in stateTimeStats ): + stateTimeStats['avg_stopped'] = 0.0 + + if ( not 'starting' in stateTimeStats ): + stateTimeStats['starting'] = 0.0 + + if ( not 'avg_starting' in stateTimeStats ): + stateTimeStats['avg_starting'] = 0.0 + + if ( not 'running' in stateTimeStats ): + stateTimeStats['running'] = 0.0 + + if ( not 'avg_running' in stateTimeStats ): + stateTimeStats['avg_running'] = 0.0 + + if ( not 'stopping' in stateTimeStats ): + stateTimeStats['stopping'] = 0.0 + + if ( not 'avg_stopping' in stateTimeStats ): + stateTimeStats['avg_stopping'] = 0.0 + + return stateTimeStats + + # DEPRECATED # ____________________________________________________________________________ diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py index afc1f24e084bd4cff1f13de3514c5fa87b263cc5..a7d8d9ce3aa74c4ec5e8be46dd64e36d704e9c85 100644 --- a/clmctest/monitoring/StreamingSim.py +++ b/clmctest/monitoring/StreamingSim.py @@ -5,7 +5,7 @@ import time import urllib.parse import pytest import random -import sys +import sys, getopt from influxdb import InfluxDBClient # Simulation parameters @@ -23,6 +23,8 @@ AGENT2_URL = 'http://172.23.1.22:8186' class Sim(object): """ + + Simulator for services """ @@ -75,6 +77,10 @@ class Sim(object): # endpoint state->mu, sigma, secs normal distribution config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]} + # Simulation configuration of the media component (MC) state changes + # "MC state", [average (sec), stddev] + mc_config_delay_dist = { "stopped":[1, 0.68], "starting": [5, 0.68], "running":[1, 0.68], "stopping": [2, 0.68]} + print("\nSimulation started. Generating data...") # Place endpoints @@ -99,6 +105,32 @@ class Sim(object): max_delay = max(delay_time, max_delay) sim_time += max_delay + # move mpegdash_service media component state from 'stopped' to 'starting' + max_delay = 0 + for ip_endpoint in ip_endpoints: + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + delay_avg = mc_config_delay_dist['stopped'][0] + delay_std = delay_avg * mc_config_delay_dist['stopped'][1] + + delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'stopped', 'starting') + max_delay = max(delay_time, max_delay) + + sim_time += max_delay + + # move mpegdash_service media component state from 'starting' to 'running' + max_delay = 0 + for ip_endpoint in ip_endpoints: + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + delay_avg = mc_config_delay_dist['starting'][0] + delay_std = delay_avg * mc_config_delay_dist['starting'][1] + + delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'starting', 'running') + max_delay = max(delay_time, max_delay) + + sim_time += max_delay + # Connect endpoints max_delay = 0 for ip_endpoint in ip_endpoints: @@ -174,7 +206,42 @@ class Sim(object): # remove requests processed off the queue ip_endpoint['request_queue'] -= int(requests_processed) + # update mpegdash_service media component state (continuously 'running') + state_stats = {} + state_stats['running'] = float(TICK_TIME) + state_stats['avg_running'] = float(TICK_TIME) + agent_db_client.write_points(lp.generate_mc_service_config("mpegdash_service_config",state_stats,sim_time)) + sim_time += TICK_TIME + + # Simulate tear-down of media components + # move mpegdash_service media component state from 'running' to 'stopping' + max_delay = 0 + for ip_endpoint in ip_endpoints: + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + delay_avg = mc_config_delay_dist['running'][0] + delay_std = delay_avg * mc_config_delay_dist['running'][1] + + delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'running', 'stopping') + max_delay = max(delay_time, max_delay) + + sim_time += max_delay + + # move mpegdash_service media component state from 'stopping' to 'stopped' + max_delay = 0 + for ip_endpoint in ip_endpoints: + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + delay_avg = mc_config_delay_dist['stopping'][0] + delay_std = delay_avg * mc_config_delay_dist['stopping'][1] + + delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'stopping', 'stopped') + max_delay = max(delay_time, max_delay) + + sim_time += max_delay + + # End simulation end_time = sim_time print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time, end_time - start_time)) @@ -223,90 +290,35 @@ class Sim(object): return delay_time -## PYTEST FIXTURES START -## ------------------------------------------------------------------------------------------------------ - -@pytest.fixture(scope='module') -def run_simulation_fixture(streaming_sim_config): - """ - A fixture, which checks if the the DB has been created, if not it runs the simulator with a 10 seconds timeout after that - """ - - influx_db_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" - agent1_url = "http://" + streaming_sim_config['hosts'][1]['ip_address'] + ":8186" - agent2_url = "http://" + streaming_sim_config['hosts'][2]['ip_address'] + ":8186" - - global INFLUX_DB_URL - global INFLUX_DB_NAME - global SIMULATION_TIME_SEC - global AGENT1_URL - global AGENT2_URL - - simulator = Sim(influx_db_url, INFLUX_DB_NAME, agent1_url, agent2_url) - dbs = simulator.db_client.get_list_database() - dbs = [db.get("name") for db in dbs] - - # This check needed to be disabled as the CLMCMetrics database is always created when - # the test starts, irrespective of whether this is the 1st time or not -# if INFLUX_DB_NAME not in dbs: - simulator.reset() - simulator.run(SIMULATION_TIME_SEC) - - print("10 seconds timeout is given so that the data could properly be inserted into the database.") - import time - time.sleep(10) - -### Media Component Configuration fixtures -""" -Line Protocol report format: - -mediaComponentConfig <global tags>,<mediaComp> <configState1=milliseconds>,<configState2=milliseconds> time -""" - -@pytest.fixture(scope='module') -def reportMC_ServiceState(streaming_sim_config): - """ - Report to determine whether the simulated streaming servers on the endpoints is running. - """ - - # IPEndpoint 1 scenario - # 10 second period: Stopped --> started --> stopped --> started - # Stopped state: 1000 + 500 = 1500 [avg = 750] - # Started state: 3000 + 5500 = 8500 [avg = 4250] + @staticmethod + def _changeMCState(agent_db_client, sim_time, mc_measurement, mu, sigma, trans_ratio, transition_state, next_state): + """ + Send INFLUX data indicating the time taken to transition to a new state - epURL = streaming_sim_config['hosts'][1]['ip_address'] - idbc = InfluxDBClient( host=epURL, port=8186, database=INFLUX_DB_NAME, timeout=10 ) + Returns the total time delay for the state change + """ - idbc.write_points( lp.generate_mc_serviceConfig( "apache", 1500, 750, 8500, 4250, 0) ) + # Calculate a randomized total time for the transition (and calculate relative ratios of time in transition and next state) + total_delay_time = max( random.normalvariate(mu, sigma), 1 ) # minimum total delay is 1 second + transition_time = total_delay_time * trans_ratio + next_state_time = total_delay_time - transition_time - # IPEndpoint 2 scenario - # 10 second period: Stopped --> started --> stopped --> started - # Stopped state: 250 + 250 = 500 [avg = 250] - # Started state: 3500 + 6000 = 9500 [avg = 4750] + mc_states = {} + + # Report time in transition (and add the same as average) + mc_states[transition_state] = transition_time + mc_states["avg_" +transition_state] = transition_time - epURL = streaming_sim_config['hosts'][2]['ip_address'] - idbc = InfluxDBClient( host=epURL, port=8186, database=INFLUX_DB_NAME, timeout=10 ) + # Report time remaining in the next state (adding the same as the average) + mc_states[next_state] = next_state_time + mc_states["avg_" +next_state] = next_state_time - idbc.write_points( lp.generate_mc_serviceConfig( "apache", 250, 250, 9500, 4750, 0) ) + agent_db_client.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time)) -@pytest.fixture(scope='module') -def reportMC_ConfigFileState(streaming_sim_config): - """ - Report to determine whether the simulated streaming servers configuration on the endpoints is correct - """ + return total_delay_time -@pytest.fixture(scope='module') -def reportMC_APIStatus(streaming_sim_config): - """ - Report to determine whether the simulated streaming servers API status call returns good - """ - -## ------------------------------------------------------------------------------------------------------ -## PYTEST FIXTURES END - - -def run_simulation(generate=True): +def run_simulation(generate=True, sTime=3600): """ A method which runs the data generation simulator :param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used) @@ -318,6 +330,8 @@ def run_simulation(generate=True): global AGENT1_URL global AGENT2_URL + SIMULATION_TIME_SEC = sTime + simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME, AGENT1_URL, AGENT2_URL) if generate: @@ -326,18 +340,36 @@ def run_simulation(generate=True): else: simulator.db_client.drop_database(simulator.influx_db_name) - if __name__ == "__main__": """ The main entry for this module. Code here is executed only if the StreamingSim.py file is executed, but not when it's imported in another module """ - # check if there are any command line arguments given when executing the module - if len(sys.argv) > 1: - # if CLI argument '-c' is set when executing the script, the influx db will be deleted instead of generating data - option = str(sys.argv[1]) != "-c" - run_simulation(generate=option) + # Default options for simulation (generate data and simulation time of 3600 seconds) + genOpt = True + simTime = 60 * 60 + + # Try get some options + try: + opts, args = getopt.getopt( sys.argv[1:], "c:t:", ['clear','time=']) + + except getopt.GetoptError: + print( 'StreamingSim.py -c -t <seconds>' ) + sys.exit(2) + + # Apply options, if any + for opt, arg in opts: + if opt in ( '-c','--clear' ): + genOpt = False + + elif opt in ('-t','--time'): + simTime = arg + + if ( genOpt == True ): + print( "Running simulation to generate data" ) + print( "Time period for this simulation: " + str(simTime) + " seconds" ) else: - # no argument is given to the function call, hence the default value True is used - run_simulation() + print( "Clearing simulation data" ) + + run_simulation( genOpt, simTime ) diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py index 039615224e17aa35ef1cd331425ab9eae55036a5..9c33c8b035c652799b68a8595777aa98df8a8408 100644 --- a/clmctest/monitoring/conftest.py +++ b/clmctest/monitoring/conftest.py @@ -4,6 +4,7 @@ import pytest import yaml import pkg_resources from influxdb import InfluxDBClient +from clmctest.monitoring.StreamingSim import Sim @pytest.fixture(scope="module") @@ -21,9 +22,8 @@ def streaming_sim_config(): data_loaded = yaml.load(stream) return data_loaded - @pytest.fixture(params=[{'database': 'CLMCMetrics'}], scope='module') -def influxdb(streaming_sim_config, request): +def influx_db(streaming_sim_config, request): """ Creates an Influx DB client for the CLMC metrics database @@ -32,7 +32,21 @@ def influxdb(streaming_sim_config, request): :return: the created Influx DB client """ - db = InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port=8086, database=request.param['database'], timeout=10) - db.drop_database(request.param['database']) - - return db + return InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port='8086', database=request.param['database'], timeout=10) + +@pytest.fixture(scope="module") +def simulator( streaming_sim_config ): + + influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" + influx_db_name = streaming_sim_config['hosts'][1]['database_name'] + agent1_url = "http://" + streaming_sim_config['hosts'][1]['ip_address'] + ":8186" + agent2_url = "http://" + streaming_sim_config['hosts'][2]['ip_address'] + ":8186" + + simulator = Sim( influx_url, influx_db_name, agent1_url, agent2_url ) + + dbs = simulator.db_client.get_list_database() + dbs = [db.get("name") for db in dbs] + + simulator.reset() + + return simulator diff --git a/clmctest/monitoring/test_MC_Config.py b/clmctest/monitoring/test_MC_Config.py deleted file mode 100644 index e8e56df698ffc2d7e4d0c51b31cd7080a0ac4490..0000000000000000000000000000000000000000 --- a/clmctest/monitoring/test_MC_Config.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/python3 - -import pytest -from clmctest.monitoring.StreamingSim import reportMC_ServiceState - -class TestMediaComponentConfig( object ): - """ - Test class to check the reported output of a media component's configuration test - as it is reported to the CLMC - """ - - # For data sent to INFLUX, see reportMC_ServiceState function in StreamingSim.py - - @pytest.mark.parametrize( "query, expectedResult", [ - ('SELECT "serviceStopped", "avgServiceStopped", "serviceStarted", "avgServiceStarted" FROM "CLMCMetrics"."autogen"."mediaComponentConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', - {"time" : "1970-01-01T00:00:00Z", "serviceStopped" : 1500, "avgServiceStopped" : 750, "serviceStarted" : 8500, "avgServiceStarted" : 4250}), - ('SELECT "serviceStopped", "avgServiceStopped", "serviceStarted", "avgServiceStarted" FROM "CLMCMetrics"."autogen"."mediaComponentConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', - {"time" : "1970-01-01T00:00:00Z", "serviceStopped" : 250, "avgServiceStopped" : 250, "serviceStarted" : 9500, "avgServiceStarted" : 4750}) - ]) - - def test_serviceStateReport( self, query, expectedResult, get_db_client, reportMC_ServiceState ): - """ - :param query: the LineProtocol query to search for MC service state report - :param expectedResult: the JSON result obtained from the query - :param get_db_client: fixture from conftest.py returning INFLUX query client - :param reportMC_ServiceState: the fixture that makes the service state report - """ - print( "\n" ) # White space for output - - # Query for result and report problems with query if necessary - queryResult = get_db_client.query( query, raise_errors = False ) - assert queryResult.error is None, "An error occurred executing query {0}." - - # Get result and evaluate - actualResult = next( queryResult.get_points() ) - - assert expectedResult == actualResult, "FAIL" #self.outputComparison( expectedResult, actualResult ) - print ("Media Component service state test succeeded: {0}".format( query )) diff --git a/clmctest/monitoring/test_simresults.py b/clmctest/monitoring/test_simresults.py index 6940560481c30a6d145a52412b7907da4842a5ae..8953a58f5e66bf57ee496ab4d5be274668e0d915 100644 --- a/clmctest/monitoring/test_simresults.py +++ b/clmctest/monitoring/test_simresults.py @@ -1,13 +1,19 @@ #!/usr/bin/python3 import pytest -from clmctest.monitoring.StreamingSim import run_simulation_fixture +import time class TestSimulation(object): """ A testing class used to group all the tests related to the simulation data """ + @pytest.fixture(scope='class') + def run_simulator( self, simulator ): + simulator.run( 3600 ) + + print( "Waiting for INFLUX to finish receiving simulation data..." ) + time.sleep( 10 ) # wait for data to finish arriving at the INFLUX database @pytest.mark.parametrize("query, expected_result", [ ('SELECT count(*) FROM "CLMCMetrics"."autogen"."cpu_usage"', @@ -19,24 +25,28 @@ class TestSimulation(object): ('SELECT count(*) FROM "CLMCMetrics"."autogen"."net_port_io"', {"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."vm_res_alloc"', - {"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}) + {"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}), + + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', + {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3604, "count_avg_starting" : 3604, "count_avg_stopped" : 3604, "count_avg_stopping" : 3604, "count_running" : 3604, "count_starting" : 3604, "count_stopped" : 3604, "count_stopping" : 3604}), + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', + {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3604, "count_avg_starting" : 3604, "count_avg_stopped" : 3604, "count_avg_stopping" : 3604, "count_running" : 3604, "count_starting" : 3604, "count_stopped" : 3604, "count_stopping" : 3604}), ]) - def test_simulation(self, query, expected_result, influxdb, run_simulation_fixture): + def test_simulation( self, run_simulator, influx_db, query, expected_result ): """ This is the entry point of the test. This method will be found and executed when the module is ran using pytest :param query: the query to execute (value obtained from the pytest parameter decorator) :param expected_result: the result expected from executing the query (value obtained from the pytest parameter decorator) - :param influxdb the import db client fixture - imported from contest.py + :param influx_db the import db client fixture - imported from contest.py :param run_simulation_fixture: the imported fixture to use to generate the testing data - the return value of the fixture is not needed in this case """ # pytest automatically goes through all queries under test, declared in the parameters decorator - print("\n") # prints a blank line for formatting purposes # the raise_errors=False argument is given so that we could actually test that the DB didn't return any errors instead of raising an exception - query_result = influxdb.query(query, raise_errors=False) + query_result = influx_db.query(query, raise_errors=False) # test the error attribute of the result is None, that is no error is returned from executing the DB query assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)