Skip to content
Snippets Groups Projects
Commit d74fd685 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Adjustments of the API start/restart/stop calls plus test updates

parent 482c001b
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,7 @@
from pyramid.config import Configurator
from pyramid.settings import asbool
from clmcservice.views import AggregatorConfig, aggregator_starter
from clmcservice.views import AggregatorConfig, AggregatorController
from clmcservice.utilities import STATUS_ATTRIBUTE
......@@ -44,8 +44,9 @@ def main(global_config, **settings):
config.add_view(AggregatorConfig, attr='get', request_method='GET')
config.add_view(AggregatorConfig, attr='put', request_method='PUT')
config.add_route('aggregator_starter', '/aggregator/status', request_method='PUT')
config.add_view(aggregator_starter)
config.add_route('aggregator_starter', '/aggregator/control')
config.add_view(AggregatorController, attr='get', request_method='GET')
config.add_view(AggregatorController, attr='put', request_method='PUT')
config.scan()
return config.make_wsgi_app()
......@@ -153,7 +153,7 @@ class Aggregator(object):
old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed
while current_time != old_timestamp + self.report_period:
while current_time < old_timestamp + self.report_period:
sleep(1)
current_time = int(time())
......
......@@ -25,7 +25,7 @@ import pytest
from pyramid import testing
from pyramid.httpexceptions import HTTPBadRequest
from time import sleep
from clmcservice.utilities import CONFIG_ATTRIBUTES
from clmcservice.utilities import CONFIG_ATTRIBUTES, PROCESS_ATTRIBUTE, STATUS_ATTRIBUTE
import os
import signal
......@@ -56,7 +56,7 @@ class TestAggregator(object):
from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running."
assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds."
assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics."
assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086"
......@@ -64,29 +64,28 @@ class TestAggregator(object):
request = testing.DummyRequest()
response = AggregatorConfig(request).get()
assert response == {'aggregator_running': False,
'aggregator_report_period': 5,
assert response == {'aggregator_report_period': 5,
'aggregator_database_name': 'E2EMetrics',
'aggregator_database_url': "http://172.40.231.51:8086"}, "Response must be a dictionary representing a JSON object with the correct configuration data of the aggregator."
assert not self.config.get_settings().get('aggregator_running'), "A GET request must not modify the aggregator configuration data."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "A GET request must not modify the aggregator configuration data."
assert self.config.get_settings().get('aggregator_report_period') == 5, "A GET request must not modify the aggregator configuration data."
assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "A GET request must not modify the aggregator configuration data."
assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "A GET request must not modify the aggregator configuration data."
@pytest.mark.parametrize("input_body, output_value", [
('{"aggregator_report_period": 10, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://171.40.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}),
{'aggregator_report_period': 10, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://171.40.231.51:8086"}),
('{"aggregator_report_period": 15, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
{'aggregator_report_period': 15, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_report_period": 20, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
{'aggregator_report_period': 20, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
('{"aggregator_report_period": 25, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.60.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
{'aggregator_report_period': 25, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.60.231.51:8086"}),
('{"aggregator_report_period": 200, "aggregator_database_name": "E2EMetrics", "aggregator_database_url": "http://172.50.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
{'aggregator_report_period': 200, 'aggregator_database_name': "E2EMetrics", 'aggregator_database_url': "http://172.50.231.51:8086"}),
('{"aggregator_report_period": 150, "aggregator_database_name": "CLMCMetrics", "aggregator_database_url": "http://172.40.231.51:8086"}',
{'aggregator_running': False, 'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}),
{'aggregator_report_period': 150, 'aggregator_database_name': "CLMCMetrics", 'aggregator_database_url': "http://172.40.231.51:8086"}),
("{aggregator_report_period: 2hb5, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.51:8086}", None),
("{aggregator_report_period: 250-, aggregator_database_name: CLMCMetrics, aggregator_database_url: http://172.60.231.52:8086}", None),
("{}", None),
......@@ -101,7 +100,7 @@ class TestAggregator(object):
from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running."
assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds."
assert self.config.get_settings().get('aggregator_database_name') == 'E2EMetrics', "Initial database name the aggregator uses is E2EMetrics."
assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086"
......@@ -130,76 +129,85 @@ class TestAggregator(object):
Tests the case of starting the aggregator through an API call.
"""
from clmcservice.views import aggregator_starter # nested import so that importing the class view is part of the test itself
from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running."
assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running."
request = testing.DummyRequest()
input_body = '{"action": "start"}'
request.body = input_body.encode(request.charset)
response = aggregator_starter(request)
assert response == {'aggregator_running': True}, "The aggregator should have been started."
assert self.config.get_settings().get('aggregator_running'), "The aggregator should have been started."
response = AggregatorController(request).put()
assert response == {STATUS_ATTRIBUTE: True}, "The aggregator should have been started."
assert self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been started."
assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized."
# kill the started process after the test is over
pid = request.registry.settings["aggregator_pid"]
pid = request.registry.settings[PROCESS_ATTRIBUTE].pid
os.kill(pid, signal.SIGTERM)
def test_stop(self):
from clmcservice.views import aggregator_starter # nested import so that importing the class view is part of the test itself
from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running."
assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running."
# send a start request to trigger the aggregator
request = testing.DummyRequest()
input_body = '{"action": "start"}'
request.body = input_body.encode(request.charset)
aggregator_starter(request)
AggregatorController(request).put()
assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized."
# test stopping the aggregator process when it is running
request = testing.DummyRequest()
input_body = '{"action": "stop"}'
request.body = input_body.encode(request.charset)
response = aggregator_starter(request)
assert response == {'aggregator_running': False}, "The aggregator should have been started."
assert not self.config.get_settings().get('aggregator_running'), "The aggregator should have been started."
response = AggregatorController(request).put()
assert response == {STATUS_ATTRIBUTE: False}, "The aggregator should have been stopped."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been stopped."
assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated."
sleep(2)
sleep(2) # put a 2 seconds timeout so that the aggregator process can terminate
# test stopping the aggregator process when it is not running
request = testing.DummyRequest()
input_body = '{"action": "stop"}'
request.body = input_body.encode(request.charset)
response = aggregator_starter(request)
assert response == {'aggregator_running': False}, "The aggregator should have been started."
assert not self.config.get_settings().get('aggregator_running'), "The aggregator should have been started."
response = AggregatorController(request).put()
assert response == {STATUS_ATTRIBUTE: False}, "The aggregator should have been stopped."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been stopped."
assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated."
def test_restart(self):
from clmcservice.views import aggregator_starter # nested import so that importing the class view is part of the test itself
from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself
assert not self.config.get_settings().get('aggregator_running'), "Initially aggregator is not running."
assert not self.config.get_settings().get(STATUS_ATTRIBUTE), "Initially aggregator is not running."
assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running."
# test restarting the aggregator process when it is stopped
request = testing.DummyRequest()
input_body = '{"action": "restart"}'
request.body = input_body.encode(request.charset)
response = aggregator_starter(request)
assert response == {'aggregator_running': True}, "The aggregator should have been started."
assert self.config.get_settings().get('aggregator_running'), "The aggregator should have been started."
response = AggregatorController(request).put()
assert response == {STATUS_ATTRIBUTE: True}, "The aggregator should have been restarted."
assert self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been restarted."
assert self.config.get_settings().get('aggregator_process'), "The aggregator process should have been reinitialised."
# test restarting the aggregator process when it is running
request = testing.DummyRequest()
input_body = '{"action": "restart"}'
request.body = input_body.encode(request.charset)
response = aggregator_starter(request)
assert response == {'aggregator_running': True}, "The aggregator should have been started."
assert self.config.get_settings().get('aggregator_running'), "The aggregator should have been started."
response = AggregatorController(request).put()
assert response == {STATUS_ATTRIBUTE: True}, "The aggregator should have been restarted."
assert self.config.get_settings().get(STATUS_ATTRIBUTE), "The aggregator should have been restarted."
assert self.config.get_settings().get('aggregator_process'), "The aggregator process should have been reinitialised."
# kill the started process after the test is over
pid = request.registry.settings["aggregator_pid"]
pid = request.registry.settings[PROCESS_ATTRIBUTE].pid
os.kill(pid, signal.SIGTERM)
......@@ -28,6 +28,8 @@ CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'ag
STATUS_ATTRIBUTE = 'aggregator_running'
PROCESS_ATTRIBUTE = 'aggregator_process'
# # URL regular expression used in the django framework
# url_regex = re.compile(
# r'^(?:http|ftp)s?://' # http:// or https://
......@@ -62,7 +64,7 @@ def validate_config_content(configuration):
assert type(configuration['aggregator_report_period']) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
# TODO maybe add a check for a valid URL ?
assert configuration['aggregator_report_period'] > 0, "Report period must be a positive integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
return configuration
......
......@@ -21,46 +21,17 @@
// Created for Project : FLAME
"""
from pyramid.view import view_defaults, view_config
from pyramid.view import view_defaults
from pyramid.httpexceptions import HTTPBadRequest
from subprocess import Popen, DEVNULL
from clmcservice.utilities import validate_config_content, validate_action_content, CONFIG_ATTRIBUTES, STATUS_ATTRIBUTE
from clmcservice.utilities import validate_config_content, validate_action_content, CONFIG_ATTRIBUTES, STATUS_ATTRIBUTE, PROCESS_ATTRIBUTE
import os.path
_process = None
def start_aggregator(config):
"""
An auxiliary method to start the aggregator.
:param config: the configuration containing the arguments for the aggregator
:return: the process ID of the started aggregator script
"""
global _process
dir_path = os.path.dirname(os.path.realpath(__file__))
command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database',
config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')]
_process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL)
print("\nStarted aggregator process with PID: {0}\n".format(_process.pid))
return _process.pid
def stop_aggregator():
"""
An auxiliary method to stop the aggregator.
"""
global _process
# check if the process is started before trying to terminate it - _process.poll() only returns something if the process has terminated, hence we check for a None value
if _process is not None and _process.poll() is None:
_process.terminate()
print("\nStopped aggregator process with PID: {0}\n".format(_process.pid))
# TODO
# 1) Is authorization needed at this stage ?
# 2) Validating the URL address and the database name ?
# 3) Restart the aggregator when configuration is updated
@view_defaults(route_name='aggregator_config', renderer='json')
......@@ -80,14 +51,13 @@ class AggregatorConfig(object):
def get(self):
"""
A GET API call for the status of the aggregator.
A GET API call for the configuration of the aggregator.
:return: A JSON response with the status of the aggregator.
:return: A JSON response with the configuration of the aggregator.
"""
aggregator_data = self.request.registry.settings
config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES}
config[STATUS_ATTRIBUTE] = aggregator_data.get(STATUS_ATTRIBUTE)
return config
......@@ -95,8 +65,8 @@ class AggregatorConfig(object):
"""
A PUT API call for the status of the aggregator.
:return: A JSON response to the PUT call (success or fail).
:raises HTTPBadRequest: if form argument cannot be converted to boolean
:return: A JSON response to the PUT call - essentially with the new configured data and comment of the state of the aggregator
:raises HTTPBadRequest: if request body is not a valid JSON for the configurator
"""
# old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
......@@ -112,44 +82,104 @@ class AggregatorConfig(object):
# stop_aggregator()
# start_aggregator(new_config)
new_config[STATUS_ATTRIBUTE] = self.request.registry.settings.get(STATUS_ATTRIBUTE)
return new_config
except AssertionError:
raise HTTPBadRequest("Bad request content - configuration format is incorrect.")
@view_config(route_name='aggregator_starter', renderer='json')
def aggregator_starter(request):
"""
A function-based view to start/stop/restart the aggregator.
@view_defaults(route_name='aggregator_starter', renderer='json')
class AggregatorController(object):
:param request: client's call request
"""
A class-based view for controlling the aggregator.
"""
content = request.body.decode(request.charset)
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
try:
content = validate_action_content(content)
:param request: client's call request
"""
config = {attribute: request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
self.request = request
action = content['action']
def get(self):
"""
A GET API call for the status of the aggregator - running or not.
if action == 'start':
pid = start_aggregator(config)
request.registry.settings[STATUS_ATTRIBUTE] = True
request.registry.settings["aggregator_pid"] = pid
elif action == 'stop':
stop_aggregator()
request.registry.settings[STATUS_ATTRIBUTE] = False
elif action == 'restart':
stop_aggregator()
pid = start_aggregator(config)
request.registry.settings[STATUS_ATTRIBUTE] = True
request.registry.settings["aggregator_pid"] = pid
:return: A JSON response with the status of the aggregator.
"""
aggregator_data = self.request.registry.settings
config = {STATUS_ATTRIBUTE: aggregator_data.get(STATUS_ATTRIBUTE)}
return {STATUS_ATTRIBUTE: request.registry.settings.get(STATUS_ATTRIBUTE)}
return config
def put(self):
"""
A PUT API call for the status of the aggregator.
:return: A JSON response to the PUT call - essentially saying whether the aggregator is running or not
:raises HTTPBadRequest: if request body is not a valid JSON for the starter
"""
content = self.request.body.decode(self.request.charset)
try:
content = validate_action_content(content)
config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
action = content['action']
if action == 'start':
aggregator_started = self.request.registry.settings[STATUS_ATTRIBUTE]
if not aggregator_started:
process = self.start_aggregator(config)
self.request.registry.settings[STATUS_ATTRIBUTE] = True
self.request.registry.settings[PROCESS_ATTRIBUTE] = process
elif action == 'stop':
self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
self.request.registry.settings[STATUS_ATTRIBUTE] = False
self.request.registry.settings[PROCESS_ATTRIBUTE] = None
elif action == 'restart':
self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
process = self.start_aggregator(config)
self.request.registry.settings[STATUS_ATTRIBUTE] = True
self.request.registry.settings[PROCESS_ATTRIBUTE] = process
return {STATUS_ATTRIBUTE: self.request.registry.settings.get(STATUS_ATTRIBUTE)}
except AssertionError:
raise HTTPBadRequest("Bad request content - must be in JSON format: {action: value}, where value is 'start', 'stop' or 'restart'.")
@staticmethod
def start_aggregator(config):
"""
An auxiliary method to start the aggregator.
:param config: the configuration containing the arguments for the aggregator
:return: the process object of the started aggregator script
"""
dir_path = os.path.dirname(os.path.realpath(__file__))
command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database',
config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')]
process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL)
print("\nStarted aggregator process with PID: {0}\n".format(process.pid))
return process
@staticmethod
def stop_aggregator(process):
"""
An auxiliary method to stop the aggregator.
:param process: the process to terminate
"""
except AssertionError:
raise HTTPBadRequest("Bad request content - must be in JSON format: {action: value}, where value is 'start', 'stop' or 'restart'.")
# check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value
if process is not None and process.poll() is None:
process.terminate()
print("\nStopped aggregator process with PID: {0}\n".format(process.pid))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment