diff --git a/scripts/clmc-service/install.sh b/scripts/clmc-service/install.sh index 2c25e7212b31ba3ce8615f60acce0c220d06b31c..49283376d927da3823a2f14839cd4cd5d479cdf8 100755 --- a/scripts/clmc-service/install.sh +++ b/scripts/clmc-service/install.sh @@ -139,6 +139,16 @@ if [ $? -ne 0 ] ; then exit 1 fi +# running tests using tox +echo "----> Running tox" +TOX_OUTPUT="$(tox)" +# check if tox output contains the 'congratulations :)' bit for tests passed +if [[ $TOX_OUTPUT != *"congratulations :)"* ]]; then + echo "CLMC service unit tests failed." + exit 1 +fi +echo "----> Tox execution of unit tests passed successfully" + # install the service echo "----> Installing CLMC web service" pip3 install . @@ -147,6 +157,10 @@ if [ $? -ne 0 ] ; then exit 1 fi +# create directory for CLMC service logs +echo "----> Creating CLMC web service log directory" +sudo mkdir /var/log/clmcservice + # Install minioclmc as systemctl service # ----------------------------------------------------------------------- mkdir -p /opt/flame/clmc diff --git a/src/service/clmcservice/__init__.py b/src/service/clmcservice/__init__.py index bd560622e940ec78c2e6b7f5daaa340c325170e1..3473f9c9e91d777883c4a01d8a703f99643b677e 100644 --- a/src/service/clmcservice/__init__.py +++ b/src/service/clmcservice/__init__.py @@ -22,8 +22,6 @@ """ from pyramid.config import Configurator -from pyramid.settings import asbool - from clmcservice.utilities import RUNNING_FLAG, MALFORMED_FLAG @@ -33,9 +31,6 @@ def main(global_config, **settings): """ # a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings - aggregator_running = asbool(settings.get(RUNNING_FLAG, False)) - settings[RUNNING_FLAG] = asbool(aggregator_running) - aggregator_report_period = int(settings.get('aggregator_report_period', 5)) settings['aggregator_report_period'] = aggregator_report_period diff --git a/src/service/clmcservice/aggregator.py b/src/service/clmcservice/aggregator.py index ab855d991610fccccd43b9cb9da74ebcca5ce586..1f796dd10f00333312b2e62fd60d7f6d90f3ce1e 100644 --- a/src/service/clmcservice/aggregator.py +++ b/src/service/clmcservice/aggregator.py @@ -28,7 +28,7 @@ from time import time, sleep from urllib.parse import urlparse from clmcservice.utilities import generate_e2e_delay_report import getopt -import sys +import logging class Aggregator(object): @@ -38,10 +38,9 @@ class Aggregator(object): REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'CLMCMetrics' # default database the aggregator uses - DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses - RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx + DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses - def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): + def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD, logger=None): """ Constructs an Aggregator instance. @@ -50,15 +49,16 @@ class Aggregator(object): :param report_period: the report period in seconds """ + if logger is None: + self.log = logging.getLogger(__name__) + else: + self.log = logger + + self.log.info("Connecting to Influx database {0} with URL {1}".format(database_name, database_url)) # initialise a database client using the database url and the database name - print("Creating InfluxDB Connection") url_object = urlparse(database_url) - while True: - try: - self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) - break - except: - sleep(self.RETRY_PERIOD) + self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10) + self.log.info("Successfully connected to Influx database {0} with URL {1}".format(database_name, database_url)) self.db_url = database_url self.db_name = database_name @@ -76,6 +76,7 @@ class Aggregator(object): Stop the aggregator from running. """ + self.log.info("Aggregator's stop flag has been set.") self._stop_flag.set() def run(self): @@ -83,10 +84,11 @@ class Aggregator(object): Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds. """ - print("Running aggregator") + self.log.info("Aggregator started running.") current_time = int(time()) while not self._stop_flag.is_set(): + self.log.info("Trying to generate an E2E measurement.") boundary_time = current_time - self.report_period @@ -95,19 +97,9 @@ class Aggregator(object): # query the network delays and group them by path ID network_delays = {} - - while True: - - try: - print("Query for network delays") - result = self.db_client.query( - 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "{0}"."autogen"."network_delays" WHERE time >= {1} and time < {2} GROUP BY path, source, target'.format(self.db_name, - boundary_time_nano, current_time_nano)) - break - except Exception as e: - print("Exception getting network delay") - print(e) - sleep(self.RETRY_PERIOD) + result = self.db_client.query( + 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "{0}"."autogen"."network_delays" WHERE time >= {1} and time < {2} GROUP BY path, source, target'.format( + self.db_name, boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item @@ -120,18 +112,9 @@ class Aggregator(object): # query the service delays and group them by endpoint, service function instance and sfr service_delays = {} - - while True: - try: - print("Query for service delays") - result = self.db_client.query( - 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name, - boundary_time_nano, current_time_nano)) - break - except Exception as e: - print("Exception getting service delay") - print(e) - sleep(self.RETRY_PERIOD) + result = self.db_client.query( + 'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name, + boundary_time_nano, current_time_nano)) for item in result.items(): metadata, result_points = item @@ -178,18 +161,15 @@ class Aggregator(object): # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row if None not in e2e_arguments.values(): - - while True: - try: - self.db_client.write_points( - generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], - e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], - e2e_arguments['delay_service'], - e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], - e2e_arguments['time'])) - break - except: - sleep(self.RETRY_PERIOD) + self.db_client.write_points( + generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], + e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], + e2e_arguments['delay_service'], + e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], + e2e_arguments['time'])) + self.log.info("Successfully generated an E2E measurement and posted back to Influx.") + else: + self.log.info("Couldn't generate an E2E measurement although some of the data could be fetched.") old_timestamp = current_time # wait until {report_period) seconds have passed @@ -197,8 +177,31 @@ class Aggregator(object): sleep(1) current_time = int(time()) + self.log.info("Aggregator stopped running.") + if __name__ == '__main__': + # initialise a file logger, only when module's main method is run (NOT when aggregator class is imported somewhere else) + log = logging.getLogger('aggregator') + hdlr = logging.FileHandler('/var/log/clmcservice/aggregator.log', mode='a') + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + hdlr.setFormatter(formatter) + log.addHandler(hdlr) + log.setLevel(logging.DEBUG) + + # log all errors that are thrown in the execution of the aggregator with the logger object initialized above + import sys + import traceback + + def report_error(error_type, error_value, error_traceback, log_object=log): + log_object.error("Uncaught error thrown!") + log_object.error("Error type: {0}".format(error_type)) + log_object.error("Error value: {0}".format(error_value)) + log_object.debug("Error traceback:") + for trace in traceback.format_tb(error_traceback): + log_object.debug(trace) + + sys.excepthook = report_error # Parse command line options try: @@ -217,9 +220,9 @@ if __name__ == '__main__': elif opt in ('-u', '--url'): arg_database_url = arg - Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run() + Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period, logger=log).run() - # print the error messages in case of a parse error + # log.info the error messages in case of a parse error except getopt.GetoptError as err: - print(err) - print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>') + log.info(err) + log.info('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>') diff --git a/src/service/clmcservice/tests.py b/src/service/clmcservice/tests.py index 581608e0758f2b5c009ee1528b6486e6e41e8c63..d9980cb7ff8767e61842269d24763a287fbe255e 100644 --- a/src/service/clmcservice/tests.py +++ b/src/service/clmcservice/tests.py @@ -99,9 +99,9 @@ class TestAggregatorAPI(object): :param output_value: the expected output value, None for expecting an Exception """ - from clmcservice.views import AggregatorConfig # nested import so that importing the class view is part of the test itself + from clmcservice.views import AggregatorConfig, AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." assert self.config.get_settings().get('aggregator_database_name') == 'CLMCMetrics', "Initial database name the aggregator uses is CLMCMetrics." assert self.config.get_settings().get('aggregator_database_url') == "http://172.40.231.51:8086", "Initial aggregator url is http://172.40.231.51:8086" @@ -116,7 +116,7 @@ class TestAggregatorAPI(object): for attribute in CONFIG_ATTRIBUTES: assert self.config.get_settings().get(attribute) == output_value.get(attribute), "Aggregator settings configuration is not updated." - assert not self.config.get_settings().get(RUNNING_FLAG), "Aggregator running status should not be updated after a configuration update." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Aggregator running status should not be updated after a configuration update." else: error_raised = False try: @@ -133,7 +133,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." request = testing.DummyRequest() @@ -142,7 +142,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been started." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been started." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been started." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." # kill the started process after the test is over @@ -156,7 +156,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # send a start request to trigger the aggregator @@ -165,6 +165,7 @@ class TestAggregatorAPI(object): request.body = input_body.encode(request.charset) AggregatorController(request).put() assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "Aggregator process should have been initialized." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Aggregator process should have been initialized." # test stopping the aggregator process when it is running request = testing.DummyRequest() @@ -173,7 +174,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." - assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been stopped." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." sleep(2) # put a 2 seconds timeout so that the aggregator process can terminate @@ -185,7 +186,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." - assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been stopped." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Aggregator process should have been terminated." def test_restart(self): @@ -195,7 +196,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # test restarting the aggregator process when it is stopped @@ -205,7 +206,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been restarted." assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." # test restarting the aggregator process when it is running @@ -215,7 +216,7 @@ class TestAggregatorAPI(object): response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been restarted." assert self.config.get_settings().get(PROCESS_ATTRIBUTE), "The aggregator process should have been reinitialised." # kill the started process after the test is over @@ -238,7 +239,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." # test restarting the aggregator process when it is running @@ -261,7 +262,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." request = testing.DummyRequest() @@ -269,12 +270,17 @@ class TestAggregatorAPI(object): assert response == {'aggregator_running': False}, "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." - assert not self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "A GET request must not modify the aggregator status flag." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." # test status with malformed configuration + # start the aggregator + request = testing.DummyRequest() + input_body = '{"action": "start"}' + request.body = input_body.encode(request.charset) + AggregatorController(request).put() self.config.get_settings()[MALFORMED_FLAG] = True - self.config.get_settings()[RUNNING_FLAG] = True + request = testing.DummyRequest() response = AggregatorController(request).get() @@ -283,9 +289,13 @@ class TestAggregatorAPI(object): 'comment': 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'}, \ "Response must be a dictionary representing a JSON object with the correct status data of the aggregator." - assert self.config.get_settings().get(RUNNING_FLAG), "A GET request must not modify the aggregator status flag." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "A GET request must not modify the aggregator status flag." assert self.config.get_settings().get(MALFORMED_FLAG), "A GET request must not modify the aggregator malformed flag." - assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "A GET request must not start the aggregator process." + assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "A GET request must not stop the aggregator process." + + # kill the started process after the test is over + pid = request.registry.settings[PROCESS_ATTRIBUTE].pid + os.kill(pid, signal.SIGTERM) def test_malformed_flag_behaviour(self): """ @@ -294,7 +304,7 @@ class TestAggregatorAPI(object): from clmcservice.views import AggregatorController, AggregatorConfig # nested import so that importing the class view is part of the test itself - assert not self.config.get_settings().get(RUNNING_FLAG), "Initially aggregator is not running." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "Initially aggregator is not running." assert not self.config.get_settings().get(MALFORMED_FLAG), "Initially aggregator is not in a malformed state" assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "Initially no aggregator process is running." assert self.config.get_settings().get('aggregator_report_period') == 5, "Initial report period is 5 seconds." @@ -318,7 +328,7 @@ class TestAggregatorAPI(object): response = AggregatorConfig(request).put() assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator shouldn't be stopped when the configuration is updated." assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." @@ -336,7 +346,7 @@ class TestAggregatorAPI(object): request.body = input_body.encode(request.charset) response = AggregatorController(request).put() assert response == {RUNNING_FLAG: True}, "The aggregator should have been restarted." - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been restarted." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been restarted." assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator should have been restarted." @@ -349,7 +359,7 @@ class TestAggregatorAPI(object): response = AggregatorConfig(request).put() assert response == output_body, "Response of PUT request must include the new configuration of the aggregator" - assert self.config.get_settings().get(RUNNING_FLAG), "The aggregator shouldn't be stopped when the configuration is updated." + assert AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator shouldn't be stopped when the configuration is updated." assert self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should be set when the configuration is updated while the process is running." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is not None, "The aggregator shouldn't be stopped when the configuration is updated." @@ -360,7 +370,7 @@ class TestAggregatorAPI(object): request.body = input_body.encode(request.charset) response = AggregatorController(request).put() assert response == {RUNNING_FLAG: False}, "The aggregator should have been stopped." - assert not self.config.get_settings().get(RUNNING_FLAG), "The aggregator should have been stopped." + assert not AggregatorController.is_process_running(self.config.get_settings().get(PROCESS_ATTRIBUTE)), "The aggregator should have been stopped." assert not self.config.get_settings().get(MALFORMED_FLAG), "The malformed flag should have been reset to False." assert self.config.get_settings().get(PROCESS_ATTRIBUTE) is None, "The aggregator should have been stopped." diff --git a/src/service/clmcservice/views.py b/src/service/clmcservice/views.py index 049be9daacb187ea3ecfb837f9c86a75a3660804..2671064039a9338060b917720b5c305ffd9dafc6 100644 --- a/src/service/clmcservice/views.py +++ b/src/service/clmcservice/views.py @@ -25,10 +25,16 @@ from pyramid.view import view_defaults from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError from influxdb import InfluxDBClient from urllib.parse import urlparse -from subprocess import Popen, DEVNULL +from subprocess import Popen from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \ CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE +import os import os.path +import sys +import logging + + +log = logging.getLogger('service_logger') @view_defaults(route_name='aggregator_config', renderer='json') @@ -77,7 +83,7 @@ class AggregatorConfig(object): # if configuration is not already malformed, check whether the configuration is updated if not self.request.registry.settings[MALFORMED_FLAG]: - malformed = old_config != new_config and self.request.registry.settings[RUNNING_FLAG] + malformed = old_config != new_config and AggregatorController.is_process_running(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) self.request.registry.settings[MALFORMED_FLAG] = malformed if malformed: new_config[MALFORMED_FLAG] = True @@ -113,9 +119,12 @@ class AggregatorController(object): """ aggregator_data = self.request.registry.settings - config = {RUNNING_FLAG: aggregator_data.get(RUNNING_FLAG)} + aggregator_process = aggregator_data.get(PROCESS_ATTRIBUTE) + aggregator_running = self.is_process_running(aggregator_process) + + config = {RUNNING_FLAG: aggregator_running} - if aggregator_data[MALFORMED_FLAG] and aggregator_data[RUNNING_FLAG]: + if aggregator_data[MALFORMED_FLAG] and aggregator_running: config[MALFORMED_FLAG] = True config[COMMENT_ATTRIBUTE] = COMMENT_VALUE @@ -138,25 +147,25 @@ class AggregatorController(object): action = content['action'] + aggregator_running = self.is_process_running(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) if action == 'start': - aggregator_started = self.request.registry.settings[RUNNING_FLAG] - if not aggregator_started: + if not aggregator_running: process = self.start_aggregator(config) - self.request.registry.settings[RUNNING_FLAG] = True + aggregator_running = True self.request.registry.settings[PROCESS_ATTRIBUTE] = process elif action == 'stop': self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) - self.request.registry.settings[RUNNING_FLAG] = False + aggregator_running = False self.request.registry.settings[PROCESS_ATTRIBUTE] = None self.request.registry.settings[MALFORMED_FLAG] = False elif action == 'restart': self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE)) process = self.start_aggregator(config) - self.request.registry.settings[RUNNING_FLAG] = True + aggregator_running = True self.request.registry.settings[PROCESS_ATTRIBUTE] = process self.request.registry.settings[MALFORMED_FLAG] = False - return {RUNNING_FLAG: self.request.registry.settings.get(RUNNING_FLAG)} + return {RUNNING_FLAG: aggregator_running} except AssertionError: raise HTTPBadRequest('Bad request content - must be in JSON format: {"action": value}, where value is "start", "stop" or "restart".') @@ -171,10 +180,12 @@ class AggregatorController(object): """ dir_path = os.path.dirname(os.path.realpath(__file__)) - command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', + python_interpreter = sys.executable + command = [python_interpreter, 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database', config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')] - process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL) - print("\nStarted aggregator process with PID: {0}\n".format(process.pid)) + process = Popen(command, cwd=dir_path) + + log.info("\nStarted aggregator process with PID: {0}\n".format(process.pid)) return process @@ -186,10 +197,22 @@ class AggregatorController(object): :param process: the process to terminate """ - # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value - if process is not None and process.poll() is None: + # check if the process is started + if AggregatorController.is_process_running(process): process.terminate() - print("\nStopped aggregator process with PID: {0}\n".format(process.pid)) + log.info("\nStopped aggregator process with PID: {0}\n".format(process.pid)) + + @staticmethod + def is_process_running(process): + """ + Checks if a process is running. + + :param process: the Popen object to check + :return: True if running, False otherwise + """ + + # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value + return process is not None and process.poll() is None @view_defaults(route_name='round_trip_time_query', renderer='json') @@ -235,7 +258,7 @@ class RoundTripTimeQuery(object): db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=influx_db_name, timeout=10) query = 'SELECT mean(*) FROM "{0}"."autogen"."e2e_delays" WHERE time >= {1} and time < {2} and sf_instance = \'{3}\''.format( influx_db_name, start_timestamp, end_timestamp, media_service) - print(query) + log.info("Executing query: {0}".format(query)) result = db_client.query(query) actual_result = next(result.get_points(), None) @@ -251,8 +274,14 @@ class RoundTripTimeQuery(object): rtt = self.calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth) return {"result": rtt} - except: - raise HTTPInternalServerError("Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url)) + except Exception as e: + msg = "Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url) + log.info(msg) + log.error(type(e)) + log.error(e) + log.error(e.args) + + raise HTTPInternalServerError(msg) except AssertionError: raise HTTPBadRequest('Bad request content - must be in JSON format: {"media_service": value, "start_timestamp": value, "end_timestamp": value}.') diff --git a/src/service/development.ini b/src/service/development.ini index 269aea605df38ab006cac8a850e5556770cfef90..b4bc0916607728489c7b2584dd3fa67157baeb40 100644 --- a/src/service/development.ini +++ b/src/service/development.ini @@ -11,8 +11,10 @@ pyramid.debug_authorization = false pyramid.debug_notfound = false pyramid.debug_routematch = false pyramid.default_locale_name = en -pyramid.includes = pyramid_debugtoolbar -aggregator_running = false +pyramid.includes = pyramid_debugtoolbar pyramid_exclog +exclog.ignore = + +## Aggregator default configuration aggregator_report_period = 5 aggregator_database_name = CLMCMetrics aggregator_database_url = http://172.40.231.51:8086 @@ -35,22 +37,27 @@ listen = localhost:9080 ### [loggers] -keys = root, clmcservice +keys = root, exc_logger, clmcservice [handlers] -keys = console +keys = console, filelog, exc_handler [formatters] -keys = generic +keys = generic, exc_formatter [logger_root] level = INFO handlers = console [logger_clmcservice] -level = DEBUG -handlers = -qualname = clmcservice +level = INFO +handlers = filelog +qualname = service_logger + +[logger_exc_logger] +level = ERROR +handlers = exc_handler +qualname = exc_logger [handler_console] class = StreamHandler @@ -58,5 +65,20 @@ args = (sys.stderr,) level = NOTSET formatter = generic +[handler_filelog] +class = FileHandler +args = ('/var/log/clmcservice/service.log','a') +level = NOTSET +formatter = generic + +[handler_exc_handler] +class = FileHandler +args = ('/var/log/clmcservice/service-exceptions.log', 'a') +level = ERROR +formatter = exc_formatter + [formatter_generic] format = %(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s + +[formatter_exc_formatter] +format = %(asctime)s %(message)s \ No newline at end of file diff --git a/src/service/production.ini b/src/service/production.ini index 62035f0a37ef3ef978a72ed0cb04e68e69de81bf..fbb9d4e0b38d5c53a30d9f0aae2473101a442383 100644 --- a/src/service/production.ini +++ b/src/service/production.ini @@ -11,9 +11,10 @@ pyramid.debug_authorization = false pyramid.debug_notfound = false pyramid.debug_routematch = false pyramid.default_locale_name = en +pyramid.includes = pyramid_exclog +exclog.ignore = -## Aggregator configuration -aggregator_running = false +## Aggregator default configuration aggregator_report_period = 5 aggregator_database_name = CLMCMetrics aggregator_database_url = http://172.40.231.51:8086 @@ -32,22 +33,27 @@ listen = *:9080 ### [loggers] -keys = root, clmcservice +keys = root, exc_logger, clmcservice [handlers] -keys = console +keys = console, filelog, exc_handler [formatters] -keys = generic +keys = generic, exc_formatter [logger_root] -level = WARN +level = INFO handlers = console [logger_clmcservice] -level = WARN -handlers = -qualname = clmcservice +level = INFO +handlers = filelog +qualname = service_logger + +[logger_exc_logger] +level = ERROR +handlers = exc_handler +qualname = exc_logger [handler_console] class = StreamHandler @@ -55,5 +61,20 @@ args = (sys.stderr,) level = NOTSET formatter = generic +[handler_filelog] +class = FileHandler +args = ('/var/log/clmcservice/service.log','a') +level = NOTSET +formatter = generic + +[handler_exc_handler] +class = FileHandler +args = ('/var/log/clmcservice/service-exceptions.log', 'a') +level = ERROR +formatter = exc_formatter + [formatter_generic] format = %(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s + +[formatter_exc_formatter] +format = %(asctime)s %(message)s \ No newline at end of file diff --git a/src/service/setup.py b/src/service/setup.py index 0b195c5433419b000bf0ff63abf59f9d01e00123..802f7416861d09a25760c857e16b9898fc8c2243 100644 --- a/src/service/setup.py +++ b/src/service/setup.py @@ -44,6 +44,7 @@ requires = [ 'plaster_pastedeploy', 'pyramid', 'pyramid_debugtoolbar', + 'pyramid_exclog', 'waitress', 'influxdb', 'pytest', diff --git a/src/test/clmctest/monitoring/E2ETestAggregatorThread.py b/src/test/clmctest/monitoring/E2ETestAggregatorThread.py index a6c00976b7650d991ddb10bc8e8583dfd36bd32f..2b979760b0f08e7261569dd88323abc7c3a9e16a 100644 --- a/src/test/clmctest/monitoring/E2ETestAggregatorThread.py +++ b/src/test/clmctest/monitoring/E2ETestAggregatorThread.py @@ -31,7 +31,7 @@ class TestAggregator(Thread): REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated DATABASE = 'CLMCMetrics' # default database the aggregator uses - DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses + DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD): """