Skip to content
Snippets Groups Projects
Commit 4474d270 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

[ Issue #56 ] - Slight code refactoring of the streaming simulation module

parent 14b87d48
No related branches found
No related tags found
No related merge requests found
...@@ -9,83 +9,90 @@ import sys ...@@ -9,83 +9,90 @@ import sys
# Simulation parameters # Simulation parameters
TICK_TIME = 1 TICK_TIME = 1
DEFAULT_REQUEST_RATE_INC = 1 DEFAULT_REQUEST_RATE_INC = 1
DEFAULT_REQUEST_RATE_INC_PERIOD = 10 DEFAULT_REQUEST_RATE_INC_PERIOD = 10
SIMULATION_TIME_SEC = 60*60 SIMULATION_TIME_SEC = 60 * 60
# CLMC parameters # CLMC parameters
INFLUX_DB_URL = 'http://192.168.50.10:8086' INFLUX_DB_URL = 'http://192.168.50.10:8086'
INFLUX_DB_NAME = 'CLMCMetrics'
AGENT_URL1 = 'http://192.168.50.11:8186' AGENT_URL1 = 'http://192.168.50.11:8186'
AGENT_URL2 = 'http://192.168.50.12:8186' AGENT_URL2 = 'http://192.168.50.12:8186'
# Simulator for services
class Sim: class Sim(object):
"""
Simulator for services
"""
def __init__(self, influx_url): def __init__(self, influx_url):
# We don't need this as the db is CLMC metrics """
self.influx_db = 'CLMCMetrics' Sets up the simulator object
:param influx_url: the influx DB url
"""
self.influx_db = INFLUX_DB_NAME
self.influx_url = influx_url self.influx_url = influx_url
# Teardown DB from previous sim and bring it back up # Teardown DB from previous sim and bring it back up
self._deleteDB() self._deleteDB()
self._createDB() self._createDB()
def run(self, simulation_length_seconds): def run(self, simulation_length_seconds):
start_time = time.time()-SIMULATION_TIME_SEC """
Runs the simulation
:param simulation_length_seconds: length of simulation
"""
start_time = time.time() - SIMULATION_TIME_SEC
sim_time = start_time sim_time = start_time
# segment_size : the length of video requested at a time # segment_size : the length of video requested at a time
# bit_rate: MPEG-2 High 1080p 25fps = 80Mbps # bit_rate: MPEG-2 High 1080p 25fps = 80Mbps
ip_endpoints = [{'agent_url': AGENT_URL1, 'location': 'DC1', 'cpu': 16, ip_endpoints = [{'agent_url': AGENT_URL1, 'location': 'DC1', 'cpu': 16,
'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0,
'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}, 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500},
{'agent_url': AGENT_URL2, 'location': 'DC2', 'cpu': 4, {'agent_url': AGENT_URL2, 'location': 'DC2', 'cpu': 4,
'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0, 'mem': '8GB', 'storage': '1TB', 'request_queue': 0, 'request_arrival_rate': 0,
'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500} 'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}
] ]
# Simulate configuration of the ipendpoints # Simulate configuration of the ipendpoints
# endpoint state->mu, sigma, secs normal distribution # endpoint state->mu, sigma, secs normal distribution
config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68],"connecting": [10, 0.68]} config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]}
# Place endpoints # Place endpoints
max_delay = 0 max_delay = 0
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['placing'][0], config_delay_dist['placing'][0]*config_delay_dist['placing'][1], 'placing', 'placed') delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['placing'][0],
if delay_time > max_delay: config_delay_dist['placing'][0] * config_delay_dist['placing'][1],
max_delay = delay_time 'placing', 'placed')
sim_time +=max_delay max_delay = max(delay_time, max_delay)
sim_time += max_delay
# Boot endpoints # Boot endpoints
max_delay = 0 max_delay = 0
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['booting'][0], config_delay_dist['booting'][0]*config_delay_dist['booting'][1], 'booting', 'booted') delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['booting'][0],
if delay_time > max_delay: config_delay_dist['booting'][0] * config_delay_dist['booting'][1],
max_delay = delay_time 'booting', 'booted')
sim_time +=max_delay max_delay = max(delay_time, max_delay)
sim_time += max_delay
# Connect endpoints # Connect endpoints
max_delay = 0 max_delay = 0
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['connecting'][0], config_delay_dist['connecting'][0]*config_delay_dist['connecting'][1], 'connecting', 'connected') delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['connecting'][0],
if delay_time > max_delay: config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1],
max_delay = delay_time 'connecting', 'connected')
sim_time +=max_delay max_delay = max(delay_time, max_delay)
sim_time += max_delay
request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC
request_queue = 0
inc_period_count = 0 inc_period_count = 0
for i in range(simulation_length_seconds): for i in range(simulation_length_seconds):
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
request_processing_time = 0
cpu_time_available = 0
requests_processed = 0
max_requests_processed = 0
cpu_active_time = 0
cpu_idle_time = 0
cpu_usage = 0
cpu_load_time = 0
avg_response_time = 0
peak_response_time = 0
# linear inc to arrival rate # linear inc to arrival rate
if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD: if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD:
ip_endpoint['request_arrival_rate'] += request_arrival_rate_inc ip_endpoint['request_arrival_rate'] += request_arrival_rate_inc
...@@ -96,118 +103,164 @@ class Sim: ...@@ -96,118 +103,164 @@ class Sim:
ip_endpoint['request_queue'] += ip_endpoint['request_arrival_rate'] ip_endpoint['request_queue'] += ip_endpoint['request_arrival_rate']
# time to process one second of video (mS) in the current second # time to process one second of video (mS) in the current second
request_processing_time = int(random.normalvariate(10, 10*0.68)) request_processing_time = int(random.normalvariate(10, 10 * 0.68))
if request_processing_time <= 10: request_processing_time = max(request_processing_time, 10)
request_processing_time = 10
# time depends on the length of the segments in seconds # time depends on the length of the segments in seconds
request_processing_time *= ip_endpoint['segment_size'] request_processing_time *= ip_endpoint['segment_size']
# amount of cpu time (mS) per tick # amount of cpu time (mS) per tick
cpu_time_available = ip_endpoint['cpu']*TICK_TIME*1000 cpu_time_available = ip_endpoint['cpu'] * TICK_TIME * 1000
max_requests_processed = int(cpu_time_available/request_processing_time) max_requests_processed = int(cpu_time_available / request_processing_time)
# calc how many requests processed # calc how many requests processed
if ip_endpoint['request_queue'] <= max_requests_processed: if ip_endpoint['request_queue'] <= max_requests_processed:
# processed all of the requests # processed all of the requests
requests_processed = ip_endpoint['request_queue'] requests_processed = ip_endpoint['request_queue']
else: else:
# processed the maxmum number of requests # processed the maximum number of requests
requests_processed = max_requests_processed requests_processed = max_requests_processed
# calculate cpu usage # calculate cpu usage
cpu_active_time = int(requests_processed*request_processing_time) cpu_active_time = int(requests_processed * request_processing_time)
cpu_idle_time = int(cpu_time_available-cpu_active_time) cpu_idle_time = int(cpu_time_available - cpu_active_time)
cpu_usage = cpu_active_time/cpu_time_available cpu_usage = cpu_active_time / cpu_time_available
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time)) self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time))
# calc network usage metrics # calc network usage metrics
bytes_rx = 2048*requests_processed bytes_rx = 2048 * requests_processed
bytes_tx = int(ip_endpoint['video_bit_rate']/8*1000000*requests_processed*ip_endpoint['segment_size']) bytes_tx = int(
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_network_report(bytes_rx, bytes_tx, sim_time)) ip_endpoint['video_bit_rate'] / 8 * 1000000 * requests_processed * ip_endpoint['segment_size'])
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_network_report(bytes_rx, bytes_tx, sim_time))
# time to process all of the requests in the queue # time to process all of the requests in the queue
peak_response_time = ip_endpoint['request_queue']*request_processing_time/ip_endpoint['cpu'] peak_response_time = ip_endpoint['request_queue'] * request_processing_time / ip_endpoint['cpu']
# mid-range # mid-range
avg_response_time = (peak_response_time+request_processing_time)/2 avg_response_time = (peak_response_time + request_processing_time) / 2
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_mpegdash_report('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], avg_response_time, peak_response_time, sim_time)) self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_mpegdash_report('http://localhost/server-status?auto',
ip_endpoint['request_arrival_rate'], avg_response_time,
peak_response_time, sim_time))
# need to calculate this but sent at 5mS for now # need to calculate this but sent at 5mS for now
network_request_delay = 0.005 network_request_delay = 0.005
# calculate network response delays (2km link, 100Mbps) # calculate network response delays (2km link, 100Mbps)
network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], ip_endpoint['video_bit_rate'], ip_endpoint['segment_size']) network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'],
ip_endpoint['video_bit_rate'])
e2e_delay = network_request_delay + (avg_response_time/1000) + network_response_delay e2e_delay = network_request_delay + (avg_response_time / 1000) + network_response_delay
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_ipendpoint_route('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], e2e_delay, sim_time)) self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_ipendpoint_route('http://localhost/server-status?auto',
ip_endpoint['request_arrival_rate'], e2e_delay,
sim_time))
# remove requests processed off the queue # remove requests processed off the queue
ip_endpoint['request_queue'] -= int(requests_processed) ip_endpoint['request_queue'] -= int(requests_processed)
sim_time += TICK_TIME sim_time += TICK_TIME
end_time = sim_time end_time = sim_time
print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time,end_time,end_time-start_time)) print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time,
end_time - start_time))
# distance metres def _calcNetworkDelay(self, distance, bandwidth, packet_size, tx_video_bit_rate):
# bandwidth Mbps """
# package size bytes Calculates the network delay
# tx_video_bit_rate bp/sec
# segment size sec :param distance: distance metres
def _calcNetworkDelay(self, distance, bandwidth, packet_size, tx_video_bit_rate, segment_size): :param bandwidth: bandwidth Mbps
response_delay = 0 :param packet_size: packet size bytes
:param tx_video_bit_rate: bp/sec
:return: the calculated network delay
"""
# propogation delay = distance/speed () (e.g 2000 metres * 2*10^8 for optical fibre) # propogation delay = distance/speed () (e.g 2000 metres * 2*10^8 for optical fibre)
propogation_delay = distance/(2*100000000) propogation_delay = distance / (2 * 100000000)
# packetisation delay = ip packet size (bits)/tx rate (e.g. 100Mbp with 0% packet loss) # packetisation delay = ip packet size (bits)/tx rate (e.g. 100Mbp with 0% packet loss)
packetisation_delay = (packet_size*8)/(bandwidth*1000000) packetisation_delay = (packet_size * 8) / (bandwidth * 1000000)
# print('packetisation_delay:', packetisation_delay)
# total number of packets to be sent # total number of packets to be sent
packets = (tx_video_bit_rate*1000000)/(packet_size*8) packets = (tx_video_bit_rate * 1000000) / (packet_size * 8)
# print('packets:', packets)
response_delay = packets*(propogation_delay+packetisation_delay) response_delay = packets * (propogation_delay + packetisation_delay)
# print('response_delay:', response_delay)
return response_delay return response_delay
def _changeVMState(self, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): def _changeVMState(self, sim_time, ip_endpoint, mu, sigma, transition_state, next_state):
delay_time = 0 """
send influx data to chnage VM state
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) :param sim_time:
:param ip_endpoint:
delay_time = random.normalvariate(mu, sigma) :param mu:
:param sigma:
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time+delay_time)) :param transition_state:
:param next_state:
:return: the delay time
"""
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(transition_state, ip_endpoint['cpu'],
ip_endpoint['mem'], ip_endpoint['storage'], sim_time))
delay_time = random.normalvariate(mu, sigma)
self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'],
ip_endpoint['storage'], sim_time + delay_time))
return delay_time return delay_time
def _createDB(self): def _createDB(self):
self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db) """
Sends a create database influx query
"""
self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db)
def _deleteDB(self): def _deleteDB(self):
"""
Sends a delete database influx query
"""
self._sendInfluxQuery(self.influx_url, 'DROP DATABASE ' + self.influx_db) self._sendInfluxQuery(self.influx_url, 'DROP DATABASE ' + self.influx_db)
def _sendInfluxData(self, url, data):
"""
writes data to influx DB
:param url: the url of the influx DB
:param data: the data to write
"""
data = data.encode()
header = {'Content-Type': 'application/octet-stream'}
req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header)
urllib.request.urlopen(req)
@staticmethod @staticmethod
def _sendInfluxQuery(url, query): def _sendInfluxQuery(url, query):
"""
Sends an influx DB query
:param url: url of db
:param query: the query to send
:return: the response received back from the query
"""
query = urllib.parse.urlencode({'q': query}) query = urllib.parse.urlencode({'q': query})
query = query.encode('ascii') query = query.encode('ascii')
req = urllib.request.Request(url + '/query ', query) req = urllib.request.Request(url + '/query ', query)
return urllib.request.urlopen(req).read().decode("utf-8").strip() return urllib.request.urlopen(req).read().decode("utf-8").strip()
def _sendInfluxData(self, url, data):
data = data.encode()
header = {'Content-Type': 'application/octet-stream'}
req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header)
urllib.request.urlopen(req)
@pytest.fixture @pytest.fixture
def run_simulation_fixture(): def run_simulation_fixture():
"""
A fixture, which checks if the the DB has been created, if not it runs the simulator with a 10 seconds timeout after that
"""
global INFLUX_DB_URL global INFLUX_DB_URL
global INFLUX_DB_NAME
global SIMULATION_TIME_SEC global SIMULATION_TIME_SEC
dbs = Sim._sendInfluxQuery(INFLUX_DB_URL, "SHOW DATABASES") dbs = Sim._sendInfluxQuery(INFLUX_DB_URL, "SHOW DATABASES")
if "CLMCMetrics" not in dbs: if INFLUX_DB_NAME not in dbs:
simulator = Sim(INFLUX_DB_URL) simulator = Sim(INFLUX_DB_URL)
simulator.run(SIMULATION_TIME_SEC) simulator.run(SIMULATION_TIME_SEC)
...@@ -215,7 +268,13 @@ def run_simulation_fixture(): ...@@ -215,7 +268,13 @@ def run_simulation_fixture():
import time import time
time.sleep(10) time.sleep(10)
def run_simulation(generate=True): def run_simulation(generate=True):
"""
A method which runs the data generation simulator
:param generate: True for generating data, False for deleting the DB
"""
global INFLUX_DB_URL global INFLUX_DB_URL
global SIMULATION_TIME_SEC global SIMULATION_TIME_SEC
...@@ -228,8 +287,14 @@ def run_simulation(generate=True): ...@@ -228,8 +287,14 @@ def run_simulation(generate=True):
if __name__ == "__main__": if __name__ == "__main__":
"""
The main entry for this module. Code here is executed only if the StreamingSim.py file is executed,
but not when it's imported
"""
option = True option = True
if len(sys.argv) > 1: if len(sys.argv) > 1:
# if CLI argument '-c' is set when executing the script the influx db will be deleted instead of generating data
option = str(sys.argv[1]) != "-c" option = str(sys.argv[1]) != "-c"
run_simulation(generate=option) run_simulation(generate=option)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment