Skip to content
Snippets Groups Projects
Commit fb33da19 authored by Michael Boniface's avatar Michael Boniface
Browse files

Merge branch 'pytest' into 'integration'

Merge pytest into integration

See merge request FLAME/flame-clmc!19
parents d4c818c8 de99fb0f
No related branches found
No related tags found
No related merge requests found
...@@ -69,7 +69,11 @@ using the following convention: ...@@ -69,7 +69,11 @@ using the following convention:
#### Creating a deployment for a test #### Creating a deployment for a test
To set up a simualtion of the adaptive streaming use case scenario To set up a simulation of the adaptive streaming use case scenario first install the vagrant-disksize plugin (if not already installed)
`vagrant plugin install vagrant-disksize`
and then execute the following command
`vagrant --fixture=streaming-sim -- up` `vagrant --fixture=streaming-sim -- up`
...@@ -83,12 +87,33 @@ The **clmc-service** vm includes influx, Kapacitor and Chronograf. The following ...@@ -83,12 +87,33 @@ The **clmc-service** vm includes influx, Kapacitor and Chronograf. The following
#### Running the streaming-sim test #### Running the streaming-sim test
**needs to be updated once we have this in pytest format**
SSH into the CLMC server SSH into the CLMC server
`vagrant --fixture=streaming-sim -- ssh clmc-service` `vagrant --fixture=streaming-sim -- ssh clmc-service`
Run a python script to generate the test data sets The next step is to generate the test data, which could be done in two ways.
First option is to run a python script to generate the test data sets
`python3 /vagrant/test/streaming-sim/StreamingSim.py` `python3 /vagrant/test/streaming-sim/StreamingSim.py`
This script could also be used to clear the generated data by using the '-c' option
`python3 /vagrant/test/streaming-sim/StreamingSim.py -c`
The second option is to directly run the testing module, which will detect if the data was generated, and if not, will automatically
generate the data before executing the tests. Keep in mind that if the test data is being generated using this way, a 10 seconds timeout
is given after the generation is finished so that the data could properly be inserted into the database. If the data was already generated
using the first option, only the tests would be executed.
The command for running the testing module is
`pytest -s /vagrant/test/streaming-sim/test_simresults.py`
The `-s` option in the command is used to output prints used in the test code and is, therefore, optional.
If pytest is not installed, an easy solution is to use the Python Package Index (PyPI)
`sudo apt-get install python3-pip`
`pip3 install pytest`
...@@ -4,110 +4,107 @@ ...@@ -4,110 +4,107 @@
# Method to create a full InfluxDB request statement (based on partial statement from client) # Method to create a full InfluxDB request statement (based on partial statement from client)
import uuid import uuid
from random import random, randint from random import randint
# Reports TX and RX, scaling on requested quality # Reports TX and RX, scaling on requested quality
def generate_network_report(recieved_bytes, sent_bytes, time): def generate_network_report(recieved_bytes, sent_bytes, time):
# Measurement result = [{"measurement": "net_port_io",
result = 'net_port_io' "tags": {
# Tags "port_id": "enps03"
result += ',port_id=enps03 ' },
# Fields "fields": {
result += 'RX_BYTES_PORT_M=' + str(recieved_bytes) + "," "RX_BYTES_PORT_M": recieved_bytes,
result += 'TX_BYTES_PORT_M=' + str(sent_bytes) "TX_BYTES_PORT_M": sent_bytes
# Timestamp },
result += ' ' + str(_getNSTime(time)) "time": _getNSTime(time)
}]
# Measurement
#print(result)
return result return result
# Formats VM config # Formats VM config
def generate_vm_config(state, cpu, mem, storage, time): def generate_vm_config(state, cpu, mem, storage, time):
# metric result = [{"measurement": "vm_res_alloc",
result = 'vm_res_alloc' "tags": {
# Tags "vm_state": state
result += ',vm_state=' + quote_wrap(state) },
result += ' ' "fields": {
# Fields "cpu": cpu,
result += 'cpu=' + str(cpu) "memory": mem,
result += ',memory=' + quote_wrap(mem) "storage": storage
result += ',storage=' + quote_wrap(storage) },
"time": _getNSTime(time)
}]
# Time
result += ' ' + str(_getNSTime(time))
print(result)
return result return result
# Reports cpu usage, scaling on requests # Reports cpu usage, scaling on requests
def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time): def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time):
result = 'cpu_usage' result = [{"measurement": "cpu_usage",
# Tag "fields": {
result += ' ' "cpu_usage": cpu_usage,
# field "cpu_active_time": cpu_active_time,
result += 'cpu_usage='+str(cpu_usage) "cpu_idle_time": cpu_idle_time
result += ',cpu_active_time='+str(cpu_active_time) },
result += ',cpu_idle_time='+str(cpu_idle_time) "time": _getNSTime(time)
result += ' ' }]
# Time
result += str(_getNSTime(time))
print(result)
return result return result
# Reports response times, scaling on number of requests # Reports response times, scaling on number of requests
def generate_mpegdash_report(resource, requests, avg_response_time, peak_response_time, time): def generate_mpegdash_report(resource, requests, avg_response_time, peak_response_time, time):
# Measurement result = [{"measurement": "mpegdash_service",
result = 'mpegdash_service' "tags": {
# Tags "cont_nav": resource
result += ',cont_nav=\"' + str(resource) + "\" " },
# Fields "fields": {
"requests": requests,
"avg_response_time": avg_response_time,
"peak_response_time": peak_response_time
},
"time": _getNSTime(time)
}]
# result += 'cont_rep=' + str(quality) + ','
result += 'requests=' + str(requests) + ','
result += 'avg_response_time=' + str(avg_response_time) + ','
result += 'peak_response_time=' + str(peak_response_time)
# Timestamp
result += ' ' + str(_getNSTime(time))
print(result)
return result return result
# ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp # ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp
def generate_ipendpoint_route(resource, requests, latency, time): def generate_ipendpoint_route(resource, requests, latency, time):
# Measurement result = [{"measurement": "ipendpoint_route",
result = 'ipendpoint_route' "tags": {
# Tags "cont_nav": str(resource)
result += ',cont_nav=\"' + str(resource) + "\" " },
# Fields "fields": {
"http_requests_fqdn_m": requests,
"network_fqdn_latency": latency
},
"time": _getNSTime(time)
}]
# result += 'cont_rep=' + str(quality) + ','
result += 'http_requests_fqdn_m=' + str(requests) + ','
result += 'network_fqdn_latency=' + str(latency)
# Timestamp
result += ' ' + str(_getNSTime(time))
#print(result)
return result return result
# Influx needs strings to be quoted, this provides a utility interface to do this
def quote_wrap(str):
return "\"" + str + "\""
# InfluxDB likes to have time-stamps in nanoseconds # InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time): def _getNSTime(time):
# Convert to nano-seconds # Convert to nano-seconds
timestamp = int(1000000000*time) timestamp = int(1000000000*time)
#print("timestamp", timestamp)
return timestamp return timestamp
# DEPRICATED
# DEPRECATED
# ____________________________________________________________________________ # ____________________________________________________________________________
# DEPRICATED: old structure, not part of new spec # DEPRECATED: old structure, not part of new spec
# Influx needs strings to be quoted, this provides a utility interface to do this
def quote_wrap(string):
return "\"" + string + "\""
def _generateClientRequest(cReq, id, time): def _generateClientRequest(cReq, id, time):
# Tags first # Tags first
result = 'sid="' + str(id) + '",' + cReq result = 'sid="' + str(id) + '",' + cReq
...@@ -143,7 +140,6 @@ def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference): ...@@ -143,7 +140,6 @@ def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference):
return 'response' + result return 'response' + result
# Formats server config # Formats server config
def _generateServerConfig(ID, location, cpu, mem, storage, time): def _generateServerConfig(ID, location, cpu, mem, storage, time):
# metric # metric
...@@ -164,7 +160,6 @@ def _generateServerConfig(ID, location, cpu, mem, storage, time): ...@@ -164,7 +160,6 @@ def _generateServerConfig(ID, location, cpu, mem, storage, time):
return result return result
# Format port config # Format port config
def _configure_port(port_id, state, rate, time): def _configure_port(port_id, state, rate, time):
# metric # metric
...@@ -195,7 +190,6 @@ def _configure_service_function(state, max_connected_clients): ...@@ -195,7 +190,6 @@ def _configure_service_function(state, max_connected_clients):
return result return result
# Reports memory usage, scaling on requests # Reports memory usage, scaling on requests
def generate_mem_report(requests, total_mem, time): def generate_mem_report(requests, total_mem, time):
# Measurement # Measurement
...@@ -304,6 +298,3 @@ def service_function_measurement(measurement, service_function_context): ...@@ -304,6 +298,3 @@ def service_function_measurement(measurement, service_function_context):
result += ',sf_i'+quote_wrap(service_function_context.sf_i) result += ',sf_i'+quote_wrap(service_function_context.sf_i)
return result return result
...@@ -3,9 +3,10 @@ ...@@ -3,9 +3,10 @@
import LineProtocolGenerator as lp import LineProtocolGenerator as lp
import time import time
import urllib.parse import urllib.parse
import urllib.request import pytest
import sys
import random import random
import sys
from influxdb import InfluxDBClient
# Simulation parameters # Simulation parameters
TICK_TIME = 1 TICK_TIME = 1
...@@ -15,21 +16,46 @@ SIMULATION_TIME_SEC = 60*60 ...@@ -15,21 +16,46 @@ SIMULATION_TIME_SEC = 60*60
# CLMC parameters # CLMC parameters
INFLUX_DB_URL = 'http://192.168.50.10:8086' INFLUX_DB_URL = 'http://192.168.50.10:8086'
INFLUX_DB_NAME = 'CLMCMetrics'
AGENT_URL1 = 'http://192.168.50.11:8186' AGENT_URL1 = 'http://192.168.50.11:8186'
AGENT_URL2 = 'http://192.168.50.12:8186' AGENT_URL2 = 'http://192.168.50.12:8186'
# Simulator for services
class sim:
def __init__(self, influx_url):
# We don't need this as the db is CLMC metrics
self.influx_db = 'CLMCMetrics'
self.influx_url = influx_url
# Teardown DB from previous sim and bring it back up
self._deleteDB()
self._createDB()
class Sim(object):
"""
Simulator for services
"""
def __init__(self, influx_url, influx_db_name):
"""
Sets up the simulator object
:param influx_url: the influx DB url
:param influx_db_name: the influx DB name
"""
self.influx_db_name = influx_db_name
# influx db client is created on initialisation, which will handle the influx DB queries
url_object = urllib.parse.urlparse(influx_url)
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=self.influx_db_name, timeout=10)
def reset(self):
"""
Resets the influx db by deleting the old database and creating a new one
"""
# Teardown DB from previous sim and bring it back up
self.db_client.drop_database(self.influx_db_name)
self.db_client.create_database(self.influx_db_name)
def run(self, simulation_length_seconds): def run(self, simulation_length_seconds):
"""
Runs the simulation
:param simulation_length_seconds: length of simulation
"""
start_time = time.time() - SIMULATION_TIME_SEC start_time = time.time() - SIMULATION_TIME_SEC
sim_time = start_time sim_time = start_time
...@@ -47,45 +73,47 @@ class sim: ...@@ -47,45 +73,47 @@ class sim:
# endpoint state->mu, sigma, secs normal distribution # endpoint state->mu, sigma, secs normal distribution
config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]} config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]}
print("\nSimulation started. Generating data...")
# Place endpoints # Place endpoints
max_delay = 0 max_delay = 0
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['placing'][0], config_delay_dist['placing'][0]*config_delay_dist['placing'][1], 'placing', 'placed') agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
if delay_time > max_delay: agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
max_delay = delay_time delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0],
config_delay_dist['placing'][0] * config_delay_dist['placing'][1],
'placing', 'placed')
max_delay = max(delay_time, max_delay)
sim_time += max_delay sim_time += max_delay
# Boot endpoints # Boot endpoints
max_delay = 0 max_delay = 0
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['booting'][0], config_delay_dist['booting'][0]*config_delay_dist['booting'][1], 'booting', 'booted') agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
if delay_time > max_delay: agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
max_delay = delay_time delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0],
config_delay_dist['booting'][0] * config_delay_dist['booting'][1],
'booting', 'booted')
max_delay = max(delay_time, max_delay)
sim_time += max_delay sim_time += max_delay
# Connect endpoints # Connect endpoints
max_delay = 0 max_delay = 0
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
delay_time = self._changeVMState(sim_time, ip_endpoint, config_delay_dist['connecting'][0], config_delay_dist['connecting'][0]*config_delay_dist['connecting'][1], 'connecting', 'connected') agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
if delay_time > max_delay: agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
max_delay = delay_time delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0],
config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1],
'connecting', 'connected')
max_delay = max(delay_time, max_delay)
sim_time += max_delay sim_time += max_delay
request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC
request_queue = 0
inc_period_count = 0 inc_period_count = 0
for i in range(simulation_length_seconds): for i in range(simulation_length_seconds):
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
request_processing_time = 0 agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
cpu_time_available = 0 agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
requests_processed = 0
max_requests_processed = 0
cpu_active_time = 0
cpu_idle_time = 0
cpu_usage = 0
cpu_load_time = 0
avg_response_time = 0
peak_response_time = 0
# linear inc to arrival rate # linear inc to arrival rate
if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD: if inc_period_count >= DEFAULT_REQUEST_RATE_INC_PERIOD:
...@@ -98,8 +126,7 @@ class sim: ...@@ -98,8 +126,7 @@ class sim:
# time to process one second of video (mS) in the current second # time to process one second of video (mS) in the current second
request_processing_time = int(random.normalvariate(10, 10 * 0.68)) request_processing_time = int(random.normalvariate(10, 10 * 0.68))
if request_processing_time <= 10: request_processing_time = max(request_processing_time, 10)
request_processing_time = 10
# time depends on the length of the segments in seconds # time depends on the length of the segments in seconds
request_processing_time *= ip_endpoint['segment_size'] request_processing_time *= ip_endpoint['segment_size']
...@@ -111,95 +138,143 @@ class sim: ...@@ -111,95 +138,143 @@ class sim:
# processed all of the requests # processed all of the requests
requests_processed = ip_endpoint['request_queue'] requests_processed = ip_endpoint['request_queue']
else: else:
# processed the maxmum number of requests # processed the maximum number of requests
requests_processed = max_requests_processed requests_processed = max_requests_processed
# calculate cpu usage # calculate cpu usage
cpu_active_time = int(requests_processed * request_processing_time) cpu_active_time = int(requests_processed * request_processing_time)
cpu_idle_time = int(cpu_time_available - cpu_active_time) cpu_idle_time = int(cpu_time_available - cpu_active_time)
cpu_usage = cpu_active_time / cpu_time_available cpu_usage = cpu_active_time / cpu_time_available
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time)) agent_db_client.write_points(lp.generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, sim_time))
# calc network usage metrics # calc network usage metrics
bytes_rx = 2048 * requests_processed bytes_rx = 2048 * requests_processed
bytes_tx = int(ip_endpoint['video_bit_rate']/8*1000000*requests_processed*ip_endpoint['segment_size']) bytes_tx = int(
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_network_report(bytes_rx, bytes_tx, sim_time)) ip_endpoint['video_bit_rate'] / 8 * 1000000 * requests_processed * ip_endpoint['segment_size'])
agent_db_client.write_points(lp.generate_network_report(bytes_rx, bytes_tx, sim_time))
# time to process all of the requests in the queue # time to process all of the requests in the queue
peak_response_time = ip_endpoint['request_queue'] * request_processing_time / ip_endpoint['cpu'] peak_response_time = ip_endpoint['request_queue'] * request_processing_time / ip_endpoint['cpu']
# mid-range # mid-range
avg_response_time = (peak_response_time + request_processing_time) / 2 avg_response_time = (peak_response_time + request_processing_time) / 2
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_mpegdash_report('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], avg_response_time, peak_response_time, sim_time)) agent_db_client.write_points(lp.generate_mpegdash_report('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'],
avg_response_time, peak_response_time, sim_time))
# need to calculate this but sent at 5mS for now # need to calculate this but sent at 5mS for now
network_request_delay = 0.005 network_request_delay = 0.005
# calculate network response delays (2km link, 100Mbps) # calculate network response delays (2km link, 100Mbps)
network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], ip_endpoint['video_bit_rate'], ip_endpoint['segment_size']) network_response_delay = self._calcNetworkDelay(2000, 100, ip_endpoint['packet_size'], ip_endpoint['video_bit_rate'])
e2e_delay = network_request_delay + (avg_response_time / 1000) + network_response_delay e2e_delay = network_request_delay + (avg_response_time / 1000) + network_response_delay
agent_db_client.write_points(lp.generate_ipendpoint_route('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], e2e_delay, sim_time))
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_ipendpoint_route('http://localhost/server-status?auto', ip_endpoint['request_arrival_rate'], e2e_delay, sim_time))
# remove requests processed off the queue # remove requests processed off the queue
ip_endpoint['request_queue'] -= int(requests_processed) ip_endpoint['request_queue'] -= int(requests_processed)
sim_time += TICK_TIME sim_time += TICK_TIME
end_time = sim_time end_time = sim_time
print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time,end_time,end_time-start_time)) print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time,
end_time - start_time))
# distance metres @staticmethod
# bandwidth Mbps def _calcNetworkDelay(distance, bandwidth, packet_size, tx_video_bit_rate):
# package size bytes """
# tx_video_bit_rate bp/sec Calculates the network delay. Declared as static method since it doesn't need access to any instance variables.
# segment size sec
def _calcNetworkDelay(self, distance, bandwidth, packet_size, tx_video_bit_rate, segment_size): :param distance: distance metres
response_delay = 0 :param bandwidth: bandwidth Mbps
:param packet_size: packet size bytes
:param tx_video_bit_rate: bp/sec
:return: the calculated network delay
"""
# propogation delay = distance/speed () (e.g 2000 metres * 2*10^8 for optical fibre) # propogation delay = distance/speed () (e.g 2000 metres * 2*10^8 for optical fibre)
propogation_delay = distance / (2 * 100000000) propogation_delay = distance / (2 * 100000000)
# packetisation delay = ip packet size (bits)/tx rate (e.g. 100Mbp with 0% packet loss) # packetisation delay = ip packet size (bits)/tx rate (e.g. 100Mbp with 0% packet loss)
packetisation_delay = (packet_size * 8) / (bandwidth * 1000000) packetisation_delay = (packet_size * 8) / (bandwidth * 1000000)
# print('packetisation_delay:', packetisation_delay)
# total number of packets to be sent # total number of packets to be sent
packets = (tx_video_bit_rate * 1000000) / (packet_size * 8) packets = (tx_video_bit_rate * 1000000) / (packet_size * 8)
# print('packets:', packets)
response_delay = packets * (propogation_delay + packetisation_delay) response_delay = packets * (propogation_delay + packetisation_delay)
# print('response_delay:', response_delay)
return response_delay return response_delay
def _changeVMState(self, sim_time, ip_endpoint, mu, sigma, transition_state, next_state): @staticmethod
delay_time = 0 def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state):
"""
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time)) Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables.
:param sim_time:
:param ip_endpoint:
:param mu:
:param sigma:
:param transition_state:
:param next_state:
:return: the delay time
"""
agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time))
delay_time = random.normalvariate(mu, sigma) delay_time = random.normalvariate(mu, sigma)
self._sendInfluxData(ip_endpoint['agent_url'], lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time+delay_time)) agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time))
return delay_time return delay_time
def _createDB(self):
self._sendInfluxQuery(self.influx_url, 'CREATE DATABASE ' + self.influx_db)
@pytest.fixture(scope='module')
def run_simulation_fixture():
"""
A fixture, which checks if the the DB has been created, if not it runs the simulator with a 10 seconds timeout after that
"""
global INFLUX_DB_URL
global INFLUX_DB_NAME
global SIMULATION_TIME_SEC
def _deleteDB(self): simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME)
self._sendInfluxQuery(self.influx_url, 'DROP DATABASE ' + self.influx_db) dbs = simulator.db_client.get_list_database()
dbs = [db.get("name") for db in dbs]
if INFLUX_DB_NAME not in dbs:
simulator.reset()
simulator.run(SIMULATION_TIME_SEC)
print("10 seconds timeout is given so that the data could properly be inserted into the database.")
import time
time.sleep(10)
def _sendInfluxQuery(self, url, query):
query = urllib.parse.urlencode({'q': query})
query = query.encode('ascii')
req = urllib.request.Request(url + '/query ', query)
urllib.request.urlopen(req)
def _sendInfluxData(self, url, data): def run_simulation(generate=True):
data = data.encode() """
header = {'Content-Type': 'application/octet-stream'} A method which runs the data generation simulator
req = urllib.request.Request(url + '/write?db=' + self.influx_db, data, header) :param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used)
urllib.request.urlopen(req) """
simulator = sim(INFLUX_DB_URL) global INFLUX_DB_NAME
global INFLUX_DB_URL
global SIMULATION_TIME_SEC
simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME)
if generate:
simulator.reset()
simulator.run(SIMULATION_TIME_SEC) simulator.run(SIMULATION_TIME_SEC)
else:
simulator.db_client.drop_database(simulator.influx_db_name)
if __name__ == "__main__":
"""
The main entry for this module. Code here is executed only if the StreamingSim.py file is executed,
but not when it's imported in another module
"""
# check if there are any command line arguments given when executing the module
if len(sys.argv) > 1:
# if CLI argument '-c' is set when executing the script, the influx db will be deleted instead of generating data
option = str(sys.argv[1]) != "-c"
run_simulation(generate=option)
else:
# no argument is given to the function call, hence the default value True is used
run_simulation()
#!/usr/bin/python3
import sys
import urllib.parse
import urllib.request
queryReference = {
"cpu_usage" : "SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"cpu_usage\"",
"ipendpoint_route" : "SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"ipendpoint_route\"",
"mpegdash_service" : "SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"mpegdash_service\"",
"net_port_io" : "SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"net_port_io\"",
"vm_res_alloc" : "SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"vm_res_alloc\""
}
resultReference = {
"cpu_usage" : "{\"results\":[{\"statement_id\":0,\"series\":[{\"name\":\"cpu_usage\",\"columns\":[\"time\",\"count_cpu_active_time\",\"count_cpu_idle_time\",\"count_cpu_usage\"],\"values\":[[\"1970-01-01T00:00:00Z\",7200,7200,7200]]}]}]}",
"ipendpoint_route" : "{\"results\":[{\"statement_id\":0,\"series\":[{\"name\":\"ipendpoint_route\",\"columns\":[\"time\",\"count_http_requests_fqdn_m\",\"count_network_fqdn_latency\"],\"values\":[[\"1970-01-01T00:00:00Z\",7200,7200]]}]}]}",
"mpegdash_service" : "{\"results\":[{\"statement_id\":0,\"series\":[{\"name\":\"mpegdash_service\",\"columns\":[\"time\",\"count_avg_response_time\",\"count_peak_response_time\",\"count_requests\"],\"values\":[[\"1970-01-01T00:00:00Z\",7200,7200,7200]]}]}]}",
"net_port_io" : "{\"results\":[{\"statement_id\":0,\"series\":[{\"name\":\"net_port_io\",\"columns\":[\"time\",\"count_RX_BYTES_PORT_M\",\"count_TX_BYTES_PORT_M\"],\"values\":[[\"1970-01-01T00:00:00Z\",7200,7200]]}]}]}",
"vm_res_alloc" : "{\"results\":[{\"statement_id\":0,\"series\":[{\"name\":\"vm_res_alloc\",\"columns\":[\"time\",\"count_cpu\",\"count_memory\",\"count_storage\"],\"values\":[[\"1970-01-01T00:00:00Z\",12,12,12]]}]}]}"
}
def checkResult( query, queryResult ):
result = False
if query != None and queryResult != None:
if ( query in resultReference ):
if ( resultReference[query] == queryResult ):
print ( "Result correct" )
result = True
else:
print ( "Incorrect result for query: " + query )
print ( "Expected = " + resultReference[query] )
print ( "Result = " + queryResult )
else:
print( "Could not find query result for: " + query )
else:
print( "Could not check result: invalid parameters" )
return result
def sendInfluxQuery( url, query ):
query = urllib.parse.urlencode( {'q': query} )
query = query.encode( 'ascii' )
req = urllib.request.Request( url + '/query ', query )
result = urllib.request.urlopen( req )
return result.read().decode("utf-8").strip()
# Entry point
# ---------------------------------------------------------------------------------------
testFailed = False
for key in list( queryReference ):
query = queryReference[key]
result = sendInfluxQuery( "http://localhost:8086", query )
if checkResult( key, result ) == False:
testFailed = True
break
if testFailed :
print( "Failed simulation result test" )
sys.exit( 1 )
else:
print( "Test succeeded" )
sys.exit( 0 )
\ No newline at end of file
#!/usr/bin/python3
from influxdb import InfluxDBClient
import pytest
from StreamingSim import run_simulation_fixture
class TestSimulation(object):
"""
A testing class used to group all the tests related to the simulation data
"""
@pytest.mark.parametrize("query, expected_result", [
("SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"cpu_usage\"",
{"time": "1970-01-01T00:00:00Z", "count_cpu_active_time": 7200, "count_cpu_idle_time": 7200, "count_cpu_usage": 7200}),
("SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"ipendpoint_route\"",
{"time": "1970-01-01T00:00:00Z", "count_http_requests_fqdn_m": 7200, "count_network_fqdn_latency": 7200}),
("SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"mpegdash_service\"",
{"time": "1970-01-01T00:00:00Z", "count_avg_response_time": 7200, "count_peak_response_time": 7200, "count_requests": 7200}),
("SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"net_port_io\"",
{"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}),
("SELECT count(*) FROM \"CLMCMetrics\".\"autogen\".\"vm_res_alloc\"",
{"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12})
])
def test_simulation(self, query, expected_result, get_db_client, run_simulation_fixture):
"""
This is the entry point of the test. This method will be found and executed when the module is ran using pytest
:param query: the query to execute (value obtained from the pytest parameter decorator)
:param expected_result: the result expected from executing the query (value obtained from the pytest parameter decorator)
:param run_simulation_fixture: the imported fixture to use to generate the testing data - the return value of the fixture is not needed in this case
"""
# pytest automatically goes through all queries under test, declared in the parameters decorator
print("\n") # prints a blank line for formatting purposes
# the raise_errors=False argument is given so that we could actually test that the DB didn't return any errors instead of raising an exception
query_result = get_db_client.query(query, raise_errors=False)
# test the error attribute of the result is None, that is no error is returned from executing the DB query
assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
# get the dictionary of result points; the next() function just gets the first element of the query results iterator (we only expect one item in the iterator)
actual_result = next(query_result.get_points())
assert expected_result == actual_result, "Simulation test failure"
print("Successfully passed test for the following query: {0}".format(query))
@pytest.fixture(params=[{'database': 'CLMCMetrics'}], scope='class')
def get_db_client(self, request):
return InfluxDBClient(host='localhost', port=8086, database=request.param['database'], timeout=10)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment