Skip to content
Snippets Groups Projects
Commit c34d2e78 authored by MJB's avatar MJB
Browse files

Merge branch 'clmcservice' of gitlab.it-innovation.soton.ac.uk:FLAME/flame-clmc into integration

parents 63bc9d34 478adc7e
No related branches found
No related tags found
No related merge requests found
Showing
with 1393 additions and 302 deletions
[run]
source = CLMCservice
omit = CLMCservice/tests.py
...@@ -10,6 +10,6 @@ ubuntu-xenial-16.04-cloudimg-console.log ...@@ -10,6 +10,6 @@ ubuntu-xenial-16.04-cloudimg-console.log
.idea/ .idea/
*.egg *.egg
*.pyc *.pyc
.pytest_cache
.tox .tox
*$py.class *$py.class
**/.pytest_cache/
from pyramid.config import Configurator
from pyramid.settings import asbool
from CLMCservice.views import AggregatorConfig
def main(global_config, **settings):
""" This function returns a Pyramid WSGI application."""
# a conversion is necessary so that the configuration value of the aggregator is stored as bool and not as string
aggregator_running = asbool(settings.get('aggregator_running', 'false'))
settings['aggregator_running'] = aggregator_running
config = Configurator(settings=settings)
config.add_route('aggregator', '/aggregator')
config.add_view(AggregatorConfig, attr='get', request_method='GET')
config.add_view(AggregatorConfig, attr='post', request_method='POST')
config.scan()
return config.make_wsgi_app()
import pytest
from pyramid import testing
from pyramid.httpexceptions import HTTPBadRequest
class TestAggregatorConfig(object):
"""
A pytest-implementation test for the aggregator configuration API calls
"""
@pytest.fixture(autouse=True)
def app_config(self):
"""
A fixture to implement setUp/tearDown functionality for all tests by initializing configuration structure for the web service
"""
self.config = testing.setUp()
self.config.add_settings({'aggregator_running': False})
yield
testing.tearDown()
def test_GET(self):
"""
Tests the GET method for the status of the aggregator.
"""
from CLMCservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
request = testing.DummyRequest()
response = AggregatorConfig(request).get()
assert type(response) == dict, "Response must be a dictionary representing a JSON object."
assert not response.get('aggregator_running'), "The response of the API call must return the aggregator status being set as False"
assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator status."
@pytest.mark.parametrize("input_val, output_val", [
("True", True),
("true", True),
("1", True),
("False", False),
("false", False),
("0", False),
("t", None),
("f", None),
])
def test_POST(self, input_val, output_val):
"""
Tests the POST method for the status of the aggregator
:param input_val: the input form parameter
:param output_val: the expected output value, None for expecting an Exception
"""
from CLMCservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
request = testing.DummyRequest()
request.params['running'] = input_val
if output_val is not None:
response = AggregatorConfig(request).post()
assert response == {'aggregator_running': output_val}, "Response of POST request must include the new status of the aggregator"
assert self.config.get_settings().get('aggregator_running') == output_val, "Aggregator status must be updated to running."
else:
error_raised = False
try:
AggregatorConfig(request).post()
except HTTPBadRequest:
error_raised = True
assert error_raised, "Error must be raised in case of an invalid argument."
def str_to_bool(value):
"""
A utility function to convert a string to boolean based on simple rules.
:param value: the value to convert
:return: True or False
:raises ValueError: if value cannot be converted to boolean
"""
if type(value) is not str:
raise ValueError("This method only converts string to booolean.")
if value in ('False', 'false', '0'):
return False
elif value in ('True', 'true', '1'):
return True
else:
raise ValueError("Invalid argument for conversion")
from pyramid.view import view_defaults
from pyramid.httpexceptions import HTTPBadRequest
from CLMCservice.utilities import str_to_bool
@view_defaults(route_name='aggregator', renderer='json')
class AggregatorConfig(object):
"""
A class-based view for accessing and mutating the status of the aggregator.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the status of the aggregator.
:return: A JSON response with the status of the aggregator.
"""
aggregator_running = self.request.registry.settings.get('aggregator_running')
return {'aggregator_running': aggregator_running}
def post(self):
"""
A POST API call for the status of the aggregator.
:return: A JSON response to the POST call (success or fail).
:raises HTTPBadRequest: if form argument cannot be converted to boolean
"""
new_status = self.request.params.get('running')
try:
new_status = str_to_bool(new_status)
except ValueError:
raise HTTPBadRequest("Bad request parameter - expected a boolean, received {0}".format(self.request.params.get('running')))
self.request.registry.settings['aggregator_running'] = new_status
# TODO start/stop aggregator based on value of new status
return {'aggregator_running': new_status}
...@@ -104,45 +104,4 @@ Then the package is installed ...@@ -104,45 +104,4 @@ Then the package is installed
Then the tests are run Then the tests are run
`sudo apt-get install python3-pip` `vagrant --fixture=scripts -- ssh test-runner -- -tt "pytest -s --pyargs clmctest.scripts"`
`pip3 install pytest`
#### CLMC Service
The CLMC service is implemented using the Pyramid framework. (currently under development)
Before installing the CLMC service and its dependencies, it is recommended to use a virtual environment. To manage virtual
environments, **virtualenvwrapper** can be used.
```
pip install virtualenvwrapper
```
To create a virtual environment use the **mkvirtualenv** command:
```
mkvirtualenv CLMC
```
When created, you should already be set to use the new virtual environment, but to make sure of this use the **workon** command:
```
workon CLMC
```
Now, any installed libraries will be specificly installed in this environment only. To install and use the CLMC service
locally, the easiest thing to do is to use **pip** (make sure you are in the root folder of the project - ***flame-clmc***):
```
pip install -e .
```
Finally, start the service on localhost by using pyramid's **pserve**:
```
pserve development.ini --reload
```
You should now be able to see the 'Hello world' message when visiting **http://localhost:8080** in your browser.
...@@ -59,6 +59,10 @@ class Aggregator(Thread): ...@@ -59,6 +59,10 @@ class Aggregator(Thread):
# a stop flag event object used to handle the killing of the thread # a stop flag event object used to handle the killing of the thread
self._stop_flag = Event() self._stop_flag = Event()
# a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
self.network_cache = {}
self.service_cache = {}
def stop(self): def stop(self):
""" """
A method used to stop the thread. A method used to stop the thread.
...@@ -66,11 +70,23 @@ class Aggregator(Thread): ...@@ -66,11 +70,23 @@ class Aggregator(Thread):
self._stop_flag.set() self._stop_flag.set()
def set_event_lock(self, event):
"""
Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing).
:param event: the event lock object
"""
setattr(self, 'event', event)
def run(self): def run(self):
""" """
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
""" """
if hasattr(self, 'event'):
self.event.set()
current_time = int(time()) current_time = int(time())
while True: while True:
if self._stop_flag.is_set(): if self._stop_flag.is_set():
...@@ -84,61 +100,73 @@ class Aggregator(Thread): ...@@ -84,61 +100,73 @@ class Aggregator(Thread):
# query the network delays and group them by path ID # query the network delays and group them by path ID
network_delays = {} network_delays = {}
result = self.db_client.query( result = self.db_client.query(
'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano)) boundary_time_nano, current_time_nano))
for item in result.items(): for item in result.items():
metadata, result_points = item metadata, result_points = item
# measurement = metadata[0] # measurement = metadata[0]
tags = metadata[1] tags = metadata[1]
network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] result = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
# query the service delays and group them by endpoint, service function instance and sfr # query the service delays and group them by endpoint, service function instance and sfr
service_delays = {} service_delays = {}
result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
for item in result.items(): for item in result.items():
metadata, result_points = item metadata, result_points = item
# measurement = metadata[0] # measurement = metadata[0]
tags = metadata[1] tags = metadata[1]
service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) result = next(result_points)
service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays: for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path path_id, source, target = path
if target not in service_delays: if target not in service_delays and target not in self.service_cache:
# if not continue with the other network path reports # if not continue with the other network path reports
continue continue
e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "time": boundary_time} "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time}
e2e_arguments['path_ID'] = path_id e2e_arguments['path_ID'] = path_id
e2e_arguments['delay_forward'] = network_delays[path] e2e_arguments['source_SFR'] = source
e2e_arguments['target_SFR'] = target
e2e_arguments['delay_forward'] = network_delays[path][0]
e2e_arguments['avg_bandwidth'] = network_delays[path][1]
# reverse the path ID to get the network delay for the reversed path # reverse the path ID to get the network delay for the reversed path
reversed_path = (path_id, target, source) reversed_path = (path_id, target, source)
assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A if reversed_path in network_delays or reversed_path in self.network_cache:
e2e_arguments['delay_reverse'] = network_delays[reversed_path] # get the reverse delay, use the latest value if reported or the cache value
e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0]
else:
e2e_arguments['delay_reverse'] = None
# get the response time of the media component connected to the target SFR # get the response time of the media component connected to the target SFR
service_delay = service_delays[target] service_delay = service_delays.get(target, self.service_cache.get(target))
response_time, endpoint, sf_instance = service_delay response_time, request_size, response_size, endpoint, sf_instance = service_delay
# put these points in the e2e arguments dictionary # put these points in the e2e arguments dictionary
e2e_arguments['delay_service'] = response_time e2e_arguments['delay_service'] = response_time
e2e_arguments['avg_request_size'] = request_size
e2e_arguments['avg_response_size'] = response_size
e2e_arguments['endpoint'] = endpoint e2e_arguments['endpoint'] = endpoint
e2e_arguments['sf_instance'] = sf_instance e2e_arguments['sf_instance'] = sf_instance
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.items(): if None not in e2e_arguments.values():
self.db_client.write_points( self.db_client.write_points(
lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
e2e_arguments['time'])) e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time']))
old_timestamp = current_time old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed # wait until {REPORT_PERIOD} seconds have passed
while current_time != old_timestamp + self.REPORT_PERIOD: while current_time < old_timestamp + self.REPORT_PERIOD:
sleep(1) sleep(1)
current_time = int(time()) current_time = int(time())
......
...@@ -74,49 +74,79 @@ class Simulator(object): ...@@ -74,49 +74,79 @@ class Simulator(object):
# all network delays start from 1ms, the dictionary stores the information to report # all network delays start from 1ms, the dictionary stores the information to report
paths = [ paths = [
{'target': 'SR3', {
'source': 'SR1', 'target': 'SR2',
'path_id': 'SR1---SR3', 'source': 'SR1',
'network_delay': 1}, 'path_id': 'SR1---SR2',
{'target': 'SR1', 'latency': 5,
'source': 'SR3', 'bandwidth': 100*1024*1024
'path_id': 'SR1---SR3', },
'network_delay': 1} {
'target': 'SR1',
'source': 'SR2',
'path_id': 'SR1---SR2',
'latency': 5,
'bandwidth': 100*1024*1024
},
{
'target': 'SR3',
'source': 'SR1',
'path_id': 'SR1---SR3',
'latency': 5,
'bandwidth': 100*1024*1024
},
{
'target': 'SR1',
'source': 'SR3',
'path_id': 'SR1---SR3',
'latency': 5,
'bandwidth': 100*1024*1024
}
] ]
service_function_instances = [
{
'endpoint': 'ms1.flame.org',
'sf_instance': 'sr2.ms1.flame.org', # TODO: what did we decide the sf_instance would look like?
'sfr': 'SR2',
'service_delay': 40,
'cpus': 1
},
{
'endpoint': 'ms1.flame.org',
'sf_instance': 'sr3.ms1.flame.org', # TODO: what did we decide the sf_instance would look like?
'sfr': 'SR3',
'service_delay': 10,
'cpus': 4
}
]
av_request_size = 10 * 1024 * 1024 # average request size measured by service function / Bytes
av_response_size = 1 * 1024 # average request size measured by service function / Bytes
# current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time # current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time
start_time = int(time.time()) start_time = int(time.time())
sim_time = start_time sim_time = start_time
mean_delay_seconds_media = 10 # initial mean media service delay sample_period_net = 1 # sample period for reporting network delays (measured in seconds)
sample_period_net = 2 # sample period for reporting network delays (measured in seconds) - net measurements reported every 2s sample_period_media = 5 # sample period for reporting media service delays (measured in seconds)
sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) - service measurements reported every 5 seconds
for i in range(0, self.SIMULATION_LENGTH): for i in range(0, self.SIMULATION_LENGTH):
# measure net delay every 2 seconds for path SR1-SR3 (generates on tick 0, 2, 4, 6, 8, 10.. etc.) # report one of the network delays every sample_period_net seconds
if i % sample_period_net == 0: if i % sample_period_net == 0:
path = paths[0] path = random.choice(paths)
self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time)) self.db_client.write_points(
lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['latency'], path['bandwidth'], sim_time))
# increase/decrease the delay in every sample report (min delay is 1) # increase/decrease the delay in every sample report (min delay is 1)
path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) path['latency'] = max(1, path['latency'] + random.randint(-3, 3))
# measure net delay every 2 seconds for path SR2-SR3 (generates on tick 1, 3, 5, 7, 9, 11.. etc.)
if (i+1) % sample_period_net == 0:
path = paths[1]
self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time))
# increase/decrease the delay in every sample report (min delay is 1) # report one of the service_function_instance response times every sample_period_media seconds
path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3))
# measure service response time every 5 seconds
if i % sample_period_media == 0: if i % sample_period_media == 0:
self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "endpoint-1", service = random.choice(service_function_instances)
"ms-A.ict-flame.eu", "SR3", sim_time)) self.db_client.write_points(lp.generate_service_delay_report(
service['endpoint'], service['sf_instance'], service['sfr'], service['service_delay'], av_request_size, av_response_size, sim_time))
# increase/decrease the delay in every sample report (min delay is 10)
mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)]))
# increase the time by one simulation tick # increase the time by one simulation tick
sim_time += self.TICK sim_time += self.TICK
......
...@@ -29,18 +29,21 @@ import uuid ...@@ -29,18 +29,21 @@ import uuid
from random import randint from random import randint
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time): def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
""" """
Generates a combined averaged measurement about the e2e delay and its contributing parts Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_ID: The path identifier, which is a bidirectional path ID for the request and the response path :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_SFR: source service router :param source_sfr: source service router
:param target_SFR: target service router :param target_sfr: target service router
:param endpoint: endpoint of the media component :param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component) :param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction) :param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction) :param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time :param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:param time: measurement timestamp :param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
...@@ -56,7 +59,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst ...@@ -56,7 +59,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst
"fields": { "fields": {
"delay_forward": float(delay_forward), "delay_forward": float(delay_forward),
"delay_reverse": float(delay_reverse), "delay_reverse": float(delay_reverse),
"delay_service": float(delay_service) "delay_service": float(delay_service),
"avg_request_size": float(avg_request_size),
"avg_response_size": float(avg_response_size),
"avg_bandwidth": float(avg_bandwidth)
}, },
"time": _getNSTime(time) "time": _getNSTime(time)
}] }]
...@@ -64,14 +70,15 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst ...@@ -64,14 +70,15 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst
return result return result
def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, time): def generate_network_delay_report(path_id, source_sfr, target_sfr, latency, bandwidth, time):
""" """
Generates a platform measurement about the network delay between two specific service routers. Generates a platform measurement about the network delay between two specific service routers.
:param path_id: the identifier of the path between the two service routers :param path_id: the identifier of the path between the two service routers
:param source_sfr: the source service router :param source_sfr: the source service router
:param target_sfr: the target service router :param target_sfr: the target service router
:param e2e_delay: the e2e network delay for traversing the path between the two service routers :param latency: the e2e network delay for traversing the path between the two service routers
:param bandwidth: the bandwidth of the path (minimum of bandwidths of the links it is composed of)
:param time: the measurement timestamp :param time: the measurement timestamp
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
...@@ -83,7 +90,8 @@ def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, ti ...@@ -83,7 +90,8 @@ def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, ti
"target": target_sfr "target": target_sfr
}, },
"fields": { "fields": {
"delay": e2e_delay "latency": latency,
"bandwidth": bandwidth
}, },
"time": _getNSTime(time) "time": _getNSTime(time)
}] }]
...@@ -91,14 +99,16 @@ def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, ti ...@@ -91,14 +99,16 @@ def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, ti
return result return result
def generate_service_delay_report(response_time, endpoint, sf_instance, sfr, time): def generate_service_delay_report(endpoint, sf_instance, sfr, response_time, request_size, response_size, time):
""" """
Generates a service measurement about the media service response time. Generates a service measurement about the media service response time.
:param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service component)
:param endpoint: endpoint of the media component :param endpoint: endpoint of the media component
:param sf_instance: service function instance :param sf_instance: service function instance
:param sfr: the service function router that connects the endpoint of the SF instance to the FLAME network :param sfr: the service function router that connects the endpoint of the SF instance to the FLAME network
:param response_time: the media service response time (this is not the response time for the whole round-trip, but only for the processing part of the media service component)
:param request_size: the size of the request received by the service in Bytes
:param response_size: the size of the response received by the service in Bytes
:param time: the measurement timestamp :param time: the measurement timestamp
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
...@@ -111,6 +121,8 @@ def generate_service_delay_report(response_time, endpoint, sf_instance, sfr, tim ...@@ -111,6 +121,8 @@ def generate_service_delay_report(response_time, endpoint, sf_instance, sfr, tim
}, },
"fields": { "fields": {
"response_time": response_time, "response_time": response_time,
"request_size": request_size,
"response_size": response_size
}, },
"time": _getNSTime(time) "time": _getNSTime(time)
}] }]
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
import pytest import pytest
import random import random
import time import time
import threading
class TestE2ESimulation(object): class TestE2ESimulation(object):
...@@ -44,8 +45,12 @@ class TestE2ESimulation(object): ...@@ -44,8 +45,12 @@ class TestE2ESimulation(object):
random.seed(0) # Seed random function so we can reliably test for average queries random.seed(0) # Seed random function so we can reliably test for average queries
print("Starting aggregator...") print("Starting aggregator...")
event = threading.Event()
e2e_aggregator.set_event_lock(event)
e2e_aggregator.start() e2e_aggregator.start()
event.wait() # wait until the aggregator thread has set the event lock (it has reached its run method and is ready to start)
print("Running simulation, please wait...") print("Running simulation, please wait...")
e2e_simulator.run() e2e_simulator.run()
...@@ -56,17 +61,18 @@ class TestE2ESimulation(object): ...@@ -56,17 +61,18 @@ class TestE2ESimulation(object):
print("... stopping aggregator") print("... stopping aggregator")
e2e_aggregator.stop() e2e_aggregator.stop()
@pytest.mark.parametrize("query, expected_result", [ @pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"',
{"time": "1970-01-01T00:00:00Z", "count_delay": 120}), {"time": "1970-01-01T00:00:00Z", "count_latency": 120, "count_bandwidth": 120}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), {"time": "1970-01-01T00:00:00Z", "count_response_time": 24, "count_request_size": 24, "count_response_size": 24}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "count_delay_forward": 24, "count_delay_reverse": 24, "count_delay_service": 24}), {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 38, "count_delay_reverse": 38, "count_delay_service": 38,
"count_avg_request_size": 38, "count_avg_response_size": 38, "count_avg_bandwidth": 38}),
('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 13.159722222222223, "mean_delay_reverse": 3.256944444444444, "mean_delay_service": 32.791666666666664}), {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 8.010964912280702, "mean_delay_reverse": 12.881578947368423, "mean_delay_service": 23.42105263157895,
'mean_avg_request_size': 10485760, 'mean_avg_response_size': 1024, 'mean_avg_bandwidth': 104857600}),
]) ])
def test_simulation(self, influx_db, query, expected_result): def test_simulation(self, influx_db, query, expected_result):
""" """
......
<!--
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 02-05-2018
// Created for Project : FLAME
-->
# **Flame CLMC Service Documentation**
#### **Authors**
|Authors|Organisation|
|---|---|
|[Nikolay Stanchev](mailto:ns17@it-innovation.soton.ac.uk)|[University of Southampton, IT Innovation Centre](http://www.it-innovation.soton.ac.uk)|
#### Description
This document describes the CLMC service and its API endpoints. The CLMC service is implemented in the *Python* framework called **Pyramid**.
It offers different API endpoints to configure and control the aggregator, which is an essential part in the process of measuring the end-to-end performance.
All source code, tests and configuration files of the service can be found in the **src/clmc-webservice** folder.
#### API Endpoints
* **GET** ***/aggregator/config***
This API method retrieves information about the configuration of the aggregator.
* Response:
Returns a JSON-formatted response with the configuration data of the aggregator - *aggregator_report_period*, *aggregator_database_name*,
*aggregator_database_url*.
* Response Body Example:
```json
{
"aggregator_report_period": 5,
"aggregator_database_name": "E2EMetrics",
"aggregator_database_url": "http://172.40.231.51:8086"
}
```
* **PUT** ***/aggregator/config***
This API method updates the configuration of the aggregator.
* Request:
Expects a JSON-formatted request body with the new configuration of the aggregator. The body should contain only
three key fields - *aggregator_report_period* (positive integer, seconds), *aggregator_database_name* and *aggregator_database_url* (a valid URL).
* Request Body Example:
```json
{
"aggregator_report_period": 25,
"aggregator_database_name": "E2EMetrics",
"aggregator_database_url": "http://172.50.231.61:8086"
}
```
* Response:
The body of the request is first validated before updating the configuration. If validation is successful, returns
a JSON-formatted response with the new configuration data. Otherwise, an **HTTP Bad Request** response is returned.
* Response Body Example:
```json
{
"aggregator_report_period": 25,
"aggregator_database_name": "E2EMetrics",
"aggregator_database_url": "http://172.50.231.61:8086"
}
```
* Notes:
If the configuration is updated, while the aggregator is running, it is not automatically restarted. An explicit API call
must be made with a *restart* request to apply the updated configuration. In the case of such PUT request as the one described
above, the response will contain more information indicating that the configuration of the aggregator is in a malformed state.
* Response Body Example:
```json
{
"aggregator_report_period": 125,
"aggregator_database_name": "E2EMetrics",
"aggregator_database_url": "http://172.50.231.61:8086/",
"malformed": true,
"comment": "Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used."
}
```
* **GET** ***/aggregator/control***
This API method retrieves information about the status of the aggregator - whether it is running or not.
* Response:
Returns a JSON-formatted response with the status data of the aggregator - *aggregator_running* field. If the aggregator
is running in a malformed state, the response will also indicate this with two additional fields - *malformed* and *comment*.
* Response Body Example:
```json
{
"aggregator_running": true
}
```
* Response Body Example - for malformed configuration:
```json
{
"aggregator_running": true,
"malformed": true,
"comment": "Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used."
}
```
* **PUT** ***/aggregator/control***
This API method updates the status of the aggregator - a user can start, stop or restart it.
* Request:
Expects a JSON-formatted request body with the new status of the aggregator. The body should contain only one key
field - *action* (the action to undertake, which can be **start**, **restart** or **stop**)
* Request Body Example:
```json
{
"action": "start"
}
```
* Response:
The body of the request is first validated before taking any actions. If the action is not one of the listed above,
then the validation will fail. If validation is successful, returns a JSON-formatted response with the new status of
the aggregator. Otherwise, an **HTTP Bad Request** response is returned.
* Response Body Example:
```json
{
"aggregator_running": true
}
```
* Notes:
* If a **start** action is requested, while the aggregator is running, then the request will be ignored. To restart the
aggregator, a user should use a **restart** action.
* If a **stop** action is requested, while the aggregator is not running, then the request will be ignored.
* A request with a **restart** action, while the aggregator is not running, has the same functionality as a request
with a **start** action.
* The functionality of a request with a **restart** action is the same as the functionlity of a **stop** action
followed by a **start** action.
#### Installing and running the CLMC service (development mode)
Before installing the CLMC service and its dependencies, it is recommended to use a python virtual environment. To easily
manage virtual environments, **virtualenvwrapper** can be used.
```
pip install virtualenvwrapper
```
To create a virtual environment use the **mkvirtualenv** command:
```
mkvirtualenv CLMC
```
When created, you should already be set to use the new virtual environment, but to make sure of this use the **workon** command:
```
workon CLMC
```
Now, any installed libraries will be installed relative to this environment only.
The easiest way to install and use the CLMC service locally is to use **pip**. Navigate to the clmc-webservice folder:
```
cd src/clmc-webservice
```
Test the CLMC service using **tox** along with the ***tox.ini*** configuration file. If tox is not installed run:
```
pip install tox
```
After it is installed, simply use the **tox** command:
```
tox
```
Then install the service in development mode.
```
pip install -e .
```
Finally, start the service on localhost by using pyramid's **pserve**:
```
pserve development.ini --reload
```
You should now be able to make requests to the CLMC service on http://localhost:8080/aggregator/config and http://localhost:8080/aggregator/control.
...@@ -16,6 +16,7 @@ If we ignore the OSI L6 protocol (e.g. HTTP, FTP, Tsunami) then we are modelling ...@@ -16,6 +16,7 @@ If we ignore the OSI L6 protocol (e.g. HTTP, FTP, Tsunami) then we are modelling
``` ```
network_delay = latency + (time difference from start of the data to the end of the data) network_delay = latency + (time difference from start of the data to the end of the data)
= latency + data_delay
``` ```
### Latency ### Latency
...@@ -24,10 +25,10 @@ The latency (or propagation delay) of the network path is the time taken for a p ...@@ -24,10 +25,10 @@ The latency (or propagation delay) of the network path is the time taken for a p
latency = distance / speed latency = distance / speed
For optical fibre (or even an eletric wire), the speed naively would be the speed of light. In fact, the speed is slower than this (in optical fibre this is because of the internal refraction that occurs, which is different for different wavelengths). According to http://www.m2optics.com/blog/bid/70587/Calculating-Optical-Fiber-Latency the delay (1/speed) is approximately 5 microseconds / km For optical fibre (or even an eletric wire), the speed naively would be the speed of light. In fact, the speed is slower than this (in optical fibre this is because of the internal refraction that occurs, which is different for different wavelengths). According to [m2.optics.com](http://www.m2optics.com/blog/bid/70587/Calculating-Optical-Fiber-Latency) the delay (1/speed) is approximately 5 microseconds / km
``` ```
if if
distance is in m distance is in m
delay is in s/m delay is in s/m
latency is in s latency is in s
...@@ -48,35 +49,179 @@ if ...@@ -48,35 +49,179 @@ if
data_size is in Bytes data_size is in Bytes
bandwidth is in Mb/s bandwidth is in Mb/s
data_delay is in s data_delay is in s
then then
data_delay = data_size * 8 / bandwidth * 1E6 data_delay = data_size * 8 / bandwidth * 1E6
``` ```
The data_size naively is the size of the data you want to send over the network (call this the "file_size"). However, the data is split into packets and each packet has a header on it so the amount of data going over the network is actually more than the amount sent. The data_size naively is the size of the data you want to send over the network (call this the "file_size"). However, the data is split into packets and each packet has a header on it so the amount of data going over the network is actually more than the amount sent. The header includes contributions from (at least) the L6 protocol (e.g. HTTP), L4 (e.g. TCP) and L3 (e.g. IP) layers.
``` ```
let let
packet_size = packet_header_size + packet_payload_size packet_size = packet_header_size + packet_payload_size
then then
data_size = (packet_size / packet_payload_size) * file_size data_size = (packet_size / packet_payload_size) * file_size
or or
data_size = (packet_size / packet_size - packet_header_size) * file_size data_size = [packet_size / (packet_size - packet_header_size)] * file_size
= file_size * packet_size / (packet_size - packet_header_size)
``` ```
### Total delay ### Measuring and Predicting
Bringing the above parts together we have:
``` ```
delay = latency + data_delay network_delay = latency + data_delay
= (distance * 5 / 1E9) + {[(packet_size / packet_size - packet_header_size) * file_size] * 8 / bandwidth * 1E6} = (distance * 5 / 1E9) + {[file_size * packet_size / (packet_size - packet_header_size)] * 8 / (bandwidth * 1E6)}
= (distance * 5 / 1E9) + (8 / 1E6) * (file_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]
``` ```
### Effect of Protocol i.e. `file_size / bandwidth` with an adjustment to increase the size of the data transmitted because of the packet header and some unit factors.
We want to be able to measure the `network_delay` and also want to be able to predict what the delay is likely to be for a given deployment.
Parameter | Known / measured
----------|--------
latency | measured by network probes
distance | sometimes known
packet_size | known (a property of the network)
packet_header_size | known (at least for L3 and L4)
file_size | measured at the service function
bandwidth | known (a property of the network), can also be measured
Measuring the actual `latency` can be done in software. For a given `file_size`, the `network_delay` could then be predicted.
*We are ignoring network congestion and the effect of the protocol (see below).*
### Effect of protocol
The choice of protocol has a large effect in networks with a high bandwidth-delay product. The analysis above ignores the network protocol. However, the choice of protocol has a large effect in networks with a high bandwidth-delay product.
In data communications, bandwidth-delay product is the product of a data link's capacity (in bits per second) and its round-trip delay time (in seconds). The result, an amount of data measured in bits (or bytes), is equivalent to the maximum amount of data on the network circuit at any given time, i.e., data that has been transmitted but not yet acknowledged. In data communications, bandwidth-delay product is the product of a data link's capacity (in bits per second) and its round-trip delay time (in seconds). The result, an amount of data measured in bits (or bytes), is equivalent to the maximum amount of data on the network circuit at any given time, i.e., data that has been transmitted but not yet acknowledged.
TCP for instance expects acknowledgement of every packet sent and if the sender has not received an acknowledgement within a specified time period then the packet will be retransmitted. Furthermore, TCP uses a flow-control method whereby the receiver specifies how much data it is willing to buffer and the sending host must pause sending and wait for acknowledgement once that amount of data is sent. TCP for instance expects acknowledgement of every packet sent and if the sender has not received an acknowledgement within a specified time period then the packet will be retransmitted. Furthermore, TCP uses a flow-control method whereby the receiver specifies how much data it is willing to buffer and the sending host must pause sending and wait for acknowledgement once that amount of data is sent.
### Effect of congestion
The analysis above considers the best case where the whole bandwidth of the link is available for the data transfer.
## Service Delay ## Service Delay
A particular service function may have several operations (API calls) on it. A model of service function performance needs to consider the resource the service function is deployed upon (and its variability and reliability), the availability of the resource (i.e. whether the service function have the resource to itself), the workload (a statistical distribution of API calls and request sizes) and the speed at which the resource can compute the basic computations invoked by the requests.
We must simplify sufficiently to make the problem tractable but not too much so that the result is of no practical use.
To simplify we can:
* assume that the resource is invariable, 100% available and 100% reliable;
* assume that the distribution of API calls is constant and that the workload can be represented sufficiently by the average request size.
To be concrete, if a service function has two API calls: `transcode_video(video_data)` and `get_status()` then we would like to model the average response time over "normal" usage which might be 10% of calls to `transcode_video` and 90% of calls to `get_status` and a variety of `video_data` sizes with a defined average size.
### Measuring
As an example, the `minio` service reports the average response time over all API calls already so, for that service at least, measuring the `service_delay` is easy. We expect to also be able to measure the average `file_size` which will do as a measure of workload.
### Predicting
As noted above, a simple model must consider:
* the resource the service function is deployed upon (e.g. CPU, memory, disk);
* the workload (an average request size)
* the speed at which the resource can compute the basic computations invoked by the requests (dependent on the service function).
We can therefore write that:
```
service_delay = f(resource, workload, service function characteristics)
```
For our simplified workload we could assume that this can be written as:
```
service_delay = workload * f(resource, service function characteristics)
```
The resource could be described in terms of the number of CPUs, amount of RAM and amount of disk. Even if the resource was a physical machine more detail would be required such as the CPU clock speed, CPU cache sizes, RAM speed, disk speed, etc. In a virtualised environment it is even more complicated as elements of the physical CPU may or may not be exposed to the virtual CPU (which may in fact be emulated).
Benchmarks are often used to help measure the performance of a resource so that one resource may be compared to another without going into all the detail of the precise artchitecture. Application benchmarks (those executing realistic workloads such as matrix operations or fast fourier transforms) can be more useful than general benchmark scores (such as SPECint or SPECfp). For more information on this, see [Snow White Clouds and the Seven Dwarfs](https://eprints.soton.ac.uk/273157/1/23157.pdf).
The best benchmark for a service function is the service function itself combined with a representative workload. That is, to predict the performance of a service function on a given resource, it is best to just run it and find out. In the absence of that, the next best would be to execute Dwarf benchmarks on each resource type and correlate them with the service functions, but that is beyond what we can do now.
We might execute a single benchmark such as the [Livermoor Loops](http://www.netlib.org/benchmark/livermorec) benchmark which stresses a variety of CPU operations. Livermoor Loops provides a single benchmark figure in Megaflops/sec.
Our service_delay equation would then just reduce to:
```
service_delay = workload * f(benchmark, service function characteristics)
= workload * service_function_scaling_factor / benchmark
```
The `service_function_scaling_factor` essentially scales the `workload` number into a number of Megaflops. So for a `workload` in bytes the `service_function_scaling_factor` would be representing Megaflops/byte.
If we don't have a benchmark then the best we can do is approximate the benchmark by the number of CPUs:
```
service_delay = workload * f(benchmark, service function characteristics)
= workload * service_function_scaling_factor / cpus
```
Is this a simplification too far? It ignores the size of RAM for instance which cannot normally be included as a linear factor (i.e. twice as much RAM does not always give twice the performance). Not having sufficient RAM results in disk swapping or complete failure. Once you have enough for a workload, adding more makes no difference.
## Conclusion
The total delay is:
```
total_delay = forward_network_delay + service_delay + reverse_network_delay
```
To *measure* the `total_delay` we need:
```
total_delay = forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay
= forward_latency
+ {(8 / 1E6) * (request_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]}
+ service_delay
+ reverse_latency
+ {(8 / 1E6) * (response_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]}
```
With:
* forward_latency / s (measured by network probe)
* reverse_latency / s (measured by network probe)
* request_size / Bytes (measured at service)
* response_size / Bytes (measured at service)
* bandwidth / Mb/s (b = bit) (assumed constant and known or measured)
* packet_size / Bytes (constant and known)
* packet_header_size / Bytes (constant and known)
This calculation assumes:
* there is no network congestion, i.e. the whole bandwidth is available
* that the protocol (such as TCP) has no effect (see discussion of flow control above)
* there is no data loss on the network
* that the service delay is proportional to the `request_size`, i.e. that the service is processing the data in the request
* that the service does not start processing until the complete request is received
* that the amount of memory and disk on the compute resource is irrelevant
* that the service delay is inversely proprtional to the number of CPUs (and all CPUs are equal)
* that the compute resource is invariable, 100% available and 100% reliable
* that the distribution of API calls is constant and that the workload can be represented sufficiently by the average request size
To *predict* the `total_delay` we need:
```
total_delay = forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay
= forward_latency
+ {(8 / 1E6) * (request_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]}
+ request_size * service_function_scaling_factor / cpus
+ reverse_latency
+ {(8 / 1E6) * (response_size / bandwidth) * [packet_size / (packet_size - packet_header_size)]}
```
With:
* service_function_scaling_factor / Mflops/Byte (known, somehow)
* cpus (unitless) (known)
As discussed above, you could also predict the latency if you know the length of a link but this seems a bit too theoretical.
...@@ -21,21 +21,18 @@ ...@@ -21,21 +21,18 @@
// Created for Project : FLAME // Created for Project : FLAME
""" """
<<<<<<< HEAD
=======
import os
import os.path
>>>>>>> 478adc7ea66ad80c634766768a19a1fa205d82ea
from setuptools import setup, find_packages from setuptools import setup, find_packages
requires = [
'plaster_pastedeploy',
'pyramid',
'pyramid_debugtoolbar',
'waitress',
'influxdb',
]
tests_require = [ def read(fname):
'WebTest >= 1.3.1', # py3 compat return open(os.path.join(os.path.dirname(__file__), fname)).read()
'pytest',
'pytest-cov',
]
def get_version(fname): def get_version(fname):
if os.path.isfile(fname): if os.path.isfile(fname):
...@@ -45,30 +42,23 @@ def get_version(fname): ...@@ -45,30 +42,23 @@ def get_version(fname):
return git_revision return git_revision
setup( setup(
name = "CLMCservice", name = "clmc",
version = "SNAPSHOT", version = get_version("clmctest/_version.py"),
author = "Michael Boniface", author = "Michael Boniface",
author_email = "mjb@it-innovation.soton.ac.uk", author_email = "mjb@it-innovation.soton.ac.uk",
description = "FLAME CLMC Testing Module", description = "FLAME CLMC Test Module",
long_description="long description", license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE",
license = "license", keywords = "FLAME CLMC",
keywords = "FLAME CLMC service test", url='https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc',
packages=find_packages(exclude=["services"]), packages=find_packages(exclude=["services"]),
include_package_data=True, include_package_data=True,
install_requires=requires, package_data={'': ['_version.py', '*.yml', '*.sh', '*.json', '*.conf']},
extras_require={ long_description="FLAME CLMC",
'testing': tests_require,
},
package_data={'': ['git-commit-ref', '*.yml', '*.sh', '*.json', '*.conf']},
classifiers=[ classifiers=[
"Development Status :: Alpha", "Development Status :: Alpha",
"Topic :: FLAME Tests", "Topic :: FLAME Tests",
"License :: ", "License :: ",
], ],
entry_points={ )
'paste.app_factory': [
'main = CLMCservice:main',
],
},
)
\ No newline at end of file
[run]
source = clmcservice
omit = clmcservice/tests.py
include MANIFEST.in
recursive-include clmcservice
\ No newline at end of file
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 15-05-2018
// Created for Project : FLAME
"""
from pyramid.config import Configurator
from pyramid.settings import asbool
from clmcservice.utilities import RUNNING_FLAG, MALFORMED_FLAG
def main(global_config, **settings):
"""
This function returns a Pyramid WSGI application.
"""
# a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings
aggregator_running = asbool(settings.get(RUNNING_FLAG, False))
settings[RUNNING_FLAG] = asbool(aggregator_running)
aggregator_report_period = int(settings.get('aggregator_report_period', 5))
settings['aggregator_report_period'] = aggregator_report_period
settings[MALFORMED_FLAG] = False
config = Configurator(settings=settings)
config.add_route('aggregator_config', '/aggregator/config')
config.add_view('clmcservice.views.AggregatorConfig', attr='get', request_method='GET')
config.add_view('clmcservice.views.AggregatorConfig', attr='put', request_method='PUT')
config.add_route('aggregator_controller', '/aggregator/control')
config.add_view('clmcservice.views.AggregatorController', attr='get', request_method='GET')
config.add_view('clmcservice.views.AggregatorController', attr='put', request_method='PUT')
config.add_route('round_trip_time_query', '/query/round-trip-time')
config.add_view('clmcservice.views.RoundTripTimeQuery', attr='get', request_method='GET')
config.scan()
return config.make_wsgi_app()
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from clmcservice.utilities import generate_e2e_delay_report
import getopt
import sys
class Aggregator(object):
"""
A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process.
"""
REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx
def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database_name: database name to use
:param database_url: database url to use
:param report_period: the report period in seconds
"""
# initialise a database client using the database url and the database name
url_object = urlparse(database_url)
while True:
try:
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
break
except:
sleep(self.RETRY_PERIOD)
self.db_url = database_url
self.db_name = database_name
self.report_period = report_period
# a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
self.network_cache = {}
self.service_cache = {}
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
current_time = int(time())
while True:
boundary_time = current_time - self.report_period
boundary_time_nano = boundary_time * 1000000000
current_time_nano = current_time * 1000000000
# query the network delays and group them by path ID
network_delays = {}
while True:
try:
result = self.db_client.query(
'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano))
break
except:
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
# query the service delays and group them by endpoint, service function instance and sfr
service_delays = {}
while True:
try:
result = self.db_client.query(
'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(
boundary_time_nano, current_time_nano))
break
except:
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path
if target not in service_delays and target not in self.service_cache:
# if not continue with the other network path reports
continue
e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time}
e2e_arguments['path_ID'] = path_id
e2e_arguments['source_SFR'] = source
e2e_arguments['target_SFR'] = target
e2e_arguments['delay_forward'] = network_delays[path][0]
e2e_arguments['avg_bandwidth'] = network_delays[path][1]
# reverse the path ID to get the network delay for the reversed path
reversed_path = (path_id, target, source)
if reversed_path in network_delays or reversed_path in self.network_cache:
# get the reverse delay, use the latest value if reported or the cache value
e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0]
else:
e2e_arguments['delay_reverse'] = None
# get the response time of the media component connected to the target SFR
service_delay = service_delays.get(target, self.service_cache.get(target))
response_time, request_size, response_size, endpoint, sf_instance = service_delay
# put these points in the e2e arguments dictionary
e2e_arguments['delay_service'] = response_time
e2e_arguments['avg_request_size'] = request_size
e2e_arguments['avg_response_size'] = response_size
e2e_arguments['endpoint'] = endpoint
e2e_arguments['sf_instance'] = sf_instance
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.values():
while True:
try:
self.db_client.write_points(
generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'],
e2e_arguments['delay_service'],
e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'],
e2e_arguments['time']))
break
except:
sleep(self.RETRY_PERIOD)
old_timestamp = current_time
# wait until {report_period) seconds have passed
while current_time < old_timestamp + self.report_period:
sleep(1)
current_time = int(time())
if __name__ == '__main__':
# Parse command line options
try:
opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url='])
arg_period = Aggregator.REPORT_PERIOD
arg_database_name = Aggregator.DATABASE
arg_database_url = Aggregator.DATABASE_URL
# Apply parameters if given
for opt, arg in opts:
if opt in ('-p', '--period'):
arg_period = int(arg)
elif opt in ('-d', '--database'):
arg_database_name = arg
elif opt in ('-u', '--url'):
arg_database_url = arg
Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run()
# print the error messages in case of a parse error
except getopt.GetoptError as err:
print(err)
print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')
This diff is collapsed.
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 15-05-2018
// Created for Project : FLAME
"""
from json import loads
from re import compile, IGNORECASE
CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url') # all of the configuration attributes - to be used as dictionary keys
RUNNING_FLAG = 'aggregator_running' # Attribute for storing the flag, which shows whether the aggregator is running or not - to be used as a dictionary key
PROCESS_ATTRIBUTE = 'aggregator_process' # Attribute for storing the process object of the aggregator - to be used as a dictionary key
# a 'malformed' running state of the aggregator is when the configuration is updated, but the aggregator is not restarted so it is running with an old version of the conf.
MALFORMED_FLAG = 'malformed' # Attribute for storing the flag, which shows whether the aggregator is running in an malformed state or not - to be used as a dictionary key
# used to indicate a malformed configuration message
COMMENT_ATTRIBUTE = 'comment'
COMMENT_VALUE = 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'
# the attributes of the JSON response body that are expected when querying round trip time
ROUND_TRIP_ATTRIBUTES = ('media_service', 'start_timestamp', 'end_timestamp')
URL_REGEX = compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain, e.g. example.domain.com
r'localhost|' # or localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # or IP address (IPv4 format)
r'(?::\d{2,5})?' # optional port number
r'(?:[/?#][^\s]*)?$', # URL path or query parameters
IGNORECASE)
def validate_config_content(configuration):
"""
A utility function to validate a configuration string representing a JSON dictionary.
:param configuration: the configuration string to validate
:return the validated configuration dictionary object with the values converted to their required type
:raise AssertionError: if the argument is not a valid configuration
"""
global CONFIG_ATTRIBUTES
try:
configuration = loads(configuration)
except:
raise AssertionError("Configuration must be a JSON object.")
assert len(configuration) == len(CONFIG_ATTRIBUTES), "Configuration mustn't contain more attributes than the required ones."
for attribute in CONFIG_ATTRIBUTES:
assert attribute in configuration, "Required attribute not found in the request content."
assert type(configuration.get('aggregator_report_period')) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
assert configuration.get('aggregator_report_period') > 0, "Report period must be a positive integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
assert URL_REGEX.match(configuration.get('aggregator_database_url')) is not None, "The aggregator must have a valid database URL in its configuration, received {0} instead.".format(configuration.get('aggregator_database_url'))
return configuration
def validate_action_content(content):
"""
A utility function to validate a content string representing a JSON dictionary.
:param content: the content string to validate
:return: the validated content dictionary
:raise AssertionError: if the argument is not a valid json content
"""
try:
content = loads(content)
except:
raise AssertionError("Content must be a JSON object.")
assert len(content) == 1, "Content mustn't contain more attributes than the required one."
assert content['action'] in ('start', 'stop', 'restart')
return content
def validate_round_trip_query_params(params):
"""
A utility function to validate a dictionary of parameters.
:param params: the params dict to validate
:return: the validated parameters dictionary
:raise AssertionError: if the argument is not a valid json content
"""
global ROUND_TRIP_ATTRIBUTES
assert len(params) == len(ROUND_TRIP_ATTRIBUTES), "Content mustn't contain more attributes than the required ones."
for attribute in ROUND_TRIP_ATTRIBUTES:
assert attribute in params, "Required attribute not found in the request content."
return params
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_sfr: source service router
:param target_sfr: target service router
:param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "e2e_delays",
"tags": {
"path_ID": path_id,
"source_SFR": source_sfr,
"target_SFR": target_sfr,
"endpoint": endpoint,
"sf_instance": sf_instance
},
"fields": {
"delay_forward": float(delay_forward),
"delay_reverse": float(delay_reverse),
"delay_service": float(delay_service),
"avg_request_size": float(avg_request_size),
"avg_response_size": float(avg_response_size),
"avg_bandwidth": float(avg_bandwidth)
},
"time": int(1000000000*time)
}]
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment