Skip to content
Snippets Groups Projects
Commit 8a46aba7 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

[ Issue #61 ] - Endpoint configuration monitoring update

parent a881c846
Branches
Tags
No related merge requests found
......@@ -23,23 +23,6 @@ def generate_network_report(recieved_bytes, sent_bytes, time):
return result
# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
result = [{"measurement": "vm_res_alloc",
"tags": {
"vm_state": state
},
"fields": {
"cpu": cpu,
"memory": mem,
"storage": storage
},
"time": _getNSTime(time)
}]
return result
# Reports cpu usage, scaling on requests
def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time):
result = [{"measurement": "cpu_usage",
......@@ -87,41 +70,33 @@ def generate_ipendpoint_route(resource, requests, latency, time):
return result
def generate_endpoint_config(state, cpu, mem, storage, statePeriod, time):
def generate_endpoint_config(cpu, mem, storage, time, **kwargs):
"""
generates a measurement
generates a measurement for a VM configuration states
:param cpu: the number of CPUs of VM endpoint
:param mem: memory of VM endpoint
:param storage: storage capacity of VM endpoint
:param state: the state that was monitored (e.g. placing, booting or connecting)
:param statePeriod: the period of time, the IP endpoint was in this state
:param time: time of measurement
:return: dictionary object representing the data to post to influx
:param kwargs: 'pythonic-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value
:return: dictionary object representing the data to post on influx
"""
result = [{"measurement": "endpointConfig",
"tags":
{
"state": state
},
"fields":
{"cpus": cpu,
"memory": mem,
"storage": storage,
"statePeriod": statePeriod,
},
"time": _getNSTime(time)}]
# lambda function to validate whether a state is given as a key in the keyword arguments dictionary
validate = lambda key: kwargs.get(key) if key in kwargs else 0.0
return result
# generate and validate the state values
fields = {"cpus": cpu, "memory": mem, "storage": storage} # NOTE: Do we need these fields ?
for state in ("placing", "booting", "connecting"):
fields[state] = validate(state)
fields[("avg_{0}".format(state))] = validate("avg_{0}".format(state))
result = [{"measurement": "endpoint_config",
"fields": fields,
"time": _getNSTime(time)}]
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
timestamp = int(1000000000*time)
return result
return timestamp
def generate_mc_service_config( mcMeasurement, stateTimeStats, time ):
......@@ -171,6 +146,13 @@ def validate_state_time_stats( stateTimeStats ):
return stateTimeStats
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
timestamp = int(1000000000*time)
return timestamp
# DEPRECATED
# ____________________________________________________________________________
......@@ -182,6 +164,23 @@ def quote_wrap(string):
return "\"" + string + "\""
# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
result = [{"measurement": "vm_res_alloc",
"tags": {
"vm_state": state
},
"fields": {
"cpu": cpu,
"memory": mem,
"storage": storage
},
"time": _getNSTime(time)
}]
return result
def _generateClientRequest(cReq, id, time):
# Tags first
result = 'sid="' + str(id) + '",' + cReq
......
......@@ -3,9 +3,9 @@
import clmctest.monitoring.LineProtocolGenerator as lp
import time
import urllib.parse
import pytest
import random
import sys, getopt
import sys
import getopt
from influxdb import InfluxDBClient
# Simulation parameters
......@@ -89,8 +89,7 @@ class Sim(object):
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0],
config_delay_dist['placing'][0] * config_delay_dist['placing'][1],
'placing', 'placed')
config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 'placing')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
......@@ -100,8 +99,7 @@ class Sim(object):
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0],
config_delay_dist['booting'][0] * config_delay_dist['booting'][1],
'booting', 'booted')
config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 'booting')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
......@@ -124,8 +122,7 @@ class Sim(object):
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0],
config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1],
'connecting', 'connected')
config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 'connecting')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
......@@ -244,23 +241,22 @@ class Sim(object):
return response_delay
@staticmethod
def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state):
def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state):
"""
Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables.
:param sim_time:
:param ip_endpoint:
:param mu:
:param sigma:
:param transition_state:
:param next_state:
:param sim_time: current simulation time
:param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.)
:param mu: mean value
:param sigma: standard deviation
:return: the delay time
"""
agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time))
delay_time = random.normalvariate(mu, sigma)
agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time))
# assert transition time is at least 0
delay_time = max(random.normalvariate(mu, sigma), 0.0)
# transition state and state period are passed to the generate report function as keyword arguments
agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time,
**{transition_state: delay_time, 'avg_{0}'.format(transition_state): delay_time}))
return delay_time
......@@ -314,6 +310,7 @@ def run_simulation(generate=True, sTime=3600):
else:
simulator.db_client.drop_database(simulator.influx_db_name)
if __name__ == "__main__":
"""
The main entry for this module. Code here is executed only if the StreamingSim.py file is executed,
......
......@@ -12,7 +12,6 @@ def streaming_sim_config():
"""
Reads the service configuration deployed for the streaming simulation test.
:param request: access the parameters of the fixture
:return: the python object representing the read YAML file
"""
rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml')
......@@ -22,6 +21,7 @@ def streaming_sim_config():
data_loaded = yaml.load(stream)
return data_loaded
@pytest.fixture(scope="module")
def streaming_sim_params(streaming_sim_config):
"""
......@@ -51,6 +51,7 @@ def get_db_client(streaming_sim_config, request):
return InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port=8086, database=request.param['database'], timeout=10)
@pytest.fixture(scope='module')
def run_simulation_fixture(streaming_sim_params):
"""
......
#!/usr/bin/python3
import pytest
from influxdb import InfluxDBClient
import clmctest.monitoring.LineProtocolGenerator as lp
import random
import time
"""
Tests the monitoring of endpoints' configuration states - currently based on model with three states (placing -> booting -> connecting)
Line Protocol report format:
endpointConfig <global tags>,<state> <statePeriod=milliseconds>,<cpus=numberOfCPUs>, <memory=memoryOfVM>, <storage=storageCapacityOfVM> time
"""
# Random initialization of state periods
state_delays = [{'placing': random.randint(7500, 12500), 'booting': random.randint(7500, 12500), 'connecting': random.randint(7500, 12500)},
{'placing': random.randint(7500, 12500), 'booting': random.randint(7500, 12500), 'connecting': random.randint(7500, 12500)}]
measurement_name = 'endpointConfig' # measurement name for configuration of media components' endpoints
@pytest.mark.parametrize("query, result", [
('SELECT "state", "statePeriod", "cpus", "memory", "storage" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
{'placing': {"state": "placing", "statePeriod": state_delays[0]['placing'], "cpus": 1, "memory": 2048, "storage": '10GB'},
'booting': {"state": "booting", "statePeriod": state_delays[0]['booting'], "cpus": 1, "memory": 2048, "storage": '10GB'},
'connecting': {"state": "connecting", "statePeriod": state_delays[0]['connecting'], "cpus": 1, "memory": 2048, "storage": '10GB'}}),
('SELECT "state", "statePeriod", "cpus", "memory", "storage" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'' ,
{'placing': {"state": "placing", "statePeriod": state_delays[1]['placing'], "cpus": 1, "memory": 2048, "storage": '10GB'},
'booting': {"state": "booting", "statePeriod": state_delays[1]['booting'], "cpus": 1, "memory": 2048, "storage": '10GB'},
'connecting': {"state": "connecting", "statePeriod": state_delays[1]['connecting'], "cpus": 1, "memory": 2048, "storage": '10GB'}})
])
def test_endpoint_config(query, result, get_db_client):
"""
Test endpoint configuration state measurements in general.
:param query: query under test for the endpoint configuration state
:param result: expected result of executed query
:param get_db_client: the InfluxDB client fixture from conftest.py
"""
print("\n") # blank line for formatting purposes
query_result = get_db_client.query(query, raise_errors=False)
# test the error attribute of the result is None, that is no error is returned from executing the DB query
assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
measurements = query_result.get_points()
assert all(map(lambda measurement: compare(measurement, result), measurements)), "Comparison failure for query:\n{0}".format(query)
print("Successfully passed test for query:\n{0}".format(query))
@pytest.mark.parametrize("query, result", [
('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'placing\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
{"mean_statePeriod": state_delays[0]['placing']}),
('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'booting\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
{"mean_statePeriod": state_delays[0]['booting']}),
('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'connecting\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
{"mean_statePeriod": state_delays[0]['connecting']}),
('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'placing\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'',
{"mean_statePeriod": state_delays[1]['placing']}),
('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'booting\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'',
{"mean_statePeriod": state_delays[1]['booting']}),
('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'connecting\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'',
{"mean_statePeriod": state_delays[1]['connecting']})
])
def test_mean_config_periods(query, result, get_db_client):
"""
:param query: query under test for the endpoint configuration state
:param result: expected result of executed query
:param get_db_client: the InfluxDB client fixture from conftest.py
"""
print("\n") # blank line for formatting purposes
query_result = get_db_client.query(query, raise_errors=False)
# test the error attribute of the result is None, that is no error is returned from executing the DB query
assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
measurement = next(query_result.get_points())
measurement.pop('time')
assert measurement == result, "Comparison failure for query of mean state period:\n{0}".format(query)
print("Successfully passed test for query of mean state period:\n{0}".format(query))
@pytest.fixture(scope='module', autouse=True)
def generate_states_data(streaming_sim_config, get_db_client):
"""
Sends configuration state data to influx for the two mc endpoints
:param streaming_sim_config: the configuration fixture from conftest.py
:param get_db_client: the influx db client
"""
global state_delays
global measurement_name
if measurement_name in (measurement['name'] for measurement in get_db_client.get_list_measurements()):
get_db_client.drop_measurement(measurement_name) # clear DB measurement from previous test if exists
time.sleep(2)
for i in range(1, 3):
measurement_time = 0
for state in ('placing', 'booting', 'connecting'):
epURL = streaming_sim_config['hosts'][i]['ip_address'] # endpoint URL
endpointID = streaming_sim_config['hosts'][i]['ipendpoint_id']
cpu = streaming_sim_config['hosts'][i]['cpus']
mem = streaming_sim_config['hosts'][i]['memory']
disk = streaming_sim_config['hosts'][i]['disk']
db_name = streaming_sim_config['hosts'][i]['database_name']
db_client = InfluxDBClient(host=epURL, port=8186, database=db_name, timeout=10)
period = state_delays[i-1].get(state)
print("Endpoint - {0}, state - {1}, period - {2}ms".format(endpointID, state, period))
db_client.write_points(lp.generate_endpoint_config(state, cpu, mem, disk, period, measurement_time))
measurement_time += period
time.sleep(10)
def compare(actual_measurement, measurement_dict):
"""
Auxiliary function to check whether the measurements are the same, excluding the time stamp field
:param actual_measurement: the actual measurement from Influx (the one with a time stamp
:param measurement_dict: the dictionary of expected measurements
:return: True for equality and False otherwise
"""
actual_measurement.pop('time')
# get the measurement state we are comparing and fetch it from the dictionary of expected measurements
expected_measurement = measurement_dict.get(actual_measurement.get('state'))
return actual_measurement == expected_measurement
......@@ -17,13 +17,17 @@ class TestSimulation(object):
{"time": "1970-01-01T00:00:00Z", "count_avg_response_time": 7200, "count_peak_response_time": 7200, "count_requests": 7200}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."net_port_io"',
{"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."vm_res_alloc"',
{"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "count_placing": 3, "count_avg_placing": 3, "count_booting": 3, "count_avg_booting": 3, "count_connecting": 3, "count_avg_connecting": 3, "count_cpus": 3, "count_memory": 3, "count_storage": 3}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "count_placing": 3, "count_avg_placing": 3, "count_booting": 3, "count_avg_booting": 3, "count_connecting": 3, "count_avg_connecting": 3,
"count_cpus": 3, "count_memory": 3, "count_storage": 3}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3602, "count_avg_starting" : 3602, "count_avg_stopped" : 3602, "count_avg_stopping" : 3602, "count_running" : 3602, "count_starting" : 3602, "count_stopped" : 3602, "count_stopping" : 3602}),
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3602, "count_avg_starting": 3602, "count_avg_stopped": 3602, "count_avg_stopping": 3602, "count_running": 3602, "count_starting": 3602, "count_stopped": 3602, "count_stopping": 3602}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3602, "count_avg_starting" : 3602, "count_avg_stopped" : 3602, "count_avg_stopping" : 3602, "count_running" : 3602, "count_starting" : 3602, "count_stopped" : 3602, "count_stopping" : 3602}),
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3602, "count_avg_starting": 3602, "count_avg_stopped": 3602, "count_avg_stopping": 3602, "count_running": 3602, "count_starting": 3602, "count_stopped": 3602, "count_stopping": 3602}),
])
def test_simulation(self, query, expected_result, get_db_client, run_simulation_fixture):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment