-
Rowan Powell authoredRowan Powell authored
serviceSim.py 14.69 KiB
# coding: utf-8
## ///////////////////////////////////////////////////////////////////////
##
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Simon Crowle
## Created Date : 03-01-2018
## Created for Project : FLAME
##
##///////////////////////////////////////////////////////////////////////
from random import random, randint
import math
import time
import datetime
import uuid
import urllib.parse
import urllib.request
import LineProtocolGenerator as lp
# DemoConfig is a configuration class used to set up the simulation
class DemoConfig(object):
def __init__(self):
self.LOG_DATA = False # Log data sent to INFLUX if true
self.ITERATION_STRIDE = 10 # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST
self.SEG_LENGTH = 4 # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering)
self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH + 1) # 30 mins
self.MIN_QUALITY = 5 # Minimum quality requested by a client
self.MAX_QUALITY = 9 # Maximum quality requested by a client
self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms)
self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes
dc = DemoConfig()
# DemoClient is a class the simulations the behaviour of a single client requesting video from the server
class DemoClient(object):
def __init__(self):
self.startRequestOffset = randint(0,
dc.CLIENT_START_DELAY_MAX) # Random time offset before requesting 1st segment
self.numSegRequests = dc.MAX_SEG - randint(0, 50) # Randomly stop client watching all of video
self.id = uuid.uuid4() # Client's ID
self.currSeg = 1 # Client's current segment
self.nextSegCountDown = 0 # Count-down before asking for next segment
self.qualityReq = randint(dc.MIN_QUALITY, dc.MAX_QUALITY) # Randomly assigned quality for this client
self.lastReqID = None # ID used to track last request made by this client
def getQuality(self):
return self.qualityReq
def getLastRequestID(self):
return self.lastReqID
def iterateRequest(self):
result = None
# If the time offset before asking for 1st segment is through and there are more segments to get
# and it is time to get one, then create a request for one!
if (self.startRequestOffset == 0):
if (self.numSegRequests > 0):
if (self.nextSegCountDown == 0):
# Generate a request ID
self.lastReqID = uuid.uuid4()
# Start building the InfluxDB statement
# tags first
result = 'cid="' + str(self.id) + '",'
result += 'segment=' + str(self.currSeg) + ' '
# then fields
result += 'quality=' + str(self.qualityReq) + ','
result += 'index="' + str(self.lastReqID) + '"'
# Update this client's segment tracking
self.currSeg += 1
self.numSegRequests -= 1
self.nextSegCountDown = dc.SEG_LENGTH
else:
self.nextSegCountDown -= 1
else:
self.startRequestOffset -= 1
# Return the _partial_ InfluxDB statement (server will complete the rest)
return result
class DatabaseManager():
def __init__(self, influx_url, db_name):
self.influx_url = influx_url
self.influxDB = db_name
def database_up(self):
self._createDB()
def database_teardown(self):
self._deleteDB()
def _createDB(self):
self._sendInfluxQuery('CREATE DATABASE ' + self.influxDB)
def _deleteDB(self):
self._sendInfluxQuery('DROP DATABASE ' + self.influxDB)
def _sendInfluxQuery(self, query):
query = urllib.parse.urlencode({'q': query})
query = query.encode('ascii')
req = urllib.request.Request(self.influxURL + '/query ', query)
urllib.request.urlopen(req)
# DemoServer is the class that simulates the behaviour of the MPEG-DASH server
class DemoServer(object):
def __init__(self, cc, si, db_url, db_name):
self.influxDB = db_name # InfluxDB database name
self.id = uuid.uuid4() # MPEG-DASH server ID
self.clientCount = cc # Number of clients to simulate
self.simIterations = si # Number of iterations to make for this simulation
self.influxURL = db_url # InfluxDB connection URL
self.currentTime = int(round(time.time() * 1000)) # The current time
def configure(self, server_id, server_location):
print("Configuring")
self.configure_VM()
time.sleep(1)
self.configure_server(server_id, server_location)
def generate_clients(self):
self.clients = []
for i in range(self.clientCount):
self.clients.append(DemoClient())
print('Number of clients: ' + str(len(self.clients)))
def reportStatus(self):
print('Number of clients: ' + str(len(self.clients)))
def configure_server(self, server_id, server_location):
print("Configuring Servers")
server_conf_block = []
server_conf_block.append(lp._generateServerConfig(server_id, server_location, 8, '100G', '1T',
self._selectDelay(0)))
#ids = ['A', 'B', 'C']
#locations = ['locA', 'locB', 'locC']
#for i, id in enumerate(ids):
# server_conf_block.append(
# lp._generateServerConfig(id, locations[i], 8, '100G', '1T', self._selectDelay(len(ids))))
self._sendInfluxDataBlock(server_conf_block)
def configure_VM(self):
print("Configuring VM nodes")
VM_conf_block = []
self._generateVMS('starting', 1, VM_conf_block)
self._generateVMS('running', 1, VM_conf_block)
self._sendInfluxDataBlock(VM_conf_block)
def configure_ports(self):
print("Configuring Servers")
server_conf_block = []
for i in range(0, 10):
server_conf_block.append(lp._configure_port())
self._sendInfluxDataBlock(server_conf_block)
def shutdown_VMs(self):
print("Shutting down VM nodes")
VM_conf_block = []
self._generateVMS('stopping', 10, VM_conf_block)
self._sendInfluxDataBlock(VM_conf_block)
def iterateService(self):
# The simulation will run through 'X' iterations of the simulation
# each time this method is called. This allows request/response messages to be
# batched and sent to the InfluxDB in sensible sized blocks
return self._executeServiceIteration(dc.ITERATION_STRIDE)
def _executeServiceIteration(self, count):
requestBlock = []
responseBlock = []
networkBlock = []
SFBlock = []
totalDifference = sumOfclientQuality = percentageDifference = 0
# Keep going until this stride (count) completes
while (count > 0):
count -= 1
# Check we have some iterations to do
if (self.simIterations > 0):
# First record clients that request segments
clientsRequesting = []
# Run through all clients and see if they make a request
for client in self.clients:
# Record request, if it was generated
cReq = client.iterateRequest()
if (cReq != None):
clientsRequesting.append(client)
requestBlock.append(lp._generateClientRequest(cReq, self.id, self.currentTime))
# Now generate request statistics
clientReqCount = len(clientsRequesting)
# Create a single CPU usage metric for this iteration
cpuUsagePercentage = self._cpuUsage(clientReqCount)
# Now generate responses, based on stats
for client in clientsRequesting:
# Generate some quality and delays based on the number of clients requesting for this iteration
qualitySelect = self._selectQuality(client.getQuality(), clientReqCount)
delaySelect = self._selectDelay(clientReqCount) + self.currentTime
qualityDifference = client.getQuality() - qualitySelect
totalDifference += qualityDifference
# print('totalDifference = ' + str(totalDifference) +'\n')
sumOfclientQuality += client.getQuality()
# print('sumOfclientQuality = ' + str(sumOfclientQuality) + '\n')
percentageDifference = int((totalDifference * 100) / sumOfclientQuality)
# print('percentageOfQualityDifference = ' + str(percentageDifference) + '%')
responseBlock.append(lp._generateServerResponse(client.getLastRequestID(), qualitySelect,
delaySelect, cpuUsagePercentage,
percentageDifference))
SFBlock.append(lp._generateMpegDashReport('https://netflix.com/scream', qualitySelect, delaySelect))
networkBlock.append(lp._generateNetworkReport(sumOfclientQuality, delaySelect))
# Iterate the service simulation
self.simIterations -= 1
self.currentTime += 1000 # advance 1 second
# If we have some requests/responses to send to InfluxDB, do it
if (len(requestBlock) > 0 and len(responseBlock) > 0):
self._sendInfluxDataBlock(requestBlock)
self._sendInfluxDataBlock(responseBlock)
self._sendInfluxDataBlock(networkBlock)
self._sendInfluxDataBlock(SFBlock)
print("Sending influx data blocks")
return self.simIterations
# 'Private' methods ________________________________________________________
def _generateVMS(self, state, amount, datablock):
for i in range(0, amount):
datablock.append(lp._generateVMConfig(state, 1, '100G', '1T', self._selectDelay(amount)))
def _cpuUsage(self, clientCount):
cpuUsage = randint(0, 10)
if (clientCount < 20):
cpuUsage += 5
elif (clientCount >= 20 and clientCount < 40):
cpuUsage += 10
elif (clientCount >= 40 and clientCount < 60):
cpuUsage += 15
elif (clientCount >= 60 and clientCount < 80):
cpuUsage += 20
elif (clientCount >= 80 and clientCount < 110):
cpuUsage += 30
elif (clientCount >= 110 and clientCount < 150):
cpuUsage += 40
elif (clientCount >= 150 and clientCount < 200):
cpuUsage += 55
elif (clientCount >= 200 and clientCount < 300):
cpuUsage += 70
elif (clientCount >= 300):
cpuUsage += 90
return cpuUsage
# Rule to determine a response quality, based on the current number of clients requesting
def _selectQuality(self, expectedQuality, clientCount):
result = dc.MAX_QUALITY
if (clientCount < 50):
result = 8
elif (clientCount >= 50 and clientCount < 100):
result = 7
elif (clientCount >= 100 and clientCount < 150):
result = 6
elif (clientCount >= 150 and clientCount < 200):
result = 5
elif (clientCount >= 200 and clientCount < 250):
result = 4
elif (clientCount >= 250 and clientCount < 300):
result = 3
elif (clientCount >= 300):
result = 2
# Give the client what it wants if possible
if (result > expectedQuality):
result = expectedQuality
return result
# Rule to determine a delay, based on the current number of clients requesting
def _selectDelay(self, cCount):
result = dc.MIN_SERV_RESP_TIME
if (cCount < 50):
result = 150
elif (cCount >= 50 and cCount < 100):
result = 200
elif (cCount > 100 and cCount < 200):
result = 500
elif (cCount >= 200):
result = 1000
# Perturb the delay a bit
result += randint(0, 20)
return result
# InfluxDB data send methods
# -----------------------------------------------------------------------------------------------
def _sendInfluxData(self, data):
data = data.encode()
header = {'Content-Type': 'application/octet-stream'}
req = urllib.request.Request(self.influxURL + '/write?db=' + self.influxDB, data, header)
urllib.request.urlopen(req)
def _sendInfluxDataBlock(self, dataBlock):
msg = ''
for stmt in dataBlock:
msg += stmt + '\n'
try:
if (dc.LOG_DATA == True):
print(msg)
self._sendInfluxData(msg)
except urllib.error.HTTPError as ex:
print("Error calling: " + str(ex.url) + "..." + str(ex.msg))
# Entry point
# -----------------------------------------------------------------------------------------------
print("Preparing simulation")
clients = 10
iterations = 3000
# port 8086: Direct to DB specified
# port 8186: To telegraf, telegraf specifies DB
database_manager = DatabaseManager('http://localhost:8186', 'testDB')
# Set up InfluxDB (need to wait a little while)
database_manager.database_teardown()
time.sleep(2)
database_manager.database_up()
time.sleep(2)
# configure servers
demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB')
demoServer.configure("Server1", "Southampton")
# Start simulation
print("Starting simulation")
while True:
itCount = demoServer.iterateService()
pcDone = round((itCount / iterations) * 100)
print("Simulation remaining (%): " + str(pcDone) + " \r", end='')
if itCount == 0:
break
demoServer.shutdown_VMs()
print("\nFinished")