Skip to content
Snippets Groups Projects
Commit 890cae38 authored by Simon Crowle's avatar Simon Crowle
Browse files

Merge branch 'mediaComponentConfig' into 'endpointConfig'

# Conflicts:
#   clmctest/monitoring/StreamingSim.py
parents 29f3d466 0c8035f6
Branches
Tags
No related merge requests found
...@@ -103,54 +103,43 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta ...@@ -103,54 +103,43 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta
return result return result
def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ):
"""
generates a measurement line for a media component configuration state
: time - timestamp for the measurement
: mcMeasurement - measurement label
: current_state - the current state of the service configuration
: current_state_time - the current length of time in the current state
: config_state_values - dictionary of media component configuration states (summed time and mean average over the sampling period)
: - stopped, starting, running, stopping [use '_sum' and '_mst' for sum and average respectively]
"""
def generate_mc_service_config( mcMeasurement, stateTimeStats, time ): # define state value validation function (inserting key/0 where key is supplied)
validate_f = lambda key: config_state_values.get(key) if key in config_state_values else 0.0
validStats = validate_state_time_stats( stateTimeStats )
result = [{ "measurement" : mcMeasurement,
"fields" :
{ "stopped" : validStats['stopped'],
"avg_stopped" : validStats['avg_stopped'],
"starting" : validStats['starting'],
"avg_starting" : validStats['avg_starting'],
"running" : validStats['running'],
"avg_running" : validStats['avg_running'],
"stopping" : validStats['stopping'],
"avg_stopping" : validStats['avg_stopping']
},
"time" : _getNSTime(time) }]
return result
def validate_state_time_stats( stateTimeStats ):
if ( not 'stopped' in stateTimeStats ):
stateTimeStats['stopped'] = 0.0
if ( not 'avg_stopped' in stateTimeStats ):
stateTimeStats['avg_stopped'] = 0.0
if ( not 'starting' in stateTimeStats ):
stateTimeStats['starting'] = 0.0
if ( not 'avg_starting' in stateTimeStats ):
stateTimeStats['avg_starting'] = 0.0
if ( not 'running' in stateTimeStats ):
stateTimeStats['running'] = 0.0
if ( not 'avg_running' in stateTimeStats ): # define expected keys
stateTimeStats['avg_running'] = 0.0 state_keys = [ "stopped_sum", "stopped_mst",
"starting_sum", "starting_mst",
"running_sum", "running_mst",
"stopping_sum", "stopping_mst" ]
if ( not 'stopping' in stateTimeStats ): # define current state time first
stateTimeStats['stopping'] = 0.0 fields = {}
fields["current_state_time"] = current_state_time
if ( not 'avg_stopping' in stateTimeStats ): # then add in validated state values
stateTimeStats['avg_stopping'] = 0.0 for key in state_keys :
fields[key] = validate_f(key)
return stateTimeStats # compose result
result = [{ "measurement" : mcMeasurement,
"tags" : { "current_state" : current_state },
"fields" : fields,
"time" : _getNSTime(time)
}]
return result
# InfluxDB likes to have time-stamps in nanoseconds # InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time): def _getNSTime(time):
......
...@@ -76,6 +76,9 @@ class Sim(object): ...@@ -76,6 +76,9 @@ class Sim(object):
endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0}, endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0},
self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}} self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}}
mc_curr_states = { self.agent1_url : { "current_state": "stopped", "current_state_time": 0 },
self.agent2_url : { "current_state": "stopped", "current_state_time": 0 } }
print("\nSimulation started. Generating data...") print("\nSimulation started. Generating data...")
# Simulate configuration of the ip endpoints # Simulate configuration of the ip endpoints
...@@ -146,6 +149,43 @@ class Sim(object): ...@@ -146,6 +149,43 @@ class Sim(object):
endpoint_states_config[agent]["current_state_time"] = 0.6 endpoint_states_config[agent]["current_state_time"] = 0.6
sim_time += 10*TICK_TIME sim_time += 10*TICK_TIME
# move mpegdash media component state from 'stopped' to 'starting'
# Total reports = 1, (completed: 0.2 seconds in 'stopped', current: 0.8 seconds in 'starting')
for ip_endpoint in ip_endpoints:
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
completed_states = ( ('stopped', 0.2, 1 ), )
mc_curr_states[agent]["current_state"] = 'starting'
mc_curr_states[agent]["current_state_time"] = 0.8
self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'starting', 0.8 )
sim_time += TICK_TIME
# move mpegdash media component state from 'starting' to 'running'
# Total reports = 5, (4 incomplete 'starting', completed: 0.8 + 4 + 0.7 seconds in 'starting', current: 0.3 seconds in 'running')
for ip_endpoint in ip_endpoints:
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# 4 seconds of starting
for i in range(0, 4):
mc_curr_states[agent]["current_state"] = 'running'
mc_curr_states[agent]["current_state_time"] += TICK_TIME
self._writeMCSingleState( influxClient, sim_time +(i * TICK_TIME), 'mpegdash_mc_config', 'running', mc_curr_states[agent]["current_state_time"] )
# Switch over to running
completed_states = ( ('starting', 0.8 + 4 + 0.7, 1 ), )
mc_curr_states[agent]["current_state"] = 'running'
mc_curr_states[agent]["current_state_time"] = 0.3
self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'running', 0.3 )
sim_time += 5 * TICK_TIME
# Move endpoints from state booted to state connecting # Move endpoints from state booted to state connecting
# reports: 2, booted: 1.5s, connecting: 0.5s # reports: 2, booted: 1.5s, connecting: 0.5s
# addition to uncompleted states from previous report: booted += 0.6s # addition to uncompleted states from previous report: booted += 0.6s
...@@ -279,30 +319,64 @@ class Sim(object): ...@@ -279,30 +319,64 @@ class Sim(object):
# update endpoint state (continuously 'connected') # update endpoint state (continuously 'connected')
self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected", endpoint_states_config[agent]["current_state_time"]) self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected", endpoint_states_config[agent]["current_state_time"])
# update mpegdash_service media component state (continuously 'running') # update mpegdash media component state (continuously 'running')
self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time) mc_curr_states[agent]["current_state"] = 'running'
mc_curr_states[agent]["current_state_time"] += TICK_TIME
# Just write the current state out here (it will be summed and averaged at the end)
self._writeMCSingleState( agent_db_client, sim_time, 'mpegdash_mc_config', 'running', mc_curr_states[agent]["current_state_time"] )
sim_time += TICK_TIME sim_time += TICK_TIME
# Simulate tear-down of media components and endpoints # Simulate tear-down of media components and endpoints
# move mpegdash_service media component state from 'running' to 'stopping' # remove endpoints
# Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping') # reports: 5, connected: 4.7s, unplaced: 0.3s
# addition to uncompleted states from previous report: connected += 3600s + 0.3s
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
# since the current state in the dictionary is still connected, only the current state time is incremented
for i in range(4):
self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connected")
endpoint_states_config[agent]["current_state_time"] += TICK_TIME
# here, the VM exits state connected, hence need to append the current state time
self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, (('connected', 0.7 + endpoint_states_config[agent]["current_state_time"], 1),), 'unplaced', 0.3)
# since VM changed its current state, readjust the config dictionary
endpoint_states_config[agent]["current_state"] = "unplaced"
endpoint_states_config[agent]["current_state_time"] = 0.3
sim_time += 5 * TICK_TIME
# move mpegdash media component state from 'running' to 'stopping'
# Total reports = 2, ( completed: n+1.8 seconds in 'running', current: 0.2 seconds in 'stopping')
for ip_endpoint in ip_endpoints:
agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._writeMCSingleState( influxClient, 'mpegdash_service_config', 'running', sim_time ) completed_states = ( ('running', mc_curr_states[ip_endpoint["agent_url"]]["current_state_time"] +1.8, 1), )
self._changeMCState( influxClient, sim_time + TICK_TIME, 'mpegdash_service_config', 10, 8, 'running', 'stopping' ) mc_curr_states[agent]["current_state"] = 'stopping'
mc_curr_states[agent]["current_state_time"] = 0.2
self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'stopping', 0.2 )
sim_time += 2 * TICK_TIME sim_time += 2 * TICK_TIME
# move mpegdash_service media component state from 'stopping' to 'stopped' # move mpegdash media component state from 'stopping' to 'stopped'
# Total reports = 1, (0.9 seconds in 'stopping', 0.1 seconds in 'stopped') # Total reports = 1, (0.9 seconds in 'stopping', 0.1 seconds in 'stopped')
for ip_endpoint in ip_endpoints: for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"]) agent = ip_endpoint["agent_url"]
agent_url = urllib.parse.urlparse(agent)
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10) influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped')
completed_states = ( ('stopping', mc_curr_states[ip_endpoint["agent_url"]]["current_state_time"] +0.9, 1), )
mc_curr_states[agent]["current_state"] = 'stopped'
mc_curr_states[agent]["current_state_time"] = 0.1
self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'stopped', 0.1 )
sim_time += TICK_TIME sim_time += TICK_TIME
...@@ -401,50 +475,38 @@ class Sim(object): ...@@ -401,50 +475,38 @@ class Sim(object):
next_state, next_state_time, **state_stats)) next_state, next_state_time, **state_stats))
@staticmethod @staticmethod
def _writeMCSingleState(influxClient, measurement, state, sim_time): def _writeMCSingleState(influxClient, sim_time, measurement, curr_state, curr_state_time):
""" """
Write a single state as a sample over TICK_TIME Write a single state as a sample over TICK_TIME
: influxClient - agent used to send metric data to CLMC : influxClient - agent used to send metric data to CLMC
: measurement - name of influx measurement set
: state - state to be declared
: sim_time - time stamp for this measurement : sim_time - time stamp for this measurement
: measurement - name of influx measurement set
: curr_state - the current state
: curr_state_time - length of time in the current state
""" """
state_stats = {} influxClient.write_points( lp.generate_mc_service_config(sim_time, measurement, curr_state, curr_state_time, {}) )
state_stats[state] = float(TICK_TIME)
state_stats['avg_' + state] = float(TICK_TIME)
influxClient.write_points(lp.generate_mc_service_config(measurement, state_stats, sim_time))
@staticmethod @staticmethod
def _changeMCState(influxClient, sim_time, mc_measurement, sample_count, trans_sample_count, transition_state, next_state): def _changeMCState(influxClient, sim_time, measurement, completed_states, curr_state, curr_state_time):
""" """
Send INFLUX data indicating the time taken to transition to a new state Send INFLUX data indicating the time taken to transition to a new state
: influxClient - agent used to send metric data to CLMC : influxClient - agent used to send metric data to CLMC
: sim_time - simulation time at start of state changing period : sim_time - simulation time at start of state changing period
: mc_measurement - measurement name : measurement - measurement name
: sample_count - the total number of samples in the reporting period (TICK_TIME) : completed_states - dictionary of states that have been completed (each includes the state name, state time sum and count)
: trans_sample_count - the number of samples in the transition state : curr_state - the current state
: transition_state - the state being exited : curr_state_time - the current state's running time
: next_state - the state being entered
""" """
mc_states = {} mc_states = {}
for state, state_sum, state_count in completed_states:
mc_states[ state +"_sum" ] = state_sum
mc_states[ state +"_mst" ] = state_sum / state_count
# Report total time in transition and its average of the reporting period influxClient.write_points( lp.generate_mc_service_config(sim_time, measurement, curr_state, curr_state_time, mc_states) )
mc_states[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count
mc_states["avg_" + transition_state] = mc_states[transition_state] / float(TICK_TIME)
# Use the time remaining as the length for the time in the next state
mc_states[next_state] = float(TICK_TIME) - mc_states[transition_state]
mc_states["avg_" + next_state] = mc_states[next_state] / float(TICK_TIME)
influxClient.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time))
def run_simulation(generate=True, sTime=3600): def run_simulation(generate=True, sTime=3600):
""" """
......
...@@ -38,10 +38,10 @@ class TestSimulation(object): ...@@ -38,10 +38,10 @@ class TestSimulation(object):
{"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3639, "count_unplaced_sum": 3639, "count_unplaced_mst": 3639, "count_placing_sum": 3639, "count_placing_mst": 3639, "count_placed_sum": 3639, "count_placed_mst": 3639, "count_booting_sum": 3639, "count_booting_mst": 3639, "count_booted_sum": 3639, {"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3639, "count_unplaced_sum": 3639, "count_unplaced_mst": 3639, "count_placing_sum": 3639, "count_placing_mst": 3639, "count_placed_sum": 3639, "count_placed_mst": 3639, "count_booting_sum": 3639, "count_booting_mst": 3639, "count_booted_sum": 3639,
"count_booted_mst": 3639, "count_connecting_sum": 3639, "count_connecting_mst": 3639, "count_connected_sum": 3639, "count_connected_mst": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}), "count_booted_mst": 3639, "count_connecting_sum": 3639, "count_connecting_mst": 3639, "count_connected_sum": 3639, "count_connected_mst": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'', ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), {"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3607, "count_running_mst": 3607, "count_running_sum": 3607, "count_starting_mst": 3607, "count_starting_sum": 3607, "count_stopped_mst": 3607, "count_stopped_sum": 3607, "count_stopping_mst": 3607, "count_stopping_sum": 3607}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'', ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}), {"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3607, "count_running_mst": 3607, "count_running_sum": 3607, "count_starting_mst": 3607, "count_starting_sum": 3607, "count_stopped_mst": 3607, "count_stopped_sum": 3607, "count_stopping_mst": 3607, "count_stopping_sum": 3607}),
('SELECT mean(placing_mst) as "placing_mst" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE placing_mst <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'', ('SELECT mean(placing_mst) as "placing_mst" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE placing_mst <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "placing_mst": 9.4}), {"time": "1970-01-01T00:00:00Z", "placing_mst": 9.4}),
...@@ -60,15 +60,24 @@ class TestSimulation(object): ...@@ -60,15 +60,24 @@ class TestSimulation(object):
('SELECT mean(connected_mst) as "connected_mst" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE connected_mst <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'', ('SELECT mean(connected_mst) as "connected_mst" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE connected_mst <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "connected_mst": 3605.0}), {"time": "1970-01-01T00:00:00Z", "connected_mst": 3605.0}),
('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0', ('SELECT mean(stopped_sum) as "stopped_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopped_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}), {"time": "1970-01-01T00:00:00Z", "stopped_sum": 0.2}),
('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0', ('SELECT mean(stopped_mst) as "stopped_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopped_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}), {"time": "1970-01-01T00:00:00Z", "stopped_mst": 0.2}),
('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0', ('SELECT mean(starting_sum) as "starting_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE starting_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}), {"time": "1970-01-01T00:00:00Z", "starting_sum": 5.5}),
('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0', ('SELECT mean(starting_mst) as "starting_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE starting_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55}) {"time": "1970-01-01T00:00:00Z", "starting_mst": 5.5}),
('SELECT mean(running_sum) as "running_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE running_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "running_sum": 3602.1000000000004}),
('SELECT mean(running_mst) as "running_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE running_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "running_mst": 3602.1000000000004}),
('SELECT mean(stopping_sum) as "stopping_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopping_sum <> 0',
{"time": "1970-01-01T00:00:00Z", "stopping_sum": 1.1}),
('SELECT mean(stopping_mst) as "stopping_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopping_mst <> 0',
{"time": "1970-01-01T00:00:00Z", "stopping_mst": 1.1}),
]) ])
def test_simulation(self, influx_db, query, expected_result): def test_simulation(self, influx_db, query, expected_result):
""" """
This is the entry point of the test. This method will be found and executed when the module is ran using pytest This is the entry point of the test. This method will be found and executed when the module is ran using pytest
......
...@@ -525,9 +525,9 @@ Observation of EP or MC states will be performed by a Telegraf plugin. For examp ...@@ -525,9 +525,9 @@ Observation of EP or MC states will be performed by a Telegraf plugin. For examp
![exampleStateFlow](./image/configStateFlow.png) ![exampleStateFlow](./image/configStateFlow.png)
_Above: example observations within a two sampling periods for a MC configuration state_ _Above: example observations within a four sampling periods for a MC configuration state_
In the example provided above a MC moves through several states, finishing in a stopped state. During each sampling period, the total time in observed states is measured and for those that are _completed states_ a sum of all the time and the average time for that state is recorded. For any state that has not been observed during the sample period, the sum and average values will be recorded as zero. For a state that has not yet completed, this state will be considered as the 'current state' and the length of time in this state increases and does so continuously, over multiple sample periods if necessary, until it exits. Finally, if a state completes directly after sample period [1] ends and a new state begins before the start of the next sample period [2], then the previous current state from period [1] should be recorded as _completed_ as part period [2]'s report. In the example provided above a MC moves through several states. During each sampling period, the total time in the observed states is measured and for those that are _completed states_ a sum of all the time and the mean average time for that state is recorded. For any state that has not been observed during the sample period, the sum and average values will be recorded as zero. For a state that has not yet completed, this state will be considered as the 'current state' and the length of time in this state increases and does so continuously, over multiple sample periods if necessary, until it exits. Finally, if a state completes directly after sample period '1' ends and a new state begins before the start of the next sample period '2', then the previous current state (from period '1') should be recorded as _completed_ as part period '2's report.
##### Endpoint configuration state model ##### Endpoint configuration state model
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment