Skip to content
Snippets Groups Projects
Commit f6d93a3a authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Separated configuration API calls from starter API calls - /aggregator/config...

Separated configuration API calls from starter API calls - /aggregator/config and /aggregator/status
parent b3784177
No related branches found
No related tags found
No related merge requests found
......@@ -24,23 +24,28 @@
from pyramid.config import Configurator
from pyramid.settings import asbool
from clmcservice.views import AggregatorConfig
from clmcservice.views import AggregatorConfig, aggregator_starter
from clmcservice.utilities import STATUS_ATTRIBUTE
def main(global_config, **settings):
""" This function returns a Pyramid WSGI application."""
# a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings
aggregator_running = asbool(settings.get('aggregator_running', 'false'))
settings['aggregator_running'] = aggregator_running
aggregator_running = asbool(settings.get('aggregator_running', False))
settings[STATUS_ATTRIBUTE] = asbool(aggregator_running)
aggregator_report_period = int(settings.get('aggregator_report_period', 5))
settings['aggregator_report_period'] = aggregator_report_period
config = Configurator(settings=settings)
config.add_route('aggregator', '/aggregator')
config.add_route('aggregator_config', '/aggregator/config')
config.add_view(AggregatorConfig, attr='get', request_method='GET')
config.add_view(AggregatorConfig, attr='post', request_method='POST')
config.add_route('aggregator_starter', '/aggregator/status')
config.add_view(aggregator_starter, request_method='POST')
config.scan()
return config.make_wsgi_app()
......@@ -38,6 +38,7 @@ class Aggregator(object):
REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses
RETRY_PERIOD = 5
def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
......@@ -56,7 +57,7 @@ class Aggregator(object):
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
break
except:
sleep(2)
sleep(self.RETRY_PERIOD)
self.db_url = database_url
self.db_name = database_name
......@@ -85,7 +86,7 @@ class Aggregator(object):
boundary_time_nano, current_time_nano))
break
except:
sleep(2)
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
......@@ -102,7 +103,7 @@ class Aggregator(object):
result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
break
except:
sleep(2)
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
......@@ -148,7 +149,7 @@ class Aggregator(object):
e2e_arguments['time']))
break
except:
sleep(2)
sleep(self.RETRY_PERIOD)
old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed
......
......@@ -53,7 +53,7 @@ class TestAggregatorConfig(object):
"""
from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself
print(self.config.get_settings())
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds."
assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics."
......@@ -73,20 +73,20 @@ class TestAggregatorConfig(object):
assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data."
@pytest.mark.parametrize("input_body, output_value", [
('{"aggregator_running": True, "aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}',
{'aggregator_running': True, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}),
('{"aggregator_running": true, "aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': True, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_running":True, "aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
{'aggregator_running': True, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
('{"aggregator_running": False, "aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
('{"aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://171.40.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}),
('{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
('{"aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
('{"aggregator_running": false, "aggregator_report_period": 20, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_running":False, "aggregator_report_period": 15, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}),
("{aggregator_running: t, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None),
("{aggregator_running: f, aggregator_report_period: 25, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None),
('{"aggregator_report_period": 200, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_report_period": 150, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}),
("{aggregator_report_period: 2hb5, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None),
("{aggregator_report_period: 250-, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:8086}", None),
("{}", None),
("{aggregator_running: true}", None),
])
......@@ -122,3 +122,21 @@ class TestAggregatorConfig(object):
error_raised = True
assert error_raised, "Error must be raised in case of an invalid argument."
class TestAggregatorStarter(object):
"""
A pytest-implementation test for the aggregator starter API calls
"""
def test_start(self):
#TODO
pass
def test_stop(self):
#TODO
pass
def test_restart(self):
#TODO
pass
......@@ -24,10 +24,22 @@
from json import loads
CONFIG_ATTRIBUTES = ('aggregator_running', 'aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url')
CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url')
STATUS_ATTRIBUTE = 'aggregator_running'
def validate_content(configuration):
# # URL regular expression used in the django framework
# url_regex = re.compile(
# r'^(?:http|ftp)s?://' # http:// or https://
# r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
# r'localhost|' # localhost...
# r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
# r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
# r'(?::\d+)?' # optional port
# r'(?:/?|[/?]\S+)$', re.IGNORECASE)
def validate_config_content(configuration):
"""
A utility function to validate a configuration string representing a JSON dictionary.
......@@ -38,11 +50,6 @@ def validate_content(configuration):
global CONFIG_ATTRIBUTES
configuration = configuration.replace('"aggregator_running": True', '"aggregator_running": true')
configuration = configuration.replace('"aggregator_running":True', '"aggregator_running": true')
configuration = configuration.replace('"aggregator_running": False', '"aggregator_running": false')
configuration = configuration.replace('"aggregator_running":False', '"aggregator_running": false')
try:
configuration = loads(configuration)
except:
......@@ -53,8 +60,64 @@ def validate_content(configuration):
for attribute in CONFIG_ATTRIBUTES:
assert attribute in configuration
assert type(configuration['aggregator_running']) == bool, "Boolean value expected for the 'aggregator_running' attribute, received {0} instead.".format(configuration.get('aggregator_running'))
assert type(configuration['aggregator_report_period']) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
# TODO maybe add a check for a valid URL ?
return configuration
def validate_action_content(content):
"""
A utility function to validate a content string representing a JSON dictionary.
:param content: the content string to validate
:return: the validated content dictionary
:raise AssertionError: if the argument is not a valid json content
"""
try:
content = loads(content)
except:
raise AssertionError("Content must be a JSON object.")
assert len(content) == 1, "Content mustn't contain more attributes than the required one."
assert content['action'] in ('start', 'stop', 'restart')
return content
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts.
:param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_sfr: source service router
:param target_sfr: target service router
:param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time
:param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "e2e_delays",
"tags": {
"path_ID": path_id,
"source_SFR": source_sfr,
"target_SFR": target_sfr,
"endpoint": endpoint,
"sf_instance": sf_instance
},
"fields": {
"delay_forward": float(delay_forward),
"delay_reverse": float(delay_reverse),
"delay_service": float(delay_service)
},
"time": int(1000000000*time)
}]
return result
......@@ -21,20 +21,54 @@
// Created for Project : FLAME
"""
from pyramid.view import view_defaults
from pyramid.view import view_defaults, view_config
from pyramid.httpexceptions import HTTPBadRequest
from clmcservice.utilities import validate_content, CONFIG_ATTRIBUTES
from subprocess import Popen, DEVNULL
from clmcservice.utilities import validate_config_content, validate_action_content, CONFIG_ATTRIBUTES, STATUS_ATTRIBUTE
import os.path
@view_defaults(route_name='aggregator', renderer='json')
_process = None
def start_aggregator(config):
"""
An auxiliary method to start the aggregator.
:param config: the configuration containing the arguments for the aggregator
"""
global _process
dir_path = os.path.dirname(os.path.realpath(__file__))
command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database',
config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')]
_process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL)
print("Started aggregator process with ID: {0}".format(_process.pid))
def stop_aggregator():
"""
An auxiliary method to stop the aggregator.
"""
global _process
# check if the process is started before trying to terminate it - _process.poll() only returns something if the process has terminated, hence we check for a None value
if _process is not None and _process.poll() is None:
_process.terminate()
@view_defaults(route_name='aggregator_config', renderer='json')
class AggregatorConfig(object):
"""
A class-based view for accessing and mutating the status of the aggregator.
A class-based view for accessing and mutating the configuration of the aggregator.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
......@@ -43,29 +77,73 @@ class AggregatorConfig(object):
def get(self):
"""
A GET API call for the status of the aggregator.
:return: A JSON response with the status of the aggregator.
"""
aggregator_data = self.request.registry.settings
return {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES}
config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES}
config[STATUS_ATTRIBUTE] = aggregator_data.get(STATUS_ATTRIBUTE)
return config
def post(self):
"""
A POST API call for the status of the aggregator.
:return: A JSON response to the POST call (success or fail).
:raises HTTPBadRequest: if form argument cannot be converted to boolean
"""
# old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
new_config = self.request.body.decode(self.request.charset)
try:
new_config = validate_content(new_config)
new_config = validate_config_content(new_config)
for attribute in CONFIG_ATTRIBUTES:
self.request.registry.settings[attribute] = new_config.get(attribute)
# TODO start/stop aggregator based on value of new status
# if not equal_configurations(old_config, new_config):
# stop_aggregator()
# start_aggregator(new_config)
new_config[STATUS_ATTRIBUTE] = self.request.registry.settings.get(STATUS_ATTRIBUTE)
return new_config
except AssertionError:
raise HTTPBadRequest("Bad request content - configuration format is incorrect.")
@view_config(route_name='aggregator_starter', renderer='json')
def aggregator_starter(request):
"""
A function-based view to start/stop/restart the aggregator.
:param request: client's call request
"""
content = request.body.decode(request.charset)
try:
content = validate_action_content(content)
config = {attribute: request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
action = content['action']
if action == 'start':
start_aggregator(config)
request.registry.settings[STATUS_ATTRIBUTE] = True
elif action == 'stop':
stop_aggregator()
request.registry.settings[STATUS_ATTRIBUTE] = False
elif action == 'restart':
stop_aggregator()
start_aggregator(config)
request.registry.settings[STATUS_ATTRIBUTE] = True
return {STATUS_ATTRIBUTE: request.registry.settings.get(STATUS_ATTRIBUTE)}
except AssertionError:
raise HTTPBadRequest("Bad request content - must be in JSON format: {action: value}, where value is 'start', 'stop' or 'restart'.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment