diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py index ba9dbbed2ebc921fce8c8f08afaa18dd6a957c51..88e85fb9783bba2df231427d77cdcffb64790aee 100644 --- a/clmctest/monitoring/StreamingSim.py +++ b/clmctest/monitoring/StreamingSim.py @@ -73,63 +73,56 @@ class Sim(object): 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500} ] - # Simulate configuration of the ip endpoints - # endpoint state->mu, sigma, secs normal distribution - config_delay_dist = {"unplaced": [1, 0.68], "placing": [10, 0.68], "placed": [1, 0.68], "booting": [10, 0.68], "booted": [2, 0.68], - "connecting": [10, 0.68], "connected": [8, 0.68]} - print("\nSimulation started. Generating data...") + # Simulate configuration of the ip endpoints + # Move endpoints from state unplaced to state placing - max_delay = 0 + # reports: 1, unplaced: 0.7s, placing: 0.3s for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['unplaced'][0], - config_delay_dist['unplaced'][0] * config_delay_dist['unplaced'][1], 0.7, - 'unplaced', 'placing') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 7, 'unplaced', 'placing') + sim_time += TICK_TIME # Place endpoints - max_delay = 0 + # reports: 10, placing: 9.1s, placed: 0.9s for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0], - config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 0.7, - 'placing', 'placed') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + + for i in range(9): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "placing") + + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 9, 'placing', 'placed') + sim_time += 10 * TICK_TIME # Move endpoints from state placed to state booting - max_delay = 0 + # reports: 1, placed: 0.8s, booting: 0.2s for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placed'][0], - config_delay_dist['placed'][0] * config_delay_dist['placed'][1], 0.7, - 'placed', 'booting') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 8, 'placed', 'booting') + sim_time += TICK_TIME # Boot endpoints - max_delay = 0 + # reports: 10, booting: 9.4s, booted: 0.6s for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0], - config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 0.7, - 'booting', 'booted') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + + for i in range(9): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "booting") + + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 4, 'booting', 'booted') + sim_time += 10*TICK_TIME # move mpegdash_service media component state from 'stopped' to 'starting' # Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting') for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting' ) + self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting') sim_time += TICK_TIME @@ -147,26 +140,25 @@ class Sim(object): sim_time += 5 * TICK_TIME # Move endpoints from state booted to state connecting - max_delay = 0 + # reports: 2, booted: 1.5s, connecting: 0.5s for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booted'][0], - config_delay_dist['booted'][0] * config_delay_dist['booted'][1], 0.7, - 'booted', 'connecting') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "booted") + self._changeVMState(agent_db_client, sim_time + (1*TICK_TIME), ip_endpoint, 10, 5, 'booted', 'connecting') + sim_time += 2*TICK_TIME # Connect endpoints - max_delay = 0 + # reports: 10, connecting: 9.7s, connected: 0.3s for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0], - config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 0.7, - 'connecting', 'connected') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + + for i in range(9): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connecting") + + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 7, 'connecting', 'connected') + sim_time += 10*TICK_TIME request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC inc_period_count = 0 @@ -233,8 +225,7 @@ class Sim(object): ip_endpoint['request_queue'] -= int(requests_processed) # update endpoint state (continuously 'connected') - agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, - **{'connected': float(TICK_TIME), 'avg_connected': float(TICK_TIME)})) + self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected") # update mpegdash_service media component state (continuously 'running') self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time) @@ -244,15 +235,16 @@ class Sim(object): # Simulate tear-down of media components and endpoints # remove endpoints - max_delay = 0 + # reports: 5, connected: 4.7s, unplaced: 0.3s for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connected'][0], - config_delay_dist['connected'][0] * config_delay_dist['connected'][1], 0.7, - 'connected', 'unplaced') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + + for i in range(4): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connected") + + self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, 10, 7, 'connected', 'unplaced') + sim_time += 5 * TICK_TIME # move mpegdash_service media component state from 'running' to 'stopping' # Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping') @@ -303,34 +295,49 @@ class Sim(object): return response_delay @staticmethod - def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, trans_ratio, transition_state, next_state): + def _writeVMSingleState(agent_db_client, sim_time, ip_endpoint, state): + """ + Write a single state as a sample over TICK_TIME. + + :param agent_db_client: agent used to send metric data to CLMC + :param sim_time: current simulation time + :param ip_endpoint: dict with info for ip endpoint + :param state: state that's being reported + """ + + state_stats = { + state: float(TICK_TIME), + 'avg_{0}'.format(state): float(TICK_TIME) + } + + agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats)) + + @staticmethod + def _changeVMState(agent_db_client, sim_time, ip_endpoint, sample_count, trans_sample_count, transition_state, next_state): """ Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables. :param sim_time: current simulation time :param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.) - :param mu: mean value - :param sigma: standard deviation - :param trans_ratio: the part of the total delay time for transition state + :param sample_count: - the total number of samples in the reporting period (TICK_TIME) + :param trans_sample_count: - the number of samples in the transition state :param transition_state: name of transition state :param next_state: name of next state :return: the delay time """ - # assert total delay time is at least 1 second - total_delay_time = max(random.normalvariate(mu, sigma), 1.0) + state_stats = {} - # part of the total delay time is the transition state period - transition_time = trans_ratio*total_delay_time - # the rest of the delay is the next state period - next_state_time = total_delay_time - transition_time + # period of transition state is part of the total sample count multiplied by the reporting period (TICK_TIME) + state_stats[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count + state_stats["avg_" + transition_state] = state_stats[transition_state] / float(TICK_TIME) - # transition state and state period are passed to the generate report function as keyword arguments - agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, - **{transition_state: transition_time, 'avg_{0}'.format(transition_state): transition_time, - next_state: next_state_time, 'avg_{0}'.format(next_state): next_state_time})) + # the rest of the period is reported as time for the next state + state_stats[next_state] = float(TICK_TIME) - state_stats[transition_state] + state_stats["avg_" + next_state] = state_stats[next_state] / float(TICK_TIME) - return total_delay_time + # transition state and state period are passed to the generate report function as keyword arguments + agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats)) @staticmethod def _writeMCSingleState(influxClient, measurement, state, sim_time): diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py index 8d1ed6202174af4730e1618b30678408cb2ff925..9f3c42cd62743c24c2939063ce1dacbfcb5b1d4d 100644 --- a/clmctest/monitoring/conftest.py +++ b/clmctest/monitoring/conftest.py @@ -15,7 +15,7 @@ def streaming_sim_config(): :return: the python object representing the read YAML file """ rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml') - print("rspec file: {0}".format(rspec)) + print("\nrspec file: {0}".format(rspec)) with open(rspec, 'r') as stream: data_loaded = yaml.load(stream) diff --git a/clmctest/monitoring/test_simresults.py b/clmctest/monitoring/test_simresults.py index cde8c04dfacf0df80f7d33f3f7389c02a4989a29..02d3913e9ce719d4841ee2fb12019315c5206e43 100644 --- a/clmctest/monitoring/test_simresults.py +++ b/clmctest/monitoring/test_simresults.py @@ -32,24 +32,31 @@ class TestSimulation(object): {"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', - {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3607, "count_avg_unplaced": 3607, "count_placing": 3607, "count_avg_placing": 3607, "count_placed": 3607, "count_avg_placed": 3607, "count_booting": 3607, "count_avg_booting": 3607, "count_booted": 3607, - "count_avg_booted": 3607, "count_connecting": 3607, "count_avg_connecting": 3607, "count_connected": 3607, "count_avg_connected": 3607, "count_cpus": 3607, "count_memory": 3607, "count_storage": 3607}), + {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639, + "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', - {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3607, "count_avg_unplaced": 3607, "count_placing": 3607, "count_avg_placing": 3607, "count_placed": 3607, "count_avg_placed": 3607, "count_booting": 3607, "count_avg_booting": 3607, "count_booted": 3607, - "count_avg_booted": 3607, "count_connecting": 3607, "count_avg_connecting": 3607, "count_connected": 3607, "count_avg_connected": 3607, "count_cpus": 3607, "count_memory": 3607, "count_storage": 3607}), + {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639, + "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), - ('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <>0', + ('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0', + {"time": "1970-01-01T00:00:00Z", "avg_placing": 0.9272727272727274}), + ('SELECT mean(avg_booting) as "avg_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_booting <> 0', + {"time": "1970-01-01T00:00:00Z", "avg_booting": 0.8727272727272727}), + ('SELECT mean(avg_connecting) as "avg_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connecting <> 0', + {"time": "1970-01-01T00:00:00Z", "avg_connecting": 0.9272727272727272}), + + ('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0', {"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}), - ('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <>0', + ('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0', {"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}), - ('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <>0', + ('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0', {"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}), - ('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <>0', + ('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0', {"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55}) ]) def test_simulation(self, influx_db, query, expected_result): @@ -76,47 +83,3 @@ class TestSimulation(object): assert expected_result == actual_result, "Simulation test failure" print("Successfully passed test for the following query: {0}".format(query)) - - @pytest.mark.parametrize("query, field", [ - ('SELECT mean("placing") as "mean_transition_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" where "placing" <> 0 and "placed" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'', - 'mean_transition_placing'), - ('SELECT mean("placing") as "mean_target_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" where "placing" <> 0 and "unplaced" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'', - 'mean_target_placing'), - ('SELECT mean("booting") as "mean_transition_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "booting" <> 0 and "booted" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'', - 'mean_transition_booting'), - ('SELECT mean("booting") as "mean_target_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "booting" <> 0 and "placed" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'', - 'mean_target_booting'), - ('SELECT mean("connecting") as "mean_transition_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "connecting" <> 0 and "connected" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'', - 'mean_transition_connecting'), - ('SELECT mean("connecting") as "mean_target_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "connecting" <> 0 and "booted" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'', - 'mean_target_connecting'), - ]) - def test_mean_config_queries(self, influx_db, query, field): - """ - Test queries for mean values in the configuration states model. - - - 'mean_transition_{state}' - we want to know the mean time spent on the given state in cases where this was the actual transition state. - e.g. 'mean_transition_placing' - refers to the mean time spent on state 'placing' in transitions such as 'placing' -> 'placed' - - - 'mean_target_{state}' - we want to know the mean time spent on the given state in cases where this was the actual target state - e.g. 'mean_target_placing' - refers to the mean time spent on state 'placing' in transitions such as 'unplaced' -> 'placing' - - :param influx_db: influx db client - :param query: query under test - :param field: the field id to fetch - """ - - # pytest automatically goes through all queries under test, declared in the parameters decorator - print("\n") # prints a blank line for formatting purposes - - # the raise_errors=False argument is given so that we could actually test that the DB didn't return any errors instead of raising an exception - query_result = influx_db.query(query, raise_errors=False) - - # test the error attribute of the result is None, that is no error is returned from executing the DB query - assert query_result.error is None, "An error was encountered while executing query {0}.".format(query) - - # get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator) - result = next(query_result.get_points()).get(field) - assert float(result) >= 0.0, "Test failure. Reported mean values cannot be negative." - - print("Successfully passed test for the following query: {0}".format(query))