Skip to content
Snippets Groups Projects
Commit 698ddfde authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Merged mediaComponentConfig into endpointConfig

parents b4d4e3cb 2460412b
No related branches found
No related tags found
No related merge requests found
......@@ -56,14 +56,14 @@ Testing is implemented using pytest.
The installation script is here:
`test/services/pytest/install.sh`
`sudo clmctest/services/pytest/install.sh`
using the following convention:
* Tests are written in python using pytest
* Related tests are stored in a python module `test/<testmodule>` to create a suite of tests. All tests are stored in files test_*.py, there can be many tests per file, and many files per module
* Related tests are stored in a python module `clmctest/<testmodule>` to create a suite of tests. All tests are stored in files test_*.py, there can be many tests per file, and many files per module
* Each test module has a rspec.yml that provides the baseline "fixture" for the tests in the module
* Tests are executed against fixtures. Fixtures are modular "setups" created for a test, that are inserted into the python code using dependancy injection. This offers more flexibility than the *unit style testing. The baseline deployment is created using `vagrant up` with an appropriate rspec, and the pytest fixture reads the rspec.yml and makes the configuration available to the test.
* Tests are executed against fixtures. Fixtures are modular "setups" created for a test, that are inserted into the python code using dependancy injection. This offers more flexibility than the unit style testing. The baseline deployment is created using `vagrant up` with an appropriate rspec, and the pytest fixture reads the rspec.yml and makes the configuration available to the test.
* Tests are executed from a guest VM (not the host) in the repo root using the command `pytest test/<testmodule>`
* Pytest will scan the directory for all tests including in files test_*.py and run them
......@@ -75,7 +75,7 @@ To set up a simulation of the adaptive streaming use case scenario first install
and then execute the following command
`vagrant --fixture=streaming-sim -- up`
`vagrant --fixture=monitoring -- up`
This will provision the following VMs clmc-service, ipendpoint1, ipendpoint2
......@@ -89,7 +89,7 @@ The **clmc-service** vm includes influx, Kapacitor and Chronograf. The following
SSH into the CLMC server
`vagrant --fixture=streaming-sim -- ssh clmc-service`
`vagrant --fixture=monitoring -- ssh clmc-service`
Then go to the 'vagrant' directory.
......@@ -99,20 +99,19 @@ The next step is to generate the test data, which could be done in two ways.
First option is to run a python script to generate the test data sets
`python3 test/streaming-sim/StreamingSim.py`
`python3 clmctest/monitoring/StreamingSim.py`
This script could also be used to clear the generated data by using the '-c' option
`python3 test/streaming-sim/StreamingSim.py -c`
`python3 clmctest/monitoring/StreamingSim.py -c`
The second option is to directly run the testing module, which will detect if the data was generated, and if not, will automatically
generate the data before executing the tests. Keep in mind that if the test data is being generated using this way, a 10 seconds timeout
is given after the generation is finished so that the data could properly be inserted into the database. If the data was already generated
using the first option, only the tests would be executed.
#### Running the monitoring tests
The command for running the testing module is
The second option is to directly run the testing module, which will detect if the data was generated, and if not, will automatically generate the data before executing the tests. Keep in mind that if the test data is being generated using this way, a 10 seconds timeout is given after the generation is finished so that the data could properly be inserted into the database. If the data was already generated using the first option, only the tests would be executed.
`pytest -s test/streaming-sim/test_simresults.py`
The command for running the testing module is:
`pytest -s clmctest/monitoring/test_simresults.py`
The `-s` option in the command is used to output prints used in the test code and is, therefore, optional.
......@@ -121,3 +120,49 @@ If pytest is not installed, an easy solution is to use the Python Package Index
`sudo apt-get install python3-pip`
`pip3 install pytest`
#### Configuration status modelling and monitoring
FLAME _endpoints_ (VMs created and managed by the SFEMC) and media service _media components_ (processes that realise the execution of the media service) both undergo changes in configuration state during the lifetime of a media service's deployment. Observations of these state changes are recorded in the CLMC under named measurement sets, for example 'endpoint_config' and '\<media component name\>_config' for endpoint and media component labels respectively. In each case, all recordable states of the endpoint/media component are enumerated as columns within the measurement set (see respective state models below for details).
Observation of these states will be performed by a third party - for example, a Telegraf plugin will continuously __report__ on the state of an NGINX service to the CLMC using a _fixed_ interval (say 10 seconds). During this _reporting_ period, the actual state of the NGINX service will be sampled (polled) by the plugin several times (say 10 each second). During any reporting period, the NGINX service _may_ transition from one state to another:
| State observation # | State |
| --- | --- |
| 1 | stopped |
| 2 | stopped |
| 3 | stopped |
| 4 | stopped |
| 5 | starting |
| 6 | starting |
| 7 | starting |
| 8 | starting |
| 9 | starting |
| 10 | starting |
_Above: example observations within a single reporting period of a media component configuration state_
Therefore each report will include for each state:
* The total time in the state for the reporting period
* The avarage time in the state for the reporting period
##### Endpoint configuration state model
##### Media component configuration state model
A media component configuration state model consists of the following states:
* stopped
* starting [transitional]
* running
* stopping [transitional]
An example measurement row for a media component configuration states is below:
| tags | stopped | avg_stopped | starting | avg_starting | running | avg_running | stopping | avg_stopping | time |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| \<global tags...\> | 0 | 0 | 4 | 0.4 | 6 | 0.6 | 0.0 | 0.0 | 0 |
In this example, the _reporting period_ is 10 seconds and with an observation rate of 1/second; the observed states were 'stopped' (4 observations) and 'starting' (6 observations).
\ No newline at end of file
......@@ -78,10 +78,6 @@ class Sim(object):
config_delay_dist = {"unplaced": [1, 0.68], "placing": [10, 0.68], "placed": [1, 0.68], "booting": [10, 0.68], "booted": [2, 0.68],
"connecting": [10, 0.68], "connected": [8, 0.68]}
# Simulation configuration of the media component (MC) state changes
# "MC state", [average (sec), stddev]
mc_config_delay_dist = {"stopped": [1, 0.68], "starting": [5, 0.68], "running": [1, 0.68], "stopping": [2, 0.68]}
print("\nSimulation started. Generating data...")
# Move endpoints from state unplaced to state placing
......@@ -128,31 +124,27 @@ class Sim(object):
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# move mpegdash_service media component state from 'stopped' to 'starting'
max_delay = 0
# move mpegdash_service media component state from 'stopped' to 'starting'
# Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting')
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['stopped'][0]
delay_std = delay_avg * mc_config_delay_dist['stopped'][1]
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'stopped', 'starting')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting' )
sim_time += TICK_TIME
# move mpegdash_service media component state from 'starting' to 'running'
max_delay = 0
# Total reports = 5, (4.7 seconds in 'starting', 0.3 seconds in 'running')
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['starting'][0]
delay_std = delay_avg * mc_config_delay_dist['starting'][1]
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'starting', 'running')
max_delay = max(delay_time, max_delay)
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
sim_time += max_delay
for i in range(0, 4):
self._writeMCSingleState(influxClient, 'mpegdash_service_config', 'starting', sim_time + (i * TICK_TIME))
self._changeMCState(influxClient, sim_time + (4 * TICK_TIME), 'mpegdash_service_config', 10, 7, 'starting', 'running')
sim_time += 5 * TICK_TIME
# Move endpoints from state booted to state connecting
max_delay = 0
......@@ -240,12 +232,12 @@ class Sim(object):
# remove requests processed off the queue
ip_endpoint['request_queue'] -= int(requests_processed)
# update mpegdash_service media component and endpoint state (continuously 'running' and 'connected' respectively)
# update endpoint state (continuously 'connected')
agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time,
**{'connected': float(TICK_TIME), 'avg_connected': float(TICK_TIME)}))
agent_db_client.write_points(lp.generate_mc_service_config("mpegdash_service_config",
{'running': float(TICK_TIME), 'avg_running': float(TICK_TIME)}, sim_time))
# update mpegdash_service media component state (continuously 'running')
self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time)
sim_time += TICK_TIME
......@@ -263,30 +255,24 @@ class Sim(object):
sim_time += max_delay
# move mpegdash_service media component state from 'running' to 'stopping'
max_delay = 0
# Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping')
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['running'][0]
delay_std = delay_avg * mc_config_delay_dist['running'][1]
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'running', 'stopping')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
self._writeMCSingleState( influxClient, 'mpegdash_service_config', 'running', sim_time )
self._changeMCState( influxClient, sim_time + TICK_TIME, 'mpegdash_service_config', 10, 8, 'running', 'stopping' )
sim_time += 2 * TICK_TIME
# move mpegdash_service media component state from 'stopping' to 'stopped'
max_delay = 0
# Total reports = 1, (0.9 seconds in 'stopping', 0.1 seconds in 'stopped')
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['stopping'][0]
delay_std = delay_avg * mc_config_delay_dist['stopping'][1]
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'stopping', 'stopped')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped')
sim_time += TICK_TIME
# End simulation
end_time = sim_time
......@@ -347,31 +333,49 @@ class Sim(object):
return total_delay_time
@staticmethod
def _changeMCState(agent_db_client, sim_time, mc_measurement, mu, sigma, trans_ratio, transition_state, next_state):
def _writeMCSingleState(influxClient, measurement, state, sim_time):
"""
Send INFLUX data indicating the time taken to transition to a new state
Write a single state as a sample over TICK_TIME
: influxClient - agent used to send metric data to CLMC
: measurement - name of influx measurement set
: state - state to be declared
: sim_time - time stamp for this measurement
Returns the total time delay for the state change
"""
# Calculate a randomized total time for the transition (and calculate relative ratios of time in transition and next state)
total_delay_time = max( random.normalvariate(mu, sigma), 1 ) # minimum total delay is 1 second
transition_time = total_delay_time * trans_ratio
next_state_time = total_delay_time - transition_time
state_stats = {}
state_stats[state] = float(TICK_TIME)
state_stats['avg_' + state] = float(TICK_TIME)
influxClient.write_points(lp.generate_mc_service_config(measurement, state_stats, sim_time))
@staticmethod
def _changeMCState(influxClient, sim_time, mc_measurement, sample_count, trans_sample_count, transition_state, next_state):
"""
Send INFLUX data indicating the time taken to transition to a new state
: influxClient - agent used to send metric data to CLMC
: sim_time - simulation time at start of state changing period
: mc_measurement - measurement name
: sample_count - the total number of samples in the reporting period (TICK_TIME)
: trans_sample_count - the number of samples in the transition state
: transition_state - the state being exited
: next_state - the state being entered
"""
mc_states = {}
# Report time in transition (and add the same as average)
mc_states[transition_state] = transition_time
mc_states["avg_" + transition_state] = transition_time
# Report time remaining in the next state (adding the same as the average)
mc_states[next_state] = next_state_time
mc_states["avg_" + next_state] = next_state_time
# Report total time in transition and its average of the reporting period
mc_states[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count
mc_states["avg_" + transition_state] = mc_states[transition_state] / float(TICK_TIME)
agent_db_client.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time))
# Use the time remaining as the length for the time in the next state
mc_states[next_state] = float(TICK_TIME) - mc_states[transition_state]
mc_states["avg_" + next_state] = mc_states[next_state] / float(TICK_TIME)
return total_delay_time
influxClient.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time))
def run_simulation(generate=True, sTime=3600):
......
......@@ -2,18 +2,24 @@
import pytest
import time
import random
class TestSimulation(object):
"""
A testing class used to group all the tests related to the simulation data
"""
@pytest.fixture(scope='class', autouse=True)
def run_simulator(self, simulator):
random.seed(0) # Seed random function so we can reliably test for average queries
print("Running simulation, please wait...")
simulator.run(3600)
print("Waiting for INFLUX to finish receiving simulation data...")
time.sleep(10) # wait for data to finish arriving at the INFLUX database
print( "... simulation data fixture finished" )
@pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "CLMCMetrics"."autogen"."cpu_usage"',
......@@ -33,9 +39,18 @@ class TestSimulation(object):
"count_avg_booted": 3607, "count_connecting": 3607, "count_avg_connecting": 3607, "count_connected": 3607, "count_avg_connected": 3607, "count_cpus": 3607, "count_memory": 3607, "count_storage": 3607}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3604, "count_avg_starting": 3604, "count_avg_stopped": 3604, "count_avg_stopping": 3604, "count_running": 3604, "count_starting": 3604, "count_stopped": 3604, "count_stopping": 3604}),
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3604, "count_avg_starting": 3604, "count_avg_stopped": 3604, "count_avg_stopping": 3604, "count_running": 3604, "count_starting": 3604, "count_stopped": 3604, "count_stopping": 3604}),
{"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <>0',
{"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}),
('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <>0',
{"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}),
('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <>0',
{"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}),
('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <>0',
{"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55})
])
def test_simulation(self, influx_db, query, expected_result):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment