Skip to content
Snippets Groups Projects
Commit f7801c23 authored by Simon Crowle's avatar Simon Crowle
Browse files

Refactors media configuration state monitoring to align with latest state monitoring model

parent beac0662
No related branches found
No related tags found
No related merge requests found
...@@ -104,54 +104,43 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta ...@@ -104,54 +104,43 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta
return result return result
def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ):
"""
generates a measurement line for a media component configuration state
: time - timestamp for the measurement
: mcMeasurement - measurement label
: current_state - the current state of the service configuration
: current_state_time - the current length of time in the current state
: config_state_values - dictionary of media component configuration states (summed time and mean average over the sampling period)
: - stopped, starting, running, stopping [use '_sum' and '_mst' for sum and average respectively]
"""
def generate_mc_service_config( mcMeasurement, stateTimeStats, time ): # define state value validation function (inserting key/0 where key is supplied)
validate_f = lambda key: config_state_values.get(key) if key in config_state_values else 0.0
validStats = validate_state_time_stats( stateTimeStats )
result = [{ "measurement" : mcMeasurement,
"fields" :
{ "stopped" : validStats['stopped'],
"avg_stopped" : validStats['avg_stopped'],
"starting" : validStats['starting'],
"avg_starting" : validStats['avg_starting'],
"running" : validStats['running'],
"avg_running" : validStats['avg_running'],
"stopping" : validStats['stopping'],
"avg_stopping" : validStats['avg_stopping']
},
"time" : _getNSTime(time) }]
return result
def validate_state_time_stats( stateTimeStats ):
if ( not 'stopped' in stateTimeStats ):
stateTimeStats['stopped'] = 0.0
if ( not 'avg_stopped' in stateTimeStats ):
stateTimeStats['avg_stopped'] = 0.0
if ( not 'starting' in stateTimeStats ):
stateTimeStats['starting'] = 0.0
if ( not 'avg_starting' in stateTimeStats ):
stateTimeStats['avg_starting'] = 0.0
if ( not 'running' in stateTimeStats ):
stateTimeStats['running'] = 0.0
if ( not 'avg_running' in stateTimeStats ): # define expected keys
stateTimeStats['avg_running'] = 0.0 state_keys = [ "stopped_sum", "stopped_mst",
"starting_sum", "starting_mst",
"running_sum", "running_mst",
"stopping_sum", "stopping_mst" ]
if ( not 'stopping' in stateTimeStats ): # define current state time first
stateTimeStats['stopping'] = 0.0 fields = {}
fields["current_state_time"] = current_state_time
if ( not 'avg_stopping' in stateTimeStats ): # then add in validated state values
stateTimeStats['avg_stopping'] = 0.0 for key in state_keys :
fields[key] = validate_f(key)
return stateTimeStats # compose result
result = [{ "measurement" : mcMeasurement,
"tags" : { "current_state" : current_state },
"fields" : fields,
"time" : _getNSTime(time)
}]
return result
# InfluxDB likes to have time-stamps in nanoseconds # InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time): def _getNSTime(time):
......
...@@ -76,6 +76,9 @@ class Sim(object): ...@@ -76,6 +76,9 @@ class Sim(object):
endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0}, endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0},
self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}} self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}}
mc_curr_states = { self.agent1_url : { "current_state": "stopped", "current_state_time": 0 },
self.agent2_url : { "current_state": "stopped", "current_state_time": 0 } }
print("\nSimulation started. Generating data...") print("\nSimulation started. Generating data...")
# Simulate configuration of the ip endpoints # Simulate configuration of the ip endpoints
...@@ -146,25 +149,40 @@ class Sim(object): ...@@ -146,25 +149,40 @@ class Sim(object):
endpoint_states_config[agent]["current_state_time"] = 0.6 endpoint_states_config[agent]["current_state_time"] = 0.6
sim_time += 10*TICK_TIME sim_time += 10*TICK_TIME
# move mpegdash_service media component state from 'stopped' to 'starting' # move mpegdash media component state from 'stopped' to 'starting'
# Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting') # Total reports = 1, (completed: 0.2 seconds in 'stopped', current: 0.8 seconds in 'starting')
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting')
completed_states = ( ('stopped', 0.2, 1 ), )
mc_curr_states[agent]["current_state"] = 'starting'
mc_curr_states[agent]["current_state_time"] = 0.8
self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'starting', 0.8 )
sim_time += TICK_TIME sim_time += TICK_TIME
# move mpegdash_service media component state from 'starting' to 'running' # move mpegdash media component state from 'starting' to 'running'
# Total reports = 5, (4.7 seconds in 'starting', 0.3 seconds in 'running') # Total reports = 5, (4 incomplete 'starting', completed: 0.8 + 4 + 0.7 seconds in 'starting', current: 0.3 seconds in 'running')
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# 4 seconds of starting
for i in range(0, 4): for i in range(0, 4):
self._writeMCSingleState(influxClient, 'mpegdash_service_config', 'starting', sim_time + (i * TICK_TIME)) mc_curr_states[agent]["current_state"] = 'running'
mc_curr_states[agent]["current_state_time"] += TICK_TIME
self._writeMCSingleState( influxClient, sim_time +(i * TICK_TIME), 'mpegdash_mc_config', 'running', mc_curr_states[agent]["current_state_time"] )
# Switch over to running
completed_states = ( ('starting', 0.8 + 4 + 0.7, 1 ), )
mc_curr_states[agent]["current_state"] = 'running'
mc_curr_states[agent]["current_state_time"] = 0.3
self._changeMCState(influxClient, sim_time + (4 * TICK_TIME), 'mpegdash_service_config', 10, 7, 'starting', 'running') self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'running', 0.3 )
sim_time += 5 * TICK_TIME sim_time += 5 * TICK_TIME
...@@ -277,8 +295,12 @@ class Sim(object): ...@@ -277,8 +295,12 @@ class Sim(object):
# update current state time in the config dictionary # update current state time in the config dictionary
endpoint_states_config[agent]["current_state_time"] += 1 endpoint_states_config[agent]["current_state_time"] += 1
# update mpegdash_service media component state (continuously 'running') # update mpegdash media component state (continuously 'running')
self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time) mc_curr_states[agent]["current_state"] = 'running'
mc_curr_states[agent]["current_state_time"] += TICK_TIME
# Just write the current state out here (it will be summed and averaged at the end)
self._writeMCSingleState( agent_db_client, sim_time, 'mpegdash_mc_config', 'running', mc_curr_states[agent]["current_state_time"] )
sim_time += TICK_TIME sim_time += TICK_TIME
...@@ -304,23 +326,33 @@ class Sim(object): ...@@ -304,23 +326,33 @@ class Sim(object):
endpoint_states_config[agent]["current_state_time"] = 0.3 endpoint_states_config[agent]["current_state_time"] = 0.3
sim_time += 5 * TICK_TIME sim_time += 5 * TICK_TIME
# move mpegdash_service media component state from 'running' to 'stopping' # move mpegdash media component state from 'running' to 'stopping'
# Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping') # Total reports = 2, ( completed: n+1.8 seconds in 'running', current: 0.2 seconds in 'stopping')
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._writeMCSingleState( influxClient, 'mpegdash_service_config', 'running', sim_time ) completed_states = ( ('running', mc_curr_states[ip_endpoint["agent_url"]]["current_state_time"] +1.8, 1), )
self._changeMCState( influxClient, sim_time + TICK_TIME, 'mpegdash_service_config', 10, 8, 'running', 'stopping' ) mc_curr_states[agent]["current_state"] = 'stopping'
mc_curr_states[agent]["current_state_time"] = 0.2
self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'stopping', 0.2 )
sim_time += 2 * TICK_TIME sim_time += 2 * TICK_TIME
# move mpegdash_service media component state from 'stopping' to 'stopped' # move mpegdash media component state from 'stopping' to 'stopped'
# Total reports = 1, (0.9 seconds in 'stopping', 0.1 seconds in 'stopped') # Total reports = 1, (0.9 seconds in 'stopping', 0.1 seconds in 'stopped')
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped')
completed_states = ( ('stopping', mc_curr_states[ip_endpoint["agent_url"]]["current_state_time"] +0.9, 1), )
mc_curr_states[agent]["current_state"] = 'stopped'
mc_curr_states[agent]["current_state_time"] = 0.1
self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'stopped', 0.1 )
sim_time += TICK_TIME sim_time += TICK_TIME
...@@ -397,50 +429,38 @@ class Sim(object): ...@@ -397,50 +429,38 @@ class Sim(object):
next_state, next_state_time, **state_stats)) next_state, next_state_time, **state_stats))
@staticmethod @staticmethod
def _writeMCSingleState(influxClient, measurement, state, sim_time): def _writeMCSingleState(influxClient, sim_time, measurement, curr_state, curr_state_time):
""" """
Write a single state as a sample over TICK_TIME Write a single state as a sample over TICK_TIME
: influxClient - agent used to send metric data to CLMC : influxClient - agent used to send metric data to CLMC
: measurement - name of influx measurement set : sim_time - time stamp for this measurement
: state - state to be declared : measurement - name of influx measurement set
: sim_time - time stamp for this measurement : curr_state - the current state
: curr_state_time - length of time in the current state
""" """
state_stats = {} influxClient.write_points( lp.generate_mc_service_config(sim_time, measurement, curr_state, curr_state_time, {}) )
state_stats[state] = float(TICK_TIME)
state_stats['avg_' + state] = float(TICK_TIME)
influxClient.write_points(lp.generate_mc_service_config(measurement, state_stats, sim_time))
@staticmethod @staticmethod
def _changeMCState(influxClient, sim_time, mc_measurement, sample_count, trans_sample_count, transition_state, next_state): def _changeMCState(influxClient, sim_time, measurement, completed_states, curr_state, curr_state_time):
""" """
Send INFLUX data indicating the time taken to transition to a new state Send INFLUX data indicating the time taken to transition to a new state
: influxClient - agent used to send metric data to CLMC : influxClient - agent used to send metric data to CLMC
: sim_time - simulation time at start of state changing period : sim_time - simulation time at start of state changing period
: mc_measurement - measurement name : measurement - measurement name
: sample_count - the total number of samples in the reporting period (TICK_TIME) : completed_states - dictionary of states that have been completed (each includes the state name, state time sum and count)
: trans_sample_count - the number of samples in the transition state : curr_state - the current state
: transition_state - the state being exited : curr_state_time - the current state's running time
: next_state - the state being entered
""" """
mc_states = {} mc_states = {}
for state, state_sum, state_count in completed_states:
mc_states[ state +"_sum" ] = state_sum
mc_states[ state +"_mst" ] = state_sum / state_count
# Report total time in transition and its average of the reporting period influxClient.write_points( lp.generate_mc_service_config(sim_time, measurement, curr_state, curr_state_time, mc_states) )
mc_states[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count
mc_states["avg_" + transition_state] = mc_states[transition_state] / float(TICK_TIME)
# Use the time remaining as the length for the time in the next state
mc_states[next_state] = float(TICK_TIME) - mc_states[transition_state]
mc_states["avg_" + next_state] = mc_states[next_state] / float(TICK_TIME)
influxClient.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time))
def run_simulation(generate=True, sTime=3600): def run_simulation(generate=True, sTime=3600):
""" """
......
...@@ -38,10 +38,10 @@ class TestSimulation(object): ...@@ -38,10 +38,10 @@ class TestSimulation(object):
{"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639, {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
"count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}), "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), {"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3607, "count_running_mst": 3607, "count_running_sum": 3607, "count_starting_mst": 3607, "count_starting_sum": 3607, "count_stopped_mst": 3607, "count_stopped_sum": 3607, "count_stopping_mst": 3607, "count_stopping_sum": 3607}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), {"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3607, "count_running_mst": 3607, "count_running_sum": 3607, "count_starting_mst": 3607, "count_starting_sum": 3607, "count_stopped_mst": 3607, "count_stopped_sum": 3607, "count_stopping_mst": 3607, "count_stopping_sum": 3607}),
('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'', ('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "avg_placing": 9.4}), {"time": "1970-01-01T00:00:00Z", "avg_placing": 9.4}),
...@@ -60,15 +60,24 @@ class TestSimulation(object): ...@@ -60,15 +60,24 @@ class TestSimulation(object):
('SELECT mean(avg_connected) as "avg_connected" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connected <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'', ('SELECT mean(avg_connected) as "avg_connected" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connected <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "avg_connected": 3605.0}), {"time": "1970-01-01T00:00:00Z", "avg_connected": 3605.0}),
('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0', ('SELECT mean(stopped_sum) as "stopped_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopped_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}), {"time": "1970-01-01T00:00:00Z", "stopped_sum": 0.2}),
('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0', ('SELECT mean(stopped_mst) as "stopped_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopped_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}), {"time": "1970-01-01T00:00:00Z", "stopped_mst": 0.2}),
('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0', ('SELECT mean(starting_sum) as "starting_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE starting_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}), {"time": "1970-01-01T00:00:00Z", "starting_sum": 5.5}),
('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0', ('SELECT mean(starting_mst) as "starting_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE starting_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55}) {"time": "1970-01-01T00:00:00Z", "starting_mst": 5.5}),
('SELECT mean(running_sum) as "running_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE running_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "running_sum": 3602.1000000000004}),
('SELECT mean(running_mst) as "running_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE running_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "running_mst": 3602.1000000000004}),
('SELECT mean(stopping_sum) as "stopping_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopping_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "stopping_sum": 1.1}),
('SELECT mean(stopping_mst) as "stopping_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopping_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "stopping_mst": 1.1}),
]) ])
def test_simulation(self, influx_db, query, expected_result): def test_simulation(self, influx_db, query, expected_result):
""" """
This is the entry point of the test. This method will be found and executed when the module is ran using pytest This is the entry point of the test. This method will be found and executed when the module is ran using pytest
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment