From 0910e0a19028ca0da2c997f871d31bf4b7dfe825 Mon Sep 17 00:00:00 2001 From: Rowan Powell <rp17@it-innovation.soton.ac.uk> Date: Wed, 10 Jan 2018 15:19:25 +0000 Subject: [PATCH] Moved line protocol generation to new file, database management to new class --- src/mediaServiceSim/serviceSim.py | 290 +++++++++++++++--------------- 1 file changed, 146 insertions(+), 144 deletions(-) diff --git a/src/mediaServiceSim/serviceSim.py b/src/mediaServiceSim/serviceSim.py index 576773e..4d56b38 100644 --- a/src/mediaServiceSim/serviceSim.py +++ b/src/mediaServiceSim/serviceSim.py @@ -34,59 +34,61 @@ import urllib.parse import urllib.request import LineProtocolGenerator as lp + # DemoConfig is a configuration class used to set up the simulation class DemoConfig(object): - def __init__( self ): - self.LOG_DATA = False # Log data sent to INFLUX if true - self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST - self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering) - self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH +1) # 30 mins - self.MIN_QUALITY = 5 # Minimum quality requested by a client - self.MAX_QUALITY = 9 # Maximum quality requested by a client - self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms) - self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes + def __init__(self): + self.LOG_DATA = False # Log data sent to INFLUX if true + self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST + self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering) + self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH + 1) # 30 mins + self.MIN_QUALITY = 5 # Minimum quality requested by a client + self.MAX_QUALITY = 9 # Maximum quality requested by a client + self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms) + self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes + dc = DemoConfig() # DemoClient is a class the simulations the behaviour of a single client requesting video from the server class DemoClient(object): - def __init__( self ): - self.startRequestOffset = randint( 0, dc.CLIENT_START_DELAY_MAX ) # Random time offset before requesting 1st segment - self.numSegRequests = dc.MAX_SEG - randint( 0, 50 ) # Randomly stop client watching all of video - self.id = uuid.uuid4() # Client's ID - self.currSeg = 1 # Client's current segment - self.nextSegCountDown = 0 # Count-down before asking for next segment - self.qualityReq = randint( dc.MIN_QUALITY, dc.MAX_QUALITY ) # Randomly assigned quality for this client - self.lastReqID = None # ID used to track last request made by this client - - - def getQuality( self ): + def __init__(self): + self.startRequestOffset = randint(0, + dc.CLIENT_START_DELAY_MAX) # Random time offset before requesting 1st segment + self.numSegRequests = dc.MAX_SEG - randint(0, 50) # Randomly stop client watching all of video + self.id = uuid.uuid4() # Client's ID + self.currSeg = 1 # Client's current segment + self.nextSegCountDown = 0 # Count-down before asking for next segment + self.qualityReq = randint(dc.MIN_QUALITY, dc.MAX_QUALITY) # Randomly assigned quality for this client + self.lastReqID = None # ID used to track last request made by this client + + def getQuality(self): return self.qualityReq - def getLastRequestID( self ): + def getLastRequestID(self): return self.lastReqID - def iterateRequest( self ): + def iterateRequest(self): result = None # If the time offset before asking for 1st segment is through and there are more segments to get # and it is time to get one, then create a request for one! - if ( self.startRequestOffset == 0 ): - if ( self.numSegRequests > 0 ): - if ( self.nextSegCountDown == 0 ): + if (self.startRequestOffset == 0): + if (self.numSegRequests > 0): + if (self.nextSegCountDown == 0): # Generate a request ID self.lastReqID = uuid.uuid4() # Start building the InfluxDB statement # tags first - result = 'cid="' + str( self.id ) + '",' - result += 'segment=' + str( self.currSeg ) + ' ' + result = 'cid="' + str(self.id) + '",' + result += 'segment=' + str(self.currSeg) + ' ' # then fields - result += 'quality=' + str( self.qualityReq ) + ',' - result += 'index="' + str( self.lastReqID ) + '"' + result += 'quality=' + str(self.qualityReq) + ',' + result += 'index="' + str(self.lastReqID) + '"' # Update this client's segment tracking self.currSeg += 1 @@ -101,32 +103,48 @@ class DemoClient(object): return result +class DatabaseManager(): + def __init__(self, influx_url, db_name): + self.influx_url = influx_url + self.influxDB = db_name + def database_up(self): + self._createDB() + + def database_teardown(self): + self._deleteDB() + + def _createDB(self): + self._sendInfluxQuery('CREATE DATABASE ' + self.influxDB) + + def _deleteDB(self): + self._sendInfluxQuery('DROP DATABASE ' + self.influxDB) + + def _sendInfluxQuery(self, query): + query = urllib.parse.urlencode({'q': query}) + query = query.encode('ascii') + req = urllib.request.Request(self.influxURL + '/query ', query) + urllib.request.urlopen(req) # DemoServer is the class that simulates the behaviour of the MPEG-DASH server class DemoServer(object): - def __init__( self, cc, si, dbURL, dbName ): - self.influxDB = dbName # InfluxDB database name - self.id = uuid.uuid4() # MPEG-DASH server ID - self.clientCount = cc # Number of clients to simulate - self.simIterations = si # Number of iterations to make for this simulation - self.influxURL = dbURL # InfluxDB connection URL - self.currentTime = int( round( time.time() * 1000 ) ) # The current time - - - def prepareDatabase( self ): - self._createDB() - + def __init__(self, cc, si, db_url, db_name): + self.influxDB = db_name # InfluxDB database name + self.id = uuid.uuid4() # MPEG-DASH server ID + self.clientCount = cc # Number of clients to simulate + self.simIterations = si # Number of iterations to make for this simulation + self.influxURL = db_url # InfluxDB connection URL + self.currentTime = int(round(time.time() * 1000)) # The current time + + def generate_clients(self): self.clients = [] - for i in range( self.clientCount ): - self.clients.append( DemoClient() ) + for i in range(self.clientCount): + self.clients.append(DemoClient()) + print('Number of clients: ' + str(len(self.clients))) - def destroyDatabase( self ): - self._deleteDB( self.influxDB ) - - def reportStatus( self ): - print ('Number of clients: ' + str(len(self.clients)) ) + def reportStatus(self): + print('Number of clients: ' + str(len(self.clients))) def configure_servers(self): print("Configuring Servers") @@ -134,43 +152,44 @@ class DemoServer(object): ids = ['A', 'B', 'C'] locations = ['locA', 'locB', 'locC'] for i, id in enumerate(ids): - server_conf_block.append(lp._generateServerConfig(id,locations[i],8,'100G','1T', self._selectDelay(len(ids)))) + server_conf_block.append( + lp._generateServerConfig(id, locations[i], 8, '100G', '1T', self._selectDelay(len(ids)))) self._sendInfluxDataBlock(server_conf_block) + def configure_VMs(self): print("Configuring VM nodes") VM_conf_block = [] - self._generateVMS('starting',10,VM_conf_block) - self._generateVMS('running',10,VM_conf_block) + self._generateVMS('starting', 10, VM_conf_block) + self._generateVMS('running', 10, VM_conf_block) self._sendInfluxDataBlock(VM_conf_block) def configure_ports(self): print("Configuring Servers") server_conf_block = [] - for i in range(0,10): + for i in range(0, 10): server_conf_block.append(lp._configure_port()) self._sendInfluxDataBlock(server_conf_block) - def shutdown_VMs(self): print("Shutting down VM nodes") VM_conf_block = [] - self._generateVMS('stopping',10,VM_conf_block) + self._generateVMS('stopping', 10, VM_conf_block) self._sendInfluxDataBlock(VM_conf_block) - def _generateVMS(self,state, amount, datablock): + def _generateVMS(self, state, amount, datablock): for i in range(0, amount): datablock.append(lp._generateVMConfig(state, 1, '100G', '1T', self._selectDelay(amount))) - def iterateService( self ): + def iterateService(self): # The simulation will run through 'X' iterations of the simulation # each time this method is called. This allows request/response messages to be # batched and sent to the InfluxDB in sensible sized blocks - return self._executeServiceIteration( dc.ITERATION_STRIDE ) + return self._executeServiceIteration(dc.ITERATION_STRIDE) -# 'Private' methods ________________________________________________________ - def _executeServiceIteration( self, count ): + # 'Private' methods ________________________________________________________ + def _executeServiceIteration(self, count): requestBlock = [] responseBlock = [] @@ -179,11 +198,11 @@ class DemoServer(object): totalDifference = sumOfclientQuality = percentageDifference = 0 # Keep going until this stride (count) completes - while ( count > 0 ): + while (count > 0): count -= 1 # Check we have some iterations to do - if ( self.simIterations > 0 ): + if (self.simIterations > 0): # First record clients that request segments clientsRequesting = [] @@ -192,186 +211,169 @@ class DemoServer(object): # Record request, if it was generated cReq = client.iterateRequest() - if ( cReq != None ): - - clientsRequesting.append( client ) - requestBlock.append( lp._generateClientRequest(cReq, self.id, self.currentTime) ) - - + if (cReq != None): + clientsRequesting.append(client) + requestBlock.append(lp._generateClientRequest(cReq, self.id, self.currentTime)) # Now generate request statistics - clientReqCount = len( clientsRequesting ) + clientReqCount = len(clientsRequesting) # Create a single CPU usage metric for this iteration - cpuUsagePercentage =self._cpuUsage(clientReqCount) + cpuUsagePercentage = self._cpuUsage(clientReqCount) # Now generate responses, based on stats for client in clientsRequesting: - # Generate some quality and delays based on the number of clients requesting for this iteration - qualitySelect = self._selectQuality( client.getQuality(), clientReqCount ) - delaySelect = self._selectDelay( clientReqCount ) + self.currentTime + qualitySelect = self._selectQuality(client.getQuality(), clientReqCount) + delaySelect = self._selectDelay(clientReqCount) + self.currentTime qualityDifference = client.getQuality() - qualitySelect - totalDifference+=qualityDifference + totalDifference += qualityDifference # print('totalDifference = ' + str(totalDifference) +'\n') - sumOfclientQuality+=client.getQuality() + sumOfclientQuality += client.getQuality() # print('sumOfclientQuality = ' + str(sumOfclientQuality) + '\n') - percentageDifference=int((totalDifference*100)/sumOfclientQuality) + percentageDifference = int((totalDifference * 100) / sumOfclientQuality) # print('percentageOfQualityDifference = ' + str(percentageDifference) + '%') - + responseBlock.append(lp._generateServerResponse(client.getLastRequestID(), qualitySelect, - delaySelect, cpuUsagePercentage, - percentageDifference)) + delaySelect, cpuUsagePercentage, + percentageDifference)) SFBlock.append(lp._generateMpegDashReport('https://netflix.com/scream', qualitySelect, delaySelect)) - networkBlock.append(lp._generateNetworkReport(sumOfclientQuality, delaySelect)) # Iterate the service simulation self.simIterations -= 1 - self.currentTime += 1000 # advance 1 second + self.currentTime += 1000 # advance 1 second # If we have some requests/responses to send to InfluxDB, do it - if ( len(requestBlock) > 0 and len(responseBlock) > 0 ): - self._sendInfluxDataBlock( requestBlock ) - self._sendInfluxDataBlock( responseBlock ) - self._sendInfluxDataBlock( networkBlock ) - self._sendInfluxDataBlock( SFBlock ) + if (len(requestBlock) > 0 and len(responseBlock) > 0): + self._sendInfluxDataBlock(requestBlock) + self._sendInfluxDataBlock(responseBlock) + self._sendInfluxDataBlock(networkBlock) + self._sendInfluxDataBlock(SFBlock) print("Sending influx data blocks") return self.simIterations def _cpuUsage(self, clientCount): - cpuUsage=randint(0, 10) + cpuUsage = randint(0, 10) - if ( clientCount < 20 ): + if (clientCount < 20): cpuUsage += 5 - elif ( clientCount >= 20 and clientCount <40 ): + elif (clientCount >= 20 and clientCount < 40): cpuUsage += 10 - elif ( clientCount >= 40 and clientCount <60 ): + elif (clientCount >= 40 and clientCount < 60): cpuUsage += 15 - elif ( clientCount >= 60 and clientCount <80 ): + elif (clientCount >= 60 and clientCount < 80): cpuUsage += 20 - elif ( clientCount >= 80 and clientCount <110 ): + elif (clientCount >= 80 and clientCount < 110): cpuUsage += 30 - elif ( clientCount >= 110 and clientCount <150 ): + elif (clientCount >= 110 and clientCount < 150): cpuUsage += 40 - elif ( clientCount >= 150 and clientCount <200 ): + elif (clientCount >= 150 and clientCount < 200): cpuUsage += 55 - elif ( clientCount >= 200 and clientCount <300 ): + elif (clientCount >= 200 and clientCount < 300): cpuUsage += 70 - elif ( clientCount >= 300 ): + elif (clientCount >= 300): cpuUsage += 90 return cpuUsage - # Rule to determine a response quality, based on the current number of clients requesting - def _selectQuality( self, expectedQuality, clientCount ): + def _selectQuality(self, expectedQuality, clientCount): result = dc.MAX_QUALITY - if ( clientCount < 50 ): + if (clientCount < 50): result = 8 - elif ( clientCount >= 50 and clientCount < 100 ): + elif (clientCount >= 50 and clientCount < 100): result = 7 - elif ( clientCount >= 100 and clientCount < 150 ): + elif (clientCount >= 100 and clientCount < 150): result = 6 - elif ( clientCount >= 150 and clientCount < 200 ): + elif (clientCount >= 150 and clientCount < 200): result = 5 - elif ( clientCount >= 200 and clientCount < 250 ): + elif (clientCount >= 200 and clientCount < 250): result = 4 - elif ( clientCount >= 250 and clientCount < 300 ): + elif (clientCount >= 250 and clientCount < 300): result = 3 - elif ( clientCount >= 300 ): + elif (clientCount >= 300): result = 2 # Give the client what it wants if possible - if ( result > expectedQuality ): + if (result > expectedQuality): result = expectedQuality return result # Rule to determine a delay, based on the current number of clients requesting - def _selectDelay( self, cCount ): + def _selectDelay(self, cCount): result = dc.MIN_SERV_RESP_TIME - if ( cCount < 50 ): + if (cCount < 50): result = 150 - elif ( cCount >= 50 and cCount < 100 ): + elif (cCount >= 50 and cCount < 100): result = 200 - elif ( cCount > 100 and cCount < 200 ): + elif (cCount > 100 and cCount < 200): result = 500 - elif ( cCount >= 200 ): + elif (cCount >= 200): result = 1000 # Perturb the delay a bit - result += randint( 0, 20 ) + result += randint(0, 20) return result - - def _createDB( self ): - self._sendInfluxQuery( 'CREATE DATABASE '+ self.influxDB ) - - def _deleteDB( self, dbName ): - self._sendInfluxQuery( 'DROP DATABASE ' + self.influxDB ) - # InfluxDB data send methods # ----------------------------------------------------------------------------------------------- - def _sendInfluxQuery( self, query ): - query = urllib.parse.urlencode( {'q' : query} ) - query = query.encode('ascii') - req = urllib.request.Request( self.influxURL + '/query ', query ) - urllib.request.urlopen( req ) - def _sendInfluxData( self, data ): + def _sendInfluxData(self, data): data = data.encode() - header = {'Content-Type': 'application/octet-stream' } - req = urllib.request.Request( self.influxURL + '/write?db=' +self.influxDB, data, header ) - urllib.request.urlopen( req ) + header = {'Content-Type': 'application/octet-stream'} + req = urllib.request.Request(self.influxURL + '/write?db=' + self.influxDB, data, header) + urllib.request.urlopen(req) - def _sendInfluxDataBlock( self, dataBlock ): + def _sendInfluxDataBlock(self, dataBlock): msg = '' for stmt in dataBlock: msg += stmt + '\n' try: - if ( dc.LOG_DATA == True ): - print( msg ) + if (dc.LOG_DATA == True): + print(msg) - self._sendInfluxData( msg ) + self._sendInfluxData(msg) except urllib.error.HTTPError as ex: - print ( "Error calling: " + str( ex.url) + "..." + str(ex.msg) ) + print("Error calling: " + str(ex.url) + "..." + str(ex.msg)) # Entry point # ----------------------------------------------------------------------------------------------- -print( "Preparing simulation" ) -clients = 10 +print("Preparing simulation") +clients = 10 iterations = 3000 # port 8086: Direct to DB specified # port 8186: To telegraf, telegraf specifies DB -demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB') - +database_manager = DatabaseManager('http://localhost:8186', 'testDB') # Set up InfluxDB (need to wait a little while) -demoServer.destroyDatabase() +database_manager.database_teardown() time.sleep(2) -demoServer.prepareDatabase() +database_manager.database_up() time.sleep(2) +# configure servers +demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB') demoServer.reportStatus() demoServer.configure_servers() demoServer.configure_VMs() # Start simulation -print( "Starting simulation" ) -while ( True ): +print("Starting simulation") +while True: itCount = demoServer.iterateService() - pcDone = round ( (itCount/ iterations) * 100 ) + pcDone = round((itCount / iterations) * 100) - print( "Simulation remaining (%): " + str( pcDone ) +" \r", end='' ) + print("Simulation remaining (%): " + str(pcDone) + " \r", end='') - if ( itCount == 0 ): + if itCount == 0: break demoServer.shutdown_VMs() -print("\nFinished") \ No newline at end of file +print("\nFinished") -- GitLab