Skip to content
Snippets Groups Projects
Commit 0910e0a1 authored by Rowan Powell's avatar Rowan Powell
Browse files

Moved line protocol generation to new file, database management to new class

parent e054e810
No related branches found
No related tags found
No related merge requests found
......@@ -34,59 +34,61 @@ import urllib.parse
import urllib.request
import LineProtocolGenerator as lp
# DemoConfig is a configuration class used to set up the simulation
class DemoConfig(object):
def __init__( self ):
self.LOG_DATA = False # Log data sent to INFLUX if true
self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST
self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering)
self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH +1) # 30 mins
self.MIN_QUALITY = 5 # Minimum quality requested by a client
self.MAX_QUALITY = 9 # Maximum quality requested by a client
self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms)
self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes
def __init__(self):
self.LOG_DATA = False # Log data sent to INFLUX if true
self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST
self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering)
self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH + 1) # 30 mins
self.MIN_QUALITY = 5 # Minimum quality requested by a client
self.MAX_QUALITY = 9 # Maximum quality requested by a client
self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms)
self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes
dc = DemoConfig()
# DemoClient is a class the simulations the behaviour of a single client requesting video from the server
class DemoClient(object):
def __init__( self ):
self.startRequestOffset = randint( 0, dc.CLIENT_START_DELAY_MAX ) # Random time offset before requesting 1st segment
self.numSegRequests = dc.MAX_SEG - randint( 0, 50 ) # Randomly stop client watching all of video
self.id = uuid.uuid4() # Client's ID
self.currSeg = 1 # Client's current segment
self.nextSegCountDown = 0 # Count-down before asking for next segment
self.qualityReq = randint( dc.MIN_QUALITY, dc.MAX_QUALITY ) # Randomly assigned quality for this client
self.lastReqID = None # ID used to track last request made by this client
def getQuality( self ):
def __init__(self):
self.startRequestOffset = randint(0,
dc.CLIENT_START_DELAY_MAX) # Random time offset before requesting 1st segment
self.numSegRequests = dc.MAX_SEG - randint(0, 50) # Randomly stop client watching all of video
self.id = uuid.uuid4() # Client's ID
self.currSeg = 1 # Client's current segment
self.nextSegCountDown = 0 # Count-down before asking for next segment
self.qualityReq = randint(dc.MIN_QUALITY, dc.MAX_QUALITY) # Randomly assigned quality for this client
self.lastReqID = None # ID used to track last request made by this client
def getQuality(self):
return self.qualityReq
def getLastRequestID( self ):
def getLastRequestID(self):
return self.lastReqID
def iterateRequest( self ):
def iterateRequest(self):
result = None
# If the time offset before asking for 1st segment is through and there are more segments to get
# and it is time to get one, then create a request for one!
if ( self.startRequestOffset == 0 ):
if ( self.numSegRequests > 0 ):
if ( self.nextSegCountDown == 0 ):
if (self.startRequestOffset == 0):
if (self.numSegRequests > 0):
if (self.nextSegCountDown == 0):
# Generate a request ID
self.lastReqID = uuid.uuid4()
# Start building the InfluxDB statement
# tags first
result = 'cid="' + str( self.id ) + '",'
result += 'segment=' + str( self.currSeg ) + ' '
result = 'cid="' + str(self.id) + '",'
result += 'segment=' + str(self.currSeg) + ' '
# then fields
result += 'quality=' + str( self.qualityReq ) + ','
result += 'index="' + str( self.lastReqID ) + '"'
result += 'quality=' + str(self.qualityReq) + ','
result += 'index="' + str(self.lastReqID) + '"'
# Update this client's segment tracking
self.currSeg += 1
......@@ -101,32 +103,48 @@ class DemoClient(object):
return result
class DatabaseManager():
def __init__(self, influx_url, db_name):
self.influx_url = influx_url
self.influxDB = db_name
def database_up(self):
self._createDB()
def database_teardown(self):
self._deleteDB()
def _createDB(self):
self._sendInfluxQuery('CREATE DATABASE ' + self.influxDB)
def _deleteDB(self):
self._sendInfluxQuery('DROP DATABASE ' + self.influxDB)
def _sendInfluxQuery(self, query):
query = urllib.parse.urlencode({'q': query})
query = query.encode('ascii')
req = urllib.request.Request(self.influxURL + '/query ', query)
urllib.request.urlopen(req)
# DemoServer is the class that simulates the behaviour of the MPEG-DASH server
class DemoServer(object):
def __init__( self, cc, si, dbURL, dbName ):
self.influxDB = dbName # InfluxDB database name
self.id = uuid.uuid4() # MPEG-DASH server ID
self.clientCount = cc # Number of clients to simulate
self.simIterations = si # Number of iterations to make for this simulation
self.influxURL = dbURL # InfluxDB connection URL
self.currentTime = int( round( time.time() * 1000 ) ) # The current time
def prepareDatabase( self ):
self._createDB()
def __init__(self, cc, si, db_url, db_name):
self.influxDB = db_name # InfluxDB database name
self.id = uuid.uuid4() # MPEG-DASH server ID
self.clientCount = cc # Number of clients to simulate
self.simIterations = si # Number of iterations to make for this simulation
self.influxURL = db_url # InfluxDB connection URL
self.currentTime = int(round(time.time() * 1000)) # The current time
def generate_clients(self):
self.clients = []
for i in range( self.clientCount ):
self.clients.append( DemoClient() )
for i in range(self.clientCount):
self.clients.append(DemoClient())
print('Number of clients: ' + str(len(self.clients)))
def destroyDatabase( self ):
self._deleteDB( self.influxDB )
def reportStatus( self ):
print ('Number of clients: ' + str(len(self.clients)) )
def reportStatus(self):
print('Number of clients: ' + str(len(self.clients)))
def configure_servers(self):
print("Configuring Servers")
......@@ -134,43 +152,44 @@ class DemoServer(object):
ids = ['A', 'B', 'C']
locations = ['locA', 'locB', 'locC']
for i, id in enumerate(ids):
server_conf_block.append(lp._generateServerConfig(id,locations[i],8,'100G','1T', self._selectDelay(len(ids))))
server_conf_block.append(
lp._generateServerConfig(id, locations[i], 8, '100G', '1T', self._selectDelay(len(ids))))
self._sendInfluxDataBlock(server_conf_block)
def configure_VMs(self):
print("Configuring VM nodes")
VM_conf_block = []
self._generateVMS('starting',10,VM_conf_block)
self._generateVMS('running',10,VM_conf_block)
self._generateVMS('starting', 10, VM_conf_block)
self._generateVMS('running', 10, VM_conf_block)
self._sendInfluxDataBlock(VM_conf_block)
def configure_ports(self):
print("Configuring Servers")
server_conf_block = []
for i in range(0,10):
for i in range(0, 10):
server_conf_block.append(lp._configure_port())
self._sendInfluxDataBlock(server_conf_block)
def shutdown_VMs(self):
print("Shutting down VM nodes")
VM_conf_block = []
self._generateVMS('stopping',10,VM_conf_block)
self._generateVMS('stopping', 10, VM_conf_block)
self._sendInfluxDataBlock(VM_conf_block)
def _generateVMS(self,state, amount, datablock):
def _generateVMS(self, state, amount, datablock):
for i in range(0, amount):
datablock.append(lp._generateVMConfig(state, 1, '100G', '1T', self._selectDelay(amount)))
def iterateService( self ):
def iterateService(self):
# The simulation will run through 'X' iterations of the simulation
# each time this method is called. This allows request/response messages to be
# batched and sent to the InfluxDB in sensible sized blocks
return self._executeServiceIteration( dc.ITERATION_STRIDE )
return self._executeServiceIteration(dc.ITERATION_STRIDE)
# 'Private' methods ________________________________________________________
def _executeServiceIteration( self, count ):
# 'Private' methods ________________________________________________________
def _executeServiceIteration(self, count):
requestBlock = []
responseBlock = []
......@@ -179,11 +198,11 @@ class DemoServer(object):
totalDifference = sumOfclientQuality = percentageDifference = 0
# Keep going until this stride (count) completes
while ( count > 0 ):
while (count > 0):
count -= 1
# Check we have some iterations to do
if ( self.simIterations > 0 ):
if (self.simIterations > 0):
# First record clients that request segments
clientsRequesting = []
......@@ -192,186 +211,169 @@ class DemoServer(object):
# Record request, if it was generated
cReq = client.iterateRequest()
if ( cReq != None ):
clientsRequesting.append( client )
requestBlock.append( lp._generateClientRequest(cReq, self.id, self.currentTime) )
if (cReq != None):
clientsRequesting.append(client)
requestBlock.append(lp._generateClientRequest(cReq, self.id, self.currentTime))
# Now generate request statistics
clientReqCount = len( clientsRequesting )
clientReqCount = len(clientsRequesting)
# Create a single CPU usage metric for this iteration
cpuUsagePercentage =self._cpuUsage(clientReqCount)
cpuUsagePercentage = self._cpuUsage(clientReqCount)
# Now generate responses, based on stats
for client in clientsRequesting:
# Generate some quality and delays based on the number of clients requesting for this iteration
qualitySelect = self._selectQuality( client.getQuality(), clientReqCount )
delaySelect = self._selectDelay( clientReqCount ) + self.currentTime
qualitySelect = self._selectQuality(client.getQuality(), clientReqCount)
delaySelect = self._selectDelay(clientReqCount) + self.currentTime
qualityDifference = client.getQuality() - qualitySelect
totalDifference+=qualityDifference
totalDifference += qualityDifference
# print('totalDifference = ' + str(totalDifference) +'\n')
sumOfclientQuality+=client.getQuality()
sumOfclientQuality += client.getQuality()
# print('sumOfclientQuality = ' + str(sumOfclientQuality) + '\n')
percentageDifference=int((totalDifference*100)/sumOfclientQuality)
percentageDifference = int((totalDifference * 100) / sumOfclientQuality)
# print('percentageOfQualityDifference = ' + str(percentageDifference) + '%')
responseBlock.append(lp._generateServerResponse(client.getLastRequestID(), qualitySelect,
delaySelect, cpuUsagePercentage,
percentageDifference))
delaySelect, cpuUsagePercentage,
percentageDifference))
SFBlock.append(lp._generateMpegDashReport('https://netflix.com/scream', qualitySelect, delaySelect))
networkBlock.append(lp._generateNetworkReport(sumOfclientQuality, delaySelect))
# Iterate the service simulation
self.simIterations -= 1
self.currentTime += 1000 # advance 1 second
self.currentTime += 1000 # advance 1 second
# If we have some requests/responses to send to InfluxDB, do it
if ( len(requestBlock) > 0 and len(responseBlock) > 0 ):
self._sendInfluxDataBlock( requestBlock )
self._sendInfluxDataBlock( responseBlock )
self._sendInfluxDataBlock( networkBlock )
self._sendInfluxDataBlock( SFBlock )
if (len(requestBlock) > 0 and len(responseBlock) > 0):
self._sendInfluxDataBlock(requestBlock)
self._sendInfluxDataBlock(responseBlock)
self._sendInfluxDataBlock(networkBlock)
self._sendInfluxDataBlock(SFBlock)
print("Sending influx data blocks")
return self.simIterations
def _cpuUsage(self, clientCount):
cpuUsage=randint(0, 10)
cpuUsage = randint(0, 10)
if ( clientCount < 20 ):
if (clientCount < 20):
cpuUsage += 5
elif ( clientCount >= 20 and clientCount <40 ):
elif (clientCount >= 20 and clientCount < 40):
cpuUsage += 10
elif ( clientCount >= 40 and clientCount <60 ):
elif (clientCount >= 40 and clientCount < 60):
cpuUsage += 15
elif ( clientCount >= 60 and clientCount <80 ):
elif (clientCount >= 60 and clientCount < 80):
cpuUsage += 20
elif ( clientCount >= 80 and clientCount <110 ):
elif (clientCount >= 80 and clientCount < 110):
cpuUsage += 30
elif ( clientCount >= 110 and clientCount <150 ):
elif (clientCount >= 110 and clientCount < 150):
cpuUsage += 40
elif ( clientCount >= 150 and clientCount <200 ):
elif (clientCount >= 150 and clientCount < 200):
cpuUsage += 55
elif ( clientCount >= 200 and clientCount <300 ):
elif (clientCount >= 200 and clientCount < 300):
cpuUsage += 70
elif ( clientCount >= 300 ):
elif (clientCount >= 300):
cpuUsage += 90
return cpuUsage
# Rule to determine a response quality, based on the current number of clients requesting
def _selectQuality( self, expectedQuality, clientCount ):
def _selectQuality(self, expectedQuality, clientCount):
result = dc.MAX_QUALITY
if ( clientCount < 50 ):
if (clientCount < 50):
result = 8
elif ( clientCount >= 50 and clientCount < 100 ):
elif (clientCount >= 50 and clientCount < 100):
result = 7
elif ( clientCount >= 100 and clientCount < 150 ):
elif (clientCount >= 100 and clientCount < 150):
result = 6
elif ( clientCount >= 150 and clientCount < 200 ):
elif (clientCount >= 150 and clientCount < 200):
result = 5
elif ( clientCount >= 200 and clientCount < 250 ):
elif (clientCount >= 200 and clientCount < 250):
result = 4
elif ( clientCount >= 250 and clientCount < 300 ):
elif (clientCount >= 250 and clientCount < 300):
result = 3
elif ( clientCount >= 300 ):
elif (clientCount >= 300):
result = 2
# Give the client what it wants if possible
if ( result > expectedQuality ):
if (result > expectedQuality):
result = expectedQuality
return result
# Rule to determine a delay, based on the current number of clients requesting
def _selectDelay( self, cCount ):
def _selectDelay(self, cCount):
result = dc.MIN_SERV_RESP_TIME
if ( cCount < 50 ):
if (cCount < 50):
result = 150
elif ( cCount >= 50 and cCount < 100 ):
elif (cCount >= 50 and cCount < 100):
result = 200
elif ( cCount > 100 and cCount < 200 ):
elif (cCount > 100 and cCount < 200):
result = 500
elif ( cCount >= 200 ):
elif (cCount >= 200):
result = 1000
# Perturb the delay a bit
result += randint( 0, 20 )
result += randint(0, 20)
return result
def _createDB( self ):
self._sendInfluxQuery( 'CREATE DATABASE '+ self.influxDB )
def _deleteDB( self, dbName ):
self._sendInfluxQuery( 'DROP DATABASE ' + self.influxDB )
# InfluxDB data send methods
# -----------------------------------------------------------------------------------------------
def _sendInfluxQuery( self, query ):
query = urllib.parse.urlencode( {'q' : query} )
query = query.encode('ascii')
req = urllib.request.Request( self.influxURL + '/query ', query )
urllib.request.urlopen( req )
def _sendInfluxData( self, data ):
def _sendInfluxData(self, data):
data = data.encode()
header = {'Content-Type': 'application/octet-stream' }
req = urllib.request.Request( self.influxURL + '/write?db=' +self.influxDB, data, header )
urllib.request.urlopen( req )
header = {'Content-Type': 'application/octet-stream'}
req = urllib.request.Request(self.influxURL + '/write?db=' + self.influxDB, data, header)
urllib.request.urlopen(req)
def _sendInfluxDataBlock( self, dataBlock ):
def _sendInfluxDataBlock(self, dataBlock):
msg = ''
for stmt in dataBlock:
msg += stmt + '\n'
try:
if ( dc.LOG_DATA == True ):
print( msg )
if (dc.LOG_DATA == True):
print(msg)
self._sendInfluxData( msg )
self._sendInfluxData(msg)
except urllib.error.HTTPError as ex:
print ( "Error calling: " + str( ex.url) + "..." + str(ex.msg) )
print("Error calling: " + str(ex.url) + "..." + str(ex.msg))
# Entry point
# -----------------------------------------------------------------------------------------------
print( "Preparing simulation" )
clients = 10
print("Preparing simulation")
clients = 10
iterations = 3000
# port 8086: Direct to DB specified
# port 8186: To telegraf, telegraf specifies DB
demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB')
database_manager = DatabaseManager('http://localhost:8186', 'testDB')
# Set up InfluxDB (need to wait a little while)
demoServer.destroyDatabase()
database_manager.database_teardown()
time.sleep(2)
demoServer.prepareDatabase()
database_manager.database_up()
time.sleep(2)
# configure servers
demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB')
demoServer.reportStatus()
demoServer.configure_servers()
demoServer.configure_VMs()
# Start simulation
print( "Starting simulation" )
while ( True ):
print("Starting simulation")
while True:
itCount = demoServer.iterateService()
pcDone = round ( (itCount/ iterations) * 100 )
pcDone = round((itCount / iterations) * 100)
print( "Simulation remaining (%): " + str( pcDone ) +" \r", end='' )
print("Simulation remaining (%): " + str(pcDone) + " \r", end='')
if ( itCount == 0 ):
if itCount == 0:
break
demoServer.shutdown_VMs()
print("\nFinished")
\ No newline at end of file
print("\nFinished")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment