Skip to content
Snippets Groups Projects
Commit d65368ef authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

[ Issue #56 ] - refactored the data generation process to utilise influxdb lib

parent cf0d7202
No related branches found
No related tags found
No related merge requests found
......@@ -4,110 +4,107 @@
# Method to create a full InfluxDB request statement (based on partial statement from client)
import uuid
from random import random, randint
from random import randint
# Reports TX and RX, scaling on requested quality
def generate_network_report(recieved_bytes, sent_bytes, time):
# Measurement
result = 'net_port_io'
# Tags
result += ',port_id=enps03 '
# Fields
result += 'RX_BYTES_PORT_M=' + str(recieved_bytes) + ","
result += 'TX_BYTES_PORT_M=' + str(sent_bytes)
# Timestamp
result += ' ' + str(_getNSTime(time))
result = [{"measurement": "net_port_io",
"tags": {
"port_id": "enps03"
},
"fields": {
"RX_BYTES_PORT_M": recieved_bytes,
"TX_BYTES_PORT_M": sent_bytes
},
"time": _getNSTime(time)
}]
# Measurement
#print(result)
return result
# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
# metric
result = 'vm_res_alloc'
# Tags
result += ',vm_state=' + quote_wrap(state)
result += ' '
# Fields
result += 'cpu=' + str(cpu)
result += ',memory=' + quote_wrap(mem)
result += ',storage=' + quote_wrap(storage)
result = [{"measurement": "vm_res_alloc",
"tags": {
"vm_state": state
},
"fields": {
"cpu": cpu,
"memory": mem,
"storage": storage
},
"time": _getNSTime(time)
}]
# Time
result += ' ' + str(_getNSTime(time))
print(result)
return result
# Reports cpu usage, scaling on requests
def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time):
result = 'cpu_usage'
# Tag
result += ' '
# field
result += 'cpu_usage='+str(cpu_usage)
result += ',cpu_active_time='+str(cpu_active_time)
result += ',cpu_idle_time='+str(cpu_idle_time)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
result = [{"measurement": "cpu_usage",
"fields": {
"cpu_usage": cpu_usage,
"cpu_active_time": cpu_active_time,
"cpu_idle_time": cpu_idle_time
},
"time": _getNSTime(time)
}]
return result
# Reports response times, scaling on number of requests
def generate_mpegdash_report(resource, requests, avg_response_time, peak_response_time, time):
# Measurement
result = 'mpegdash_service'
# Tags
result += ',cont_nav=\"' + str(resource) + "\" "
# Fields
result = [{"measurement": "mpegdash_service",
"tags": {
"cont_nav": resource
},
"fields": {
"requests": requests,
"avg_response_time": avg_response_time,
"peak_response_time": peak_response_time
},
"time": _getNSTime(time)
}]
# result += 'cont_rep=' + str(quality) + ','
result += 'requests=' + str(requests) + ','
result += 'avg_response_time=' + str(avg_response_time) + ','
result += 'peak_response_time=' + str(peak_response_time)
# Timestamp
result += ' ' + str(_getNSTime(time))
print(result)
return result
#ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp
# ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp
def generate_ipendpoint_route(resource, requests, latency, time):
# Measurement
result = 'ipendpoint_route'
# Tags
result += ',cont_nav=\"' + str(resource) + "\" "
# Fields
result = [{"measurement": "ipendpoint_route",
"tags": {
"cont_nav": str(resource)
},
"fields": {
"http_requests_fqdn_m": requests,
"network_fqdn_latency": latency
},
"time": _getNSTime(time)
}]
# result += 'cont_rep=' + str(quality) + ','
result += 'http_requests_fqdn_m=' + str(requests) + ','
result += 'network_fqdn_latency=' + str(latency)
# Timestamp
result += ' ' + str(_getNSTime(time))
#print(result)
return result
# Influx needs strings to be quoted, this provides a utility interface to do this
def quote_wrap(str):
return "\"" + str + "\""
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
timestamp = int(1000000000*time)
#print("timestamp", timestamp)
return timestamp
# DEPRICATED
# DEPRECATED
# ____________________________________________________________________________
# DEPRICATED: old structure, not part of new spec
# DEPRECATED: old structure, not part of new spec
# Influx needs strings to be quoted, this provides a utility interface to do this
def quote_wrap(string):
return "\"" + string + "\""
def _generateClientRequest(cReq, id, time):
# Tags first
result = 'sid="' + str(id) + '",' + cReq
......@@ -143,7 +140,6 @@ def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference):
return 'response' + result
# Formats server config
def _generateServerConfig(ID, location, cpu, mem, storage, time):
# metric
......@@ -164,7 +160,6 @@ def _generateServerConfig(ID, location, cpu, mem, storage, time):
return result
# Format port config
def _configure_port(port_id, state, rate, time):
# metric
......@@ -195,14 +190,13 @@ def _configure_service_function(state, max_connected_clients):
return result
# Reports memory usage, scaling on requests
def generate_mem_report(requests, total_mem, time):
# Measurement
result = 'mem'
result += ' '
# field
used = randint(0, min(100,5*requests))
used = randint(0, min(100, 5*requests))
available = 100-used
result += 'available_percent='+str(available)
result += ',used_percent='+str(used)
......@@ -304,6 +298,3 @@ def service_function_measurement(measurement, service_function_context):
result += ',sf_i'+quote_wrap(service_function_context.sf_i)
return result
......@@ -3,10 +3,10 @@
import LineProtocolGenerator as lp
import time
import urllib.parse
import urllib.request
import pytest
import random
import sys
from influxdb import InfluxDBClient
# Simulation parameters
TICK_TIME = 1
......@@ -16,7 +16,7 @@ SIMULATION_TIME_SEC = 60 * 60
# CLMC parameters
INFLUX_DB_URL = 'http://192.168.50.10:8086'
INFLUX_DB_NAME = 'CLMCMetrics'
INFLUX_DB_NAME = "CLMCMetrics"
AGENT_URL1 = 'http://192.168.50.11:8186'
AGENT_URL2 = 'http://192.168.50.12:8186'
......@@ -26,19 +26,28 @@ class Sim(object):
Simulator for services
"""
def __init__(self, influx_url):
def __init__(self, influx_url, influx_db_name):
"""
Sets up the simulator object
:param influx_url: the influx DB url
:param influx_db_name: the influx DB name
"""
self.influx_db = INFLUX_DB_NAME
self.influx_url = influx_url
self.influx_db_name = influx_db_name
# influx db client is created on initialisation, which will handle the influx DB queries
url_object = urllib.parse.urlparse(influx_url)
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=self.influx_db_name, timeout=10)
def reset(self):
"""
Resets the influx db by deleting the old database and creating a new one
"""
# Teardown DB from previous sim and bring it back up
self._deleteDB()
self._createDB()
self.db_client.drop_database(self.influx_db_name)
self.db_client.create_database(self.influx_db_name)
def run(self, simulation_length_seconds):
"""
......@@ -64,10 +73,14 @@ class Sim(object):
# endpoint state->mu, sigma, secs normal distribution
config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]}
print("\nSimulation started. Generating data...")
# Place endpoints
max_delay = 0
for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['placing'][0],
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0],
config_delay_dist['placing'][0] * config_delay_dist['placing'][1],
'placing', 'placed')
max_delay = max(delay_time, max_delay)
......@@ -76,7 +89,9 @@ class Sim(object):
# Boot endpoints
max_delay = 0
for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['booting'][0],
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0],
config_delay_dist['booting'][0] * config_delay_dist['booting'][1],
'booting', 'booted')
max_delay = max(delay_time, max_delay)
......@@ -85,7 +100,9 @@ class Sim(object):
# Connect endpoints
max_delay = 0
for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['connecting'][0],
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0],
config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1],
'connecting', 'connected')
max_delay = max(delay_time, max_delay)
......@@ -95,6 +112,9 @@ class Sim(object):
inc_period_count = 0
for i in range(simulation_length_seconds):
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# linear inc to arrival rate
if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD:
ip_endpoint['request_arrival_rate'] += request_arrival_rate_inc
......@@ -125,37 +145,29 @@ class Sim(object):
cpu_active_time = int(requests_processed * request_processing_time)
cpu_idle_time = int(cpu_time_available - cpu_active_time)
cpu_usage = cpu_active_time / cpu_time_available
self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time))
agent_db_client.write_points(lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time))
# calc network usage metrics
bytes_rx = 2048 * requests_processed
bytes_tx = int(
ip_endpoint['video_bit_rate'] / 8 * 1000000 * requests_processed * ip_endpoint['segment_size'])
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_network_report(bytes_rx, bytes_tx, sim_time))
agent_db_client.write_points(lp.generate_network_report(bytes_rx, bytes_tx, sim_time))
# time to process all of the requests in the queue
peak_response_time = ip_endpoint['request_queue'] * request_processing_time / ip_endpoint['cpu']
# mid-range
avg_response_time = (peak_response_time + request_processing_time) / 2
self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_mpegdash_report('http://localhost/server-status?auto',
ip_endpoint['request_arrival_rate'], avg_response_time,
peak_response_time, sim_time))
agent_db_client.write_points(lp.generate_mpegdash_report('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'],
avg_response_time, peak_response_time, sim_time))
# need to calculate this but sent at 5mS for now
network_request_delay = 0.005
# calculate network response delays (2km link, 100Mbps)
network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'],
ip_endpoint['video_bit_rate'])
network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], ip_endpoint['video_bit_rate'])
e2e_delay = network_request_delay + (avg_response_time / 1000) + network_response_delay
self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_ipendpoint_route('http://localhost/server-status?auto',
ip_endpoint['request_arrival_rate'], e2e_delay,
sim_time))
agent_db_client.write_points(lp.generate_ipendpoint_route('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], e2e_delay, sim_time))
# remove requests processed off the queue
ip_endpoint['request_queue'] -= int(requests_processed)
......@@ -165,9 +177,10 @@ class Sim(object):
print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time,
end_time - start_time))
def _calcNetworkDelay(self, distance, bandwidth, packet_size, tx_video_bit_rate):
@staticmethod
def _calcNetworkDelay(distance, bandwidth, packet_size, tx_video_bit_rate):
"""
Calculates the network delay
Calculates the network delay. Declared as static method since it doesn't need access to any instance variables.
:param distance: distance metres
:param bandwidth: bandwidth Mbps
......@@ -187,9 +200,10 @@ class Sim(object):
return response_delay
def _changeVMState(self, sim_time, ip_endpoint, mu, sigma, transition_state, next_state):
@staticmethod
def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state):
"""
send influx data to chnage VM state
Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables.
:param sim_time:
:param ip_endpoint:
:param mu:
......@@ -198,57 +212,15 @@ class Sim(object):
:param next_state:
:return: the delay time
"""
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(transition_state, ip_endpoint['cpu'],
ip_endpoint['mem'], ip_endpoint['storage'], sim_time))
agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time))
delay_time = random.normalvariate(mu, sigma)
self._sendInfluxData(ip_endpoint['agent_url'],
lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'],
ip_endpoint['storage'], sim_time + delay_time))
agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time))
return delay_time
def _createDB(self):
"""
Sends a create database influx query
"""
self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db)
def _deleteDB(self):
"""
Sends a delete database influx query
"""
self._sendInfluxQuery(self.influx_url, 'DROP DATABASE ' + self.influx_db)
def _sendInfluxData(self, url, data):
"""
writes data to influx DB
:param url: the url of the influx DB
:param data: the data to write
"""
data = data.encode()
header = {'Content-Type': 'application/octet-stream'}
req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header)
urllib.request.urlopen(req)
@staticmethod
def _sendInfluxQuery(url, query):
"""
Sends an influx DB query
:param url: url of db
:param query: the query to send
:return: the response received back from the query
"""
query = urllib.parse.urlencode({'q': query})
query = query.encode('ascii')
req = urllib.request.Request(url + '/query ', query)
return urllib.request.urlopen(req).read().decode("utf-8").strip()
@pytest.fixture(scope='module')
def run_simulation_fixture():
......@@ -260,10 +232,12 @@ def run_simulation_fixture():
global INFLUX_DB_NAME
global SIMULATION_TIME_SEC
dbs = Sim._sendInfluxQuery(INFLUX_DB_URL, "SHOW DATABASES")
simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME)
dbs = simulator.db_client.get_list_database()
dbs = [db.get("name") for db in dbs]
if INFLUX_DB_NAME not in dbs:
simulator = Sim(INFLUX_DB_URL)
simulator.reset()
simulator.run(SIMULATION_TIME_SEC)
print("10 seconds timeout is given so that the data could properly be inserted into the database.")
......@@ -277,15 +251,17 @@ def run_simulation(generate=True):
:param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used)
"""
global INFLUX_DB_NAME
global INFLUX_DB_URL
global SIMULATION_TIME_SEC
simulator = Sim(INFLUX_DB_URL)
simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME)
if generate:
simulator.reset()
simulator.run(SIMULATION_TIME_SEC)
else:
simulator._deleteDB()
simulator.db_client.drop_database(simulator.influx_db_name)
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment