# coding: utf-8 ## /////////////////////////////////////////////////////////////////////// ## ## © University of Southampton IT Innovation Centre, 2018 ## ## Copyright in this software belongs to University of Southampton ## IT Innovation Centre of Gamma House, Enterprise Road, ## Chilworth Science Park, Southampton, SO16 7NS, UK. ## ## This software may not be used, sold, licensed, transferred, copied ## or reproduced in whole or in part in any manner or form or in or ## on any media by any person other than in accordance with the terms ## of the Licence Agreement supplied with the software, or otherwise ## without the prior written consent of the copyright owners. ## ## This software is distributed WITHOUT ANY WARRANTY, without even the ## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ## PURPOSE, except where stated in the Licence Agreement supplied with ## the software. ## ## Created By : Simon Crowle ## Created Date : 03-01-2018 ## Created for Project : FLAME ## ##/////////////////////////////////////////////////////////////////////// from random import random, randint import math import time import datetime import uuid import urllib.parse import urllib.request import LineProtocolGenerator as lp # DemoConfig is a configuration class used to set up the simulation class DemoConfig(object): def __init__(self): self.LOG_DATA = False # Log data sent to INFLUX if true self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering) self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH + 1) # 30 mins self.MIN_QUALITY = 5 # Minimum quality requested by a client self.MAX_QUALITY = 9 # Maximum quality requested by a client self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms) self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes dc = DemoConfig() # DemoClient is a class the simulations the behaviour of a single client requesting video from the server class DemoClient(object): def __init__(self): self.startRequestOffset = randint(0, dc.CLIENT_START_DELAY_MAX) # Random time offset before requesting 1st segment self.numSegRequests = dc.MAX_SEG - randint(0, 50) # Randomly stop client watching all of video self.id = uuid.uuid4() # Client's ID self.currSeg = 1 # Client's current segment self.nextSegCountDown = 0 # Count-down before asking for next segment self.qualityReq = randint(dc.MIN_QUALITY, dc.MAX_QUALITY) # Randomly assigned quality for this client self.lastReqID = None # ID used to track last request made by this client def getQuality(self): return self.qualityReq def getLastRequestID(self): return self.lastReqID def iterateRequest(self): result = None # If the time offset before asking for 1st segment is through and there are more segments to get # and it is time to get one, then create a request for one! if (self.startRequestOffset == 0): if (self.numSegRequests > 0): if (self.nextSegCountDown == 0): # Generate a request ID self.lastReqID = uuid.uuid4() # Start building the InfluxDB statement # tags first result = 'cid="' + str(self.id) + '",' result += 'segment=' + str(self.currSeg) + ' ' # then fields result += 'quality=' + str(self.qualityReq) + ',' result += 'index="' + str(self.lastReqID) + '"' # Update this client's segment tracking self.currSeg += 1 self.numSegRequests -= 1 self.nextSegCountDown = dc.SEG_LENGTH else: self.nextSegCountDown -= 1 else: self.startRequestOffset -= 1 # Return the _partial_ InfluxDB statement (server will complete the rest) return result class DatabaseManager(): def __init__(self, influx_url, db_name): self.influx_url = influx_url self.influxDB = db_name def database_up(self): self._createDB() def database_teardown(self): self._deleteDB() def _createDB(self): self._sendInfluxQuery('CREATE DATABASE ' + self.influxDB) def _deleteDB(self): self._sendInfluxQuery('DROP DATABASE ' + self.influxDB) def _sendInfluxQuery(self, query): query = urllib.parse.urlencode({'q': query}) query = query.encode('ascii') req = urllib.request.Request(self.influxURL + '/query ', query) urllib.request.urlopen(req) # DemoServer is the class that simulates the behaviour of the MPEG-DASH server class DemoServer(object): def __init__(self, cc, si, db_url, db_name): self.influxDB = db_name # InfluxDB database name self.id = uuid.uuid4() # MPEG-DASH server ID self.clientCount = cc # Number of clients to simulate self.simIterations = si # Number of iterations to make for this simulation self.influxURL = db_url # InfluxDB connection URL self.currentTime = int(round(time.time() * 1000)) # The current time def configure(self, server_id, server_location): print("Configuring") self.configure_VM() time.sleep(1) self.configure_server(server_id, server_location) def generate_clients(self): self.clients = [] for i in range(self.clientCount): self.clients.append(DemoClient()) print('Number of clients: ' + str(len(self.clients))) def reportStatus(self): print('Number of clients: ' + str(len(self.clients))) def configure_server(self, server_id, server_location): print("Configuring Servers") server_conf_block = [] server_conf_block.append(lp._generateServerConfig(server_id, server_location, 8, '100G', '1T', self._selectDelay(0))) #ids = ['A', 'B', 'C'] #locations = ['locA', 'locB', 'locC'] #for i, id in enumerate(ids): # server_conf_block.append( # lp._generateServerConfig(id, locations[i], 8, '100G', '1T', self._selectDelay(len(ids)))) self._sendInfluxDataBlock(server_conf_block) def configure_VM(self): print("Configuring VM nodes") VM_conf_block = [] self._generateVMS('starting', 1, VM_conf_block) self._generateVMS('running', 1, VM_conf_block) self._sendInfluxDataBlock(VM_conf_block) def configure_ports(self): print("Configuring Servers") server_conf_block = [] for i in range(0, 10): server_conf_block.append(lp._configure_port()) self._sendInfluxDataBlock(server_conf_block) def shutdown_VMs(self): print("Shutting down VM nodes") VM_conf_block = [] self._generateVMS('stopping', 10, VM_conf_block) self._sendInfluxDataBlock(VM_conf_block) def iterateService(self): # The simulation will run through 'X' iterations of the simulation # each time this method is called. This allows request/response messages to be # batched and sent to the InfluxDB in sensible sized blocks return self._executeServiceIteration(dc.ITERATION_STRIDE) def _executeServiceIteration(self, count): requestBlock = [] responseBlock = [] networkBlock = [] SFBlock = [] totalDifference = sumOfclientQuality = percentageDifference = 0 # Keep going until this stride (count) completes while (count > 0): count -= 1 # Check we have some iterations to do if (self.simIterations > 0): # First record clients that request segments clientsRequesting = [] # Run through all clients and see if they make a request for client in self.clients: # Record request, if it was generated cReq = client.iterateRequest() if (cReq != None): clientsRequesting.append(client) requestBlock.append(lp._generateClientRequest(cReq, self.id, self.currentTime)) # Now generate request statistics clientReqCount = len(clientsRequesting) # Create a single CPU usage metric for this iteration cpuUsagePercentage = self._cpuUsage(clientReqCount) # Now generate responses, based on stats for client in clientsRequesting: # Generate some quality and delays based on the number of clients requesting for this iteration qualitySelect = self._selectQuality(client.getQuality(), clientReqCount) delaySelect = self._selectDelay(clientReqCount) + self.currentTime qualityDifference = client.getQuality() - qualitySelect totalDifference += qualityDifference # print('totalDifference = ' + str(totalDifference) +'\n') sumOfclientQuality += client.getQuality() # print('sumOfclientQuality = ' + str(sumOfclientQuality) + '\n') percentageDifference = int((totalDifference * 100) / sumOfclientQuality) # print('percentageOfQualityDifference = ' + str(percentageDifference) + '%') responseBlock.append(lp._generateServerResponse(client.getLastRequestID(), qualitySelect, delaySelect, cpuUsagePercentage, percentageDifference)) SFBlock.append(lp._generateMpegDashReport('https://netflix.com/scream', qualitySelect, delaySelect)) networkBlock.append(lp._generateNetworkReport(sumOfclientQuality, delaySelect)) # Iterate the service simulation self.simIterations -= 1 self.currentTime += 1000 # advance 1 second # If we have some requests/responses to send to InfluxDB, do it if (len(requestBlock) > 0 and len(responseBlock) > 0): self._sendInfluxDataBlock(requestBlock) self._sendInfluxDataBlock(responseBlock) self._sendInfluxDataBlock(networkBlock) self._sendInfluxDataBlock(SFBlock) print("Sending influx data blocks") return self.simIterations # 'Private' methods ________________________________________________________ def _generateVMS(self, state, amount, datablock): for i in range(0, amount): datablock.append(lp._generateVMConfig(state, 1, '100G', '1T', self._selectDelay(amount))) def _cpuUsage(self, clientCount): cpuUsage = randint(0, 10) if (clientCount < 20): cpuUsage += 5 elif (clientCount >= 20 and clientCount < 40): cpuUsage += 10 elif (clientCount >= 40 and clientCount < 60): cpuUsage += 15 elif (clientCount >= 60 and clientCount < 80): cpuUsage += 20 elif (clientCount >= 80 and clientCount < 110): cpuUsage += 30 elif (clientCount >= 110 and clientCount < 150): cpuUsage += 40 elif (clientCount >= 150 and clientCount < 200): cpuUsage += 55 elif (clientCount >= 200 and clientCount < 300): cpuUsage += 70 elif (clientCount >= 300): cpuUsage += 90 return cpuUsage # Rule to determine a response quality, based on the current number of clients requesting def _selectQuality(self, expectedQuality, clientCount): result = dc.MAX_QUALITY if (clientCount < 50): result = 8 elif (clientCount >= 50 and clientCount < 100): result = 7 elif (clientCount >= 100 and clientCount < 150): result = 6 elif (clientCount >= 150 and clientCount < 200): result = 5 elif (clientCount >= 200 and clientCount < 250): result = 4 elif (clientCount >= 250 and clientCount < 300): result = 3 elif (clientCount >= 300): result = 2 # Give the client what it wants if possible if (result > expectedQuality): result = expectedQuality return result # Rule to determine a delay, based on the current number of clients requesting def _selectDelay(self, cCount): result = dc.MIN_SERV_RESP_TIME if (cCount < 50): result = 150 elif (cCount >= 50 and cCount < 100): result = 200 elif (cCount > 100 and cCount < 200): result = 500 elif (cCount >= 200): result = 1000 # Perturb the delay a bit result += randint(0, 20) return result # InfluxDB data send methods # ----------------------------------------------------------------------------------------------- def _sendInfluxData(self, data): data = data.encode() header = {'Content-Type': 'application/octet-stream'} req = urllib.request.Request(self.influxURL + '/write?db=' + self.influxDB, data, header) urllib.request.urlopen(req) def _sendInfluxDataBlock(self, dataBlock): msg = '' for stmt in dataBlock: msg += stmt + '\n' try: if (dc.LOG_DATA == True): print(msg) self._sendInfluxData(msg) except urllib.error.HTTPError as ex: print("Error calling: " + str(ex.url) + "..." + str(ex.msg)) # Entry point # ----------------------------------------------------------------------------------------------- print("Preparing simulation") clients = 10 iterations = 3000 # port 8086: Direct to DB specified # port 8186: To telegraf, telegraf specifies DB database_manager = DatabaseManager('http://localhost:8186', 'testDB') # Set up InfluxDB (need to wait a little while) database_manager.database_teardown() time.sleep(2) database_manager.database_up() time.sleep(2) # configure servers demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB') demoServer.configure("Server1", "Southampton") # Start simulation print("Starting simulation") while True: itCount = demoServer.iterateService() pcDone = round((itCount / iterations) * 100) print("Simulation remaining (%): " + str(pcDone) + " \r", end='') if itCount == 0: break demoServer.shutdown_VMs() print("\nFinished")