#!/usr/bin/python3 import LineProtocolGenerator as lp import time import urllib.parse import pytest import random import sys from influxdb import InfluxDBClient # Simulation parameters TICK_TIME = 1 DEFAULT_REQUEST_RATE_INC = 1 DEFAULT_REQUEST_RATE_INC_PERIOD = 10 SIMULATION_TIME_SEC = 60 * 60 # CLMC parameters INFLUX_DB_URL = 'http://192.168.50.10:8086' INFLUX_DB_NAME = "CLMCMetrics" AGENT_URL1 = 'http://192.168.50.11:8186' AGENT_URL2 = 'http://192.168.50.12:8186' class Sim(object): """ Simulator for services """ def __init__(self, influx_url, influx_db_name): """ Sets up the simulator object :param influx_url: the influx DB url :param influx_db_name: the influx DB name """ self.influx_db_name = influx_db_name # influx db client is created on initialisation, which will handle the influx DB queries url_object = urllib.parse.urlparse(influx_url) self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=self.influx_db_name, timeout=10) def reset(self): """ Resets the influx db by deleting the old database and creating a new one """ # Teardown DB from previous sim and bring it back up self.db_client.drop_database(self.influx_db_name) self.db_client.create_database(self.influx_db_name) def run(self, simulation_length_seconds): """ Runs the simulation :param simulation_length_seconds: length of simulation """ start_time = time.time() - SIMULATION_TIME_SEC sim_time = start_time # segment_size : the length of video requested at a time # bit_rate: MPEG-2 High 1080p 25fps = 80Mbps ip_endpoints = [{'agent_url': AGENT_URL1, 'location': 'DC1', 'cpu': 16, 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}, {'agent_url': AGENT_URL2, 'location': 'DC2', 'cpu': 4, 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500} ] # Simulate configuration of the ipendpoints # endpoint state->mu, sigma, secs normal distribution config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]} print("\nSimulation started. Generating data...") # Place endpoints max_delay = 0 for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0], config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 'placing', 'placed') max_delay = max(delay_time, max_delay) sim_time += max_delay # Boot endpoints max_delay = 0 for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0], config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 'booting', 'booted') max_delay = max(delay_time, max_delay) sim_time += max_delay # Connect endpoints max_delay = 0 for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0], config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 'connecting', 'connected') max_delay = max(delay_time, max_delay) sim_time += max_delay request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC inc_period_count = 0 for i in range(simulation_length_seconds): for ip_endpoint in ip_endpoints: agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) # linear inc to arrival rate if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD: ip_endpoint['request_arrival_rate'] += request_arrival_rate_inc inc_period_count = 0 else: inc_period_count += 1 # add new requests to the queue ip_endpoint['request_queue'] += ip_endpoint['request_arrival_rate'] # time to process one second of video (mS) in the current second request_processing_time = int(random.normalvariate(10, 10 * 0.68)) request_processing_time = max(request_processing_time, 10) # time depends on the length of the segments in seconds request_processing_time *= ip_endpoint['segment_size'] # amount of cpu time (mS) per tick cpu_time_available = ip_endpoint['cpu'] * TICK_TIME * 1000 max_requests_processed = int(cpu_time_available / request_processing_time) # calc how many requests processed if ip_endpoint['request_queue'] <= max_requests_processed: # processed all of the requests requests_processed = ip_endpoint['request_queue'] else: # processed the maximum number of requests requests_processed = max_requests_processed # calculate cpu usage cpu_active_time = int(requests_processed * request_processing_time) cpu_idle_time = int(cpu_time_available - cpu_active_time) cpu_usage = cpu_active_time / cpu_time_available agent_db_client.write_points(lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time)) # calc network usage metrics bytes_rx = 2048 * requests_processed bytes_tx = int( ip_endpoint['video_bit_rate'] / 8 * 1000000 * requests_processed * ip_endpoint['segment_size']) agent_db_client.write_points(lp.generate_network_report(bytes_rx, bytes_tx, sim_time)) # time to process all of the requests in the queue peak_response_time = ip_endpoint['request_queue'] * request_processing_time / ip_endpoint['cpu'] # mid-range avg_response_time = (peak_response_time + request_processing_time) / 2 agent_db_client.write_points(lp.generate_mpegdash_report('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], avg_response_time, peak_response_time, sim_time)) # need to calculate this but sent at 5mS for now network_request_delay = 0.005 # calculate network response delays (2km link, 100Mbps) network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], ip_endpoint['video_bit_rate']) e2e_delay = network_request_delay + (avg_response_time / 1000) + network_response_delay agent_db_client.write_points(lp.generate_ipendpoint_route('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], e2e_delay, sim_time)) # remove requests processed off the queue ip_endpoint['request_queue'] -= int(requests_processed) sim_time += TICK_TIME end_time = sim_time print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time, end_time - start_time)) @staticmethod def _calcNetworkDelay(distance, bandwidth, packet_size, tx_video_bit_rate): """ Calculates the network delay. Declared as static method since it doesn't need access to any instance variables. :param distance: distance metres :param bandwidth: bandwidth Mbps :param packet_size: packet size bytes :param tx_video_bit_rate: bp/sec :return: the calculated network delay """ # propogation delay = distance/speed () (e.g 2000 metres * 2*10^8 for optical fibre) propogation_delay = distance / (2 * 100000000) # packetisation delay = ip packet size (bits)/tx rate (e.g. 100Mbp with 0% packet loss) packetisation_delay = (packet_size * 8) / (bandwidth * 1000000) # total number of packets to be sent packets = (tx_video_bit_rate * 1000000) / (packet_size * 8) response_delay = packets * (propogation_delay + packetisation_delay) return response_delay @staticmethod def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): """ Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables. :param sim_time: :param ip_endpoint: :param mu: :param sigma: :param transition_state: :param next_state: :return: the delay time """ agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) delay_time = random.normalvariate(mu, sigma) agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time)) return delay_time @pytest.fixture(scope='module') def run_simulation_fixture(): """ A fixture, which checks if the the DB has been created, if not it runs the simulator with a 10 seconds timeout after that """ global INFLUX_DB_URL global INFLUX_DB_NAME global SIMULATION_TIME_SEC simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME) dbs = simulator.db_client.get_list_database() dbs = [db.get("name") for db in dbs] if INFLUX_DB_NAME not in dbs: simulator.reset() simulator.run(SIMULATION_TIME_SEC) print("10 seconds timeout is given so that the data could properly be inserted into the database.") import time time.sleep(10) def run_simulation(generate=True): """ A method which runs the data generation simulator :param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used) """ global INFLUX_DB_NAME global INFLUX_DB_URL global SIMULATION_TIME_SEC simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME) if generate: simulator.reset() simulator.run(SIMULATION_TIME_SEC) else: simulator.db_client.drop_database(simulator.influx_db_name) if __name__ == "__main__": """ The main entry for this module. Code here is executed only if the StreamingSim.py file is executed, but not when it's imported in another module """ # check if there are any command line arguments given when executing the module if len(sys.argv) > 1: # if CLI argument '-c' is set when executing the script, the influx db will be deleted instead of generating data option = str(sys.argv[1]) != "-c" run_simulation(generate=option) else: # no argument is given to the function call, hence the default value True is used run_simulation()