Skip to content
Snippets Groups Projects
Commit 447300ac authored by Michael Boniface's avatar Michael Boniface
Browse files

Merge branch 'simulationrefactor' into 'integration'

Simulationrefactor

See merge request mjb/flame-clmc!4
parents 5f940c1a 06627965
No related branches found
No related tags found
No related merge requests found
...@@ -26,35 +26,55 @@ ...@@ -26,35 +26,55 @@
Vagrant.configure("2") do |config| Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/xenial64" config.vm.box = "ubuntu/xenial64"
config.vm.define "influx" do |my| config.vm.define "clmc" do |my|
config.vm.network :private_network, ip: "192.168.50.10", virtualbox__intnet: "clmc-net"
my.vm.provider "virtualbox" do |v| my.vm.provider "virtualbox" do |v|
v.customize ["modifyvm", :id, "--memory", 2048] v.customize ["modifyvm", :id, "--memory", 2048]
v.customize ["modifyvm", :id, "--cpus", 1] v.customize ["modifyvm", :id, "--cpus", 1]
end end
# copy resource files into VM
config.vm.provision "file", source: "./scripts/influx/telegraf.conf", destination: "$HOME/config/telegraf/telegraf.conf"
# open InfluxDB port # open InfluxDB port
config.vm.network "forwarded_port", guest: 8086, host: 8086 config.vm.network "forwarded_port", guest: 8086, host: 8086
# open Chronograf port # open Chronograf port
config.vm.network "forwarded_port", guest: 8888, host: 8888 config.vm.network "forwarded_port", guest: 8888, host: 8888
# open TICK Kapacitor port # open Kapacitor port
config.vm.network "forwarded_port", guest: 9092, host: 9092 config.vm.network "forwarded_port", guest: 9092, host: 9092
# open local Telegraf port # install the CLMC service
config.vm.network "forwarded_port", guest: 8186, host: 8186 config.vm.provision :shell, :path => 'scripts/influx/install-clmc-service.sh'
# install the TICK stack # start the CLMC service
config.vm.provision :shell, :path => 'scripts/influx/install-tick-stack-vm.sh' config.vm.provision :shell, :path => 'scripts/influx/start-clmc-service.sh'
end
# configure the TICK stack config.vm.define "ipendpoint1" do |my|
config.vm.provision :shell, :path => 'scripts/influx/configure-tick-stack-vm.sh'
# start the TICK stack config.vm.network :private_network, ip: "192.168.50.11", virtualbox__intnet: "clmc-net"
config.vm.provision :shell, :path => 'scripts/influx/start-tick-stack-services.sh'
my.vm.provider "virtualbox" do |v|
v.customize ["modifyvm", :id, "--memory", 512]
v.customize ["modifyvm", :id, "--cpus", 1]
end
# Install CLMC agent
config.vm.provision :shell, :path => 'scripts/influx/install-clmc-agent.sh', :args => "/vagrant/scripts/influx/telegraf_ipendpoint1.conf"
end end
config.vm.define "ipendpoint2" do |my|
config.vm.network :private_network, ip: "192.168.50.12", virtualbox__intnet: "clmc-net"
my.vm.provider "virtualbox" do |v|
v.customize ["modifyvm", :id, "--memory", 512]
v.customize ["modifyvm", :id, "--cpus", 1]
end
# Install CLMC agent
config.vm.provision :shell, :path => 'scripts/influx/install-clmc-agent.sh', :args => "/vagrant/scripts/influx/telegraf_ipendpoint2.conf"
end
end end
#!/bin/bash #!/bin/bash
#///////////////////////////////////////////////////////////////////////// #/////////////////////////////////////////////////////////////////////////
#// #//
#// (c) University of Southampton IT Innovation Centre, 2018 #// (c) University of Southampton IT Innovation Centre, 2017
#// #//
#// Copyright in this software belongs to University of Southampton #// Copyright in this software belongs to University of Southampton
#// IT Innovation Centre of Gamma House, Enterprise Road, #// IT Innovation Centre of Gamma House, Enterprise Road,
...@@ -18,17 +18,18 @@ ...@@ -18,17 +18,18 @@
#// PURPOSE, except where stated in the Licence Agreement supplied with #// PURPOSE, except where stated in the Licence Agreement supplied with
#// the software. #// the software.
#// #//
#// Created By : Simon Crowle #// Created By : Michael Boniface
#// Created Date : 03/11/2018 #// Created Date : 13/12/2017
#// Created for Project : FLAME #// Created for Project : FLAME
#// #//
#///////////////////////////////////////////////////////////////////////// #/////////////////////////////////////////////////////////////////////////
echo Configuring TICK stack services... # Install telegraf
wget https://dl.influxdata.com/telegraf/releases/telegraf_1.3.2-1_amd64.deb
dpkg -i telegraf_1.3.2-1_amd64.deb
# Configure Telegraf # Copy configuration
systemctl stop telegraf cp $1 /etc/telegraf/telegraf.conf
cp ./config/telegraf/telegraf.conf /etc/telegraf/
# Start telegraf
systemctl start telegraf systemctl start telegraf
\ No newline at end of file
...@@ -24,6 +24,10 @@ ...@@ -24,6 +24,10 @@
#// #//
#///////////////////////////////////////////////////////////////////////// #/////////////////////////////////////////////////////////////////////////
# install python for the simulator
sudo apt-get update
sudo apt-get install python
# install influx # install influx
wget https://dl.influxdata.com/influxdb/releases/influxdb_1.2.4_amd64.deb wget https://dl.influxdata.com/influxdb/releases/influxdb_1.2.4_amd64.deb
dpkg -i influxdb_1.2.4_amd64.deb dpkg -i influxdb_1.2.4_amd64.deb
...@@ -32,10 +36,6 @@ dpkg -i influxdb_1.2.4_amd64.deb ...@@ -32,10 +36,6 @@ dpkg -i influxdb_1.2.4_amd64.deb
wget https://dl.influxdata.com/kapacitor/releases/kapacitor_1.3.1_amd64.deb wget https://dl.influxdata.com/kapacitor/releases/kapacitor_1.3.1_amd64.deb
dpkg -i kapacitor_1.3.1_amd64.deb dpkg -i kapacitor_1.3.1_amd64.deb
# install Telegraf
wget https://dl.influxdata.com/telegraf/releases/telegraf_1.3.2-1_amd64.deb
dpkg -i telegraf_1.3.2-1_amd64.deb
# install Chronograf # install Chronograf
wget https://dl.influxdata.com/chronograf/releases/chronograf_1.3.3.0_amd64.deb wget https://dl.influxdata.com/chronograf/releases/chronograf_1.3.3.0_amd64.deb
dpkg -i chronograf_1.3.3.0_amd64.deb dpkg -i chronograf_1.3.3.0_amd64.deb
\ No newline at end of file
#!/bin/bash
#/////////////////////////////////////////////////////////////////////////
#//
#// (c) University of Southampton IT Innovation Centre, 2017
#//
#// Copyright in this software belongs to University of Southampton
#// IT Innovation Centre of Gamma House, Enterprise Road,
#// Chilworth Science Park, Southampton, SO16 7NS, UK.
#//
#// This software may not be used, sold, licensed, transferred, copied
#// or reproduced in whole or in part in any manner or form or in or
#// on any media by any person other than in accordance with the terms
#// of the Licence Agreement supplied with the software, or otherwise
#// without the prior written consent of the copyright owners.
#//
#// This software is distributed WITHOUT ANY WARRANTY, without even the
#// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
#// PURPOSE, except where stated in the Licence Agreement supplied with
#// the software.
#//
#// Created By : Michael Boniface
#// Created Date : 13/12/2017
#// Created for Project : FLAME
#//
#/////////////////////////////////////////////////////////////////////////
# install docker
apt-get -y update
apt-get -y install apt-transport-https ca-certificates curl software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
apt-get -y update
apt-get install docker-ce
# to get a specific version look at the cache and run the install with that version
# apt-cache madison docker-ce
# apt-get install docker-ce=<VERSION>
# test docker
# docker run hello-world
# install docker compose
curl -L https://github.com/docker/compose/releases/download/1.17.0/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
# test compose
docker-compose version 1.17.0, build 1719ceb
# install tick stack
git clone https://github.com/influxdata/TICK-docker.git /opt
cd /opt/TICK-docker/1.3
docker-compose up -d
...@@ -28,8 +28,4 @@ echo Starting TICK stack services... ...@@ -28,8 +28,4 @@ echo Starting TICK stack services...
systemctl start influxdb systemctl start influxdb
systemctl start kapacitor systemctl start kapacitor
systemctl start telegraf
systemctl start chronograf systemctl start chronograf
\ No newline at end of file
# test influx
#curl "http://localhost:8086/query?q=show+databases"
\ No newline at end of file
...@@ -11,9 +11,12 @@ ...@@ -11,9 +11,12 @@
# Global tags can be specified here in key="value" format. # Global tags can be specified here in key="value" format.
[global_tags] [global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1 location="DC1"
# rack = "1a" sfc="MS_Template_1"
auth = "IT-Inn" sfc_i="MS_I1"
sf="adaptive_streaming"
sf_i="adaptive_streaming_I1"
ipendpoint="adaptive_streaming_I1_ipendpoint1"
# Configuration for telegraf agent # Configuration for telegraf agent
[agent] [agent]
...@@ -65,7 +68,7 @@ ...@@ -65,7 +68,7 @@
# Multiple urls can be specified but it is assumed that they are part of the same # Multiple urls can be specified but it is assumed that they are part of the same
# cluster, this means that only ONE of the urls will be written to each interval. # cluster, this means that only ONE of the urls will be written to each interval.
# urls = ["udp://127.0.0.1:8089"] # UDP endpoint example # urls = ["udp://127.0.0.1:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required urls = ["http://192.168.50.10:8086"] # required
# The target database for metrics (telegraf will create it if not exists) # The target database for metrics (telegraf will create it if not exists)
database = "CLMCMetrics" # required database = "CLMCMetrics" # required
# Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". # Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
...@@ -95,7 +98,6 @@ ...@@ -95,7 +98,6 @@
############################################################################### ###############################################################################
# INPUTS # # INPUTS #
############################################################################### ###############################################################################
# # Influx HTTP write listener # # Influx HTTP write listener
[[inputs.http_listener]] [[inputs.http_listener]]
## Address and port to host HTTP listener on ## Address and port to host HTTP listener on
...@@ -111,3 +113,4 @@ ...@@ -111,3 +113,4 @@
## MTLS ## MTLS
#tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] #tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
\ No newline at end of file
# Telegraf configuration
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
# Global tags can be specified here in key="value" format.
[global_tags]
location="DC2"
sfc="MS_Template_1"
sfc_i="MS_I1"
sf="adaptive_streaming"
sf_i="adaptive_streaming_I1"
ipendpoint="adaptive_streaming_I1_ipendpoint2"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write.
metric_buffer_limit = 1000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Logging configuration:
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
quiet = false
## Specify the log file name. The empty string means to log to stdout.
logfile = "G:/Telegraf/telegraf.log"
## Override default hostname, if empty use os.Hostname()
hostname = ""
###############################################################################
# OUTPUTS #
###############################################################################
# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
# The full HTTP or UDP endpoint URL for your InfluxDB instance.
# Multiple urls can be specified but it is assumed that they are part of the same
# cluster, this means that only ONE of the urls will be written to each interval.
# urls = ["udp://127.0.0.1:8089"] # UDP endpoint example
urls = ["http://192.168.50.10:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
database = "CLMCMetrics" # required
# Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# note: using second precision greatly helps InfluxDB compression
precision = "s"
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
###############################################################################
# INPUTS #
###############################################################################
# # Influx HTTP write listener
[[inputs.http_listener]]
## Address and port to host HTTP listener on
service_address = ":8186"
## timeouts
read_timeout = "10s"
write_timeout = "10s"
## HTTPS
#tls_cert= "/etc/telegraf/cert.pem"
#tls_key = "/etc/telegraf/key.pem"
## MTLS
#tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
\ No newline at end of file
...@@ -4,6 +4,90 @@ ...@@ -4,6 +4,90 @@
import uuid import uuid
from random import random, randint from random import random, randint
# Reports TX and RX, scaling on requested quality
def generate_network_report(recieved_bytes, sent_bytes, time):
# Measurement
result = 'net_port_io'
# Tags
result += ',port_id=enps03 '
# Fields
result += 'RX_BYTES_PORT_M=' + str(recieved_bytes) + ","
result += 'TX_BYTES_PORT_M=' + str(sent_bytes)
# Timestamp
result += ' ' + str(_getNSTime(time))
# Measurement
# print('network'+result)
return result
# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
# metric
result = 'vm_res_alloc'
# Tags
result += ',vm_state=' + quote_wrap(state)
result += ' '
# Fields
result += 'cpu=' + str(cpu)
result += ',memory=' + quote_wrap(mem)
result += ',storage=' + quote_wrap(storage)
# Time
result += ' ' + str(_getNSTime(time))
print(result)
return result
# Reports cpu usage, scaling on requests
def generate_cpu_report(cpu_useage, cpu_usage_system, time):
result = 'vm_host_cpu_usage'
# Tag
result += ' '
# field
result += 'cpu_usage='+str(cpu_useage)
result += ',cpu_usage_system='+str(cpu_usage_system)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Reports response times, scaling on number of requests
def generate_mpegdash_report(resource, requests, avg_response_time, peak_response_time, time):
# Measurement
result = 'mpegdash_service'
# Tags
result += ',cont_nav=\"' + str(resource) + "\" "
# Fields
# result += 'cont_rep=' + str(quality) + ','
result += 'requests=' + str(requests) + ','
result += 'avg_response_time=' + str(avg_response_time) + ','
result += 'peak_response_time=' + str(peak_response_time)
# Timestamp
result += ' ' + str(_getNSTime(time))
# print(result)
return result
# Influx needs strings to be quoted, this provides a utility interface to do this
def quote_wrap(str):
return "\"" + str + "\""
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
return 1000000 * time
# DEPRICATED
# ____________________________________________________________________________
# DEPRICATED: old structure, not part of new spec
def _generateClientRequest(cReq, id, time): def _generateClientRequest(cReq, id, time):
# Tags first # Tags first
result = 'sid="' + str(id) + '",' + cReq result = 'sid="' + str(id) + '",' + cReq
...@@ -19,6 +103,7 @@ def _generateClientRequest(cReq, id, time): ...@@ -19,6 +103,7 @@ def _generateClientRequest(cReq, id, time):
# Method to create a full InfluxDB response statement # Method to create a full InfluxDB response statement
# DEPRECATED: old structure, not part of new spec
def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference): def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference):
# Tags first # Tags first
result = ' ' result = ' '
...@@ -38,47 +123,13 @@ def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference): ...@@ -38,47 +123,13 @@ def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference):
return 'response' + result return 'response' + result
def _generateNetworkReport(sum_of_client_quality, time):
# Measurement
result = 'net_port_io'
# Tags
result += ',port_id=enps03 '
# Fields
result += 'RX_BYTES_PORT_M=' + str(sum_of_client_quality * 32) + ","
result += 'TX_BYTES_PORT_M=' + str(sum_of_client_quality * 1024)
# Timestamp
result += ' ' + str(_getNSTime(time))
# Measurement
# print('network'+result)
return result
def _generateMpegDashReport(resource, quality, time):
# Measurement
result = 'mpegdash_service '
# Tags
#None
# Fields
requests = randint(10, 30)
avg_response_time = 50 + randint(0, 100) + randint(0, 10 * quality)
peak_response_time = avg_response_time + randint(30, 60) + randint(5, 10) * quality
result += 'cont_nav=\"' + str(resource) + "\","
result += 'cont_rep=' + str(quality) + ','
result += 'requests=' + str(requests) + ','
result += 'avg_response_time=' + str(avg_response_time) + ','
result += 'peak_response_time=' + str(peak_response_time)
# Timestamp
result += ' ' + str(_getNSTime(time))
#print(result)
return result
# Formats server config
def _generateServerConfig(ID, location, cpu, mem, storage, time): def _generateServerConfig(ID, location, cpu, mem, storage, time):
# metric # metric
result = 'host_resource' result = 'host_resource'
# Tags # Tags
result += ',slide_id=' + quote_wrap(ID) result += ',slice_id=' + quote_wrap(ID)
result += ',location=' + quote_wrap(location) result += ',location=' + quote_wrap(location)
result += ' ' result += ' '
# Fields # Fields
...@@ -93,31 +144,15 @@ def _generateServerConfig(ID, location, cpu, mem, storage, time): ...@@ -93,31 +144,15 @@ def _generateServerConfig(ID, location, cpu, mem, storage, time):
return result return result
def _generateVMConfig(state, cpu, mem, storage, time):
# metric
result = 'vm_res_alloc'
# Tags
result += ',vm_state=' + quote_wrap(state)
result += ' '
# Fields
result += 'cpu=' + str(cpu)
result += ',memory=' + quote_wrap(mem)
result += ',storage=' + quote_wrap(storage)
# Time
result += ' ' + str(_getNSTime(time))
print(result)
return result
# Format port config
def _configure_port(port_id, state, rate, time): def _configure_port(port_id, state, rate, time):
# metric # metric
result = 'net_port_config ' result = 'net_port_config '
# Fields # Fields
result += 'port_id=' + quote_wrap('enps' + port_id) result += 'port_id=' + quote_wrap('enps' + port_id)
result += 'port_state=' + quote_wrap(state) result += ',port_state=' + quote_wrap(state)
result += 'tx_constraint=' + quote_wrap(rate) result += ',tx_constraint=' + quote_wrap(rate)
result += ' ' result += ' '
# Time # Time
...@@ -126,22 +161,129 @@ def _configure_port(port_id, state, rate, time): ...@@ -126,22 +161,129 @@ def _configure_port(port_id, state, rate, time):
print(result) print(result)
return result return result
# Format service function config
def _configure_service_function(state, max_connected_clients): def _configure_service_function(state, max_connected_clients):
# measurement # measurement
result = 'mpegdash_service_config' result = 'mpegdash_service_config'
# tags # tags
result += ',running='+quote_wrap(state) result += ',service_state='+quote_wrap(state)
result += ' ' result += ' '
# fields # fields
result += 'max_connected_clients='+max_connected_clients result += 'max_connected_clients='+str(max_connected_clients)
return result
# Reports memory usage, scaling on requests
def generate_mem_report(requests, total_mem, time):
# Measurement
result = 'mem'
result += ' '
# field
used = randint(0, min(100,5*requests))
available = 100-used
result += 'available_percent='+str(available)
result += ',used_percent='+str(used)
result += ',total='+str(total_mem)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats compute node config
def generate_compute_node_config(slice_id, location, node_id, cpus, mem, storage, time):
# Measurement
result = 'compute_node_config'
# CommonContext Tag
result += ',slide_id='+quote_wrap(slice_id)
# Tag
result += ',location='+quote_wrap(location)
result += ',comp_node_id='+quote_wrap(node_id)
result += ' '
# field
result += 'cpus='+str(cpus)
result += ',memory='+str(mem)
result += ',storage='+str(storage)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats network resource config
def generate_network_resource_config(slice_id, network_id, bandwidth, time):
# Measurement
result = 'network_resource_config'
# Meta Tag
result += ',slice_id='+quote_wrap(slice_id)
# Tag
result += 'network_id='+quote_wrap(network_id)
result += ' '
# field
result += 'bandwidth='+str(bandwidth)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats network interface config
def generate_network_interface_config(slice_id, comp_node_id, port_id, rx_constraint, tx_constraint, time):
# Measurement
result = 'network_interface_config'
# Meta Tag
result += ',slice_id'+quote_wrap(slice_id)
# Tags
result += ',comp_node_id='+quote_wrap(comp_node_id)
result += ',port_id='+quote_wrap(port_id)
result += ' '
# field
result += 'rx_constraint='+str(rx_constraint)
result += ',tx_constraint='+str(tx_constraint)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Format SF instance config
def generate_sf_instance_surrogate_config(loc, sfc, sfc_i, sf_package, sf_i, cpus, mem, storage, time):
# Measurement
result = 'sf_instance_surrogate_config'
# Meta Tag
result += ',location'+quote_wrap(loc)
result += ',sfc'+quote_wrap(sfc)
result += ',sfc_i'+quote_wrap(sfc_i)
result += ',sf_package'+quote_wrap(sf_package)
result += ',sf_i'+quote_wrap(sf_i)
result += ' '
# field
result += 'cpus='+str(cpus)
result += ',memory='+str(mem)
result += ',storage='+str(storage)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats context container as part of other line protocol generators
def service_function_measurement(measurement, service_function_context):
result = measurement
result += ',sfc'+quote_wrap(service_function_context.sfc)
result += ',sfc_i'+quote_wrap(service_function_context.sfc_i)
result += ',sf_package'+quote_wrap(service_function_context.sf_package)
result += ',sf_i'+quote_wrap(service_function_context.sf_i)
return result return result
def quote_wrap(str):
return "\"" + str + "\""
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
return 1000000 * time
File deleted
...@@ -34,6 +34,7 @@ import urllib.parse ...@@ -34,6 +34,7 @@ import urllib.parse
import urllib.request import urllib.request
import LineProtocolGenerator as lp import LineProtocolGenerator as lp
# DemoConfig is a configuration class used to set up the simulation # DemoConfig is a configuration class used to set up the simulation
class DemoConfig(object): class DemoConfig(object):
def __init__(self): def __init__(self):
...@@ -46,13 +47,15 @@ class DemoConfig(object): ...@@ -46,13 +47,15 @@ class DemoConfig(object):
self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms) self.MIN_SERV_RESP_TIME = 100 # Mininum time taken for server to respond to a request (ms)
self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes self.CLIENT_START_DELAY_MAX = 360 # Randomly delay clients starting stream up to 3 minutes
dc = DemoConfig() dc = DemoConfig()
# DemoClient is a class the simulations the behaviour of a single client requesting video from the server # DemoClient is a class the simulations the behaviour of a single client requesting video from the server
class DemoClient(object): class DemoClient(object):
def __init__(self): def __init__(self):
self.startRequestOffset = randint( 0, dc.CLIENT_START_DELAY_MAX ) # Random time offset before requesting 1st segment self.startRequestOffset = randint(0,
dc.CLIENT_START_DELAY_MAX) # Random time offset before requesting 1st segment
self.numSegRequests = dc.MAX_SEG - randint(0, 50) # Randomly stop client watching all of video self.numSegRequests = dc.MAX_SEG - randint(0, 50) # Randomly stop client watching all of video
self.id = uuid.uuid4() # Client's ID self.id = uuid.uuid4() # Client's ID
self.currSeg = 1 # Client's current segment self.currSeg = 1 # Client's current segment
...@@ -60,7 +63,6 @@ class DemoClient(object): ...@@ -60,7 +63,6 @@ class DemoClient(object):
self.qualityReq = randint(dc.MIN_QUALITY, dc.MAX_QUALITY) # Randomly assigned quality for this client self.qualityReq = randint(dc.MIN_QUALITY, dc.MAX_QUALITY) # Randomly assigned quality for this client
self.lastReqID = None # ID used to track last request made by this client self.lastReqID = None # ID used to track last request made by this client
def getQuality(self): def getQuality(self):
return self.qualityReq return self.qualityReq
...@@ -101,48 +103,103 @@ class DemoClient(object): ...@@ -101,48 +103,103 @@ class DemoClient(object):
return result return result
# Used to tell influx to launch or teardown a database (DB name overwritten by telegraf)
class DatabaseManager:
def __init__(self, influx_url, db_name):
self.influx_url = influx_url
self.influx_db = db_name
def database_up(self):
self._createDB()
def database_teardown(self):
self._deleteDB()
def _createDB(self):
self._sendInfluxQuery('CREATE DATABASE ' + self.influx_db)
def _deleteDB(self):
self._sendInfluxQuery('DROP DATABASE ' + self.influx_db)
def _sendInfluxQuery(self, query):
query = urllib.parse.urlencode({'q': query})
query = query.encode('ascii')
req = urllib.request.Request(self.influx_url + '/query ', query)
urllib.request.urlopen(req)
# Used to allocate clients to servers
class ClientManager:
def __init__(self, servers):
self.servers = servers
def generate_new_clients(self, amount):
assigned_count = 0
while(assigned_count < amount):
for server in self.servers:
if(assigned_count < amount):
server.assign_client(DemoClient())
assigned_count += 1
# Simulates nodes not connected directly to clients (e.g. telegraf)
class Node:
def __init__(self, influxurl, influxdb, input_cpu):
self.influx_url = influxurl
self.influx_db = influxdb
self.report_cpu = input_cpu
def iterateService(self):
if self.report_cpu:
self._sendInfluxData(lp.generate_CPU_report(0))
self._sendInfluxData(lp.generate_mem_report(10, 0))
# Private Methods
# ________________________________________________________________
# This is duplicated from DemoServer, should probably be refactored
def _sendInfluxData(self, data):
data = data.encode()
header = {'Content-Type': 'application/octet-stream'}
req = urllib.request.Request(self.influx_url + '/write?db=' + self.influx_db, data, header)
urllib.request.urlopen(req)
# Container for common SF tags, used as part of generating SF usage reports
# DemoServer is the class that simulates the behaviour of the MPEG-DASH server # DemoServer is the class that simulates the behaviour of the MPEG-DASH server
class DemoServer(object): class DemoServer(object):
def __init__( self, cc, si, dbURL, dbName ): def __init__(self, si, db_url, db_name, server_id, server_location):
self.influxDB = dbName # InfluxDB database name self.influxDB = db_name # InfluxDB database name
self.id = uuid.uuid4() # MPEG-DASH server ID self.id = uuid.uuid4() # MPEG-DASH server ID
self.clientCount = cc # Number of clients to simulate
self.simIterations = si # Number of iterations to make for this simulation self.simIterations = si # Number of iterations to make for this simulation
self.influxURL = dbURL # InfluxDB connection URL self.influxURL = db_url # InfluxDB connection URL
self.currentTime = int(round(time.time() * 1000)) # The current time self.currentTime = int(round(time.time() * 1000)) # The current time
self._configure(server_id, server_location)
def prepareDatabase( self ):
self._createDB()
self.clients = [] self.clients = []
for i in range( self.clientCount ):
self.clients.append( DemoClient() )
def destroyDatabase( self ): def shutdown(self):
self._deleteDB( self.influxDB ) print("Shutting down")
self.configure_VM('stopping')
def reportStatus( self ): def assign_client(self, new_client):
self.clients.append(new_client)
print('Number of clients: ' + str(len(self.clients))) print('Number of clients: ' + str(len(self.clients)))
def configure_servers(self): def configure_server(self, server_id, server_location):
print("Configuring Servers") print("Configuring Servers")
server_conf_block = [] server_conf_block = []
ids = ['A', 'B', 'C'] server_conf_block.append(lp._generateServerConfig(server_id, server_location, 8, '100G', '1T',
locations = ['locA', 'locB', 'locC'] self._selectDelay(0)))
for i, id in enumerate(ids):
server_conf_block.append(lp._generateServerConfig(id,locations[i],8,'100G','1T', self._selectDelay(len(ids)))) #ids = ['A', 'B', 'C']
#locations = ['locA', 'locB', 'locC']
#for i, id in enumerate(ids):
# server_conf_block.append(
# lp._generateServerConfig(id, locations[i], 8, '100G', '1T', self._selectDelay(len(ids))))
self._sendInfluxDataBlock(server_conf_block) self._sendInfluxDataBlock(server_conf_block)
def configure_VMs(self):
print("Configuring VM nodes")
VM_conf_block = []
self._generateVMS('starting',10,VM_conf_block)
self._generateVMS('running',10,VM_conf_block)
self._sendInfluxDataBlock(VM_conf_block) def configure_VM(self, state):
print("Configuring VM node")
self._sendInfluxData(self._generateVM(state, 1))
def configure_ports(self): def configure_ports(self):
print("Configuring Servers") print("Configuring Servers")
...@@ -151,25 +208,19 @@ class DemoServer(object): ...@@ -151,25 +208,19 @@ class DemoServer(object):
server_conf_block.append(lp._configure_port()) server_conf_block.append(lp._configure_port())
self._sendInfluxDataBlock(server_conf_block) self._sendInfluxDataBlock(server_conf_block)
def shutdown_VM(self):
def shutdown_VMs(self):
print("Shutting down VM nodes") print("Shutting down VM nodes")
VM_conf_block = [] VM_conf_block = []
self._generateVMS('stopping', 10, VM_conf_block) self._generateVMS('stopping', 10, VM_conf_block)
self._sendInfluxDataBlock(VM_conf_block) self._sendInfluxDataBlock(VM_conf_block)
def _generateVMS(self,state, amount, datablock):
for i in range(0, amount):
datablock.append(lp._generateVMConfig(state, 1, '100G', '1T', self._selectDelay(amount)))
def iterateService(self): def iterateService(self):
# The simulation will run through 'X' iterations of the simulation # The simulation will run through 'X' iterations of the simulation
# each time this method is called. This allows request/response messages to be # each time this method is called. This allows request/response messages to be
# batched and sent to the InfluxDB in sensible sized blocks # batched and sent to the InfluxDB in sensible sized blocks
return self._executeServiceIteration(dc.ITERATION_STRIDE) return self._executeServiceIteration(dc.ITERATION_STRIDE)
# 'Private' methods ________________________________________________________
def _executeServiceIteration(self, count): def _executeServiceIteration(self, count):
requestBlock = [] requestBlock = []
...@@ -192,13 +243,10 @@ class DemoServer(object): ...@@ -192,13 +243,10 @@ class DemoServer(object):
# Record request, if it was generated # Record request, if it was generated
cReq = client.iterateRequest() cReq = client.iterateRequest()
if ( cReq != None ): if cReq is not None:
clientsRequesting.append(client) clientsRequesting.append(client)
requestBlock.append(lp._generateClientRequest(cReq, self.id, self.currentTime)) requestBlock.append(lp._generateClientRequest(cReq, self.id, self.currentTime))
# Now generate request statistics # Now generate request statistics
clientReqCount = len(clientsRequesting) clientReqCount = len(clientsRequesting)
...@@ -207,7 +255,6 @@ class DemoServer(object): ...@@ -207,7 +255,6 @@ class DemoServer(object):
# Now generate responses, based on stats # Now generate responses, based on stats
for client in clientsRequesting: for client in clientsRequesting:
# Generate some quality and delays based on the number of clients requesting for this iteration # Generate some quality and delays based on the number of clients requesting for this iteration
qualitySelect = self._selectQuality(client.getQuality(), clientReqCount) qualitySelect = self._selectQuality(client.getQuality(), clientReqCount)
delaySelect = self._selectDelay(clientReqCount) + self.currentTime delaySelect = self._selectDelay(clientReqCount) + self.currentTime
...@@ -224,7 +271,6 @@ class DemoServer(object): ...@@ -224,7 +271,6 @@ class DemoServer(object):
percentageDifference)) percentageDifference))
SFBlock.append(lp._generateMpegDashReport('https://netflix.com/scream', qualitySelect, delaySelect)) SFBlock.append(lp._generateMpegDashReport('https://netflix.com/scream', qualitySelect, delaySelect))
networkBlock.append(lp._generateNetworkReport(sumOfclientQuality, delaySelect)) networkBlock.append(lp._generateNetworkReport(sumOfclientQuality, delaySelect))
# Iterate the service simulation # Iterate the service simulation
self.simIterations -= 1 self.simIterations -= 1
...@@ -240,6 +286,21 @@ class DemoServer(object): ...@@ -240,6 +286,21 @@ class DemoServer(object):
return self.simIterations return self.simIterations
def _generateVM(self, state, delay):
return lp._generateVMConfig(state, 1, '100G', '1T', self._selectDelay(delay))
# 'Private' methods ________________________________________________________
def _configure(self, server_id, server_location):
print("Configuring")
self.configure_VM('starting')
self.configure_VM('running')
#time.sleep(0.1)
self.configure_server(server_id, server_location)
self._sendInfluxData(lp._configure_port('01', 'running', '1GB/s', self.currentTime))
self._sendInfluxData(lp._configure_service_function('starting', 100))
#time.sleep(0.1)
self._sendInfluxData(lp._configure_service_function('running', 100))
def _cpuUsage(self, clientCount): def _cpuUsage(self, clientCount):
cpuUsage = randint(0, 10) cpuUsage = randint(0, 10)
...@@ -264,7 +325,6 @@ class DemoServer(object): ...@@ -264,7 +325,6 @@ class DemoServer(object):
return cpuUsage return cpuUsage
# Rule to determine a response quality, based on the current number of clients requesting # Rule to determine a response quality, based on the current number of clients requesting
def _selectQuality(self, expectedQuality, clientCount): def _selectQuality(self, expectedQuality, clientCount):
...@@ -310,20 +370,8 @@ class DemoServer(object): ...@@ -310,20 +370,8 @@ class DemoServer(object):
return result return result
def _createDB( self ):
self._sendInfluxQuery( 'CREATE DATABASE '+ self.influxDB )
def _deleteDB( self, dbName ):
self._sendInfluxQuery( 'DROP DATABASE ' + self.influxDB )
# InfluxDB data send methods # InfluxDB data send methods
# ----------------------------------------------------------------------------------------------- # -----------------------------------------------------------------------------------------------
def _sendInfluxQuery( self, query ):
query = urllib.parse.urlencode( {'q' : query} )
query = query.encode('ascii')
req = urllib.request.Request( self.influxURL + '/query ', query )
urllib.request.urlopen( req )
def _sendInfluxData(self, data): def _sendInfluxData(self, data):
data = data.encode() data = data.encode()
...@@ -349,29 +397,41 @@ class DemoServer(object): ...@@ -349,29 +397,41 @@ class DemoServer(object):
# Entry point # Entry point
# ----------------------------------------------------------------------------------------------- # -----------------------------------------------------------------------------------------------
print("Preparing simulation") print("Preparing simulation")
clients = 10 # Iterations is time in seconds for each server to simulate
iterations = 3000 iterations = 3000
# port 8086: Direct to DB specified # port 8086: Direct to DB specified
# port 8186: To telegraf, telegraf specifies DB # port 8186: To telegraf, telegraf specifies DB
demoServer = DemoServer(clients, iterations, 'http://localhost:8186', 'testDB') start_time = time.localtime()
database_manager = DatabaseManager('http://localhost:8186', 'testDB')
# Set up InfluxDB (need to wait a little while) # Set up InfluxDB (need to wait a little while)
demoServer.destroyDatabase() database_manager.database_teardown()
time.sleep(2) time.sleep(2)
demoServer.prepareDatabase() database_manager.database_up()
time.sleep(2) time.sleep(2)
demoServer.reportStatus() # configure servers
demoServer.configure_servers() demoServer_southampton = DemoServer(iterations, 'http://localhost:8186', 'testDB', "Server1", "Southampton")
demoServer.configure_VMs() demoServer_bristol = DemoServer(iterations, 'http://localhost:8186', 'testDB', "Server2", "Bristol")
telegraf_node = Node('http://localhost:8186', 'testDB', True)
server_list = [demoServer_southampton, demoServer_bristol]
client_manager = ClientManager(server_list)
client_manager.generate_new_clients(20)
# Start simulation # Start simulation
print("Starting simulation") print("Starting simulation")
while ( True ): while True:
itCount = demoServer.iterateService() for server in server_list:
itCount = server.iterateService()
telegraf_node.iterateService()
pcDone = round((itCount / iterations) * 100) pcDone = round((itCount / iterations) * 100)
print("Simulation remaining (%): " + str(pcDone) + " \r", end='') print("Simulation remaining (%): " + str(pcDone) + " \r", end='')
if ( itCount == 0 ): if itCount == 0:
break break
demoServer.shutdown_VMs()
for server in server_list:
server.shutdown()
print("\nFinished") print("\nFinished")
end_time = time.localtime()
print("Started at {0} ended at {1}, total run time {2}".format(start_time,end_time,(end_time-start_time)))
import LineProtocolGenerator as lp
import time
import urllib.parse
import urllib.request
from random import random, randint
# Simulator for services
class sim:
def __init__(self, influx_url):
# requests per second for different quality levels
self.quality_request_rate = {"DC1": [10, 20, 10], "DC2": [5, 30, 5]}
# We don't need this as the db is CLMC metrics
self.influx_db = 'CLMCMetrics'
self.influx_url = influx_url
# Teardown DB from previous sim and bring it back up
self._deleteDB()
self._createDB()
def run(self, simulation_length_seconds):
start_time = time.time()
current_time = int(time.time())
surrogate_services = [{'agent_url': 'http://192.168.50.11:8186', 'location': 'DC1', 'cpu': 2,
'mem': '8GB', 'storage': '1TB'},
{'agent_url': 'http://192.168.50.12:8186', 'location': 'DC2', 'cpu': 4,
'mem': '8GB', 'storage': '1TB'}
]
# Simulate surrogate services being asserted
for service in surrogate_services:
self._sendInfluxData(service['agent_url'], lp.generate_vm_config('starting', service['cpu'], service['mem'], service['storage'], current_time))
for service in surrogate_services:
self._sendInfluxData(service['agent_url'], lp.generate_vm_config('running', service['cpu'], service['mem'], service['storage'], current_time))
# Run simulation
for i in range(simulation_length_seconds):
for service in surrogate_services:
# Scale CPU usage on number of requests, quality and cpu allocation
cpu_usage = self.quality_request_rate[service['location']][0]
cpu_usage += self.quality_request_rate[service['location']][1]*2
cpu_usage += self.quality_request_rate[service['location']][2]*4
cpu_usage = cpu_usage/service['cpu']
cpu_usage = cpu_usage/100 # Transform into %
self._sendInfluxData(service['agent_url'], lp.generate_cpu_report( cpu_usage, cpu_usage, current_time))
# Scale SENT/REC bytes on requests and quality
bytes = self.quality_request_rate[service['location']][0]
bytes += self.quality_request_rate[service['location']][1]*2
bytes += self.quality_request_rate[service['location']][2]*4
bytes_sent = 1024*bytes
bytes_rec = 32*bytes
self._sendInfluxData(service['agent_url'], lp.generate_network_report(bytes_rec, bytes_sent, current_time))
# Scale MPEG Dash on requests, quality, cpu usage
avg_response_time = randint(0, 5 * self.quality_request_rate[service['location']][0])
avg_response_time += randint(0, 10 * self.quality_request_rate[service['location']][1])
avg_response_time += randint(0, 15 * self.quality_request_rate[service['location']][2])
avg_response_time *= cpu_usage
peak_response_time = avg_response_time + randint(30, 60)
requests = sum(self.quality_request_rate[service['location']])
self._sendInfluxData(service['agent_url'], lp.generate_mpegdash_report('https://Netflix.com/scream', requests, avg_response_time, peak_response_time, current_time))
# Add a second to the clock
current_time += 1000
end_time = time.time()
print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time,end_time,end_time-start_time))
def _createDB(self):
self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db)
def _deleteDB(self):
self._sendInfluxQuery(self.influx_url, 'DROP DATABASE ' + self.influx_db)
def _sendInfluxQuery(self, url, query):
query = urllib.parse.urlencode({'q': query})
query = query.encode('ascii')
req = urllib.request.Request(url + '/query ', query)
urllib.request.urlopen(req)
def _sendInfluxData(self, url, data):
data = data.encode()
header = {'Content-Type': 'application/octet-stream'}
req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header)
urllib.request.urlopen(req)
simulator = sim('http://192.168.50.10:8086')
simulator.run(180)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment