Skip to content
Snippets Groups Projects
Commit e3bfce58 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

[Issue #61] - normalised all reporting periods for endpoint configuration states to 1 second

parent 698ddfde
No related branches found
No related tags found
No related merge requests found
......@@ -73,63 +73,56 @@ class Sim(object):
'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}
]
# Simulate configuration of the ip endpoints
# endpoint state->mu, sigma, secs normal distribution
config_delay_dist = {"unplaced": [1, 0.68], "placing": [10, 0.68], "placed": [1, 0.68], "booting": [10, 0.68], "booted": [2, 0.68],
"connecting": [10, 0.68], "connected": [8, 0.68]}
print("\nSimulation started. Generating data...")
# Simulate configuration of the ip endpoints
# Move endpoints from state unplaced to state placing
max_delay = 0
# reports: 1, unplaced: 0.7s, placing: 0.3s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['unplaced'][0],
config_delay_dist['unplaced'][0] * config_delay_dist['unplaced'][1], 0.7,
'unplaced', 'placing')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 7, 'unplaced', 'placing')
sim_time += TICK_TIME
# Place endpoints
max_delay = 0
# reports: 10, placing: 9.1s, placed: 0.9s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0],
config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 0.7,
'placing', 'placed')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
for i in range(9):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "placing")
self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 9, 'placing', 'placed')
sim_time += 10 * TICK_TIME
# Move endpoints from state placed to state booting
max_delay = 0
# reports: 1, placed: 0.8s, booting: 0.2s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placed'][0],
config_delay_dist['placed'][0] * config_delay_dist['placed'][1], 0.7,
'placed', 'booting')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 8, 'placed', 'booting')
sim_time += TICK_TIME
# Boot endpoints
max_delay = 0
# reports: 10, booting: 9.4s, booted: 0.6s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0],
config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 0.7,
'booting', 'booted')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
for i in range(9):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "booting")
self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 4, 'booting', 'booted')
sim_time += 10*TICK_TIME
# move mpegdash_service media component state from 'stopped' to 'starting'
# Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting')
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting' )
self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting')
sim_time += TICK_TIME
......@@ -147,26 +140,25 @@ class Sim(object):
sim_time += 5 * TICK_TIME
# Move endpoints from state booted to state connecting
max_delay = 0
# reports: 2, booted: 1.5s, connecting: 0.5s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booted'][0],
config_delay_dist['booted'][0] * config_delay_dist['booted'][1], 0.7,
'booted', 'connecting')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "booted")
self._changeVMState(agent_db_client, sim_time + (1*TICK_TIME), ip_endpoint, 10, 5, 'booted', 'connecting')
sim_time += 2*TICK_TIME
# Connect endpoints
max_delay = 0
# reports: 10, connecting: 9.7s, connected: 0.3s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0],
config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 0.7,
'connecting', 'connected')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
for i in range(9):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connecting")
self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 7, 'connecting', 'connected')
sim_time += 10*TICK_TIME
request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC
inc_period_count = 0
......@@ -233,8 +225,7 @@ class Sim(object):
ip_endpoint['request_queue'] -= int(requests_processed)
# update endpoint state (continuously 'connected')
agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time,
**{'connected': float(TICK_TIME), 'avg_connected': float(TICK_TIME)}))
self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected")
# update mpegdash_service media component state (continuously 'running')
self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time)
......@@ -244,15 +235,16 @@ class Sim(object):
# Simulate tear-down of media components and endpoints
# remove endpoints
max_delay = 0
# reports: 5, connected: 4.7s, unplaced: 0.3s
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connected'][0],
config_delay_dist['connected'][0] * config_delay_dist['connected'][1], 0.7,
'connected', 'unplaced')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
for i in range(4):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connected")
self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, 10, 7, 'connected', 'unplaced')
sim_time += 5 * TICK_TIME
# move mpegdash_service media component state from 'running' to 'stopping'
# Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping')
......@@ -303,34 +295,49 @@ class Sim(object):
return response_delay
@staticmethod
def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, trans_ratio, transition_state, next_state):
def _writeVMSingleState(agent_db_client, sim_time, ip_endpoint, state):
"""
Write a single state as a sample over TICK_TIME.
:param agent_db_client: agent used to send metric data to CLMC
:param sim_time: current simulation time
:param ip_endpoint: dict with info for ip endpoint
:param state: state that's being reported
"""
state_stats = {
state: float(TICK_TIME),
'avg_{0}'.format(state): float(TICK_TIME)
}
agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats))
@staticmethod
def _changeVMState(agent_db_client, sim_time, ip_endpoint, sample_count, trans_sample_count, transition_state, next_state):
"""
Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables.
:param sim_time: current simulation time
:param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.)
:param mu: mean value
:param sigma: standard deviation
:param trans_ratio: the part of the total delay time for transition state
:param sample_count: - the total number of samples in the reporting period (TICK_TIME)
:param trans_sample_count: - the number of samples in the transition state
:param transition_state: name of transition state
:param next_state: name of next state
:return: the delay time
"""
# assert total delay time is at least 1 second
total_delay_time = max(random.normalvariate(mu, sigma), 1.0)
state_stats = {}
# part of the total delay time is the transition state period
transition_time = trans_ratio*total_delay_time
# the rest of the delay is the next state period
next_state_time = total_delay_time - transition_time
# period of transition state is part of the total sample count multiplied by the reporting period (TICK_TIME)
state_stats[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count
state_stats["avg_" + transition_state] = state_stats[transition_state] / float(TICK_TIME)
# transition state and state period are passed to the generate report function as keyword arguments
agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time,
**{transition_state: transition_time, 'avg_{0}'.format(transition_state): transition_time,
next_state: next_state_time, 'avg_{0}'.format(next_state): next_state_time}))
# the rest of the period is reported as time for the next state
state_stats[next_state] = float(TICK_TIME) - state_stats[transition_state]
state_stats["avg_" + next_state] = state_stats[next_state] / float(TICK_TIME)
return total_delay_time
# transition state and state period are passed to the generate report function as keyword arguments
agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats))
@staticmethod
def _writeMCSingleState(influxClient, measurement, state, sim_time):
......
......@@ -15,7 +15,7 @@ def streaming_sim_config():
:return: the python object representing the read YAML file
"""
rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml')
print("rspec file: {0}".format(rspec))
print("\nrspec file: {0}".format(rspec))
with open(rspec, 'r') as stream:
data_loaded = yaml.load(stream)
......
......@@ -32,24 +32,31 @@ class TestSimulation(object):
{"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "count_unplaced": 3607, "count_avg_unplaced": 3607, "count_placing": 3607, "count_avg_placing": 3607, "count_placed": 3607, "count_avg_placed": 3607, "count_booting": 3607, "count_avg_booting": 3607, "count_booted": 3607,
"count_avg_booted": 3607, "count_connecting": 3607, "count_avg_connecting": 3607, "count_connected": 3607, "count_avg_connected": 3607, "count_cpus": 3607, "count_memory": 3607, "count_storage": 3607}),
{"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
"count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "count_unplaced": 3607, "count_avg_unplaced": 3607, "count_placing": 3607, "count_avg_placing": 3607, "count_placed": 3607, "count_avg_placed": 3607, "count_booting": 3607, "count_avg_booting": 3607, "count_booted": 3607,
"count_avg_booted": 3607, "count_connecting": 3607, "count_avg_connecting": 3607, "count_connected": 3607, "count_avg_connected": 3607, "count_cpus": 3607, "count_memory": 3607, "count_storage": 3607}),
{"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
"count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <>0',
('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_placing": 0.9272727272727274}),
('SELECT mean(avg_booting) as "avg_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_booting <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_booting": 0.8727272727272727}),
('SELECT mean(avg_connecting) as "avg_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connecting <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_connecting": 0.9272727272727272}),
('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}),
('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <>0',
('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}),
('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <>0',
('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}),
('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <>0',
('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55})
])
def test_simulation(self, influx_db, query, expected_result):
......@@ -76,47 +83,3 @@ class TestSimulation(object):
assert expected_result == actual_result, "Simulation test failure"
print("Successfully passed test for the following query: {0}".format(query))
@pytest.mark.parametrize("query, field", [
('SELECT mean("placing") as "mean_transition_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" where "placing" <> 0 and "placed" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'',
'mean_transition_placing'),
('SELECT mean("placing") as "mean_target_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" where "placing" <> 0 and "unplaced" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'',
'mean_target_placing'),
('SELECT mean("booting") as "mean_transition_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "booting" <> 0 and "booted" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'',
'mean_transition_booting'),
('SELECT mean("booting") as "mean_target_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "booting" <> 0 and "placed" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'',
'mean_target_booting'),
('SELECT mean("connecting") as "mean_transition_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "connecting" <> 0 and "connected" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'',
'mean_transition_connecting'),
('SELECT mean("connecting") as "mean_target_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "connecting" <> 0 and "booted" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'',
'mean_target_connecting'),
])
def test_mean_config_queries(self, influx_db, query, field):
"""
Test queries for mean values in the configuration states model.
- 'mean_transition_{state}' - we want to know the mean time spent on the given state in cases where this was the actual transition state.
e.g. 'mean_transition_placing' - refers to the mean time spent on state 'placing' in transitions such as 'placing' -> 'placed'
- 'mean_target_{state}' - we want to know the mean time spent on the given state in cases where this was the actual target state
e.g. 'mean_target_placing' - refers to the mean time spent on state 'placing' in transitions such as 'unplaced' -> 'placing'
:param influx_db: influx db client
:param query: query under test
:param field: the field id to fetch
"""
# pytest automatically goes through all queries under test, declared in the parameters decorator
print("\n") # prints a blank line for formatting purposes
# the raise_errors=False argument is given so that we could actually test that the DB didn't return any errors instead of raising an exception
query_result = influx_db.query(query, raise_errors=False)
# test the error attribute of the result is None, that is no error is returned from executing the DB query
assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
result = next(query_result.get_points()).get(field)
assert float(result) >= 0.0, "Test failure. Reported mean values cannot be negative."
print("Successfully passed test for the following query: {0}".format(query))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment