From d65368ef3477f8bf61a3521c4ccb7d4996522bcd Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Thu, 8 Mar 2018 13:23:02 +0000 Subject: [PATCH] [ Issue #56 ] - refactored the data generation process to utilise influxdb lib --- test/streaming-sim/LineProtocolGenerator.py | 139 +++++++++----------- test/streaming-sim/StreamingSim.py | 130 ++++++++---------- 2 files changed, 118 insertions(+), 151 deletions(-) diff --git a/test/streaming-sim/LineProtocolGenerator.py b/test/streaming-sim/LineProtocolGenerator.py index 5d7914f..1b19c3c 100644 --- a/test/streaming-sim/LineProtocolGenerator.py +++ b/test/streaming-sim/LineProtocolGenerator.py @@ -4,110 +4,107 @@ # Method to create a full InfluxDB request statement (based on partial statement from client) import uuid -from random import random, randint +from random import randint # Reports TX and RX, scaling on requested quality def generate_network_report(recieved_bytes, sent_bytes, time): - # Measurement - result = 'net_port_io' - # Tags - result += ',port_id=enps03 ' - # Fields - result += 'RX_BYTES_PORT_M=' + str(recieved_bytes) + "," - result += 'TX_BYTES_PORT_M=' + str(sent_bytes) - # Timestamp - result += ' ' + str(_getNSTime(time)) + result = [{"measurement": "net_port_io", + "tags": { + "port_id": "enps03" + }, + "fields": { + "RX_BYTES_PORT_M": recieved_bytes, + "TX_BYTES_PORT_M": sent_bytes + }, + "time": _getNSTime(time) + }] - # Measurement - #print(result) return result # Formats VM config def generate_vm_config(state, cpu, mem, storage, time): - # metric - result = 'vm_res_alloc' - # Tags - result += ',vm_state=' + quote_wrap(state) - result += ' ' - # Fields - result += 'cpu=' + str(cpu) - result += ',memory=' + quote_wrap(mem) - result += ',storage=' + quote_wrap(storage) + result = [{"measurement": "vm_res_alloc", + "tags": { + "vm_state": state + }, + "fields": { + "cpu": cpu, + "memory": mem, + "storage": storage + }, + "time": _getNSTime(time) + }] - # Time - result += ' ' + str(_getNSTime(time)) - - print(result) return result # Reports cpu usage, scaling on requests def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time): - result = 'cpu_usage' - # Tag - result += ' ' - # field - result += 'cpu_usage='+str(cpu_usage) - result += ',cpu_active_time='+str(cpu_active_time) - result += ',cpu_idle_time='+str(cpu_idle_time) - result += ' ' - # Time - result += str(_getNSTime(time)) - print(result) + result = [{"measurement": "cpu_usage", + "fields": { + "cpu_usage": cpu_usage, + "cpu_active_time": cpu_active_time, + "cpu_idle_time": cpu_idle_time + }, + "time": _getNSTime(time) + }] + return result # Reports response times, scaling on number of requests def generate_mpegdash_report(resource, requests, avg_response_time, peak_response_time, time): - # Measurement - result = 'mpegdash_service' - # Tags - result += ',cont_nav=\"' + str(resource) + "\" " - # Fields + result = [{"measurement": "mpegdash_service", + "tags": { + "cont_nav": resource + }, + "fields": { + "requests": requests, + "avg_response_time": avg_response_time, + "peak_response_time": peak_response_time + }, + "time": _getNSTime(time) + }] - # result += 'cont_rep=' + str(quality) + ',' - result += 'requests=' + str(requests) + ',' - result += 'avg_response_time=' + str(avg_response_time) + ',' - result += 'peak_response_time=' + str(peak_response_time) - # Timestamp - result += ' ' + str(_getNSTime(time)) - print(result) return result -#ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp + +# ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp def generate_ipendpoint_route(resource, requests, latency, time): - # Measurement - result = 'ipendpoint_route' - # Tags - result += ',cont_nav=\"' + str(resource) + "\" " - # Fields + result = [{"measurement": "ipendpoint_route", + "tags": { + "cont_nav": str(resource) + }, + "fields": { + "http_requests_fqdn_m": requests, + "network_fqdn_latency": latency + }, + "time": _getNSTime(time) + }] - # result += 'cont_rep=' + str(quality) + ',' - result += 'http_requests_fqdn_m=' + str(requests) + ',' - result += 'network_fqdn_latency=' + str(latency) - # Timestamp - result += ' ' + str(_getNSTime(time)) - #print(result) return result -# Influx needs strings to be quoted, this provides a utility interface to do this -def quote_wrap(str): - return "\"" + str + "\"" - # InfluxDB likes to have time-stamps in nanoseconds def _getNSTime(time): # Convert to nano-seconds timestamp = int(1000000000*time) - #print("timestamp", timestamp) + return timestamp -# DEPRICATED + +# DEPRECATED # ____________________________________________________________________________ -# DEPRICATED: old structure, not part of new spec +# DEPRECATED: old structure, not part of new spec + +# Influx needs strings to be quoted, this provides a utility interface to do this +def quote_wrap(string): + return "\"" + string + "\"" + + def _generateClientRequest(cReq, id, time): # Tags first result = 'sid="' + str(id) + '",' + cReq @@ -143,7 +140,6 @@ def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference): return 'response' + result - # Formats server config def _generateServerConfig(ID, location, cpu, mem, storage, time): # metric @@ -164,7 +160,6 @@ def _generateServerConfig(ID, location, cpu, mem, storage, time): return result - # Format port config def _configure_port(port_id, state, rate, time): # metric @@ -195,14 +190,13 @@ def _configure_service_function(state, max_connected_clients): return result - # Reports memory usage, scaling on requests def generate_mem_report(requests, total_mem, time): # Measurement result = 'mem' result += ' ' # field - used = randint(0, min(100,5*requests)) + used = randint(0, min(100, 5*requests)) available = 100-used result += 'available_percent='+str(available) result += ',used_percent='+str(used) @@ -304,6 +298,3 @@ def service_function_measurement(measurement, service_function_context): result += ',sf_i'+quote_wrap(service_function_context.sf_i) return result - - - diff --git a/test/streaming-sim/StreamingSim.py b/test/streaming-sim/StreamingSim.py index b5a9b21..0fc397c 100644 --- a/test/streaming-sim/StreamingSim.py +++ b/test/streaming-sim/StreamingSim.py @@ -3,10 +3,10 @@ import LineProtocolGenerator as lp import time import urllib.parse -import urllib.request import pytest import random import sys +from influxdb import InfluxDBClient # Simulation parameters TICK_TIME = 1 @@ -16,7 +16,7 @@ SIMULATION_TIME_SEC = 60 * 60 # CLMC parameters INFLUX_DB_URL = 'http://192.168.50.10:8086' -INFLUX_DB_NAME = 'CLMCMetrics' +INFLUX_DB_NAME = "CLMCMetrics" AGENT_URL1 = 'http://192.168.50.11:8186' AGENT_URL2 = 'http://192.168.50.12:8186' @@ -26,19 +26,28 @@ class Sim(object): Simulator for services """ - def __init__(self, influx_url): + def __init__(self, influx_url, influx_db_name): """ Sets up the simulator object :param influx_url: the influx DB url + :param influx_db_name: the influx DB name """ - self.influx_db = INFLUX_DB_NAME - self.influx_url = influx_url + self.influx_db_name = influx_db_name + + # influx db client is created on initialisation, which will handle the influx DB queries + url_object = urllib.parse.urlparse(influx_url) + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=self.influx_db_name, timeout=10) + + def reset(self): + """ + Resets the influx db by deleting the old database and creating a new one + """ # Teardown DB from previous sim and bring it back up - self._deleteDB() - self._createDB() + self.db_client.drop_database(self.influx_db_name) + self.db_client.create_database(self.influx_db_name) def run(self, simulation_length_seconds): """ @@ -64,10 +73,14 @@ class Sim(object): # endpoint state->mu, sigma, secs normal distribution config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]} + print("\nSimulation started. Generating data...") + # Place endpoints max_delay = 0 for ip_endpoint in ip_endpoints: - delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['placing'][0], + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0], config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 'placing', 'placed') max_delay = max(delay_time, max_delay) @@ -76,7 +89,9 @@ class Sim(object): # Boot endpoints max_delay = 0 for ip_endpoint in ip_endpoints: - delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['booting'][0], + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0], config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 'booting', 'booted') max_delay = max(delay_time, max_delay) @@ -85,7 +100,9 @@ class Sim(object): # Connect endpoints max_delay = 0 for ip_endpoint in ip_endpoints: - delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['connecting'][0], + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0], config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 'connecting', 'connected') max_delay = max(delay_time, max_delay) @@ -95,6 +112,9 @@ class Sim(object): inc_period_count = 0 for i in range(simulation_length_seconds): for ip_endpoint in ip_endpoints: + agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) + agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) + # linear inc to arrival rate if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD: ip_endpoint['request_arrival_rate'] += request_arrival_rate_inc @@ -125,37 +145,29 @@ class Sim(object): cpu_active_time = int(requests_processed * request_processing_time) cpu_idle_time = int(cpu_time_available - cpu_active_time) cpu_usage = cpu_active_time / cpu_time_available - self._sendInfluxData(ip_endpoint['agent_url'], - lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time)) + agent_db_client.write_points(lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time)) # calc network usage metrics bytes_rx = 2048 * requests_processed bytes_tx = int( ip_endpoint['video_bit_rate'] / 8 * 1000000 * requests_processed * ip_endpoint['segment_size']) - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_network_report(bytes_rx, bytes_tx, sim_time)) + agent_db_client.write_points(lp.generate_network_report(bytes_rx, bytes_tx, sim_time)) # time to process all of the requests in the queue peak_response_time = ip_endpoint['request_queue'] * request_processing_time / ip_endpoint['cpu'] # mid-range avg_response_time = (peak_response_time + request_processing_time) / 2 - self._sendInfluxData(ip_endpoint['agent_url'], - lp.generate_mpegdash_report('http://localhost/server-status?auto', - ip_endpoint['request_arrival_rate'], avg_response_time, - peak_response_time, sim_time)) + agent_db_client.write_points(lp.generate_mpegdash_report('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], + avg_response_time, peak_response_time, sim_time)) # need to calculate this but sent at 5mS for now network_request_delay = 0.005 # calculate network response delays (2km link, 100Mbps) - network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], - ip_endpoint['video_bit_rate']) + network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], ip_endpoint['video_bit_rate']) e2e_delay = network_request_delay + (avg_response_time / 1000) + network_response_delay - - self._sendInfluxData(ip_endpoint['agent_url'], - lp.generate_ipendpoint_route('http://localhost/server-status?auto', - ip_endpoint['request_arrival_rate'], e2e_delay, - sim_time)) + agent_db_client.write_points(lp.generate_ipendpoint_route('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], e2e_delay, sim_time)) # remove requests processed off the queue ip_endpoint['request_queue'] -= int(requests_processed) @@ -165,9 +177,10 @@ class Sim(object): print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time, end_time - start_time)) - def _calcNetworkDelay(self, distance, bandwidth, packet_size, tx_video_bit_rate): + @staticmethod + def _calcNetworkDelay(distance, bandwidth, packet_size, tx_video_bit_rate): """ - Calculates the network delay + Calculates the network delay. Declared as static method since it doesn't need access to any instance variables. :param distance: distance metres :param bandwidth: bandwidth Mbps @@ -187,9 +200,10 @@ class Sim(object): return response_delay - def _changeVMState(self, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): + @staticmethod + def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): """ - send influx data to chnage VM state + Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables. :param sim_time: :param ip_endpoint: :param mu: @@ -198,57 +212,15 @@ class Sim(object): :param next_state: :return: the delay time """ - self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(transition_state, ip_endpoint['cpu'], - ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) + + agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) delay_time = random.normalvariate(mu, sigma) - self._sendInfluxData(ip_endpoint['agent_url'], - lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], - ip_endpoint['storage'], sim_time + delay_time)) + agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time)) return delay_time - def _createDB(self): - """ - Sends a create database influx query - """ - - self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db) - - def _deleteDB(self): - """ - Sends a delete database influx query - """ - - self._sendInfluxQuery(self.influx_url, 'DROP DATABASE ' + self.influx_db) - - def _sendInfluxData(self, url, data): - """ - writes data to influx DB - :param url: the url of the influx DB - :param data: the data to write - """ - - data = data.encode() - header = {'Content-Type': 'application/octet-stream'} - req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header) - urllib.request.urlopen(req) - - @staticmethod - def _sendInfluxQuery(url, query): - """ - Sends an influx DB query - :param url: url of db - :param query: the query to send - :return: the response received back from the query - """ - - query = urllib.parse.urlencode({'q': query}) - query = query.encode('ascii') - req = urllib.request.Request(url + '/query ', query) - return urllib.request.urlopen(req).read().decode("utf-8").strip() - @pytest.fixture(scope='module') def run_simulation_fixture(): @@ -260,10 +232,12 @@ def run_simulation_fixture(): global INFLUX_DB_NAME global SIMULATION_TIME_SEC - dbs = Sim._sendInfluxQuery(INFLUX_DB_URL, "SHOW DATABASES") + simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME) + dbs = simulator.db_client.get_list_database() + dbs = [db.get("name") for db in dbs] if INFLUX_DB_NAME not in dbs: - simulator = Sim(INFLUX_DB_URL) + simulator.reset() simulator.run(SIMULATION_TIME_SEC) print("10 seconds timeout is given so that the data could properly be inserted into the database.") @@ -277,15 +251,17 @@ def run_simulation(generate=True): :param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used) """ + global INFLUX_DB_NAME global INFLUX_DB_URL global SIMULATION_TIME_SEC - simulator = Sim(INFLUX_DB_URL) + simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME) if generate: + simulator.reset() simulator.run(SIMULATION_TIME_SEC) else: - simulator._deleteDB() + simulator.db_client.drop_database(simulator.influx_db_name) if __name__ == "__main__": -- GitLab