Skip to content
Snippets Groups Projects
Commit ceca9ab8 authored by Simon Crowle's avatar Simon Crowle
Browse files

Adds prototype MPEG-DASH simulator (now uses Telegraf)

NOTE: data schema has not yet changed
parent f74721c3
No related branches found
No related tags found
No related merge requests found
# coding: utf-8
## ///////////////////////////////////////////////////////////////////////
##
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Simon Crowle
## Created Date : 03-01-2018
## Created for Project : FLAME
##
##///////////////////////////////////////////////////////////////////////
from random import random, randint
import math
import time
import datetime
import uuid
import urllib.parse
import urllib.request
# DemoConfig is a configuration class used to set up the simulation
class DemoConfig(object):
def __init__( self ):
self.LOG_DATA = False # Log data sent to INFLUX if true
self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST
self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering)
self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH +1) # 30 mins
self.MIN_QUALITY = 5 # Minimum quality requested by a client
self.MAX_QUALITY = 9 # Maximum quality requested by a client
self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms)
self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes
dc = DemoConfig()
# DemoClient is a class the simulations the behaviour of a single client requesting video from the server
class DemoClient(object):
def __init__( self ):
self.startRequestOffset = randint( 0, dc.CLIENT_START_DELAY_MAX ) # Random time offset before requesting 1st segment
self.numSegRequests = dc.MAX_SEG - randint( 0, 50 ) # Randomly stop client watching all of video
self.id = uuid.uuid4() # Client's ID
self.currSeg = 1 # Client's current segment
self.nextSegCountDown = 0 # Count-down before asking for next segment
self.qualityReq = randint( dc.MIN_QUALITY, dc.MAX_QUALITY ) # Randomly assigned quality for this client
self.lastReqID = None # ID used to track last request made by this client
def getQuality( self ):
return self.qualityReq
def getLastRequestID( self ):
return self.lastReqID
def iterateRequest( self ):
result = None
# If the time offset before asking for 1st segment is through and there are more segments to get
# and it is time to get one, then create a request for one!
if ( self.startRequestOffset == 0 ):
if ( self.numSegRequests > 0 ):
if ( self.nextSegCountDown == 0 ):
# Generate a request ID
self.lastReqID = uuid.uuid4()
# Start building the InfluxDB statement
# tags first
result = 'cid="' + str( self.id ) + '",'
result += 'segment=' + str( self.currSeg ) + ' '
# then fields
result += 'quality=' + str( self.qualityReq ) + ','
result += 'index="' + str( self.lastReqID ) + '"'
# Update this client's segment tracking
self.currSeg += 1
self.numSegRequests -= 1
self.nextSegCountDown = dc.SEG_LENGTH
else:
self.nextSegCountDown -= 1
else:
self.startRequestOffset -= 1
# Return the _partial_ InfluxDB statement (server will complete the rest)
return result
# DemoServer is the class that simulates the behaviour of the MPEG-DASH server
class DemoServer(object):
def __init__( self, cc, si, dbURL, dbName ):
self.influxDB = dbName # InfluxDB database name
self.id = uuid.uuid4() # MPEG-DASH server ID
self.clientCount = cc # Number of clients to simulate
self.simIterations = si # Number of iterations to make for this simulation
self.influxURL = dbURL # InfluxDB connection URL
self.currentTime = int( round( time.time() * 1000 ) ) # The current time
def prepareDatabase( self ):
self._createDB()
self.clients = []
for i in range( self.clientCount ):
self.clients.append( DemoClient() )
def destroyDatabase( self ):
self._deleteDB( self.influxDB )
def reportStatus( self ):
print ('Number of clients: ' + str(len(self.clients)) )
def iterateService( self ):
# The simulation will run through 'X' iterations of the simulation
# each time this method is called. This allows request/response messages to be
# batched and sent to the InfluxDB in sensible sized blocks
return self._executeServiceIteration( dc.ITERATION_STRIDE )
# 'Private' methods ________________________________________________________
def _executeServiceIteration( self, count ):
requestBlock = []
responseBlock = []
totalDifference = sumOfclientQuality = percentageDifference = 0
# Keep going until this stride (count) completes
while ( count > 0 ):
count -= 1
# Check we have some iterations to do
if ( self.simIterations > 0 ):
# First record clients that request segments
clientsRequesting = []
# Run through all clients and see if they make a request
for client in self.clients:
# Record request, if it was generated
cReq = client.iterateRequest()
if ( cReq != None ):
clientsRequesting.append( client )
requestBlock.append( self._generateClientRequest(cReq) )
# Now generate request statistics
clientReqCount = len( clientsRequesting )
# Create a single CPU usage metric for this iteration
cpuUsagePercentage =self._cpuUsage(clientReqCount)
# Now generate responses, based on stats
for client in clientsRequesting:
# Generate some quality and delays based on the number of clients requesting for this iteration
qualitySelect = self._selectQuality( client.getQuality(), clientReqCount )
delaySelect = self._selectDelay( clientReqCount )
qualityDifference = client.getQuality() - qualitySelect
totalDifference+=qualityDifference
# print('totalDifference = ' + str(totalDifference) +'\n')
sumOfclientQuality+=client.getQuality()
#print('sumOfclientQuality = ' + str(sumOfclientQuality) + '\n')
percentageDifference=int((totalDifference*100)/sumOfclientQuality)
# print('percentageOfQualityDifference = ' + str(percentageDifference) + '%')
responseBlock.append( self._generateServerResponse( client.getLastRequestID(), qualitySelect, delaySelect, cpuUsagePercentage, percentageDifference ) )
# Iterate the service simulation
self.simIterations -= 1
self.currentTime += 1000 # advance 1 second
# If we have some requests/responses to send to InfluxDB, do it
if ( len(requestBlock) > 0 and len(responseBlock) > 0 ):
self._sendInfluxDataBlock( requestBlock )
self._sendInfluxDataBlock( responseBlock )
return self.simIterations
def _cpuUsage(self, clientCount):
cpuUsage=randint(0, 10)
if ( clientCount < 20 ):
cpuUsage += 5
elif ( clientCount >= 20 and clientCount <40 ):
cpuUsage += 10
elif ( clientCount >= 40 and clientCount <60 ):
cpuUsage += 15
elif ( clientCount >= 60 and clientCount <80 ):
cpuUsage += 20
elif ( clientCount >= 80 and clientCount <110 ):
cpuUsage += 30
elif ( clientCount >= 110 and clientCount <150 ):
cpuUsage += 40
elif ( clientCount >= 150 and clientCount <200 ):
cpuUsage += 55
elif ( clientCount >= 200 and clientCount <300 ):
cpuUsage += 70
elif ( clientCount >= 300 ):
cpuUsage += 90
return cpuUsage
# Rule to determine a response quality, based on the current number of clients requesting
def _selectQuality( self, expectedQuality, clientCount ):
result = dc.MAX_QUALITY
if ( clientCount < 50 ):
result = 8
elif ( clientCount >= 50 and clientCount < 100 ):
result = 7
elif ( clientCount >= 100 and clientCount < 150 ):
result = 6
elif ( clientCount >= 150 and clientCount < 200 ):
result = 5
elif ( clientCount >= 200 and clientCount < 250 ):
result = 4
elif ( clientCount >= 250 and clientCount < 300 ):
result = 3
elif ( clientCount >= 300 ):
result = 2
# Give the client what it wants if possible
if ( result > expectedQuality ):
result = expectedQuality
return result
# Rule to determine a delay, based on the current number of clients requesting
def _selectDelay( self, cCount ):
result = dc.MIN_SERV_RESP_TIME
if ( cCount < 50 ):
result = 150
elif ( cCount >= 50 and cCount < 100 ):
result = 200
elif ( cCount > 100 and cCount < 200 ):
result = 500
elif ( cCount >= 200 ):
result = 1000
# Perturb the delay a bit
result += randint( 0, 20 )
return result
# Method to create a full InfluxDB request statement (based on partial statement from client)
def _generateClientRequest( self, cReq ):
# Tags first
result = 'sid="' + str(self.id) + '",' + cReq
# Fields
# No additional fields here yet
# Timestamp
result += ' ' + str( self._getNSTime() )
# Measurement
return 'request,' + result
# Method to create a full InfluxDB response statement
def _generateServerResponse( self, reqID, quality, delay, cpuUsage, qualityDifference ):
# Tags first
# None here yet
# Fields
result = 'quality=' + str( quality ) + ','
result += 'cpuUsage=' + str(cpuUsage) + ','
result += 'qualityDifference=' + str(qualityDifference) +','
result += 'requestID="' + str( reqID ) + '",'
result += 'index="' + str( uuid.uuid4() ) + '"'
# Timestamp
result += ' ' + str( self._getNSTime(delay) )
# Measurement
return 'response ' + result
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime( self, offset = 0 ):
# Convert to nano-seconds
return 1000000 * ( self.currentTime + offset )
def _createDB( self ):
self._sendInfluxQuery( 'CREATE DATABASE '+ self.influxDB )
def _deleteDB( self, dbName ):
self._sendInfluxQuery( 'DROP DATABASE ' + self.influxDB )
# InfluxDB data send methods
# -----------------------------------------------------------------------------------------------
def _sendInfluxQuery( self, query ):
query = urllib.parse.urlencode( {'q' : query} )
query = query.encode('ascii')
req = urllib.request.Request( self.influxURL + '/query ', query )
urllib.request.urlopen( req )
def _sendInfluxData( self, data ):
data = data.encode()
header = {'Content-Type': 'application/octet-stream' }
req = urllib.request.Request( self.influxURL + '/write?db=' +self.influxDB, data, header )
urllib.request.urlopen( req )
def _sendInfluxDataBlock( self, dataBlock ):
msg = ''
for stmt in dataBlock:
msg += stmt + '\n'
try:
if ( dc.LOG_DATA == True ):
print( msg )
self._sendInfluxData( msg )
except urllib.error.HTTPError as ex:
print ( "Error calling: " + str( ex.url) + "..." + str(ex.msg) )
# Entry point
# -----------------------------------------------------------------------------------------------
print( "Preparing simulation" )
clients = 10
iterations = 3000
demoServer = DemoServer( clients, iterations, 'http://localhost:8186', 'testDB' )
# Set up InfluxDB (need to wait a little while)
demoServer.destroyDatabase()
time.sleep(2)
demoServer.prepareDatabase()
time.sleep(2)
demoServer.reportStatus()
# Start simulation
print( "Starting simulation" )
while ( True ):
itCount = demoServer.iterateService()
pcDone = round ( (itCount/ iterations) * 100 )
print( "Simulation remaining (%): " + str( pcDone ) +" \r", end='' )
if ( itCount == 0 ):
break;
print( "\nFinished" )
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment