diff --git a/src/mediaServiceSim/serviceSim.py b/src/mediaServiceSim/serviceSim.py new file mode 100644 index 0000000000000000000000000000000000000000..2c02546b4090c3def8f938f7d4894b3355214d03 --- /dev/null +++ b/src/mediaServiceSim/serviceSim.py @@ -0,0 +1,369 @@ +# coding: utf-8 +## /////////////////////////////////////////////////////////////////////// +## +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Simon Crowle +## Created Date : 03-01-2018 +## Created for Project : FLAME +## +##/////////////////////////////////////////////////////////////////////// + +from random import random, randint + +import math +import time +import datetime +import uuid +import urllib.parse +import urllib.request + + +# DemoConfig is a configuration class used to set up the simulation +class DemoConfig(object): + def __init__( self ): + self.LOG_DATA = False # Log data sent to INFLUX if true + self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST + self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering) + self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH +1) # 30 mins + self.MIN_QUALITY = 5 # Minimum quality requested by a client + self.MAX_QUALITY = 9 # Maximum quality requested by a client + self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms) + self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes + +dc = DemoConfig() + + +# DemoClient is a class the simulations the behaviour of a single client requesting video from the server +class DemoClient(object): + def __init__( self ): + self.startRequestOffset = randint( 0, dc.CLIENT_START_DELAY_MAX ) # Random time offset before requesting 1st segment + self.numSegRequests = dc.MAX_SEG - randint( 0, 50 ) # Randomly stop client watching all of video + self.id = uuid.uuid4() # Client's ID + self.currSeg = 1 # Client's current segment + self.nextSegCountDown = 0 # Count-down before asking for next segment + self.qualityReq = randint( dc.MIN_QUALITY, dc.MAX_QUALITY ) # Randomly assigned quality for this client + self.lastReqID = None # ID used to track last request made by this client + + + def getQuality( self ): + return self.qualityReq + + def getLastRequestID( self ): + return self.lastReqID + + def iterateRequest( self ): + result = None + + # If the time offset before asking for 1st segment is through and there are more segments to get + # and it is time to get one, then create a request for one! + if ( self.startRequestOffset == 0 ): + if ( self.numSegRequests > 0 ): + if ( self.nextSegCountDown == 0 ): + + # Generate a request ID + self.lastReqID = uuid.uuid4() + + # Start building the InfluxDB statement + # tags first + result = 'cid="' + str( self.id ) + '",' + result += 'segment=' + str( self.currSeg ) + ' ' + + # then fields + result += 'quality=' + str( self.qualityReq ) + ',' + result += 'index="' + str( self.lastReqID ) + '"' + + # Update this client's segment tracking + self.currSeg += 1 + self.numSegRequests -= 1 + self.nextSegCountDown = dc.SEG_LENGTH + else: + self.nextSegCountDown -= 1 + else: + self.startRequestOffset -= 1 + + # Return the _partial_ InfluxDB statement (server will complete the rest) + return result + + + + + +# DemoServer is the class that simulates the behaviour of the MPEG-DASH server +class DemoServer(object): + def __init__( self, cc, si, dbURL, dbName ): + self.influxDB = dbName # InfluxDB database name + self.id = uuid.uuid4() # MPEG-DASH server ID + self.clientCount = cc # Number of clients to simulate + self.simIterations = si # Number of iterations to make for this simulation + self.influxURL = dbURL # InfluxDB connection URL + self.currentTime = int( round( time.time() * 1000 ) ) # The current time + + + def prepareDatabase( self ): + self._createDB() + + self.clients = [] + for i in range( self.clientCount ): + self.clients.append( DemoClient() ) + + def destroyDatabase( self ): + self._deleteDB( self.influxDB ) + + def reportStatus( self ): + print ('Number of clients: ' + str(len(self.clients)) ) + + def iterateService( self ): + # The simulation will run through 'X' iterations of the simulation + # each time this method is called. This allows request/response messages to be + # batched and sent to the InfluxDB in sensible sized blocks + return self._executeServiceIteration( dc.ITERATION_STRIDE ) + +# 'Private' methods ________________________________________________________ + def _executeServiceIteration( self, count ): + + requestBlock = [] + responseBlock = [] + totalDifference = sumOfclientQuality = percentageDifference = 0 + + # Keep going until this stride (count) completes + while ( count > 0 ): + count -= 1 + + # Check we have some iterations to do + if ( self.simIterations > 0 ): + # First record clients that request segments + clientsRequesting = [] + + # Run through all clients and see if they make a request + for client in self.clients: + + # Record request, if it was generated + cReq = client.iterateRequest() + if ( cReq != None ): + + clientsRequesting.append( client ) + requestBlock.append( self._generateClientRequest(cReq) ) + + + + # Now generate request statistics + clientReqCount = len( clientsRequesting ) + + # Create a single CPU usage metric for this iteration + cpuUsagePercentage =self._cpuUsage(clientReqCount) + + # Now generate responses, based on stats + for client in clientsRequesting: + + # Generate some quality and delays based on the number of clients requesting for this iteration + qualitySelect = self._selectQuality( client.getQuality(), clientReqCount ) + delaySelect = self._selectDelay( clientReqCount ) + qualityDifference = client.getQuality() - qualitySelect + totalDifference+=qualityDifference + # print('totalDifference = ' + str(totalDifference) +'\n') + sumOfclientQuality+=client.getQuality() + #print('sumOfclientQuality = ' + str(sumOfclientQuality) + '\n') + percentageDifference=int((totalDifference*100)/sumOfclientQuality) + # print('percentageOfQualityDifference = ' + str(percentageDifference) + '%') + + responseBlock.append( self._generateServerResponse( client.getLastRequestID(), qualitySelect, delaySelect, cpuUsagePercentage, percentageDifference ) ) + + + # Iterate the service simulation + self.simIterations -= 1 + self.currentTime += 1000 # advance 1 second + + # If we have some requests/responses to send to InfluxDB, do it + if ( len(requestBlock) > 0 and len(responseBlock) > 0 ): + self._sendInfluxDataBlock( requestBlock ) + self._sendInfluxDataBlock( responseBlock ) + + return self.simIterations + + def _cpuUsage(self, clientCount): + cpuUsage=randint(0, 10) + + if ( clientCount < 20 ): + cpuUsage += 5 + elif ( clientCount >= 20 and clientCount <40 ): + cpuUsage += 10 + elif ( clientCount >= 40 and clientCount <60 ): + cpuUsage += 15 + elif ( clientCount >= 60 and clientCount <80 ): + cpuUsage += 20 + elif ( clientCount >= 80 and clientCount <110 ): + cpuUsage += 30 + elif ( clientCount >= 110 and clientCount <150 ): + cpuUsage += 40 + elif ( clientCount >= 150 and clientCount <200 ): + cpuUsage += 55 + elif ( clientCount >= 200 and clientCount <300 ): + cpuUsage += 70 + elif ( clientCount >= 300 ): + cpuUsage += 90 + + return cpuUsage + + + # Rule to determine a response quality, based on the current number of clients requesting + def _selectQuality( self, expectedQuality, clientCount ): + + result = dc.MAX_QUALITY + + if ( clientCount < 50 ): + result = 8 + elif ( clientCount >= 50 and clientCount < 100 ): + result = 7 + elif ( clientCount >= 100 and clientCount < 150 ): + result = 6 + elif ( clientCount >= 150 and clientCount < 200 ): + result = 5 + elif ( clientCount >= 200 and clientCount < 250 ): + result = 4 + elif ( clientCount >= 250 and clientCount < 300 ): + result = 3 + elif ( clientCount >= 300 ): + result = 2 + + # Give the client what it wants if possible + if ( result > expectedQuality ): + result = expectedQuality + + return result + + # Rule to determine a delay, based on the current number of clients requesting + def _selectDelay( self, cCount ): + + result = dc.MIN_SERV_RESP_TIME + + if ( cCount < 50 ): + result = 150 + elif ( cCount >= 50 and cCount < 100 ): + result = 200 + elif ( cCount > 100 and cCount < 200 ): + result = 500 + elif ( cCount >= 200 ): + result = 1000 + + # Perturb the delay a bit + result += randint( 0, 20 ) + + return result + + # Method to create a full InfluxDB request statement (based on partial statement from client) + def _generateClientRequest( self, cReq ): + + # Tags first + result = 'sid="' + str(self.id) + '",' + cReq + + # Fields + # No additional fields here yet + + # Timestamp + result += ' ' + str( self._getNSTime() ) + + # Measurement + return 'request,' + result + + # Method to create a full InfluxDB response statement + def _generateServerResponse( self, reqID, quality, delay, cpuUsage, qualityDifference ): + + # Tags first + # None here yet + + # Fields + result = 'quality=' + str( quality ) + ',' + result += 'cpuUsage=' + str(cpuUsage) + ',' + result += 'qualityDifference=' + str(qualityDifference) +',' + result += 'requestID="' + str( reqID ) + '",' + result += 'index="' + str( uuid.uuid4() ) + '"' + + # Timestamp + result += ' ' + str( self._getNSTime(delay) ) + + # Measurement + return 'response ' + result + + + # InfluxDB likes to have time-stamps in nanoseconds + def _getNSTime( self, offset = 0 ): + # Convert to nano-seconds + return 1000000 * ( self.currentTime + offset ) + + def _createDB( self ): + self._sendInfluxQuery( 'CREATE DATABASE '+ self.influxDB ) + + def _deleteDB( self, dbName ): + self._sendInfluxQuery( 'DROP DATABASE ' + self.influxDB ) + + # InfluxDB data send methods + # ----------------------------------------------------------------------------------------------- + def _sendInfluxQuery( self, query ): + query = urllib.parse.urlencode( {'q' : query} ) + query = query.encode('ascii') + req = urllib.request.Request( self.influxURL + '/query ', query ) + urllib.request.urlopen( req ) + + def _sendInfluxData( self, data ): + data = data.encode() + header = {'Content-Type': 'application/octet-stream' } + req = urllib.request.Request( self.influxURL + '/write?db=' +self.influxDB, data, header ) + urllib.request.urlopen( req ) + + def _sendInfluxDataBlock( self, dataBlock ): + msg = '' + for stmt in dataBlock: + msg += stmt + '\n' + + try: + if ( dc.LOG_DATA == True ): + print( msg ) + + self._sendInfluxData( msg ) + + except urllib.error.HTTPError as ex: + print ( "Error calling: " + str( ex.url) + "..." + str(ex.msg) ) + + +# Entry point +# ----------------------------------------------------------------------------------------------- +print( "Preparing simulation" ) +clients = 10 +iterations = 3000 +demoServer = DemoServer( clients, iterations, 'http://localhost:8186', 'testDB' ) + +# Set up InfluxDB (need to wait a little while) +demoServer.destroyDatabase() +time.sleep(2) +demoServer.prepareDatabase() +time.sleep(2) +demoServer.reportStatus() + +# Start simulation +print( "Starting simulation" ) +while ( True ): + itCount = demoServer.iterateService() + pcDone = round ( (itCount/ iterations) * 100 ) + + print( "Simulation remaining (%): " + str( pcDone ) +" \r", end='' ) + + if ( itCount == 0 ): + break; + +print( "\nFinished" ) \ No newline at end of file