diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 213e84da0fff40208d621de5597bca3711ba4a7e..fd109cf0c9548237fc2ec37c63e14c8a7ca0be1f 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -23,23 +23,6 @@ def generate_network_report(recieved_bytes, sent_bytes, time): return result -# Formats VM config -def generate_vm_config(state, cpu, mem, storage, time): - result = [{"measurement": "vm_res_alloc", - "tags": { - "vm_state": state - }, - "fields": { - "cpu": cpu, - "memory": mem, - "storage": storage - }, - "time": _getNSTime(time) - }] - - return result - - # Reports cpu usage, scaling on requests def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time): result = [{"measurement": "cpu_usage", @@ -87,12 +70,40 @@ def generate_ipendpoint_route(resource, requests, latency, time): return result -# InfluxDB likes to have time-stamps in nanoseconds -def _getNSTime(time): - # Convert to nano-seconds - timestamp = int(1000000000*time) +def generate_endpoint_config(time, cpu, mem, storage, current_state, current_state_time, **kwargs): + """ + generates a measurement for a VM configuration states + + :param cpu: the number of CPUs of VM endpoint + :param mem: memory of VM endpoint + :param storage: storage capacity of VM endpoint + :param current_state: the current state the endpoint is in (TAG) + :param current_state_time: the part of the sampling period the endpoint was in the current state + :param time: time of measurement + :param kwargs: 'python-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value + + :return: dictionary object representing the data to post on influx + """ + + # lambda function to validate whether a state is given as a key in the keyword arguments dictionary + validate = lambda key: kwargs.get(key) if key in kwargs else 0.0 + + # generate and validate the state values + fields = {"cpus": cpu, "memory": mem, "storage": storage} # NOTE: Do we need these fields ? + for state in ("unplaced", "placing", "placed", "booting", "booted", "connecting", "connected"): + fields[state] = validate(state) + fields[("avg_{0}".format(state))] = validate("avg_{0}".format(state)) + + result = [{"measurement": "endpoint_config", + "tags": { + "current_state": current_state, + "current_state_time": current_state_time, + }, + "fields": fields, + "time": _getNSTime(time)}] + + return result - return timestamp def generate_mc_service_config( mcMeasurement, stateTimeStats, time ): @@ -142,6 +153,13 @@ def validate_state_time_stats( stateTimeStats ): return stateTimeStats +# InfluxDB likes to have time-stamps in nanoseconds +def _getNSTime(time): + # Convert to nano-seconds + timestamp = int(1000000000*time) + + return timestamp + # DEPRECATED # ____________________________________________________________________________ @@ -153,6 +171,23 @@ def quote_wrap(string): return "\"" + string + "\"" +# Formats VM config +def generate_vm_config(state, cpu, mem, storage, time): + result = [{"measurement": "vm_res_alloc", + "tags": { + "vm_state": state + }, + "fields": { + "cpu": cpu, + "memory": mem, + "storage": storage + }, + "time": _getNSTime(time) + }] + + return result + + def _generateClientRequest(cReq, id, time): # Tags first result = 'sid="' + str(id) + '",' + cReq diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py index d0b97f4178da058d1d8ec9c532fc094e75ba21be..5f8fd2fda6af221867bc229fe3400ab4e9c02b3a 100644 --- a/clmctest/monitoring/StreamingSim.py +++ b/clmctest/monitoring/StreamingSim.py @@ -3,9 +3,9 @@ import clmctest.monitoring.LineProtocolGenerator as lp import time import urllib.parse -import pytest import random -import sys, getopt +import sys +import getopt from influxdb import InfluxDBClient # Simulation parameters @@ -73,40 +73,85 @@ class Sim(object): 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500} ] - # Simulate configuration of the ipendpoints - # endpoint state->mu, sigma, secs normal distribution - config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]} + endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0}, + self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}} print("\nSimulation started. Generating data...") + # Simulate configuration of the ip endpoints + + # Move endpoints from state unplaced to state placing + # reports: 1, unplaced: 0.7s (completed), placing: 0.3s + for ip_endpoint in ip_endpoints: + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + self._changeVMState(agent_db_client, sim_time, ip_endpoint, (("unplaced", 0.7, 1),), 'placing', 0.3) + # report the change of the state in the config dictionary, to keep track of unreported seconds for the current state + endpoint_states_config[agent]["current_state"] = "placing" + endpoint_states_config[agent]["current_state_time"] = 0.3 + sim_time += TICK_TIME + # Place endpoints - max_delay = 0 + # reports: 10 (SAMPLE PERIOD is 1), placing: 9.1s (completed), placed: 0.9s + # addition to uncompleted states from previous report: placing += 0.3s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0], - config_delay_dist['placing'][0] * config_delay_dist['placing'][1], - 'placing', 'placed') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + + # since the current state in the dictionary is still placing, only the current state time is incremented + for i in range(9): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "placing") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME + + # here, the VM exits state placing, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (("placing", 0.1 + endpoint_states_config[agent]["current_state_time"], 1),), 'placed', 0.9) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "placed" + endpoint_states_config[agent]["current_state_time"] = 0.9 + sim_time += 10 * TICK_TIME + + # Move endpoints from state placed to state booting + # reports: 1, placed: 0.8s, booting: 0.2s + # addition to uncompleted states from previous report: placed += 0.9s + for ip_endpoint in ip_endpoints: + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + + # here, the VM exits state placed, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time, ip_endpoint, (('placed', 0.8 + endpoint_states_config[agent]["current_state_time"], 1),), 'booting', 0.2) + endpoint_states_config[agent]["current_state"] = "booting" + endpoint_states_config[agent]["current_state_time"] = 0.2 + sim_time += TICK_TIME # Boot endpoints - max_delay = 0 + # reports: 10 (SAMPLE PERIOD is 1), booting: 9.4s, booted: 0.6s + # addition to uncompleted states from previous report: booting += 0.2s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0], - config_delay_dist['booting'][0] * config_delay_dist['booting'][1], - 'booting', 'booted') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + + # since the current state in the dictionary is still booting, only the current state time is incremented + for i in range(9): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "booting") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME + + # here, the VM exits state booting, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (('booting', 0.4 + endpoint_states_config[agent]["current_state_time"], 1),), 'booted', 0.6) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "booted" + endpoint_states_config[agent]["current_state_time"] = 0.6 + sim_time += 10*TICK_TIME # move mpegdash_service media component state from 'stopped' to 'starting' # Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting') for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting' ) + self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting') sim_time += TICK_TIME @@ -116,29 +161,58 @@ class Sim(object): agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - for i in range( 0, 4 ): - self._writeMCSingleState( influxClient, 'mpegdash_service_config', 'starting', sim_time + (i * TICK_TIME) ) + for i in range(0, 4): + self._writeMCSingleState(influxClient, 'mpegdash_service_config', 'starting', sim_time + (i * TICK_TIME)) - self._changeMCState( influxClient, sim_time + (4 * TICK_TIME), 'mpegdash_service_config', 10, 7, 'starting', 'running' ) + self._changeMCState(influxClient, sim_time + (4 * TICK_TIME), 'mpegdash_service_config', 10, 7, 'starting', 'running') sim_time += 5 * TICK_TIME + # Move endpoints from state booted to state connecting + # reports: 2, booted: 1.5s, connecting: 0.5s + # addition to uncompleted states from previous report: booted += 0.6s + for ip_endpoint in ip_endpoints: + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + + # since the current state in the dictionary is still booted, only the current state time is incremented + self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "booted") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME + + # here, the VM exits state booted, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (1*TICK_TIME), ip_endpoint, (('booted', 0.5 + endpoint_states_config[agent]["current_state_time"], 1),), 'connecting', 0.5) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "connecting" + endpoint_states_config[agent]["current_state_time"] = 0.5 + sim_time += 2*TICK_TIME + # Connect endpoints - max_delay = 0 + # reports: 10, connecting: 9.7s, connected: 0.3s + # addition to uncompleted states from previous report: connecting += 0.5s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0], - config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], - 'connecting', 'connected') - max_delay = max(delay_time, max_delay) - sim_time += max_delay + + # since the current state in the dictionary is still connecting, only the current state time is incremented + for i in range(9): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connecting") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME + + # here, the VM exits state booted, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (('connecting', 0.7 + endpoint_states_config[agent]["current_state_time"], 1),), 'connected', 0.3) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "connected" + endpoint_states_config[agent]["current_state_time"] = 0.3 + sim_time += 10*TICK_TIME request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC inc_period_count = 0 for i in range(simulation_length_seconds): for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) # linear inc to arrival rate @@ -198,12 +272,38 @@ class Sim(object): # remove requests processed off the queue ip_endpoint['request_queue'] -= int(requests_processed) + # update endpoint state (continuously 'connected') + self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected") + # update current state time in the config dictionary + endpoint_states_config[agent]["current_state_time"] += 1 + # update mpegdash_service media component state (continuously 'running') - self._writeMCSingleState( agent_db_client, 'mpegdash_service_config', 'running', sim_time ) + self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time) sim_time += TICK_TIME - # Simulate tear-down of media components + # Simulate tear-down of media components and endpoints + + # remove endpoints + # reports: 5, connected: 4.7s, unplaced: 0.3s + # addition to uncompleted states from previous report: connected += 3600s + 0.3s + for ip_endpoint in ip_endpoints: + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + + # since the current state in the dictionary is still connected, only the current state time is incremented + for i in range(4): + self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connected") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME + + # here, the VM exits state connected, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, (('connected', 0.7 + endpoint_states_config[agent]["current_state_time"], 1),), 'unplaced', 0.3) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "unplaced" + endpoint_states_config[agent]["current_state_time"] = 0.3 + sim_time += 5 * TICK_TIME + # move mpegdash_service media component state from 'running' to 'stopping' # Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping') for ip_endpoint in ip_endpoints: @@ -220,7 +320,7 @@ class Sim(object): for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped' ) + self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped') sim_time += TICK_TIME @@ -253,25 +353,48 @@ class Sim(object): return response_delay @staticmethod - def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): + def _writeVMSingleState(agent_db_client, sim_time, ip_endpoint, state): """ - Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables. - :param sim_time: - :param ip_endpoint: - :param mu: - :param sigma: - :param transition_state: - :param next_state: - :return: the delay time + Write a single state as a sample over TICK_TIME. + + :param agent_db_client: agent used to send metric data to CLMC + :param sim_time: current simulation time + :param ip_endpoint: dict with info for ip endpoint + :param state: state that's being reported + """ + + # since no state has been finished, the generate_endpoint_config is called without any keyword arguments (field:value pairs) and only the tags for the current state are set + data = lp.generate_endpoint_config(sim_time, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], state, TICK_TIME) + agent_db_client.write_points(data) + + @staticmethod + def _changeVMState(agent_db_client, sim_time, ip_endpoint, completed_states, next_state, next_state_time): """ + Send influx data to report change of VM state over the sample period. - agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) + :param agent_db_client: agent used to send metric data to CLMC + :param sim_time: current simulation time + :param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.) - delay_time = random.normalvariate(mu, sigma) + :param completed_states: states that have been completed + These are the state that have been completed over the sample period (could be more than 1). In terms of data format, a tuple of tuples. + Each completed state is a tuple if format (state, state_period, count), where state is the state's name, period is the total time the VM was in this state and count + is the number of times the VM was in this state (used to derive the average for the sample period. + E.g. ((booting, 5, 2), (booted, 2, 1)) - this is for transitions: booting(3s) -> booted(2s) -> booting(2s) -> booted(1s) + since booted is the last (current state), it is reported only once in the completed states tuple - agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time)) + :param next_state: name of next state (which is also the current state of the VM) - in the above example this would be booted. + :param next_state_time: the time the VM was in the next state (which is also the current state of the VM) - in the above example this would be 1 second. + """ + + state_stats = {} + for state, period, count in completed_states: + state_stats[state] = period + state_stats["avg_{0}".format(state)] = float(period/count) - return delay_time + # transition state and state period are passed to the generate report function as keyword arguments + agent_db_client.write_points(lp.generate_endpoint_config(sim_time, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], + next_state, next_state_time, **state_stats)) @staticmethod def _writeMCSingleState(influxClient, measurement, state, sim_time): @@ -287,10 +410,9 @@ class Sim(object): """ state_stats = {} - state_stats[state] = float( TICK_TIME ) - state_stats['avg_' +state] = float( TICK_TIME ) - influxClient.write_points( lp.generate_mc_service_config(measurement,state_stats,sim_time) ) - + state_stats[state] = float(TICK_TIME) + state_stats['avg_' + state] = float(TICK_TIME) + influxClient.write_points(lp.generate_mc_service_config(measurement, state_stats, sim_time)) @staticmethod def _changeMCState(influxClient, sim_time, mc_measurement, sample_count, trans_sample_count, transition_state, next_state): @@ -308,22 +430,24 @@ class Sim(object): """ mc_states = {} - + # Report total time in transition and its average of the reporting period mc_states[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count - mc_states["avg_" +transition_state] = mc_states[transition_state] / float(TICK_TIME) + mc_states["avg_" + transition_state] = mc_states[transition_state] / float(TICK_TIME) # Use the time remaining as the length for the time in the next state mc_states[next_state] = float(TICK_TIME) - mc_states[transition_state] - mc_states["avg_" +next_state] = mc_states[next_state] / float(TICK_TIME) + mc_states["avg_" + next_state] = mc_states[next_state] / float(TICK_TIME) influxClient.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time)) def run_simulation(generate=True, sTime=3600): """ - A method which runs the data generation simulator + A method which runs the data generation simulator. + :param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used) + :param sTime: the number of 'seconds' the simulation will run """ global INFLUX_DB_NAME @@ -342,6 +466,7 @@ def run_simulation(generate=True, sTime=3600): else: simulator.db_client.drop_database(simulator.influx_db_name) + if __name__ == "__main__": """ The main entry for this module. Code here is executed only if the StreamingSim.py file is executed, @@ -354,24 +479,24 @@ if __name__ == "__main__": # Try get some options try: - opts, args = getopt.getopt( sys.argv[1:], "c:t:", ['clear','time=']) + opts, args = getopt.getopt(sys.argv[1:], "c:t:", ['clear', 'time=']) except getopt.GetoptError: - print( 'StreamingSim.py -c -t <seconds>' ) + print('StreamingSim.py -c -t <seconds>') sys.exit(2) # Apply options, if any for opt, arg in opts: - if opt in ( '-c','--clear' ): + if opt in ('-c', '--clear'): genOpt = False - elif opt in ('-t','--time'): + elif opt in ('-t', '--time'): simTime = arg - if ( genOpt == True ): - print( "Running simulation to generate data" ) - print( "Time period for this simulation: " + str(simTime) + " seconds" ) + if genOpt: + print("Running simulation to generate data") + print("Time period for this simulation: " + str(simTime) + " seconds") else: - print( "Clearing simulation data" ) + print("Clearing simulation data") - run_simulation( genOpt, simTime ) + run_simulation(genOpt, simTime) diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py index b6af171be569615762b0c1a94079ac4c1d441d7b..9f3c42cd62743c24c2939063ce1dacbfcb5b1d4d 100644 --- a/clmctest/monitoring/conftest.py +++ b/clmctest/monitoring/conftest.py @@ -12,16 +12,16 @@ def streaming_sim_config(): """ Reads the service configuration deployed for the streaming simulation test. - :param request: access the parameters of the fixture :return: the python object representing the read YAML file """ rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml') - print("rspec file: {0}".format(rspec)) + print("\nrspec file: {0}".format(rspec)) with open(rspec, 'r') as stream: data_loaded = yaml.load(stream) return data_loaded + @pytest.fixture(params=[{'database': 'CLMCMetrics'}], scope='module') def influx_db(streaming_sim_config, request): """ @@ -34,6 +34,7 @@ def influx_db(streaming_sim_config, request): return InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port='8086', database=request.param['database'], timeout=10) + @pytest.fixture(scope="module") def simulator(streaming_sim_config): @@ -42,10 +43,7 @@ def simulator(streaming_sim_config): agent1_url = "http://" + streaming_sim_config['hosts'][1]['ip_address'] + ":8186" agent2_url = "http://" + streaming_sim_config['hosts'][2]['ip_address'] + ":8186" - simulator = Sim( influx_url, influx_db_name, agent1_url, agent2_url ) - - dbs = simulator.db_client.get_list_database() - dbs = [db.get("name") for db in dbs] + simulator = Sim(influx_url, influx_db_name, agent1_url, agent2_url) simulator.reset() diff --git a/clmctest/monitoring/test_simresults.py b/clmctest/monitoring/test_simresults.py index 0d2e2893eb3bd1ce24f50dfde7482d8414cf187f..a18f282c68a12c3cb6927d84395de13a310f7398 100644 --- a/clmctest/monitoring/test_simresults.py +++ b/clmctest/monitoring/test_simresults.py @@ -9,17 +9,16 @@ class TestSimulation(object): """ A testing class used to group all the tests related to the simulation data """ - @pytest.fixture(scope='class') - def run_simulator( self, simulator ): - random.seed( 0 ) # Seed random function so we can reliably test for average queries + @pytest.fixture(scope='class', autouse=True) + def run_simulator(self, simulator): + random.seed(0) # Seed random function so we can reliably test for average queries - print( "Running simulation, please wait..." ) - simulator.run( 3600 ) - - print( "Waiting for INFLUX to finish receiving simulation data..." ) - time.sleep( 10 ) # wait for data to finish arriving at the INFLUX database + print("Running simulation, please wait...") + simulator.run(3600) + print("Waiting for INFLUX to finish receiving simulation data...") + time.sleep(10) # wait for data to finish arriving at the INFLUX database print( "... simulation data fixture finished" ) @pytest.mark.parametrize("query, expected_result", [ @@ -31,33 +30,52 @@ class TestSimulation(object): {"time": "1970-01-01T00:00:00Z", "count_avg_response_time": 7200, "count_peak_response_time": 7200, "count_requests": 7200}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."net_port_io"', {"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}), - ('SELECT count(*) FROM "CLMCMetrics"."autogen"."vm_res_alloc"', - {"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}), - # Media component state tests + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', + {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639, + "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}), + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', + {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639, + "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}), + ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', - {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3609, "count_avg_starting" : 3609, "count_avg_stopped" : 3609, "count_avg_stopping" : 3609, "count_running" : 3609, "count_starting" : 3609, "count_stopped" : 3609, "count_stopping" : 3609}), + {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', - {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3609, "count_avg_starting" : 3609, "count_avg_stopped" : 3609, "count_avg_stopping" : 3609, "count_running" : 3609, "count_starting" : 3609, "count_stopped" : 3609, "count_stopping" : 3609}), - - ('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <>0', - {"time" : "1970-01-01T00:00:00Z", "avg_stopped" : 0.15}), - ('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <>0', - {"time" : "1970-01-01T00:00:00Z", "avg_starting" : 0.9166666666666666}), - ('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <>0', - {"time" : "1970-01-01T00:00:00Z", "avg_running" : 0.9997502081598669}), - ('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <>0', - {"time" : "1970-01-01T00:00:00Z", "avg_stopping" : 0.55}) + {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), + + ('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'', + {"time": "1970-01-01T00:00:00Z", "avg_placing": 9.4}), + ('SELECT mean(avg_booting) as "avg_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_booting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'', + {"time": "1970-01-01T00:00:00Z", "avg_booting": 9.6}), + ('SELECT mean(avg_connecting) as "avg_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connecting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'', + {"time": "1970-01-01T00:00:00Z", "avg_connecting": 10.2}), + ('SELECT mean(avg_connected) as "avg_connected" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connected <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'', + {"time": "1970-01-01T00:00:00Z", "avg_connected": 3605.0}), + ('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'', + {"time": "1970-01-01T00:00:00Z", "avg_placing": 9.4}), + ('SELECT mean(avg_booting) as "avg_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_booting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'', + {"time": "1970-01-01T00:00:00Z", "avg_booting": 9.6}), + ('SELECT mean(avg_connecting) as "avg_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connecting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'', + {"time": "1970-01-01T00:00:00Z", "avg_connecting": 10.2}), + ('SELECT mean(avg_connected) as "avg_connected" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connected <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'', + {"time": "1970-01-01T00:00:00Z", "avg_connected": 3605.0}), + + ('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0', + {"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}), + ('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0', + {"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}), + ('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0', + {"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}), + ('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0', + {"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55}) ]) - - def test_simulation( self, run_simulator, influx_db, query, expected_result ): + def test_simulation(self, influx_db, query, expected_result): """ This is the entry point of the test. This method will be found and executed when the module is ran using pytest :param query: the query to execute (value obtained from the pytest parameter decorator) :param expected_result: the result expected from executing the query (value obtained from the pytest parameter decorator) :param influx_db the import db client fixture - imported from contest.py - :param run_simulation_fixture: the imported fixture to use to generate the testing data - the return value of the fixture is not needed in this case """ # pytest automatically goes through all queries under test, declared in the parameters decorator @@ -69,7 +87,7 @@ class TestSimulation(object): # test the error attribute of the result is None, that is no error is returned from executing the DB query assert query_result.error is None, "An error was encountered while executing query {0}.".format(query) - # get the dictionary of result points; the next() function just gets the first element of the query results iterator (we only expect one item in the iterator) + # get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator) actual_result = next(query_result.get_points()) assert expected_result == actual_result, "Simulation test failure" diff --git a/clmctest/streaming/conftest.py b/clmctest/streaming/conftest.py index 8aa64e50f91b5dda288d9fb05a8241320b36b46c..71a8ff81cb1baba9f57f59253f06b535b1717500 100644 --- a/clmctest/streaming/conftest.py +++ b/clmctest/streaming/conftest.py @@ -2,8 +2,11 @@ import pytest import yaml +import requests +import time import pkg_resources + @pytest.fixture(scope="module") def streaming_config(): """ @@ -18,3 +21,16 @@ def streaming_config(): with open(rspec, 'r') as stream: data_loaded = yaml.load(stream) return data_loaded + + +@pytest.fixture(scope="module", autouse=True, + params=[{'config': {'kapacitor_url': 'http://localhost:8888/chronograf/v1/sources/1/kapacitors', 'kapacitor_file': '/vagrant/test/streaming/kapacitor.json'}}]) +def kapacitor_config(request): + + kapacitor_configuration = request.param['config']['kapacitor_file'] + with open(kapacitor_configuration, "r") as rule_file: + data = "".join(line.strip() for line in rule_file.readlines()) + + kapacitor_url = request.param['config']['kapacitor_url'] + requests.post(url=kapacitor_url, data=data, headers={"Content-Type": "application/json"}) + time.sleep(1) diff --git a/clmctest/streaming/setupCLMC.sh b/clmctest/streaming/setupCLMC.sh index 6d2bd38390aca17ad2ad89a2debb6d5f89eab794..e7e2fc91b89cef38d690781cac3f1c92ba39a901 100644 --- a/clmctest/streaming/setupCLMC.sh +++ b/clmctest/streaming/setupCLMC.sh @@ -33,18 +33,8 @@ echo $TEST_DIR"/kapacitor.conf" cp $TEST_DIR/kapacitor.conf /etc/kapacitor/kapacitor.conf systemctl start kapacitor -# wait for kapacitor to restart -# TODO: do this better -sleep 5 - # Set up Influx data source curl -i -X POST -H "Content-Type: application/json" http://localhost:8888/chronograf/v1/sources -d @$TEST_DIR/influx.json -# Set up Kapacitor -curl -i -X POST -H "Content-Type: application/json" http://localhost:8888/chronograf/v1/sources/1/kapacitors -d @$TEST_DIR/kapacitor.json - -# Set up rules -curl -i -X POST -H "Content-Type: application/json" http://localhost:9092/kapacitor/v1/tasks -d @$TEST_DIR/rules.json - # Set up dashboard curl -i -X POST -H "Content-Type: application/json" http://localhost:8888/chronograf/v1/dashboards -d @$TEST_DIR/dashboard.json diff --git a/clmctest/streaming/test_rule1.json b/clmctest/streaming/test_rule1.json new file mode 100644 index 0000000000000000000000000000000000000000..17d4cdec27f49226968e54cb0f5cb3e794eddd33 --- /dev/null +++ b/clmctest/streaming/test_rule1.json @@ -0,0 +1,9 @@ +{ + "id" : "TestRule1", + "type" : "batch", + "dbrps" : [{"db": "CLMCMetrics", "rp" : "autogen"}], + + "script" : "var ruleData = batch\r\n |query(''' SELECT mean(\"handled\") AS \"mean_handled\" FROM \"CLMCMetrics\".\"autogen\".\"nginx\" WHERE \"ipendpoint\"='adaptive_streaming_I1_nginx1' ''')\r\n .period(5s)\r\n .every(5s)\r\n\r\nvar ruleAlert = ruleData\r\n |alert()\r\n .id('{{ .Name }}\/adaptive_streaming_I1_nginx1')\r\n .message('{{ .ID }} is {{ .Level }} Mean handled connections: {{ index .Fields \"mean_handled\" }}')\r\n .warn(lambda: \"mean_handled\" > 10)\r\n .log( '\/tmp\/TestRule1.log' )", + + "status" : "enabled" +} \ No newline at end of file diff --git a/clmctest/streaming/test_rule2.json b/clmctest/streaming/test_rule2.json new file mode 100644 index 0000000000000000000000000000000000000000..c9adb8401df6662ba14882f3316c84b27e9aa50c --- /dev/null +++ b/clmctest/streaming/test_rule2.json @@ -0,0 +1,9 @@ +{ + "id" : "TestRule2", + "type" : "batch", + "dbrps" : [{"db": "CLMCMetrics", "rp" : "autogen"}], + + "script" : "var ruleData = batch\r\n |query(''' SELECT mean(\"waiting\") AS \"mean_waiting\" FROM \"CLMCMetrics\".\"autogen\".\"nginx\" WHERE \"ipendpoint\"='adaptive_streaming_I1_nginx1' ''')\r\n .period(5s)\r\n .every(5s)\r\n\r\nvar ruleAlert = ruleData\r\n |alert()\r\n .id('{{ .Name }}\/adaptive_streaming_I1_nginx1')\r\n .message('{{ .ID }} is {{ .Level }} Mean waiting connections: {{ index .Fields \"mean_waiting\" }}')\r\n .warn(lambda: \"mean_waiting\" > 10)\r\n .log( '\/tmp\/TestRule2.log' )", + + "status" : "enabled" +} \ No newline at end of file diff --git a/clmctest/streaming/test_streaming.py b/clmctest/streaming/test_streaming.py index c1b7f77190e144a8c77a63c9f83d63e62d1fd1ad..15ecb680b068ea83dce525020aae905985159ccf 100644 --- a/clmctest/streaming/test_streaming.py +++ b/clmctest/streaming/test_streaming.py @@ -21,6 +21,8 @@ class TestStreamingAlerts(object): @pytest.mark.parametrize("rule, log", [ ("rules.json", "/tmp/RPSLoad.log"), + ("test_rule1.json", "/tmp/TestRule1.log"), + ("test_rule2.json", "/tmp/TestRule2.log"), ]) def test_alerts(self, rule, log, streaming_url, streaming_manifest): """ @@ -38,14 +40,10 @@ class TestStreamingAlerts(object): :param streaming_manifest: the fixture providing the root of the XML streaming manifest """ - kapacitor_setter = self.kapacitor_setting(rule) + kapacitor_setter = self.kapacitor_setting(rule, log) next(kapacitor_setter) # Setup the test rule - try: - if isfile(log): - remove(log) # delete log file if existing from previous tests - except PermissionError: - system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file + print("Testing alert creation for rule: {0}".format(rule)) segments = streaming_manifest.findall(".//{urn:mpeg:DASH:schema:MPD:2011}SegmentURL") @@ -56,9 +54,11 @@ class TestStreamingAlerts(object): t.start() alert_created = False + counter = 0 + time_delay = 2.5 while True: # loop while threads are execution and do a check every 2.5 seconds to check if either alert log has been created or threads have finished execution - sleep(2.5) + sleep(time_delay) if isfile(log): for t in threads: # kill all running threads in case log file is created beforehand t.stop() @@ -67,32 +67,55 @@ class TestStreamingAlerts(object): if threads_queue.full(): break - assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert." + counter += time_delay # the counter tracks the time taken; for the rules under test usually a 30 seconds time frame is enough to trigger the alert + if counter >= 12*time_delay: + for t in threads: # kill all running threads in case of test failure + t.stop() + break + + assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert for rule {0}.".format(rule) - print("\nSuccessfully passed alert creation test.\n") + print("Successfully passed alert creation test for rule: {0}.".format(rule)) next(kapacitor_setter) # Teardown the test rule - def kapacitor_setting(self, rule): + def kapacitor_setting(self, rule, log): """ A generator function used to provide setUp/tearDown actions for a particular kapacitor rule. On setUp rule is initialized, on tearDown rule is deleted. Interleaving is achieved using the generator pattern. :param rule: the name of the json file for the rule under test + :param log: the absolute path of the log file that's being tested """ + # check if the log file is already created due to a previous test + try: + if isfile(log): + remove(log) # delete log file if existing from previous tests + except PermissionError: + system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file + # Initialization of the kapacitor rule - Test setUp (UnitTest style) with open(join(dirname(__file__), rule), "r") as rule_file: data = "".join(line.strip() for line in rule_file.readlines()) rule_data = json.loads(data) requests.delete(url=urljoin(self.kapacitor_url + "/", rule_data.get("id"))) # delete in case of a task with the same ID already set in the kapacitor - requests.post(url=self.kapacitor_url, data=data, headers={"Content-Type": "application/json"}) + r = requests.post(url=self.kapacitor_url, data=data, headers={"Content-Type": "application/json"}) + assert r.status_code == 200, "Couldn't create alert rule {0}".format(rule) + print("\nSuccessfully created test rule {0}".format(rule)) yield # Deleting the kapacitor rule used for testing - Test tearDown (UnitTest style) requests.delete(url=urljoin(self.kapacitor_url + "/", rule_data.get("id"))) + + # check if the log file is created and clean it up + try: + if isfile(log): + remove(log) # delete log file if existing from previous tests + except PermissionError: + system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file yield @staticmethod