From 58275638d702de80b46b174365edc035cde8f00c Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Thu, 5 Apr 2018 08:21:24 +0100 Subject: [PATCH] [Issue #61] - Simulator update to reflect on the changes of the configuration model --- clmctest/monitoring/LineProtocolGenerator.py | 9 +- clmctest/monitoring/StreamingSim.py | 137 +++++++++++++------ 2 files changed, 105 insertions(+), 41 deletions(-) diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py index 47766fb..fd109cf 100644 --- a/clmctest/monitoring/LineProtocolGenerator.py +++ b/clmctest/monitoring/LineProtocolGenerator.py @@ -70,15 +70,18 @@ def generate_ipendpoint_route(resource, requests, latency, time): return result -def generate_endpoint_config(cpu, mem, storage, time, **kwargs): +def generate_endpoint_config(time, cpu, mem, storage, current_state, current_state_time, **kwargs): """ generates a measurement for a VM configuration states :param cpu: the number of CPUs of VM endpoint :param mem: memory of VM endpoint :param storage: storage capacity of VM endpoint + :param current_state: the current state the endpoint is in (TAG) + :param current_state_time: the part of the sampling period the endpoint was in the current state :param time: time of measurement :param kwargs: 'python-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value + :return: dictionary object representing the data to post on influx """ @@ -92,6 +95,10 @@ def generate_endpoint_config(cpu, mem, storage, time, **kwargs): fields[("avg_{0}".format(state))] = validate("avg_{0}".format(state)) result = [{"measurement": "endpoint_config", + "tags": { + "current_state": current_state, + "current_state_time": current_state_time, + }, "fields": fields, "time": _getNSTime(time)}] diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py index 88e85fb..5f8fd2f 100644 --- a/clmctest/monitoring/StreamingSim.py +++ b/clmctest/monitoring/StreamingSim.py @@ -73,48 +73,77 @@ class Sim(object): 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500} ] + endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0}, + self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}} + print("\nSimulation started. Generating data...") # Simulate configuration of the ip endpoints # Move endpoints from state unplaced to state placing - # reports: 1, unplaced: 0.7s, placing: 0.3s + # reports: 1, unplaced: 0.7s (completed), placing: 0.3s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 7, 'unplaced', 'placing') + self._changeVMState(agent_db_client, sim_time, ip_endpoint, (("unplaced", 0.7, 1),), 'placing', 0.3) + # report the change of the state in the config dictionary, to keep track of unreported seconds for the current state + endpoint_states_config[agent]["current_state"] = "placing" + endpoint_states_config[agent]["current_state_time"] = 0.3 sim_time += TICK_TIME # Place endpoints - # reports: 10, placing: 9.1s, placed: 0.9s + # reports: 10 (SAMPLE PERIOD is 1), placing: 9.1s (completed), placed: 0.9s + # addition to uncompleted states from previous report: placing += 0.3s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + # since the current state in the dictionary is still placing, only the current state time is incremented for i in range(9): self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "placing") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME - self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 9, 'placing', 'placed') + # here, the VM exits state placing, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (("placing", 0.1 + endpoint_states_config[agent]["current_state_time"], 1),), 'placed', 0.9) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "placed" + endpoint_states_config[agent]["current_state_time"] = 0.9 sim_time += 10 * TICK_TIME # Move endpoints from state placed to state booting # reports: 1, placed: 0.8s, booting: 0.2s + # addition to uncompleted states from previous report: placed += 0.9s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) - self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 8, 'placed', 'booting') + + # here, the VM exits state placed, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time, ip_endpoint, (('placed', 0.8 + endpoint_states_config[agent]["current_state_time"], 1),), 'booting', 0.2) + endpoint_states_config[agent]["current_state"] = "booting" + endpoint_states_config[agent]["current_state_time"] = 0.2 sim_time += TICK_TIME # Boot endpoints - # reports: 10, booting: 9.4s, booted: 0.6s + # reports: 10 (SAMPLE PERIOD is 1), booting: 9.4s, booted: 0.6s + # addition to uncompleted states from previous report: booting += 0.2s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + # since the current state in the dictionary is still booting, only the current state time is incremented for i in range(9): self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "booting") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME - self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 4, 'booting', 'booted') + # here, the VM exits state booting, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (('booting', 0.4 + endpoint_states_config[agent]["current_state_time"], 1),), 'booted', 0.6) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "booted" + endpoint_states_config[agent]["current_state_time"] = 0.6 sim_time += 10*TICK_TIME # move mpegdash_service media component state from 'stopped' to 'starting' @@ -141,30 +170,49 @@ class Sim(object): # Move endpoints from state booted to state connecting # reports: 2, booted: 1.5s, connecting: 0.5s + # addition to uncompleted states from previous report: booted += 0.6s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + + # since the current state in the dictionary is still booted, only the current state time is incremented self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "booted") - self._changeVMState(agent_db_client, sim_time + (1*TICK_TIME), ip_endpoint, 10, 5, 'booted', 'connecting') + endpoint_states_config[agent]["current_state_time"] += TICK_TIME + + # here, the VM exits state booted, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (1*TICK_TIME), ip_endpoint, (('booted', 0.5 + endpoint_states_config[agent]["current_state_time"], 1),), 'connecting', 0.5) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "connecting" + endpoint_states_config[agent]["current_state_time"] = 0.5 sim_time += 2*TICK_TIME # Connect endpoints # reports: 10, connecting: 9.7s, connected: 0.3s + # addition to uncompleted states from previous report: connecting += 0.5s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + # since the current state in the dictionary is still connecting, only the current state time is incremented for i in range(9): self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connecting") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME - self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 7, 'connecting', 'connected') + # here, the VM exits state booted, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (('connecting', 0.7 + endpoint_states_config[agent]["current_state_time"], 1),), 'connected', 0.3) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "connected" + endpoint_states_config[agent]["current_state_time"] = 0.3 sim_time += 10*TICK_TIME request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC inc_period_count = 0 for i in range(simulation_length_seconds): for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) # linear inc to arrival rate @@ -226,6 +274,8 @@ class Sim(object): # update endpoint state (continuously 'connected') self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected") + # update current state time in the config dictionary + endpoint_states_config[agent]["current_state_time"] += 1 # update mpegdash_service media component state (continuously 'running') self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time) @@ -236,14 +286,22 @@ class Sim(object): # remove endpoints # reports: 5, connected: 4.7s, unplaced: 0.3s + # addition to uncompleted states from previous report: connected += 3600s + 0.3s for ip_endpoint in ip_endpoints: - agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent = ip_endpoint["agent_url"] + agent_url = urllib.parse.urlparse(agent) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + # since the current state in the dictionary is still connected, only the current state time is incremented for i in range(4): self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connected") + endpoint_states_config[agent]["current_state_time"] += TICK_TIME - self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, 10, 7, 'connected', 'unplaced') + # here, the VM exits state connected, hence need to append the current state time + self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, (('connected', 0.7 + endpoint_states_config[agent]["current_state_time"], 1),), 'unplaced', 0.3) + # since VM changed its current state, readjust the config dictionary + endpoint_states_config[agent]["current_state"] = "unplaced" + endpoint_states_config[agent]["current_state_time"] = 0.3 sim_time += 5 * TICK_TIME # move mpegdash_service media component state from 'running' to 'stopping' @@ -305,39 +363,38 @@ class Sim(object): :param state: state that's being reported """ - state_stats = { - state: float(TICK_TIME), - 'avg_{0}'.format(state): float(TICK_TIME) - } - - agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats)) + # since no state has been finished, the generate_endpoint_config is called without any keyword arguments (field:value pairs) and only the tags for the current state are set + data = lp.generate_endpoint_config(sim_time, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], state, TICK_TIME) + agent_db_client.write_points(data) @staticmethod - def _changeVMState(agent_db_client, sim_time, ip_endpoint, sample_count, trans_sample_count, transition_state, next_state): + def _changeVMState(agent_db_client, sim_time, ip_endpoint, completed_states, next_state, next_state_time): """ - Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables. + Send influx data to report change of VM state over the sample period. + :param agent_db_client: agent used to send metric data to CLMC :param sim_time: current simulation time :param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.) - :param sample_count: - the total number of samples in the reporting period (TICK_TIME) - :param trans_sample_count: - the number of samples in the transition state - :param transition_state: name of transition state - :param next_state: name of next state - :return: the delay time - """ - state_stats = {} + :param completed_states: states that have been completed + These are the state that have been completed over the sample period (could be more than 1). In terms of data format, a tuple of tuples. + Each completed state is a tuple if format (state, state_period, count), where state is the state's name, period is the total time the VM was in this state and count + is the number of times the VM was in this state (used to derive the average for the sample period. + E.g. ((booting, 5, 2), (booted, 2, 1)) - this is for transitions: booting(3s) -> booted(2s) -> booting(2s) -> booted(1s) + since booted is the last (current state), it is reported only once in the completed states tuple - # period of transition state is part of the total sample count multiplied by the reporting period (TICK_TIME) - state_stats[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count - state_stats["avg_" + transition_state] = state_stats[transition_state] / float(TICK_TIME) + :param next_state: name of next state (which is also the current state of the VM) - in the above example this would be booted. + :param next_state_time: the time the VM was in the next state (which is also the current state of the VM) - in the above example this would be 1 second. + """ - # the rest of the period is reported as time for the next state - state_stats[next_state] = float(TICK_TIME) - state_stats[transition_state] - state_stats["avg_" + next_state] = state_stats[next_state] / float(TICK_TIME) + state_stats = {} + for state, period, count in completed_states: + state_stats[state] = period + state_stats["avg_{0}".format(state)] = float(period/count) # transition state and state period are passed to the generate report function as keyword arguments - agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats)) + agent_db_client.write_points(lp.generate_endpoint_config(sim_time, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], + next_state, next_state_time, **state_stats)) @staticmethod def _writeMCSingleState(influxClient, measurement, state, sim_time): -- GitLab