From b9f01978b973e575929a6ee53753f6bc1c826b8b Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Thu, 31 May 2018 12:30:59 +0100 Subject: [PATCH] Updated the CLMC service and fixed the issue of starting the aggregator on a VM --- src/service/clmcservice/__init__.py | 5 -- src/service/clmcservice/aggregator.py | 69 ++++++------------- src/service/clmcservice/tests.py | 54 +++++++++------ src/service/clmcservice/views.py | 47 +++++++++---- src/service/development.ini | 3 +- src/service/production.ini | 3 +- .../monitoring/E2ETestAggregatorThread.py | 2 +- 7 files changed, 90 insertions(+), 93 deletions(-) diff --git a/src/service/clmcservice/__init__.py b/src/service/clmcservice/__init__.py index bd56062..3473f9c 100644 --- a/src/service/clmcservice/__init__.py +++ b/src/service/clmcservice/__init__.py @@ -22,8 +22,6 @@ """ from pyramid.config import Configurator -from pyramid.settings import asbool - from clmcservice.utilities import RUNNING_FLAG, MALFORMED_FLAG @@ -33,9 +31,6 @@ def main(global_config, **settings): """ # a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings - aggregator_running = asbool(settings.get(RUNNING_FLAG, False)) - settings[RUNNING_FLAG] = asbool(aggregator_running) - aggregator_report_period = int(settings.get('aggregator_report_period', 5)) settings['aggregator_report_period'] = aggregator_report_period diff --git a/src/service/clmcservice/aggregator.py b/src/service/clmcservice/aggregator.py index ab855d9..9b94005 100644 --- a/src/service/clmcservice/aggregator.py +++ b/src/service/clmcservice/aggregator.py @@ -38,8 +38,7 @@ class Aggregator(object): REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'CLMCMetrics' # default database the aggregator uses - DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses - RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx + DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): """ @@ -50,15 +49,11 @@ class Aggregator(object): :param report_period: the report period in seconds """ + print("Connecting to Influx database {0} with URL {1}".format(database_name, database_url)) # initialise a database client using the database url and the database name - print("Creating InfluxDB Connection") url_object = urlparse(database_url) - while True: - try: - self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) - break - except: - sleep(self.RETRY_PERIOD) + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) + print("Successfully connected to Influx database {0} with URL {1}".format(database_name, database_url)) self.db_url = database_url self.db_name = database_name @@ -76,6 +71,7 @@ class Aggregator(object): Stop the aggregator from running. """ + print("Aggregator's stop flag has been set.") self._stop_flag.set() def run(self): @@ -83,11 +79,10 @@ class Aggregator(object): Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. """ - print("Running aggregator") + print("Aggregator started running.") current_time = int(time()) while not self._stop_flag.is_set(): - boundary_time = current_time - self.report_period boundary_time_nano = boundary_time * 1000000000 @@ -95,19 +90,9 @@ class Aggregator(object): # query the network delays and group them by path ID network_delays = {} - - while True: - - try: - print("Query for network delays") - result = self.db_client.query( - 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "{0}"."autogen"."network_delays" WHERE time >= {1} and time < {2} GROUP BY path, source, target'.format(self.db_name, - boundary_time_nano, current_time_nano)) - break - except Exception as e: - print("Exception getting network delay") - print(e) - sleep(self.RETRY_PERIOD) + result = self.db_client.query( + 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "{0}"."autogen"."network_delays" WHERE time >= {1} and time < {2} GROUP BY path, source, target'.format( + self.db_name, boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item @@ -120,18 +105,9 @@ class Aggregator(object): # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} - - while True: - try: - print("Query for service delays") - result = self.db_client.query( - 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name, - boundary_time_nano, current_time_nano)) - break - except Exception as e: - print("Exception getting service delay") - print(e) - sleep(self.RETRY_PERIOD) + result = self.db_client.query( + 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name, + boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item @@ -178,18 +154,13 @@ class Aggregator(object): # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row if None not in e2e_arguments.values(): - - while True: - try: - self.db_client.write_points( - generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], - e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], - e2e_arguments['delay_service'], - e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], - e2e_arguments['time'])) - break - except: - sleep(self.RETRY_PERIOD) + self.db_client.write_points( + generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], + e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], + e2e_arguments['delay_service'], + e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], + e2e_arguments['time'])) + print("Successfully generated an E2E measurement and posted back to Influx.") old_timestamp = current_time # wait until {report_period) seconds have passed @@ -197,6 +168,8 @@ class Aggregator(object): sleep(1) current_time = int(time()) + print("Aggregator stopped running.") + if __name__ == '__main__': diff --git a/src/service/clmcservice/tests.py b/src/service/clmcservice/tests.py index 581608e..d9980cb 100644 --- a/src/service/clmcservice/tests.py +++ b/src/service/clmcservice/tests.py @@ -99,9 +99,9 @@ class TestAggregatorAPI(object): :param output_value: the expected output value, None for expecting an Exception """ - from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself + from clmcservice.views import AggregatorConfig, AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." assert self.config.get_settings().get('aggregator_database_name') == 'CLMCMetrics', "Initial database name the aggregator uses is CLMCMetrics." assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" @@ -116,7 +116,7 @@ class TestAggregatorAPI(object): for attribute in CONFIG_ATTRIBUTES: assert self.config.get_settings().get(attribute) == output_value.get(attribute), "Aggregator settings configuration is not updated." - assert not self.config.get_settings().get(RUNNING_FLAG), "Aggregator running status should not be updated after a configuration update." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Aggregator running status should not be updated after a configuration update." else: error_raised = False try: @@ -133,7 +133,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." request = testing.DummyRequest() @@ -142,7 +142,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been started." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been started." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been started." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." # kill the started process after the test is over @@ -156,7 +156,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # send a start request to trigger the aggregator @@ -165,6 +165,7 @@ class TestAggregatorAPI(object): request.body = input_body.encode(request.charset) AggregatorController(request).put() assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Aggregator process should have been initialized." # test stopping the aggregator process when it is running request = testing.DummyRequest() @@ -173,7 +174,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." - assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been stopped." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." sleep(2) # put a 2 seconds timeout so that the aggregator process can terminate @@ -185,7 +186,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." - assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been stopped." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." def test_restart(self): @@ -195,7 +196,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # test restarting the aggregator process when it is stopped @@ -205,7 +206,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been restarted." assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." # test restarting the aggregator process when it is running @@ -215,7 +216,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been restarted." assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." # kill the started process after the test is over @@ -238,7 +239,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # test restarting the aggregator process when it is running @@ -261,7 +262,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." request = testing.DummyRequest() @@ -269,12 +270,17 @@ class TestAggregatorAPI(object): assert response == {'aggregator_running': False}, "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." - assert not self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "A GET request must not modify the aggregator status flag." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." # test status with malformed configuration + # start the aggregator + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + AggregatorController(request).put() self.config.get_settings()[MALFORMED_FLAG] = True - self.config.get_settings()[RUNNING_FLAG] = True + request = testing.DummyRequest() response = AggregatorController(request).get() @@ -283,9 +289,13 @@ class TestAggregatorAPI(object): 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'}, \ "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." - assert self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "A GET request must not modify the aggregator status flag." assert self.config.get_settings().get(MALFORMED_FLAG), "A GET request must not modify the aggregator malformed flag." - assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "A GET request must not stop the aggregator process." + + # kill the started process after the test is over + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid + os.kill(pid, signal.SIGTERM) def test_malformed_flag_behaviour(self): """ @@ -294,7 +304,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController, AggregatorConfig # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert not self.config.get_settings().get(MALFORMED_FLAG), "Initially aggregator is not in a malformed state" assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." @@ -318,7 +328,7 @@ class TestAggregatorAPI(object): response = AggregatorConfig(request).put() assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator shouldn't be stopped when the configuration is updated." assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." @@ -336,7 +346,7 @@ class TestAggregatorAPI(object): request.body = input_body.encode(request.charset) response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been restarted." assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator should have been restarted." @@ -349,7 +359,7 @@ class TestAggregatorAPI(object): response = AggregatorConfig(request).put() assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator shouldn't be stopped when the configuration is updated." assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." @@ -360,7 +370,7 @@ class TestAggregatorAPI(object): request.body = input_body.encode(request.charset) response = AggregatorController(request).put() assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." - assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been stopped." assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "The aggregator should have been stopped." diff --git a/src/service/clmcservice/views.py b/src/service/clmcservice/views.py index 049be9d..416fc37 100644 --- a/src/service/clmcservice/views.py +++ b/src/service/clmcservice/views.py @@ -25,10 +25,12 @@ from pyramid.view import view_defaults from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError from influxdb import InfluxDBClient from urllib.parse import urlparse -from subprocess import Popen, DEVNULL +from subprocess import Popen from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \ CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE +import os import os.path +import sys @view_defaults(route_name='aggregator_config', renderer='json') @@ -77,7 +79,7 @@ class AggregatorConfig(object): # if configuration is not already malformed, check whether the configuration is updated if not self.request.registry.settings[MALFORMED_FLAG]: - malformed = old_config != new_config and self.request.registry.settings[RUNNING_FLAG] + malformed = old_config != new_config and AggregatorController.is_process_running(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) self.request.registry.settings[MALFORMED_FLAG] = malformed if malformed: new_config[MALFORMED_FLAG] = True @@ -113,9 +115,12 @@ class AggregatorController(object): """ aggregator_data = self.request.registry.settings - config = {RUNNING_FLAG: aggregator_data.get(RUNNING_FLAG)} + aggregator_process = aggregator_data.get(PROCESS_ATTRIBUTE) + aggregator_running = self.is_process_running(aggregator_process) - if aggregator_data[MALFORMED_FLAG] and aggregator_data[RUNNING_FLAG]: + config = {RUNNING_FLAG: aggregator_running} + + if aggregator_data[MALFORMED_FLAG] and aggregator_running: config[MALFORMED_FLAG] = True config[COMMENT_ATTRIBUTE] = COMMENT_VALUE @@ -138,25 +143,25 @@ class AggregatorController(object): action = content['action'] + aggregator_running = self.is_process_running(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) if action == 'start': - aggregator_started = self.request.registry.settings[RUNNING_FLAG] - if not aggregator_started: + if not aggregator_running: process = self.start_aggregator(config) - self.request.registry.settings[RUNNING_FLAG] = True + aggregator_running = True self.request.registry.settings[PROCESS_ATTRIBUTE] = process elif action == 'stop': self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) - self.request.registry.settings[RUNNING_FLAG] = False + aggregator_running = False self.request.registry.settings[PROCESS_ATTRIBUTE] = None self.request.registry.settings[MALFORMED_FLAG] = False elif action == 'restart': self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) process = self.start_aggregator(config) - self.request.registry.settings[RUNNING_FLAG] = True + aggregator_running = True self.request.registry.settings[PROCESS_ATTRIBUTE] = process self.request.registry.settings[MALFORMED_FLAG] = False - return {RUNNING_FLAG: self.request.registry.settings.get(RUNNING_FLAG)} + return {RUNNING_FLAG: aggregator_running} except AssertionError: raise HTTPBadRequest('Bad request content - must be in JSON format: {"action": value}, where value is "start", "stop" or "restart".') @@ -171,9 +176,11 @@ class AggregatorController(object): """ dir_path = os.path.dirname(os.path.realpath(__file__)) - command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', + python_interpreter = sys.executable + command = [python_interpreter, 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] - process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL) + process = Popen(command, cwd=dir_path) + print("\nStarted aggregator process with PID: {0}\n".format(process.pid)) return process @@ -186,11 +193,23 @@ class AggregatorController(object): :param process: the process to terminate """ - # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value - if process is not None and process.poll() is None: + # check if the process is started + if AggregatorController.is_process_running(process): process.terminate() print("\nStopped aggregator process with PID: {0}\n".format(process.pid)) + @staticmethod + def is_process_running(process): + """ + Checks if a process is running. + + :param process: the Popen object to check + :return: True if running, False otherwise + """ + + # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value + return process is not None and process.poll() is None + @view_defaults(route_name='round_trip_time_query', renderer='json') class RoundTripTimeQuery(object): diff --git a/src/service/development.ini b/src/service/development.ini index 269aea6..84d3a8c 100644 --- a/src/service/development.ini +++ b/src/service/development.ini @@ -12,7 +12,8 @@ pyramid.debug_notfound = false pyramid.debug_routematch = false pyramid.default_locale_name = en pyramid.includes = pyramid_debugtoolbar -aggregator_running = false + +## Aggregator default configuration aggregator_report_period = 5 aggregator_database_name = CLMCMetrics aggregator_database_url = http://172.40.231.51:8086 diff --git a/src/service/production.ini b/src/service/production.ini index 62035f0..d56e037 100644 --- a/src/service/production.ini +++ b/src/service/production.ini @@ -12,8 +12,7 @@ pyramid.debug_notfound = false pyramid.debug_routematch = false pyramid.default_locale_name = en -## Aggregator configuration -aggregator_running = false +## Aggregator default configuration aggregator_report_period = 5 aggregator_database_name = CLMCMetrics aggregator_database_url = http://172.40.231.51:8086 diff --git a/src/test/clmctest/monitoring/E2ETestAggregatorThread.py b/src/test/clmctest/monitoring/E2ETestAggregatorThread.py index a6c0097..2b97976 100644 --- a/src/test/clmctest/monitoring/E2ETestAggregatorThread.py +++ b/src/test/clmctest/monitoring/E2ETestAggregatorThread.py @@ -31,7 +31,7 @@ class TestAggregator(Thread): REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'CLMCMetrics' # default database the aggregator uses - DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses + DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): """ -- GitLab