diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 692405ef04d261039d4c00d6cbf0367a4d286bf1..a2864129ca4688d28eb3ae4adf103a37229f74c8 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -23,23 +23,6 @@ def generate_network_report(recieved_bytes, sent_bytes, time): return result -# Formats VM config -def generate_vm_config(state, cpu, mem, storage, time): - result = [{"measurement": "vm_res_alloc", - "tags": { - "vm_state": state - }, - "fields": { - "cpu": cpu, - "memory": mem, - "storage": storage - }, - "time": _getNSTime(time) - }] - - return result - - # Reports cpu usage, scaling on requests def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time): result = [{"measurement": "cpu_usage", @@ -87,41 +70,33 @@ def generate_ipendpoint_route(resource, requests, latency, time): return result -def generate_endpoint_config(state, cpu, mem, storage, statePeriod, time): +def generate_endpoint_config(cpu, mem, storage, time, **kwargs): """ - generates a measurement + generates a measurement for a VM configuration states :param cpu: the number of CPUs of VM endpoint :param mem: memory of VM endpoint :param storage: storage capacity of VM endpoint - :param state: the state that was monitored (e.g. placing, booting or connecting) - :param statePeriod: the period of time, the IP endpoint was in this state :param time: time of measurement - :return: dictionary object representing the data to post to influx + :param kwargs: 'pythonic-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value + :return: dictionary object representing the data to post on influx """ - result = [{"measurement": "endpointConfig", - "tags": - { - "state": state - }, - "fields": - {"cpus": cpu, - "memory": mem, - "storage": storage, - "statePeriod": statePeriod, - }, - "time": _getNSTime(time)}] + # lambda function to validate whether a state is given as a key in the keyword arguments dictionary + validate = lambda key: kwargs.get(key) if key in kwargs else 0.0 - return result + # generate and validate the state values + fields = {"cpus": cpu, "memory": mem, "storage": storage} # NOTE: Do we need these fields ? + for state in ("placing", "booting", "connecting"): + fields[state] = validate(state) + fields[("avg_{0}".format(state))] = validate("avg_{0}".format(state)) + result = [{"measurement": "endpoint_config", + "fields": fields, + "time": _getNSTime(time)}] -# InfluxDB likes to have time-stamps in nanoseconds -def _getNSTime(time): - # Convert to nano-seconds - timestamp = int(1000000000*time) + return result - return timestamp def generate_mc_service_config( mcMeasurement, stateTimeStats, time ): @@ -171,6 +146,13 @@ def validate_state_time_stats( stateTimeStats ): return stateTimeStats +# InfluxDB likes to have time-stamps in nanoseconds +def _getNSTime(time): + # Convert to nano-seconds + timestamp = int(1000000000*time) + + return timestamp + # DEPRECATED # ____________________________________________________________________________ @@ -182,6 +164,23 @@ def quote_wrap(string): return "\"" + string + "\"" +# Formats VM config +def generate_vm_config(state, cpu, mem, storage, time): + result = [{"measurement": "vm_res_alloc", + "tags": { + "vm_state": state + }, + "fields": { + "cpu": cpu, + "memory": mem, + "storage": storage + }, + "time": _getNSTime(time) + }] + + return result + + def _generateClientRequest(cReq, id, time): # Tags first result = 'sid="' + str(id) + '",' + cReq diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py index 242fe83a5eace6e918702b09a80ba1d234cc3706..d7d8bd1d566f9df471092a1120f14a2673b4f94f 100644 --- a/clmctest/monitoring/StreamingSim.py +++ b/clmctest/monitoring/StreamingSim.py @@ -3,9 +3,9 @@ import clmctest.monitoring.LineProtocolGenerator as lp import time import urllib.parse -import pytest import random -import sys, getopt +import sys +import getopt from influxdb import InfluxDBClient # Simulation parameters @@ -89,8 +89,7 @@ class Sim(object): agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0], - config_delay_dist['placing'][0] * config_delay_dist['placing'][1], - 'placing', 'placed') + config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 'placing') max_delay = max(delay_time, max_delay) sim_time += max_delay @@ -100,8 +99,7 @@ class Sim(object): agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0], - config_delay_dist['booting'][0] * config_delay_dist['booting'][1], - 'booting', 'booted') + config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 'booting') max_delay = max(delay_time, max_delay) sim_time += max_delay @@ -124,8 +122,7 @@ class Sim(object): agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0], - config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], - 'connecting', 'connected') + config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 'connecting') max_delay = max(delay_time, max_delay) sim_time += max_delay @@ -244,23 +241,22 @@ class Sim(object): return response_delay @staticmethod - def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): + def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state): """ Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables. - :param sim_time: - :param ip_endpoint: - :param mu: - :param sigma: - :param transition_state: - :param next_state: + + :param sim_time: current simulation time + :param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.) + :param mu: mean value + :param sigma: standard deviation :return: the delay time """ - agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) - - delay_time = random.normalvariate(mu, sigma) - - agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time)) + # assert transition time is at least 0 + delay_time = max(random.normalvariate(mu, sigma), 0.0) + # transition state and state period are passed to the generate report function as keyword arguments + agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, + **{transition_state: delay_time, 'avg_{0}'.format(transition_state): delay_time})) return delay_time @@ -314,6 +310,7 @@ def run_simulation(generate=True, sTime=3600): else: simulator.db_client.drop_database(simulator.influx_db_name) + if __name__ == "__main__": """ The main entry for this module. Code here is executed only if the StreamingSim.py file is executed, diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py index ad4d96b17c85d8bd8149f3adf84247156abcba9e..6d59641ce7aa70144118678ad6cf1f82587d0e3b 100644 --- a/clmctest/monitoring/conftest.py +++ b/clmctest/monitoring/conftest.py @@ -12,7 +12,6 @@ def streaming_sim_config(): """ Reads the service configuration deployed for the streaming simulation test. - :param request: access the parameters of the fixture :return: the python object representing the read YAML file """ rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml') @@ -22,6 +21,7 @@ def streaming_sim_config(): data_loaded = yaml.load(stream) return data_loaded + @pytest.fixture(scope="module") def streaming_sim_params(streaming_sim_config): """ @@ -51,6 +51,7 @@ def get_db_client(streaming_sim_config, request): return InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port=8086, database=request.param['database'], timeout=10) + @pytest.fixture(scope='module') def run_simulation_fixture(streaming_sim_params): """ diff --git a/clmctest/monitoring/test_endpoint_config.py b/clmctest/monitoring/test_endpoint_config.py deleted file mode 100644 index 0d0a09da8c6ba9ffb31826e0271a26ee886103a1..0000000000000000000000000000000000000000 --- a/clmctest/monitoring/test_endpoint_config.py +++ /dev/null @@ -1,143 +0,0 @@ -#!/usr/bin/python3 - -import pytest -from influxdb import InfluxDBClient -import clmctest.monitoring.LineProtocolGenerator as lp -import random -import time - - -""" -Tests the monitoring of endpoints' configuration states - currently based on model with three states (placing -> booting -> connecting) - -Line Protocol report format: - -endpointConfig <global tags>,<state> <statePeriod=milliseconds>,<cpus=numberOfCPUs>, <memory=memoryOfVM>, <storage=storageCapacityOfVM> time -""" - - -# Random initialization of state periods -state_delays = [{'placing': random.randint(7500, 12500), 'booting': random.randint(7500, 12500), 'connecting': random.randint(7500, 12500)}, - {'placing': random.randint(7500, 12500), 'booting': random.randint(7500, 12500), 'connecting': random.randint(7500, 12500)}] - -measurement_name = 'endpointConfig' # measurement name for configuration of media components' endpoints - - -@pytest.mark.parametrize("query, result", [ - ('SELECT "state", "statePeriod", "cpus", "memory", "storage" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'', - {'placing': {"state": "placing", "statePeriod": state_delays[0]['placing'], "cpus": 1, "memory": 2048, "storage": '10GB'}, - 'booting': {"state": "booting", "statePeriod": state_delays[0]['booting'], "cpus": 1, "memory": 2048, "storage": '10GB'}, - 'connecting': {"state": "connecting", "statePeriod": state_delays[0]['connecting'], "cpus": 1, "memory": 2048, "storage": '10GB'}}), - ('SELECT "state", "statePeriod", "cpus", "memory", "storage" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'' , - {'placing': {"state": "placing", "statePeriod": state_delays[1]['placing'], "cpus": 1, "memory": 2048, "storage": '10GB'}, - 'booting': {"state": "booting", "statePeriod": state_delays[1]['booting'], "cpus": 1, "memory": 2048, "storage": '10GB'}, - 'connecting': {"state": "connecting", "statePeriod": state_delays[1]['connecting'], "cpus": 1, "memory": 2048, "storage": '10GB'}}) -]) -def test_endpoint_config(query, result, get_db_client): - """ - Test endpoint configuration state measurements in general. - - :param query: query under test for the endpoint configuration state - :param result: expected result of executed query - :param get_db_client: the InfluxDB client fixture from conftest.py - """ - - print("\n") # blank line for formatting purposes - - query_result = get_db_client.query(query, raise_errors=False) - # test the error attribute of the result is None, that is no error is returned from executing the DB query - assert query_result.error is None, "An error was encountered while executing query {0}.".format(query) - measurements = query_result.get_points() - - assert all(map(lambda measurement: compare(measurement, result), measurements)), "Comparison failure for query:\n{0}".format(query) - - print("Successfully passed test for query:\n{0}".format(query)) - - -@pytest.mark.parametrize("query, result", [ - ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'placing\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'', - {"mean_statePeriod": state_delays[0]['placing']}), - ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'booting\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'', - {"mean_statePeriod": state_delays[0]['booting']}), - ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'connecting\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'', - {"mean_statePeriod": state_delays[0]['connecting']}), - ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'placing\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'', - {"mean_statePeriod": state_delays[1]['placing']}), - ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'booting\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'', - {"mean_statePeriod": state_delays[1]['booting']}), - ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'connecting\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'', - {"mean_statePeriod": state_delays[1]['connecting']}) -]) -def test_mean_config_periods(query, result, get_db_client): - """ - :param query: query under test for the endpoint configuration state - :param result: expected result of executed query - :param get_db_client: the InfluxDB client fixture from conftest.py - """ - - print("\n") # blank line for formatting purposes - - query_result = get_db_client.query(query, raise_errors=False) - # test the error attribute of the result is None, that is no error is returned from executing the DB query - assert query_result.error is None, "An error was encountered while executing query {0}.".format(query) - - measurement = next(query_result.get_points()) - measurement.pop('time') - - assert measurement == result, "Comparison failure for query of mean state period:\n{0}".format(query) - - print("Successfully passed test for query of mean state period:\n{0}".format(query)) - - -@pytest.fixture(scope='module', autouse=True) -def generate_states_data(streaming_sim_config, get_db_client): - """ - Sends configuration state data to influx for the two mc endpoints - - :param streaming_sim_config: the configuration fixture from conftest.py - :param get_db_client: the influx db client - """ - - global state_delays - global measurement_name - - if measurement_name in (measurement['name'] for measurement in get_db_client.get_list_measurements()): - get_db_client.drop_measurement(measurement_name) # clear DB measurement from previous test if exists - - time.sleep(2) - - for i in range(1, 3): - measurement_time = 0 - for state in ('placing', 'booting', 'connecting'): - epURL = streaming_sim_config['hosts'][i]['ip_address'] # endpoint URL - endpointID = streaming_sim_config['hosts'][i]['ipendpoint_id'] - cpu = streaming_sim_config['hosts'][i]['cpus'] - mem = streaming_sim_config['hosts'][i]['memory'] - disk = streaming_sim_config['hosts'][i]['disk'] - - db_name = streaming_sim_config['hosts'][i]['database_name'] - db_client = InfluxDBClient(host=epURL, port=8186, database=db_name, timeout=10) - - period = state_delays[i-1].get(state) - print("Endpoint - {0}, state - {1}, period - {2}ms".format(endpointID, state, period)) - - db_client.write_points(lp.generate_endpoint_config(state, cpu, mem, disk, period, measurement_time)) - measurement_time += period - - time.sleep(10) - - -def compare(actual_measurement, measurement_dict): - """ - Auxiliary function to check whether the measurements are the same, excluding the time stamp field - - :param actual_measurement: the actual measurement from Influx (the one with a time stamp - :param measurement_dict: the dictionary of expected measurements - :return: True for equality and False otherwise - """ - - actual_measurement.pop('time') - # get the measurement state we are comparing and fetch it from the dictionary of expected measurements - expected_measurement = measurement_dict.get(actual_measurement.get('state')) - - return actual_measurement == expected_measurement diff --git a/clmctest/monitoring/test_simresults.py b/clmctest/monitoring/test_simresults.py index 73d984a1a852ab988a782ce780b4b85d1993cfe6..0277167352449960ce288a49e6dc897d5bbfc3f1 100644 --- a/clmctest/monitoring/test_simresults.py +++ b/clmctest/monitoring/test_simresults.py @@ -17,13 +17,17 @@ class TestSimulation(object): {"time": "1970-01-01T00:00:00Z", "count_avg_response_time": 7200, "count_peak_response_time": 7200, "count_requests": 7200}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."net_port_io"', {"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}), - ('SELECT count(*) FROM "CLMCMetrics"."autogen"."vm_res_alloc"', - {"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}), - + + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', + {"time": "1970-01-01T00:00:00Z", "count_placing": 3, "count_avg_placing": 3, "count_booting": 3, "count_avg_booting": 3, "count_connecting": 3, "count_avg_connecting": 3, "count_cpus": 3, "count_memory": 3, "count_storage": 3}), + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', + {"time": "1970-01-01T00:00:00Z", "count_placing": 3, "count_avg_placing": 3, "count_booting": 3, "count_avg_booting": 3, "count_connecting": 3, "count_avg_connecting": 3, + "count_cpus": 3, "count_memory": 3, "count_storage": 3}), + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', - {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3602, "count_avg_starting" : 3602, "count_avg_stopped" : 3602, "count_avg_stopping" : 3602, "count_running" : 3602, "count_starting" : 3602, "count_stopped" : 3602, "count_stopping" : 3602}), + {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3602, "count_avg_starting": 3602, "count_avg_stopped": 3602, "count_avg_stopping": 3602, "count_running": 3602, "count_starting": 3602, "count_stopped": 3602, "count_stopping": 3602}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', - {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3602, "count_avg_starting" : 3602, "count_avg_stopped" : 3602, "count_avg_stopping" : 3602, "count_running" : 3602, "count_starting" : 3602, "count_stopped" : 3602, "count_stopping" : 3602}), + {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3602, "count_avg_starting": 3602, "count_avg_stopped": 3602, "count_avg_stopping": 3602, "count_running": 3602, "count_starting": 3602, "count_stopped": 3602, "count_stopping": 3602}), ]) def test_simulation(self, query, expected_result, get_db_client, run_simulation_fixture): """