diff --git a/test/streaming-sim/StreamingSim.py b/test/streaming-sim/StreamingSim.py index a2e37375945cec6f56f09b7a74c72d19af04a071..48db4dcb703b52947bc7350b2a57b9adbfa7be39 100644 --- a/test/streaming-sim/StreamingSim.py +++ b/test/streaming-sim/StreamingSim.py @@ -9,83 +9,90 @@ import sys # Simulation parameters TICK_TIME = 1 DEFAULT_REQUEST_RATE_INC = 1 -DEFAULT_REQUEST_RATE_INC_PERIOD = 10 -SIMULATION_TIME_SEC = 60*60 +DEFAULT_REQUEST_RATE_INC_PERIOD = 10 +SIMULATION_TIME_SEC = 60 * 60 # CLMC parameters INFLUX_DB_URL = 'http://192.168.50.10:8086' +INFLUX_DB_NAME = 'CLMCMetrics' AGENT_URL1 = 'http://192.168.50.11:8186' AGENT_URL2 = 'http://192.168.50.12:8186' -# Simulator for services -class Sim: + +class Sim(object): + """ + Simulator for services + """ + def __init__(self, influx_url): - # We don't need this as the db is CLMC metrics - self.influx_db = 'CLMCMetrics' + """ + Sets up the simulator object + + :param influx_url: the influx DB url + """ + + self.influx_db = INFLUX_DB_NAME self.influx_url = influx_url + # Teardown DB from previous sim and bring it back up self._deleteDB() self._createDB() - def run(self, simulation_length_seconds): - start_time = time.time()-SIMULATION_TIME_SEC + """ + Runs the simulation + + :param simulation_length_seconds: length of simulation + """ + + start_time = time.time() - SIMULATION_TIME_SEC sim_time = start_time # segment_size : the length of video requested at a time # bit_rate: MPEG-2 High 1080p 25fps = 80Mbps ip_endpoints = [{'agent_url': AGENT_URL1, 'location': 'DC1', 'cpu': 16, - 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, - 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}, - {'agent_url': AGENT_URL2, 'location': 'DC2', 'cpu': 4, - 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, - 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500} + 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, + 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}, + {'agent_url': AGENT_URL2, 'location': 'DC2', 'cpu': 4, + 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, + 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500} ] # Simulate configuration of the ipendpoints # endpoint state->mu, sigma, secs normal distribution - config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68],"connecting": [10, 0.68]} + config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]} # Place endpoints - max_delay = 0 + max_delay = 0 for ip_endpoint in ip_endpoints: - delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['placing'][0], config_delay_dist['placing'][0]*config_delay_dist['placing'][1], 'placing', 'placed') - if delay_time > max_delay: - max_delay = delay_time - sim_time +=max_delay + delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['placing'][0], + config_delay_dist['placing'][0] * config_delay_dist['placing'][1], + 'placing', 'placed') + max_delay = max(delay_time, max_delay) + sim_time += max_delay # Boot endpoints - max_delay = 0 + max_delay = 0 for ip_endpoint in ip_endpoints: - delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['booting'][0], config_delay_dist['booting'][0]*config_delay_dist['booting'][1], 'booting', 'booted') - if delay_time > max_delay: - max_delay = delay_time - sim_time +=max_delay + delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['booting'][0], + config_delay_dist['booting'][0] * config_delay_dist['booting'][1], + 'booting', 'booted') + max_delay = max(delay_time, max_delay) + sim_time += max_delay # Connect endpoints - max_delay = 0 + max_delay = 0 for ip_endpoint in ip_endpoints: - delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['connecting'][0], config_delay_dist['connecting'][0]*config_delay_dist['connecting'][1], 'connecting', 'connected') - if delay_time > max_delay: - max_delay = delay_time - sim_time +=max_delay - + delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['connecting'][0], + config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], + 'connecting', 'connected') + max_delay = max(delay_time, max_delay) + sim_time += max_delay + request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC - request_queue = 0 inc_period_count = 0 - for i in range(simulation_length_seconds): + for i in range(simulation_length_seconds): for ip_endpoint in ip_endpoints: - request_processing_time = 0 - cpu_time_available = 0 - requests_processed = 0 - max_requests_processed = 0 - cpu_active_time = 0 - cpu_idle_time = 0 - cpu_usage = 0 - cpu_load_time = 0 - avg_response_time = 0 - peak_response_time = 0 - # linear inc to arrival rate if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD: ip_endpoint['request_arrival_rate'] += request_arrival_rate_inc @@ -96,118 +103,164 @@ class Sim: ip_endpoint['request_queue'] += ip_endpoint['request_arrival_rate'] # time to process one second of video (mS) in the current second - request_processing_time = int(random.normalvariate(10, 10*0.68)) - if request_processing_time <= 10: - request_processing_time = 10 + request_processing_time = int(random.normalvariate(10, 10 * 0.68)) + request_processing_time = max(request_processing_time, 10) # time depends on the length of the segments in seconds request_processing_time *= ip_endpoint['segment_size'] # amount of cpu time (mS) per tick - cpu_time_available = ip_endpoint['cpu']*TICK_TIME*1000 - max_requests_processed = int(cpu_time_available/request_processing_time) + cpu_time_available = ip_endpoint['cpu'] * TICK_TIME * 1000 + max_requests_processed = int(cpu_time_available / request_processing_time) # calc how many requests processed if ip_endpoint['request_queue'] <= max_requests_processed: # processed all of the requests requests_processed = ip_endpoint['request_queue'] else: - # processed the maxmum number of requests + # processed the maximum number of requests requests_processed = max_requests_processed # calculate cpu usage - cpu_active_time = int(requests_processed*request_processing_time) - cpu_idle_time = int(cpu_time_available-cpu_active_time) - cpu_usage = cpu_active_time/cpu_time_available - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time)) + cpu_active_time = int(requests_processed * request_processing_time) + cpu_idle_time = int(cpu_time_available - cpu_active_time) + cpu_usage = cpu_active_time / cpu_time_available + self._sendInfluxData(ip_endpoint['agent_url'], + lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time)) # calc network usage metrics - bytes_rx = 2048*requests_processed - bytes_tx = int(ip_endpoint['video_bit_rate']/8*1000000*requests_processed*ip_endpoint['segment_size']) - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_network_report(bytes_rx, bytes_tx, sim_time)) + bytes_rx = 2048 * requests_processed + bytes_tx = int( + ip_endpoint['video_bit_rate'] / 8 * 1000000 * requests_processed * ip_endpoint['segment_size']) + self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_network_report(bytes_rx, bytes_tx, sim_time)) # time to process all of the requests in the queue - peak_response_time = ip_endpoint['request_queue']*request_processing_time/ip_endpoint['cpu'] + peak_response_time = ip_endpoint['request_queue'] * request_processing_time / ip_endpoint['cpu'] # mid-range - avg_response_time = (peak_response_time+request_processing_time)/2 - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_mpegdash_report('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], avg_response_time, peak_response_time, sim_time)) + avg_response_time = (peak_response_time + request_processing_time) / 2 + self._sendInfluxData(ip_endpoint['agent_url'], + lp.generate_mpegdash_report('http://localhost/server-status?auto', + ip_endpoint['request_arrival_rate'], avg_response_time, + peak_response_time, sim_time)) # need to calculate this but sent at 5mS for now network_request_delay = 0.005 # calculate network response delays (2km link, 100Mbps) - network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], ip_endpoint['video_bit_rate'], ip_endpoint['segment_size']) + network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], + ip_endpoint['video_bit_rate']) - e2e_delay = network_request_delay + (avg_response_time/1000) + network_response_delay + e2e_delay = network_request_delay + (avg_response_time / 1000) + network_response_delay - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_ipendpoint_route('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], e2e_delay, sim_time)) + self._sendInfluxData(ip_endpoint['agent_url'], + lp.generate_ipendpoint_route('http://localhost/server-status?auto', + ip_endpoint['request_arrival_rate'], e2e_delay, + sim_time)) # remove requests processed off the queue - ip_endpoint['request_queue'] -= int(requests_processed) + ip_endpoint['request_queue'] -= int(requests_processed) sim_time += TICK_TIME end_time = sim_time - print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time,end_time,end_time-start_time)) + print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time, + end_time - start_time)) - # distance metres - # bandwidth Mbps - # package size bytes - # tx_video_bit_rate bp/sec - # segment size sec - def _calcNetworkDelay(self, distance, bandwidth, packet_size, tx_video_bit_rate, segment_size): - response_delay = 0 + def _calcNetworkDelay(self, distance, bandwidth, packet_size, tx_video_bit_rate): + """ + Calculates the network delay + + :param distance: distance metres + :param bandwidth: bandwidth Mbps + :param packet_size: packet size bytes + :param tx_video_bit_rate: bp/sec + :return: the calculated network delay + """ # propogation delay = distance/speed () (e.g 2000 metres * 2*10^8 for optical fibre) - propogation_delay = distance/(2*100000000) + propogation_delay = distance / (2 * 100000000) # packetisation delay = ip packet size (bits)/tx rate (e.g. 100Mbp with 0% packet loss) - packetisation_delay = (packet_size*8)/(bandwidth*1000000) - # print('packetisation_delay:', packetisation_delay) + packetisation_delay = (packet_size * 8) / (bandwidth * 1000000) # total number of packets to be sent - packets = (tx_video_bit_rate*1000000)/(packet_size*8) - # print('packets:', packets) - response_delay = packets*(propogation_delay+packetisation_delay) - # print('response_delay:', response_delay) + packets = (tx_video_bit_rate * 1000000) / (packet_size * 8) + + response_delay = packets * (propogation_delay + packetisation_delay) - return response_delay + return response_delay def _changeVMState(self, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): - delay_time = 0 - - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) - - delay_time = random.normalvariate(mu, sigma) - - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time+delay_time)) + """ + send influx data to chnage VM state + :param sim_time: + :param ip_endpoint: + :param mu: + :param sigma: + :param transition_state: + :param next_state: + :return: the delay time + """ + self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(transition_state, ip_endpoint['cpu'], + ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) + + delay_time = random.normalvariate(mu, sigma) + + self._sendInfluxData(ip_endpoint['agent_url'], + lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], + ip_endpoint['storage'], sim_time + delay_time)) return delay_time def _createDB(self): - self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db) + """ + Sends a create database influx query + """ + self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db) def _deleteDB(self): + """ + Sends a delete database influx query + """ + self._sendInfluxQuery(self.influx_url, 'DROP DATABASE ' + self.influx_db) + def _sendInfluxData(self, url, data): + """ + writes data to influx DB + :param url: the url of the influx DB + :param data: the data to write + """ + + data = data.encode() + header = {'Content-Type': 'application/octet-stream'} + req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header) + urllib.request.urlopen(req) + @staticmethod def _sendInfluxQuery(url, query): + """ + Sends an influx DB query + :param url: url of db + :param query: the query to send + :return: the response received back from the query + """ + query = urllib.parse.urlencode({'q': query}) query = query.encode('ascii') req = urllib.request.Request(url + '/query ', query) return urllib.request.urlopen(req).read().decode("utf-8").strip() - def _sendInfluxData(self, url, data): - data = data.encode() - header = {'Content-Type': 'application/octet-stream'} - req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header) - urllib.request.urlopen(req) - @pytest.fixture def run_simulation_fixture(): + """ + A fixture, which checks if the the DB has been created, if not it runs the simulator with a 10 seconds timeout after that + """ + global INFLUX_DB_URL + global INFLUX_DB_NAME global SIMULATION_TIME_SEC dbs = Sim._sendInfluxQuery(INFLUX_DB_URL, "SHOW DATABASES") - if "CLMCMetrics" not in dbs: + if INFLUX_DB_NAME not in dbs: simulator = Sim(INFLUX_DB_URL) simulator.run(SIMULATION_TIME_SEC) @@ -215,7 +268,13 @@ def run_simulation_fixture(): import time time.sleep(10) + def run_simulation(generate=True): + """ + A method which runs the data generation simulator + :param generate: True for generating data, False for deleting the DB + """ + global INFLUX_DB_URL global SIMULATION_TIME_SEC @@ -228,8 +287,14 @@ def run_simulation(generate=True): if __name__ == "__main__": + """ + The main entry for this module. Code here is executed only if the StreamingSim.py file is executed, + but not when it's imported + """ + option = True if len(sys.argv) > 1: + # if CLI argument '-c' is set when executing the script the influx db will be deleted instead of generating data option = str(sys.argv[1]) != "-c" run_simulation(generate=option)