Skip to content
Snippets Groups Projects
Commit beac0662 authored by Simon Crowle's avatar Simon Crowle
Browse files

Merge branch 'endpointConfig' into 'mediaComponentConfig'

Endpoint config

See merge request FLAME/flame-clmc!28
parents 3c37fffd de3857ec
No related branches found
No related tags found
No related merge requests found
......@@ -23,23 +23,6 @@ def generate_network_report(recieved_bytes, sent_bytes, time):
return result
# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
result = [{"measurement": "vm_res_alloc",
"tags": {
"vm_state": state
},
"fields": {
"cpu": cpu,
"memory": mem,
"storage": storage
},
"time": _getNSTime(time)
}]
return result
# Reports cpu usage, scaling on requests
def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time):
result = [{"measurement": "cpu_usage",
......@@ -87,12 +70,40 @@ def generate_ipendpoint_route(resource, requests, latency, time):
return result
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
timestamp = int(1000000000*time)
def generate_endpoint_config(time, cpu, mem, storage, current_state, current_state_time, **kwargs):
"""
generates a measurement for a VM configuration states
:param cpu: the number of CPUs of VM endpoint
:param mem: memory of VM endpoint
:param storage: storage capacity of VM endpoint
:param current_state: the current state the endpoint is in (TAG)
:param current_state_time: the part of the sampling period the endpoint was in the current state
:param time: time of measurement
:param kwargs: 'python-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value
:return: dictionary object representing the data to post on influx
"""
# lambda function to validate whether a state is given as a key in the keyword arguments dictionary
validate = lambda key: kwargs.get(key) if key in kwargs else 0.0
# generate and validate the state values
fields = {"cpus": cpu, "memory": mem, "storage": storage} # NOTE: Do we need these fields ?
for state in ("unplaced", "placing", "placed", "booting", "booted", "connecting", "connected"):
fields[state] = validate(state)
fields[("avg_{0}".format(state))] = validate("avg_{0}".format(state))
result = [{"measurement": "endpoint_config",
"tags": {
"current_state": current_state,
"current_state_time": current_state_time,
},
"fields": fields,
"time": _getNSTime(time)}]
return result
return timestamp
def generate_mc_service_config( mcMeasurement, stateTimeStats, time ):
......@@ -142,6 +153,13 @@ def validate_state_time_stats( stateTimeStats ):
return stateTimeStats
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
timestamp = int(1000000000*time)
return timestamp
# DEPRECATED
# ____________________________________________________________________________
......@@ -153,6 +171,23 @@ def quote_wrap(string):
return "\"" + string + "\""
# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
result = [{"measurement": "vm_res_alloc",
"tags": {
"vm_state": state
},
"fields": {
"cpu": cpu,
"memory": mem,
"storage": storage
},
"time": _getNSTime(time)
}]
return result
def _generateClientRequest(cReq, id, time):
# Tags first
result = 'sid="' + str(id) + '",' + cReq
......
......@@ -3,9 +3,9 @@
import clmctest.monitoring.LineProtocolGenerator as lp
import time
import urllib.parse
import pytest
import random
import sys, getopt
import sys
import getopt
from influxdb import InfluxDBClient
# Simulation parameters
......@@ -73,40 +73,85 @@ class Sim(object):
'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}
]
# Simulate configuration of the ipendpoints
# endpoint state->mu, sigma, secs normal distribution
config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]}
endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0},
self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}}
print("\nSimulation started. Generating data...")
# Simulate configuration of the ip endpoints
# Move endpoints from state unplaced to state placing
# reports: 1, unplaced: 0.7s (completed), placing: 0.3s
for ip_endpoint in ip_endpoints:
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeVMState(agent_db_client, sim_time, ip_endpoint, (("unplaced", 0.7, 1),), 'placing', 0.3)
# report the change of the state in the config dictionary, to keep track of unreported seconds for the current state
endpoint_states_config[agent]["current_state"] = "placing"
endpoint_states_config[agent]["current_state_time"] = 0.3
sim_time += TICK_TIME
# Place endpoints
max_delay = 0
# reports: 10 (SAMPLE PERIOD is 1), placing: 9.1s (completed), placed: 0.9s
# addition to uncompleted states from previous report: placing += 0.3s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0],
config_delay_dist['placing'][0] * config_delay_dist['placing'][1],
'placing', 'placed')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# since the current state in the dictionary is still placing, only the current state time is incremented
for i in range(9):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "placing")
endpoint_states_config[agent]["current_state_time"] += TICK_TIME
# here, the VM exits state placing, hence need to append the current state time
self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (("placing", 0.1 + endpoint_states_config[agent]["current_state_time"], 1),), 'placed', 0.9)
# since VM changed its current state, readjust the config dictionary
endpoint_states_config[agent]["current_state"] = "placed"
endpoint_states_config[agent]["current_state_time"] = 0.9
sim_time += 10 * TICK_TIME
# Move endpoints from state placed to state booting
# reports: 1, placed: 0.8s, booting: 0.2s
# addition to uncompleted states from previous report: placed += 0.9s
for ip_endpoint in ip_endpoints:
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# here, the VM exits state placed, hence need to append the current state time
self._changeVMState(agent_db_client, sim_time, ip_endpoint, (('placed', 0.8 + endpoint_states_config[agent]["current_state_time"], 1),), 'booting', 0.2)
endpoint_states_config[agent]["current_state"] = "booting"
endpoint_states_config[agent]["current_state_time"] = 0.2
sim_time += TICK_TIME
# Boot endpoints
max_delay = 0
# reports: 10 (SAMPLE PERIOD is 1), booting: 9.4s, booted: 0.6s
# addition to uncompleted states from previous report: booting += 0.2s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0],
config_delay_dist['booting'][0] * config_delay_dist['booting'][1],
'booting', 'booted')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# since the current state in the dictionary is still booting, only the current state time is incremented
for i in range(9):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "booting")
endpoint_states_config[agent]["current_state_time"] += TICK_TIME
# here, the VM exits state booting, hence need to append the current state time
self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (('booting', 0.4 + endpoint_states_config[agent]["current_state_time"], 1),), 'booted', 0.6)
# since VM changed its current state, readjust the config dictionary
endpoint_states_config[agent]["current_state"] = "booted"
endpoint_states_config[agent]["current_state_time"] = 0.6
sim_time += 10*TICK_TIME
# move mpegdash_service media component state from 'stopped' to 'starting'
# Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting')
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting' )
self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting')
sim_time += TICK_TIME
......@@ -116,29 +161,58 @@ class Sim(object):
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
for i in range( 0, 4 ):
self._writeMCSingleState( influxClient, 'mpegdash_service_config', 'starting', sim_time + (i * TICK_TIME) )
for i in range(0, 4):
self._writeMCSingleState(influxClient, 'mpegdash_service_config', 'starting', sim_time + (i * TICK_TIME))
self._changeMCState( influxClient, sim_time + (4 * TICK_TIME), 'mpegdash_service_config', 10, 7, 'starting', 'running' )
self._changeMCState(influxClient, sim_time + (4 * TICK_TIME), 'mpegdash_service_config', 10, 7, 'starting', 'running')
sim_time += 5 * TICK_TIME
# Move endpoints from state booted to state connecting
# reports: 2, booted: 1.5s, connecting: 0.5s
# addition to uncompleted states from previous report: booted += 0.6s
for ip_endpoint in ip_endpoints:
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# since the current state in the dictionary is still booted, only the current state time is incremented
self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "booted")
endpoint_states_config[agent]["current_state_time"] += TICK_TIME
# here, the VM exits state booted, hence need to append the current state time
self._changeVMState(agent_db_client, sim_time + (1*TICK_TIME), ip_endpoint, (('booted', 0.5 + endpoint_states_config[agent]["current_state_time"], 1),), 'connecting', 0.5)
# since VM changed its current state, readjust the config dictionary
endpoint_states_config[agent]["current_state"] = "connecting"
endpoint_states_config[agent]["current_state_time"] = 0.5
sim_time += 2*TICK_TIME
# Connect endpoints
max_delay = 0
# reports: 10, connecting: 9.7s, connected: 0.3s
# addition to uncompleted states from previous report: connecting += 0.5s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0],
config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1],
'connecting', 'connected')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# since the current state in the dictionary is still connecting, only the current state time is incremented
for i in range(9):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connecting")
endpoint_states_config[agent]["current_state_time"] += TICK_TIME
# here, the VM exits state booted, hence need to append the current state time
self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, (('connecting', 0.7 + endpoint_states_config[agent]["current_state_time"], 1),), 'connected', 0.3)
# since VM changed its current state, readjust the config dictionary
endpoint_states_config[agent]["current_state"] = "connected"
endpoint_states_config[agent]["current_state_time"] = 0.3
sim_time += 10*TICK_TIME
request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC
inc_period_count = 0
for i in range(simulation_length_seconds):
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# linear inc to arrival rate
......@@ -198,12 +272,38 @@ class Sim(object):
# remove requests processed off the queue
ip_endpoint['request_queue'] -= int(requests_processed)
# update endpoint state (continuously 'connected')
self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected")
# update current state time in the config dictionary
endpoint_states_config[agent]["current_state_time"] += 1
# update mpegdash_service media component state (continuously 'running')
self._writeMCSingleState( agent_db_client, 'mpegdash_service_config', 'running', sim_time )
self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time)
sim_time += TICK_TIME
# Simulate tear-down of media components
# Simulate tear-down of media components and endpoints
# remove endpoints
# reports: 5, connected: 4.7s, unplaced: 0.3s
# addition to uncompleted states from previous report: connected += 3600s + 0.3s
for ip_endpoint in ip_endpoints:
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# since the current state in the dictionary is still connected, only the current state time is incremented
for i in range(4):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connected")
endpoint_states_config[agent]["current_state_time"] += TICK_TIME
# here, the VM exits state connected, hence need to append the current state time
self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, (('connected', 0.7 + endpoint_states_config[agent]["current_state_time"], 1),), 'unplaced', 0.3)
# since VM changed its current state, readjust the config dictionary
endpoint_states_config[agent]["current_state"] = "unplaced"
endpoint_states_config[agent]["current_state_time"] = 0.3
sim_time += 5 * TICK_TIME
# move mpegdash_service media component state from 'running' to 'stopping'
# Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping')
for ip_endpoint in ip_endpoints:
......@@ -220,7 +320,7 @@ class Sim(object):
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped' )
self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped')
sim_time += TICK_TIME
......@@ -253,25 +353,48 @@ class Sim(object):
return response_delay
@staticmethod
def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state):
def _writeVMSingleState(agent_db_client, sim_time, ip_endpoint, state):
"""
Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables.
:param sim_time:
:param ip_endpoint:
:param mu:
:param sigma:
:param transition_state:
:param next_state:
:return: the delay time
Write a single state as a sample over TICK_TIME.
:param agent_db_client: agent used to send metric data to CLMC
:param sim_time: current simulation time
:param ip_endpoint: dict with info for ip endpoint
:param state: state that's being reported
"""
# since no state has been finished, the generate_endpoint_config is called without any keyword arguments (field:value pairs) and only the tags for the current state are set
data = lp.generate_endpoint_config(sim_time, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], state, TICK_TIME)
agent_db_client.write_points(data)
@staticmethod
def _changeVMState(agent_db_client, sim_time, ip_endpoint, completed_states, next_state, next_state_time):
"""
Send influx data to report change of VM state over the sample period.
agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time))
:param agent_db_client: agent used to send metric data to CLMC
:param sim_time: current simulation time
:param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.)
delay_time = random.normalvariate(mu, sigma)
:param completed_states: states that have been completed
These are the state that have been completed over the sample period (could be more than 1). In terms of data format, a tuple of tuples.
Each completed state is a tuple if format (state, state_period, count), where state is the state's name, period is the total time the VM was in this state and count
is the number of times the VM was in this state (used to derive the average for the sample period.
E.g. ((booting, 5, 2), (booted, 2, 1)) - this is for transitions: booting(3s) -> booted(2s) -> booting(2s) -> booted(1s)
since booted is the last (current state), it is reported only once in the completed states tuple
agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time))
:param next_state: name of next state (which is also the current state of the VM) - in the above example this would be booted.
:param next_state_time: the time the VM was in the next state (which is also the current state of the VM) - in the above example this would be 1 second.
"""
state_stats = {}
for state, period, count in completed_states:
state_stats[state] = period
state_stats["avg_{0}".format(state)] = float(period/count)
return delay_time
# transition state and state period are passed to the generate report function as keyword arguments
agent_db_client.write_points(lp.generate_endpoint_config(sim_time, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'],
next_state, next_state_time, **state_stats))
@staticmethod
def _writeMCSingleState(influxClient, measurement, state, sim_time):
......@@ -287,10 +410,9 @@ class Sim(object):
"""
state_stats = {}
state_stats[state] = float( TICK_TIME )
state_stats['avg_' +state] = float( TICK_TIME )
influxClient.write_points( lp.generate_mc_service_config(measurement,state_stats,sim_time) )
state_stats[state] = float(TICK_TIME)
state_stats['avg_' + state] = float(TICK_TIME)
influxClient.write_points(lp.generate_mc_service_config(measurement, state_stats, sim_time))
@staticmethod
def _changeMCState(influxClient, sim_time, mc_measurement, sample_count, trans_sample_count, transition_state, next_state):
......@@ -308,22 +430,24 @@ class Sim(object):
"""
mc_states = {}
# Report total time in transition and its average of the reporting period
mc_states[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count
mc_states["avg_" +transition_state] = mc_states[transition_state] / float(TICK_TIME)
mc_states["avg_" + transition_state] = mc_states[transition_state] / float(TICK_TIME)
# Use the time remaining as the length for the time in the next state
mc_states[next_state] = float(TICK_TIME) - mc_states[transition_state]
mc_states["avg_" +next_state] = mc_states[next_state] / float(TICK_TIME)
mc_states["avg_" + next_state] = mc_states[next_state] / float(TICK_TIME)
influxClient.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time))
def run_simulation(generate=True, sTime=3600):
"""
A method which runs the data generation simulator
A method which runs the data generation simulator.
:param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used)
:param sTime: the number of 'seconds' the simulation will run
"""
global INFLUX_DB_NAME
......@@ -342,6 +466,7 @@ def run_simulation(generate=True, sTime=3600):
else:
simulator.db_client.drop_database(simulator.influx_db_name)
if __name__ == "__main__":
"""
The main entry for this module. Code here is executed only if the StreamingSim.py file is executed,
......@@ -354,24 +479,24 @@ if __name__ == "__main__":
# Try get some options
try:
opts, args = getopt.getopt( sys.argv[1:], "c:t:", ['clear','time='])
opts, args = getopt.getopt(sys.argv[1:], "c:t:", ['clear', 'time='])
except getopt.GetoptError:
print( 'StreamingSim.py -c -t <seconds>' )
print('StreamingSim.py -c -t <seconds>')
sys.exit(2)
# Apply options, if any
for opt, arg in opts:
if opt in ( '-c','--clear' ):
if opt in ('-c', '--clear'):
genOpt = False
elif opt in ('-t','--time'):
elif opt in ('-t', '--time'):
simTime = arg
if ( genOpt == True ):
print( "Running simulation to generate data" )
print( "Time period for this simulation: " + str(simTime) + " seconds" )
if genOpt:
print("Running simulation to generate data")
print("Time period for this simulation: " + str(simTime) + " seconds")
else:
print( "Clearing simulation data" )
print("Clearing simulation data")
run_simulation( genOpt, simTime )
run_simulation(genOpt, simTime)
......@@ -12,16 +12,16 @@ def streaming_sim_config():
"""
Reads the service configuration deployed for the streaming simulation test.
:param request: access the parameters of the fixture
:return: the python object representing the read YAML file
"""
rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml')
print("rspec file: {0}".format(rspec))
print("\nrspec file: {0}".format(rspec))
with open(rspec, 'r') as stream:
data_loaded = yaml.load(stream)
return data_loaded
@pytest.fixture(params=[{'database': 'CLMCMetrics'}], scope='module')
def influx_db(streaming_sim_config, request):
"""
......@@ -34,6 +34,7 @@ def influx_db(streaming_sim_config, request):
return InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port='8086', database=request.param['database'], timeout=10)
@pytest.fixture(scope="module")
def simulator(streaming_sim_config):
......@@ -42,10 +43,7 @@ def simulator(streaming_sim_config):
agent1_url = "http://" + streaming_sim_config['hosts'][1]['ip_address'] + ":8186"
agent2_url = "http://" + streaming_sim_config['hosts'][2]['ip_address'] + ":8186"
simulator = Sim( influx_url, influx_db_name, agent1_url, agent2_url )
dbs = simulator.db_client.get_list_database()
dbs = [db.get("name") for db in dbs]
simulator = Sim(influx_url, influx_db_name, agent1_url, agent2_url)
simulator.reset()
......
......@@ -9,17 +9,16 @@ class TestSimulation(object):
"""
A testing class used to group all the tests related to the simulation data
"""
@pytest.fixture(scope='class')
def run_simulator( self, simulator ):
random.seed( 0 ) # Seed random function so we can reliably test for average queries
@pytest.fixture(scope='class', autouse=True)
def run_simulator(self, simulator):
random.seed(0) # Seed random function so we can reliably test for average queries
print( "Running simulation, please wait..." )
simulator.run( 3600 )
print( "Waiting for INFLUX to finish receiving simulation data..." )
time.sleep( 10 ) # wait for data to finish arriving at the INFLUX database
print("Running simulation, please wait...")
simulator.run(3600)
print("Waiting for INFLUX to finish receiving simulation data...")
time.sleep(10) # wait for data to finish arriving at the INFLUX database
print( "... simulation data fixture finished" )
@pytest.mark.parametrize("query, expected_result", [
......@@ -31,33 +30,52 @@ class TestSimulation(object):
{"time": "1970-01-01T00:00:00Z", "count_avg_response_time": 7200, "count_peak_response_time": 7200, "count_requests": 7200}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."net_port_io"',
{"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."vm_res_alloc"',
{"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}),
# Media component state tests
('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
"count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
"count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3609, "count_avg_starting" : 3609, "count_avg_stopped" : 3609, "count_avg_stopping" : 3609, "count_running" : 3609, "count_starting" : 3609, "count_stopped" : 3609, "count_stopping" : 3609}),
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3609, "count_avg_starting" : 3609, "count_avg_stopped" : 3609, "count_avg_stopping" : 3609, "count_running" : 3609, "count_starting" : 3609, "count_stopped" : 3609, "count_stopping" : 3609}),
('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <>0',
{"time" : "1970-01-01T00:00:00Z", "avg_stopped" : 0.15}),
('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <>0',
{"time" : "1970-01-01T00:00:00Z", "avg_starting" : 0.9166666666666666}),
('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <>0',
{"time" : "1970-01-01T00:00:00Z", "avg_running" : 0.9997502081598669}),
('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <>0',
{"time" : "1970-01-01T00:00:00Z", "avg_stopping" : 0.55})
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "avg_placing": 9.4}),
('SELECT mean(avg_booting) as "avg_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_booting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "avg_booting": 9.6}),
('SELECT mean(avg_connecting) as "avg_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connecting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "avg_connecting": 10.2}),
('SELECT mean(avg_connected) as "avg_connected" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connected <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "avg_connected": 3605.0}),
('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "avg_placing": 9.4}),
('SELECT mean(avg_booting) as "avg_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_booting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "avg_booting": 9.6}),
('SELECT mean(avg_connecting) as "avg_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connecting <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "avg_connecting": 10.2}),
('SELECT mean(avg_connected) as "avg_connected" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connected <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "avg_connected": 3605.0}),
('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}),
('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}),
('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}),
('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55})
])
def test_simulation( self, run_simulator, influx_db, query, expected_result ):
def test_simulation(self, influx_db, query, expected_result):
"""
This is the entry point of the test. This method will be found and executed when the module is ran using pytest
:param query: the query to execute (value obtained from the pytest parameter decorator)
:param expected_result: the result expected from executing the query (value obtained from the pytest parameter decorator)
:param influx_db the import db client fixture - imported from contest.py
:param run_simulation_fixture: the imported fixture to use to generate the testing data - the return value of the fixture is not needed in this case
"""
# pytest automatically goes through all queries under test, declared in the parameters decorator
......@@ -69,7 +87,7 @@ class TestSimulation(object):
# test the error attribute of the result is None, that is no error is returned from executing the DB query
assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
# get the dictionary of result points; the next() function just gets the first element of the query results iterator (we only expect one item in the iterator)
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
actual_result = next(query_result.get_points())
assert expected_result == actual_result, "Simulation test failure"
......
......@@ -2,8 +2,11 @@
import pytest
import yaml
import requests
import time
import pkg_resources
@pytest.fixture(scope="module")
def streaming_config():
"""
......@@ -18,3 +21,16 @@ def streaming_config():
with open(rspec, 'r') as stream:
data_loaded = yaml.load(stream)
return data_loaded
@pytest.fixture(scope="module", autouse=True,
params=[{'config': {'kapacitor_url': 'http://localhost:8888/chronograf/v1/sources/1/kapacitors', 'kapacitor_file': '/vagrant/test/streaming/kapacitor.json'}}])
def kapacitor_config(request):
kapacitor_configuration = request.param['config']['kapacitor_file']
with open(kapacitor_configuration, "r") as rule_file:
data = "".join(line.strip() for line in rule_file.readlines())
kapacitor_url = request.param['config']['kapacitor_url']
requests.post(url=kapacitor_url, data=data, headers={"Content-Type": "application/json"})
time.sleep(1)
......@@ -33,18 +33,8 @@ echo $TEST_DIR"/kapacitor.conf"
cp $TEST_DIR/kapacitor.conf /etc/kapacitor/kapacitor.conf
systemctl start kapacitor
# wait for kapacitor to restart
# TODO: do this better
sleep 5
# Set up Influx data source
curl -i -X POST -H "Content-Type: application/json" http://localhost:8888/chronograf/v1/sources -d @$TEST_DIR/influx.json
# Set up Kapacitor
curl -i -X POST -H "Content-Type: application/json" http://localhost:8888/chronograf/v1/sources/1/kapacitors -d @$TEST_DIR/kapacitor.json
# Set up rules
curl -i -X POST -H "Content-Type: application/json" http://localhost:9092/kapacitor/v1/tasks -d @$TEST_DIR/rules.json
# Set up dashboard
curl -i -X POST -H "Content-Type: application/json" http://localhost:8888/chronograf/v1/dashboards -d @$TEST_DIR/dashboard.json
{
"id" : "TestRule1",
"type" : "batch",
"dbrps" : [{"db": "CLMCMetrics", "rp" : "autogen"}],
"script" : "var ruleData = batch\r\n |query(''' SELECT mean(\"handled\") AS \"mean_handled\" FROM \"CLMCMetrics\".\"autogen\".\"nginx\" WHERE \"ipendpoint\"='adaptive_streaming_I1_nginx1' ''')\r\n .period(5s)\r\n .every(5s)\r\n\r\nvar ruleAlert = ruleData\r\n |alert()\r\n .id('{{ .Name }}\/adaptive_streaming_I1_nginx1')\r\n .message('{{ .ID }} is {{ .Level }} Mean handled connections: {{ index .Fields \"mean_handled\" }}')\r\n .warn(lambda: \"mean_handled\" > 10)\r\n .log( '\/tmp\/TestRule1.log' )",
"status" : "enabled"
}
\ No newline at end of file
{
"id" : "TestRule2",
"type" : "batch",
"dbrps" : [{"db": "CLMCMetrics", "rp" : "autogen"}],
"script" : "var ruleData = batch\r\n |query(''' SELECT mean(\"waiting\") AS \"mean_waiting\" FROM \"CLMCMetrics\".\"autogen\".\"nginx\" WHERE \"ipendpoint\"='adaptive_streaming_I1_nginx1' ''')\r\n .period(5s)\r\n .every(5s)\r\n\r\nvar ruleAlert = ruleData\r\n |alert()\r\n .id('{{ .Name }}\/adaptive_streaming_I1_nginx1')\r\n .message('{{ .ID }} is {{ .Level }} Mean waiting connections: {{ index .Fields \"mean_waiting\" }}')\r\n .warn(lambda: \"mean_waiting\" > 10)\r\n .log( '\/tmp\/TestRule2.log' )",
"status" : "enabled"
}
\ No newline at end of file
......@@ -21,6 +21,8 @@ class TestStreamingAlerts(object):
@pytest.mark.parametrize("rule, log", [
("rules.json", "/tmp/RPSLoad.log"),
("test_rule1.json", "/tmp/TestRule1.log"),
("test_rule2.json", "/tmp/TestRule2.log"),
])
def test_alerts(self, rule, log, streaming_url, streaming_manifest):
"""
......@@ -38,14 +40,10 @@ class TestStreamingAlerts(object):
:param streaming_manifest: the fixture providing the root of the XML streaming manifest
"""
kapacitor_setter = self.kapacitor_setting(rule)
kapacitor_setter = self.kapacitor_setting(rule, log)
next(kapacitor_setter) # Setup the test rule
try:
if isfile(log):
remove(log) # delete log file if existing from previous tests
except PermissionError:
system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file
print("Testing alert creation for rule: {0}".format(rule))
segments = streaming_manifest.findall(".//{urn:mpeg:DASH:schema:MPD:2011}SegmentURL")
......@@ -56,9 +54,11 @@ class TestStreamingAlerts(object):
t.start()
alert_created = False
counter = 0
time_delay = 2.5
while True:
# loop while threads are execution and do a check every 2.5 seconds to check if either alert log has been created or threads have finished execution
sleep(2.5)
sleep(time_delay)
if isfile(log):
for t in threads: # kill all running threads in case log file is created beforehand
t.stop()
......@@ -67,32 +67,55 @@ class TestStreamingAlerts(object):
if threads_queue.full():
break
assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert."
counter += time_delay # the counter tracks the time taken; for the rules under test usually a 30 seconds time frame is enough to trigger the alert
if counter >= 12*time_delay:
for t in threads: # kill all running threads in case of test failure
t.stop()
break
assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert for rule {0}.".format(rule)
print("\nSuccessfully passed alert creation test.\n")
print("Successfully passed alert creation test for rule: {0}.".format(rule))
next(kapacitor_setter) # Teardown the test rule
def kapacitor_setting(self, rule):
def kapacitor_setting(self, rule, log):
"""
A generator function used to provide setUp/tearDown actions for a particular kapacitor rule.
On setUp rule is initialized, on tearDown rule is deleted. Interleaving is achieved using the generator pattern.
:param rule: the name of the json file for the rule under test
:param log: the absolute path of the log file that's being tested
"""
# check if the log file is already created due to a previous test
try:
if isfile(log):
remove(log) # delete log file if existing from previous tests
except PermissionError:
system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file
# Initialization of the kapacitor rule - Test setUp (UnitTest style)
with open(join(dirname(__file__), rule), "r") as rule_file:
data = "".join(line.strip() for line in rule_file.readlines())
rule_data = json.loads(data)
requests.delete(url=urljoin(self.kapacitor_url + "/", rule_data.get("id"))) # delete in case of a task with the same ID already set in the kapacitor
requests.post(url=self.kapacitor_url, data=data, headers={"Content-Type": "application/json"})
r = requests.post(url=self.kapacitor_url, data=data, headers={"Content-Type": "application/json"})
assert r.status_code == 200, "Couldn't create alert rule {0}".format(rule)
print("\nSuccessfully created test rule {0}".format(rule))
yield
# Deleting the kapacitor rule used for testing - Test tearDown (UnitTest style)
requests.delete(url=urljoin(self.kapacitor_url + "/", rule_data.get("id")))
# check if the log file is created and clean it up
try:
if isfile(log):
remove(log) # delete log file if existing from previous tests
except PermissionError:
system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file
yield
@staticmethod
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment