Skip to content
Snippets Groups Projects
Commit 5e52ae60 authored by MJB's avatar MJB
Browse files

Merge branch 'mediaComponentConfig' of...

Merge branch 'mediaComponentConfig' of gitlab.it-innovation.soton.ac.uk:FLAME/flame-clmc into mediaComponentConfig
parents 4c6aa311 c5e5e6a7
Branches
Tags
No related merge requests found
......@@ -94,19 +94,54 @@ def _getNSTime(time):
return timestamp
def generate_mc_serviceConfig( mcLabel, sStop, asStop, sStart, asStart, time ):
result = [{ "measurement" : "mediaComponentConfig",
"tags" : { "mediaComp" : mcLabel },
"fields" :
{ "serviceStopped" : sStop,
"avgServiceStopped" : asStop,
"serviceStarted" : sStart,
"avgServiceStarted" : asStart
def generate_mc_service_config( mcMeasurement, stateTimeStats, time ):
validStats = validate_state_time_stats( stateTimeStats )
result = [{ "measurement" : mcMeasurement,
"fields" :
{ "stopped" : validStats['stopped'],
"avg_stopped" : validStats['avg_stopped'],
"starting" : validStats['starting'],
"avg_starting" : validStats['avg_starting'],
"running" : validStats['running'],
"avg_running" : validStats['avg_running'],
"stopping" : validStats['stopping'],
"avg_stopping" : validStats['avg_stopping']
},
"time" : _getNSTime(time) }]
return result
def validate_state_time_stats( stateTimeStats ):
if ( not 'stopped' in stateTimeStats ):
stateTimeStats['stopped'] = 0.0
if ( not 'avg_stopped' in stateTimeStats ):
stateTimeStats['avg_stopped'] = 0.0
if ( not 'starting' in stateTimeStats ):
stateTimeStats['starting'] = 0.0
if ( not 'avg_starting' in stateTimeStats ):
stateTimeStats['avg_starting'] = 0.0
if ( not 'running' in stateTimeStats ):
stateTimeStats['running'] = 0.0
if ( not 'avg_running' in stateTimeStats ):
stateTimeStats['avg_running'] = 0.0
if ( not 'stopping' in stateTimeStats ):
stateTimeStats['stopping'] = 0.0
if ( not 'avg_stopping' in stateTimeStats ):
stateTimeStats['avg_stopping'] = 0.0
return stateTimeStats
# DEPRECATED
# ____________________________________________________________________________
......
......@@ -5,7 +5,7 @@ import time
import urllib.parse
import pytest
import random
import sys
import sys, getopt
from influxdb import InfluxDBClient
# Simulation parameters
......@@ -23,6 +23,8 @@ AGENT2_URL = 'http://172.23.1.22:8186'
class Sim(object):
"""
Simulator for services
"""
......@@ -75,6 +77,10 @@ class Sim(object):
# endpoint state->mu, sigma, secs normal distribution
config_delay_dist = {"placing": [10, 0.68], "booting": [10, 0.68], "connecting": [10, 0.68]}
# Simulation configuration of the media component (MC) state changes
# "MC state", [average (sec), stddev]
mc_config_delay_dist = { "stopped":[1, 0.68], "starting": [5, 0.68], "running":[1, 0.68], "stopping": [2, 0.68]}
print("\nSimulation started. Generating data...")
# Place endpoints
......@@ -99,6 +105,32 @@ class Sim(object):
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# move mpegdash_service media component state from 'stopped' to 'starting'
max_delay = 0
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['stopped'][0]
delay_std = delay_avg * mc_config_delay_dist['stopped'][1]
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'stopped', 'starting')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# move mpegdash_service media component state from 'starting' to 'running'
max_delay = 0
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['starting'][0]
delay_std = delay_avg * mc_config_delay_dist['starting'][1]
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'starting', 'running')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# Connect endpoints
max_delay = 0
for ip_endpoint in ip_endpoints:
......@@ -174,7 +206,42 @@ class Sim(object):
# remove requests processed off the queue
ip_endpoint['request_queue'] -= int(requests_processed)
# update mpegdash_service media component state (continuously 'running')
state_stats = {}
state_stats['running'] = float(TICK_TIME)
state_stats['avg_running'] = float(TICK_TIME)
agent_db_client.write_points(lp.generate_mc_service_config("mpegdash_service_config",state_stats,sim_time))
sim_time += TICK_TIME
# Simulate tear-down of media components
# move mpegdash_service media component state from 'running' to 'stopping'
max_delay = 0
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['running'][0]
delay_std = delay_avg * mc_config_delay_dist['running'][1]
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'running', 'stopping')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# move mpegdash_service media component state from 'stopping' to 'stopped'
max_delay = 0
for ip_endpoint in ip_endpoints:
agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
delay_avg = mc_config_delay_dist['stopping'][0]
delay_std = delay_avg * mc_config_delay_dist['stopping'][1]
delay_time = self._changeMCState(agent_db_client, sim_time, "mpegdash_service_config", delay_avg, delay_std, 0.7, 'stopping', 'stopped')
max_delay = max(delay_time, max_delay)
sim_time += max_delay
# End simulation
end_time = sim_time
print("Simulation Finished. Start time {0}. End time {1}. Total time {2}".format(start_time, end_time,
end_time - start_time))
......@@ -223,90 +290,35 @@ class Sim(object):
return delay_time
## PYTEST FIXTURES START
## ------------------------------------------------------------------------------------------------------
@pytest.fixture(scope='module')
def run_simulation_fixture(streaming_sim_config):
"""
A fixture, which checks if the the DB has been created, if not it runs the simulator with a 10 seconds timeout after that
"""
influx_db_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
agent1_url = "http://" + streaming_sim_config['hosts'][1]['ip_address'] + ":8186"
agent2_url = "http://" + streaming_sim_config['hosts'][2]['ip_address'] + ":8186"
global INFLUX_DB_URL
global INFLUX_DB_NAME
global SIMULATION_TIME_SEC
global AGENT1_URL
global AGENT2_URL
simulator = Sim(influx_db_url, INFLUX_DB_NAME, agent1_url, agent2_url)
dbs = simulator.db_client.get_list_database()
dbs = [db.get("name") for db in dbs]
# This check needed to be disabled as the CLMCMetrics database is always created when
# the test starts, irrespective of whether this is the 1st time or not
# if INFLUX_DB_NAME not in dbs:
simulator.reset()
simulator.run(SIMULATION_TIME_SEC)
print("10 seconds timeout is given so that the data could properly be inserted into the database.")
import time
time.sleep(10)
### Media Component Configuration fixtures
"""
Line Protocol report format:
mediaComponentConfig <global tags>,<mediaComp> <configState1=milliseconds>,<configState2=milliseconds> time
"""
@pytest.fixture(scope='module')
def reportMC_ServiceState(streaming_sim_config):
"""
Report to determine whether the simulated streaming servers on the endpoints is running.
"""
# IPEndpoint 1 scenario
# 10 second period: Stopped --> started --> stopped --> started
# Stopped state: 1000 + 500 = 1500 [avg = 750]
# Started state: 3000 + 5500 = 8500 [avg = 4250]
@staticmethod
def _changeMCState(agent_db_client, sim_time, mc_measurement, mu, sigma, trans_ratio, transition_state, next_state):
"""
Send INFLUX data indicating the time taken to transition to a new state
epURL = streaming_sim_config['hosts'][1]['ip_address']
idbc = InfluxDBClient( host=epURL, port=8186, database=INFLUX_DB_NAME, timeout=10 )
Returns the total time delay for the state change
"""
idbc.write_points( lp.generate_mc_serviceConfig( "apache", 1500, 750, 8500, 4250, 0) )
# Calculate a randomized total time for the transition (and calculate relative ratios of time in transition and next state)
total_delay_time = max( random.normalvariate(mu, sigma), 1 ) # minimum total delay is 1 second
transition_time = total_delay_time * trans_ratio
next_state_time = total_delay_time - transition_time
# IPEndpoint 2 scenario
# 10 second period: Stopped --> started --> stopped --> started
# Stopped state: 250 + 250 = 500 [avg = 250]
# Started state: 3500 + 6000 = 9500 [avg = 4750]
mc_states = {}
# Report time in transition (and add the same as average)
mc_states[transition_state] = transition_time
mc_states["avg_" +transition_state] = transition_time
epURL = streaming_sim_config['hosts'][2]['ip_address']
idbc = InfluxDBClient( host=epURL, port=8186, database=INFLUX_DB_NAME, timeout=10 )
# Report time remaining in the next state (adding the same as the average)
mc_states[next_state] = next_state_time
mc_states["avg_" +next_state] = next_state_time
idbc.write_points( lp.generate_mc_serviceConfig( "apache", 250, 250, 9500, 4750, 0) )
agent_db_client.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time))
@pytest.fixture(scope='module')
def reportMC_ConfigFileState(streaming_sim_config):
"""
Report to determine whether the simulated streaming servers configuration on the endpoints is correct
"""
return total_delay_time
@pytest.fixture(scope='module')
def reportMC_APIStatus(streaming_sim_config):
"""
Report to determine whether the simulated streaming servers API status call returns good
"""
## ------------------------------------------------------------------------------------------------------
## PYTEST FIXTURES END
def run_simulation(generate=True):
def run_simulation(generate=True, sTime=3600):
"""
A method which runs the data generation simulator
:param generate: True for generating data, False for deleting the DB (optional argument, if not given, default value True is used)
......@@ -318,6 +330,8 @@ def run_simulation(generate=True):
global AGENT1_URL
global AGENT2_URL
SIMULATION_TIME_SEC = sTime
simulator = Sim(INFLUX_DB_URL, INFLUX_DB_NAME, AGENT1_URL, AGENT2_URL)
if generate:
......@@ -326,18 +340,36 @@ def run_simulation(generate=True):
else:
simulator.db_client.drop_database(simulator.influx_db_name)
if __name__ == "__main__":
"""
The main entry for this module. Code here is executed only if the StreamingSim.py file is executed,
but not when it's imported in another module
"""
# check if there are any command line arguments given when executing the module
if len(sys.argv) > 1:
# if CLI argument '-c' is set when executing the script, the influx db will be deleted instead of generating data
option = str(sys.argv[1]) != "-c"
run_simulation(generate=option)
# Default options for simulation (generate data and simulation time of 3600 seconds)
genOpt = True
simTime = 60 * 60
# Try get some options
try:
opts, args = getopt.getopt( sys.argv[1:], "c:t:", ['clear','time='])
except getopt.GetoptError:
print( 'StreamingSim.py -c -t <seconds>' )
sys.exit(2)
# Apply options, if any
for opt, arg in opts:
if opt in ( '-c','--clear' ):
genOpt = False
elif opt in ('-t','--time'):
simTime = arg
if ( genOpt == True ):
print( "Running simulation to generate data" )
print( "Time period for this simulation: " + str(simTime) + " seconds" )
else:
# no argument is given to the function call, hence the default value True is used
run_simulation()
print( "Clearing simulation data" )
run_simulation( genOpt, simTime )
......@@ -4,6 +4,7 @@ import pytest
import yaml
import pkg_resources
from influxdb import InfluxDBClient
from clmctest.monitoring.StreamingSim import Sim
@pytest.fixture(scope="module")
......@@ -21,9 +22,8 @@ def streaming_sim_config():
data_loaded = yaml.load(stream)
return data_loaded
@pytest.fixture(params=[{'database': 'CLMCMetrics'}], scope='module')
def influxdb(streaming_sim_config, request):
def influx_db(streaming_sim_config, request):
"""
Creates an Influx DB client for the CLMC metrics database
......@@ -32,7 +32,21 @@ def influxdb(streaming_sim_config, request):
:return: the created Influx DB client
"""
db = InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port=8086, database=request.param['database'], timeout=10)
db.drop_database(request.param['database'])
return db
return InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port='8086', database=request.param['database'], timeout=10)
@pytest.fixture(scope="module")
def simulator( streaming_sim_config ):
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
influx_db_name = streaming_sim_config['hosts'][1]['database_name']
agent1_url = "http://" + streaming_sim_config['hosts'][1]['ip_address'] + ":8186"
agent2_url = "http://" + streaming_sim_config['hosts'][2]['ip_address'] + ":8186"
simulator = Sim( influx_url, influx_db_name, agent1_url, agent2_url )
dbs = simulator.db_client.get_list_database()
dbs = [db.get("name") for db in dbs]
simulator.reset()
return simulator
#!/usr/bin/python3
import pytest
from clmctest.monitoring.StreamingSim import reportMC_ServiceState
class TestMediaComponentConfig( object ):
"""
Test class to check the reported output of a media component's configuration test
as it is reported to the CLMC
"""
# For data sent to INFLUX, see reportMC_ServiceState function in StreamingSim.py
@pytest.mark.parametrize( "query, expectedResult", [
('SELECT "serviceStopped", "avgServiceStopped", "serviceStarted", "avgServiceStarted" FROM "CLMCMetrics"."autogen"."mediaComponentConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time" : "1970-01-01T00:00:00Z", "serviceStopped" : 1500, "avgServiceStopped" : 750, "serviceStarted" : 8500, "avgServiceStarted" : 4250}),
('SELECT "serviceStopped", "avgServiceStopped", "serviceStarted", "avgServiceStarted" FROM "CLMCMetrics"."autogen"."mediaComponentConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time" : "1970-01-01T00:00:00Z", "serviceStopped" : 250, "avgServiceStopped" : 250, "serviceStarted" : 9500, "avgServiceStarted" : 4750})
])
def test_serviceStateReport( self, query, expectedResult, get_db_client, reportMC_ServiceState ):
"""
:param query: the LineProtocol query to search for MC service state report
:param expectedResult: the JSON result obtained from the query
:param get_db_client: fixture from conftest.py returning INFLUX query client
:param reportMC_ServiceState: the fixture that makes the service state report
"""
print( "\n" ) # White space for output
# Query for result and report problems with query if necessary
queryResult = get_db_client.query( query, raise_errors = False )
assert queryResult.error is None, "An error occurred executing query {0}."
# Get result and evaluate
actualResult = next( queryResult.get_points() )
assert expectedResult == actualResult, "FAIL" #self.outputComparison( expectedResult, actualResult )
print ("Media Component service state test succeeded: {0}".format( query ))
#!/usr/bin/python3
import pytest
from clmctest.monitoring.StreamingSim import run_simulation_fixture
import time
class TestSimulation(object):
"""
A testing class used to group all the tests related to the simulation data
"""
@pytest.fixture(scope='class')
def run_simulator( self, simulator ):
simulator.run( 3600 )
print( "Waiting for INFLUX to finish receiving simulation data..." )
time.sleep( 10 ) # wait for data to finish arriving at the INFLUX database
@pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "CLMCMetrics"."autogen"."cpu_usage"',
......@@ -19,24 +25,28 @@ class TestSimulation(object):
('SELECT count(*) FROM "CLMCMetrics"."autogen"."net_port_io"',
{"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."vm_res_alloc"',
{"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12})
{"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
{"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3604, "count_avg_starting" : 3604, "count_avg_stopped" : 3604, "count_avg_stopping" : 3604, "count_running" : 3604, "count_starting" : 3604, "count_stopped" : 3604, "count_stopping" : 3604}),
('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
{"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3604, "count_avg_starting" : 3604, "count_avg_stopped" : 3604, "count_avg_stopping" : 3604, "count_running" : 3604, "count_starting" : 3604, "count_stopped" : 3604, "count_stopping" : 3604}),
])
def test_simulation(self, query, expected_result, influxdb, run_simulation_fixture):
def test_simulation( self, run_simulator, influx_db, query, expected_result ):
"""
This is the entry point of the test. This method will be found and executed when the module is ran using pytest
:param query: the query to execute (value obtained from the pytest parameter decorator)
:param expected_result: the result expected from executing the query (value obtained from the pytest parameter decorator)
:param influxdb the import db client fixture - imported from contest.py
:param influx_db the import db client fixture - imported from contest.py
:param run_simulation_fixture: the imported fixture to use to generate the testing data - the return value of the fixture is not needed in this case
"""
# pytest automatically goes through all queries under test, declared in the parameters decorator
print("\n") # prints a blank line for formatting purposes
# the raise_errors=False argument is given so that we could actually test that the DB didn't return any errors instead of raising an exception
query_result = influxdb.query(query, raise_errors=False)
query_result = influx_db.query(query, raise_errors=False)
# test the error attribute of the result is None, that is no error is returned from executing the DB query
assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment