Skip to content
Snippets Groups Projects
serviceSim.py 14.69 KiB
# coding: utf-8
## ///////////////////////////////////////////////////////////////////////
##
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road, 
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
##      Created By :            Simon Crowle
##      Created Date :          03-01-2018
##      Created for Project :   FLAME
##
##///////////////////////////////////////////////////////////////////////

from random import random, randint

import math
import time
import datetime
import uuid
import urllib.parse
import urllib.request
import LineProtocolGenerator as lp


# DemoConfig is a configuration class used to set up the simulation
class DemoConfig(object):
    def __init__(self):
        self.LOG_DATA = False  # Log data sent to INFLUX if true
        self.ITERATION_STRIDE = 10  # Number of seconds of requests/responses sent to INFLUXDB per HTTP POST
        self.SEG_LENGTH = 4  # Each MPEG segment encodes 5 seconds worth of frames (assume double-buffering)
        self.MAX_SEG = (30 * 60) / (self.SEG_LENGTH + 1)  # 30 mins
        self.MIN_QUALITY = 5  # Minimum quality requested by a client
        self.MAX_QUALITY = 9  # Maximum quality requested by a client
        self.MIN_SERV_RESP_TIME = 100  # Mininum time taken for server to respond to a request (ms)
        self.CLIENT_START_DELAY_MAX = 360  # Randomly delay clients starting stream up to 3 minutes


dc = DemoConfig()


# DemoClient is a class the simulations the behaviour of a single client requesting video from the server
class DemoClient(object):
    def __init__(self):
        self.startRequestOffset = randint(0,
                                          dc.CLIENT_START_DELAY_MAX)  # Random time offset before requesting 1st segment
        self.numSegRequests = dc.MAX_SEG - randint(0, 50)  # Randomly stop client watching all of video
        self.id = uuid.uuid4()  # Client's ID
        self.currSeg = 1  # Client's current segment
        self.nextSegCountDown = 0  # Count-down before asking for next segment
        self.qualityReq = randint(dc.MIN_QUALITY, dc.MAX_QUALITY)  # Randomly assigned quality for this client
        self.lastReqID = None  # ID used to track last request made by this client

    def getQuality(self):
        return self.qualityReq

    def getLastRequestID(self):
        return self.lastReqID

    def iterateRequest(self):
        result = None

        # If the time offset before asking for 1st segment is through and there are more segments to get
        # and it is time to get one, then create a request for one!
        if (self.startRequestOffset == 0):
            if (self.numSegRequests > 0):
                if (self.nextSegCountDown == 0):

                    # Generate a request ID
                    self.lastReqID = uuid.uuid4()

                    # Start building the InfluxDB statement
                    # tags first
                    result = 'cid="' + str(self.id) + '",'
                    result += 'segment=' + str(self.currSeg) + ' '

                    # then fields
                    result += 'quality=' + str(self.qualityReq) + ','
                    result += 'index="' + str(self.lastReqID) + '"'

                    # Update this client's segment tracking
                    self.currSeg += 1
                    self.numSegRequests -= 1
                    self.nextSegCountDown = dc.SEG_LENGTH
                else:
                    self.nextSegCountDown -= 1
        else:
            self.startRequestOffset -= 1

        # Return the _partial_ InfluxDB statement (server will complete the rest)
        return result


class DatabaseManager():
    def __init__(self, influx_url, db_name):
        self.influx_url = influx_url
        self.influxDB = db_name

    def database_up(self):
        self._createDB()

    def database_teardown(self):
        self._deleteDB()

    def _createDB(self):
        self._sendInfluxQuery('CREATE DATABASE ' + self.influxDB)

    def _deleteDB(self):
        self._sendInfluxQuery('DROP DATABASE ' + self.influxDB)

    def _sendInfluxQuery(self, query):
        query = urllib.parse.urlencode({'q': query})
        query = query.encode('ascii')
        req = urllib.request.Request(self.influxURL + '/query ', query)
        urllib.request.urlopen(req)


# DemoServer is the class that simulates the behaviour of the MPEG-DASH server
class DemoServer(object):
    def __init__(self, cc, si, db_url, db_name):
        self.influxDB = db_name  # InfluxDB database name
        self.id = uuid.uuid4()  # MPEG-DASH server ID
        self.clientCount = cc  # Number of clients to simulate
        self.simIterations = si  # Number of iterations to make for this simulation
        self.influxURL = db_url  # InfluxDB connection URL
        self.currentTime = int(round(time.time() * 1000))  # The current time

    def configure(self, server_id, server_location):
        print("Configuring")
        self.configure_VM()
        time.sleep(1)
        self.configure_server(server_id, server_location)

    def generate_clients(self):
        self.clients = []
        for i in range(self.clientCount):
            self.clients.append(DemoClient())
        print('Number of clients: ' + str(len(self.clients)))

    def reportStatus(self):
        print('Number of clients: ' + str(len(self.clients)))

    def configure_server(self, server_id, server_location):
        print("Configuring Servers")
        server_conf_block = []
        server_conf_block.append(lp._generateServerConfig(server_id, server_location, 8, '100G', '1T',
                                                          self._selectDelay(0)))

        #ids = ['A', 'B', 'C']
        #locations = ['locA', 'locB', 'locC']
        #for i, id in enumerate(ids):
        #    server_conf_block.append(
        #        lp._generateServerConfig(id, locations[i], 8, '100G', '1T', self._selectDelay(len(ids))))
        self._sendInfluxDataBlock(server_conf_block)

    def configure_VM(self):
        print("Configuring VM nodes")
        VM_conf_block = []
        self._generateVMS('starting', 1, VM_conf_block)
        self._generateVMS('running', 1, VM_conf_block)

        self._sendInfluxDataBlock(VM_conf_block)

    def configure_ports(self):
        print("Configuring Servers")
        server_conf_block = []
        for i in range(0, 10):
            server_conf_block.append(lp._configure_port())
        self._sendInfluxDataBlock(server_conf_block)

    def shutdown_VMs(self):
        print("Shutting down VM nodes")
        VM_conf_block = []
        self._generateVMS('stopping', 10, VM_conf_block)

        self._sendInfluxDataBlock(VM_conf_block)

    def iterateService(self):
        # The simulation will run through 'X' iterations of the simulation
        # each time this method is called. This allows request/response messages to be
        # batched and sent to the InfluxDB in sensible sized blocks
        return self._executeServiceIteration(dc.ITERATION_STRIDE)

    def _executeServiceIteration(self, count):

        requestBlock = []
        responseBlock = []
        networkBlock = []
        SFBlock = []
        totalDifference = sumOfclientQuality = percentageDifference = 0

        # Keep going until this stride (count) completes
        while (count > 0):
            count -= 1

            # Check we have some iterations to do
            if (self.simIterations > 0):
                # First record clients that request segments
                clientsRequesting = []

                # Run through all clients and see if they make a request
                for client in self.clients:

                    # Record request, if it was generated
                    cReq = client.iterateRequest()
                    if (cReq != None):
                        clientsRequesting.append(client)
                        requestBlock.append(lp._generateClientRequest(cReq, self.id, self.currentTime))

                # Now generate request statistics
                clientReqCount = len(clientsRequesting)

                # Create a single CPU usage metric for this iteration
                cpuUsagePercentage = self._cpuUsage(clientReqCount)

                # Now generate responses, based on stats
                for client in clientsRequesting:
                    # Generate some quality and delays based on the number of clients requesting for this iteration
                    qualitySelect = self._selectQuality(client.getQuality(), clientReqCount)
                    delaySelect = self._selectDelay(clientReqCount) + self.currentTime
                    qualityDifference = client.getQuality() - qualitySelect
                    totalDifference += qualityDifference
                    # print('totalDifference = ' + str(totalDifference) +'\n')
                    sumOfclientQuality += client.getQuality()
                    # print('sumOfclientQuality = ' + str(sumOfclientQuality) + '\n')
                    percentageDifference = int((totalDifference * 100) / sumOfclientQuality)
                    # print('percentageOfQualityDifference = ' + str(percentageDifference) + '%')

                    responseBlock.append(lp._generateServerResponse(client.getLastRequestID(), qualitySelect,
                                                                    delaySelect, cpuUsagePercentage,
                                                                    percentageDifference))
                    SFBlock.append(lp._generateMpegDashReport('https://netflix.com/scream', qualitySelect, delaySelect))

                    networkBlock.append(lp._generateNetworkReport(sumOfclientQuality, delaySelect))
                # Iterate the service simulation
                self.simIterations -= 1
                self.currentTime += 1000  # advance 1 second

        # If we have some requests/responses to send to InfluxDB, do it
        if (len(requestBlock) > 0 and len(responseBlock) > 0):
            self._sendInfluxDataBlock(requestBlock)
            self._sendInfluxDataBlock(responseBlock)
            self._sendInfluxDataBlock(networkBlock)
            self._sendInfluxDataBlock(SFBlock)
            print("Sending influx data blocks")

        return self.simIterations

    # 'Private' methods ________________________________________________________
    def _generateVMS(self, state, amount, datablock):
        for i in range(0, amount):
            datablock.append(lp._generateVMConfig(state, 1, '100G', '1T', self._selectDelay(amount)))

    def _cpuUsage(self, clientCount):
        cpuUsage = randint(0, 10)

        if (clientCount < 20):
            cpuUsage += 5
        elif (clientCount >= 20 and clientCount < 40):
            cpuUsage += 10
        elif (clientCount >= 40 and clientCount < 60):
            cpuUsage += 15
        elif (clientCount >= 60 and clientCount < 80):
            cpuUsage += 20
        elif (clientCount >= 80 and clientCount < 110):
            cpuUsage += 30
        elif (clientCount >= 110 and clientCount < 150):
            cpuUsage += 40
        elif (clientCount >= 150 and clientCount < 200):
            cpuUsage += 55
        elif (clientCount >= 200 and clientCount < 300):
            cpuUsage += 70
        elif (clientCount >= 300):
            cpuUsage += 90

        return cpuUsage

    # Rule to determine a response quality, based on the current number of clients requesting
    def _selectQuality(self, expectedQuality, clientCount):

        result = dc.MAX_QUALITY

        if (clientCount < 50):
            result = 8
        elif (clientCount >= 50 and clientCount < 100):
            result = 7
        elif (clientCount >= 100 and clientCount < 150):
            result = 6
        elif (clientCount >= 150 and clientCount < 200):
            result = 5
        elif (clientCount >= 200 and clientCount < 250):
            result = 4
        elif (clientCount >= 250 and clientCount < 300):
            result = 3
        elif (clientCount >= 300):
            result = 2

        # Give the client what it wants if possible
        if (result > expectedQuality):
            result = expectedQuality

        return result

    # Rule to determine a delay, based on the current number of clients requesting
    def _selectDelay(self, cCount):

        result = dc.MIN_SERV_RESP_TIME

        if (cCount < 50):
            result = 150
        elif (cCount >= 50 and cCount < 100):
            result = 200
        elif (cCount > 100 and cCount < 200):
            result = 500
        elif (cCount >= 200):
            result = 1000

        # Perturb the delay a bit
        result += randint(0, 20)

        return result

    # InfluxDB data send methods
    # -----------------------------------------------------------------------------------------------

    def _sendInfluxData(self, data):
        data = data.encode()
        header = {'Content-Type': 'application/octet-stream'}
        req = urllib.request.Request(self.influxURL + '/write?db=' + self.influxDB, data, header)
        urllib.request.urlopen(req)

    def _sendInfluxDataBlock(self, dataBlock):
        msg = ''
        for stmt in dataBlock:
            msg += stmt + '\n'

        try:
            if (dc.LOG_DATA == True):
                print(msg)

            self._sendInfluxData(msg)

        except urllib.error.HTTPError as ex:
            print("Error calling: " + str(ex.url) + "..." + str(ex.msg))


# Entry point
# -----------------------------------------------------------------------------------------------
print("Preparing simulation")
clients = 10
iterations = 3000
# port 8086: Direct to DB specified
# port 8186: To telegraf, telegraf specifies DB
database_manager = DatabaseManager('http://localhost:8186', 'testDB')
# Set up InfluxDB (need to wait a little while)
database_manager.database_teardown()
time.sleep(2)
database_manager.database_up()
time.sleep(2)
# configure servers
demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB')
demoServer.configure("Server1", "Southampton")
# Start simulation
print("Starting simulation")
while True:
    itCount = demoServer.iterateService()
    pcDone = round((itCount / iterations) * 100)

    print("Simulation remaining (%): " + str(pcDone) + " \r", end='')

    if itCount == 0:
        break
demoServer.shutdown_VMs()
print("\nFinished")