From 8243c06d670f158cceefac2c96cebb6dd10d864f Mon Sep 17 00:00:00 2001 From: Michael Boniface <mjb@it-innovation.soton.ac.uk> Date: Fri, 11 May 2018 13:41:23 +0100 Subject: [PATCH] fixed the monitoring to be culmulative over the entire duration of the sampling --- clmctest/inputs/test_config_collector.py | 105 ++++++++++++----------- clmctest/services/nginx/install.sh | 4 +- src/monitoring/config_collector.py | 66 +++++++------- src/monitoring/systemctl_monitor.py | 26 +++--- 4 files changed, 105 insertions(+), 96 deletions(-) diff --git a/clmctest/inputs/test_config_collector.py b/clmctest/inputs/test_config_collector.py index 8d13b31..3513c90 100644 --- a/clmctest/inputs/test_config_collector.py +++ b/clmctest/inputs/test_config_collector.py @@ -39,9 +39,6 @@ samples = [[['active', 0], ['active', 2]], [['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]], [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]] -sample_set = 0 -current_index = 0 - def get_sample_test(): global sample_set global current_index @@ -56,48 +53,59 @@ def get_sample_test(): def write_output(report): print("Writing report output {0}".format(report)) - -def test_aggregation(): - t = ConfigCollector(get_sample_test, write_output) +sample_set = 0 +current_index = 0 + +def test_agg(): + t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[0], 10, 12) + assert report['fields']['current_state'] == 'active' + assert report['fields']['current_state_time'] == 12 + assert report['fields']['active_sum'] == 12 + assert report['fields']['active_count'] == 1 + assert report['time'] == 12 + t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[1], 10, 14) - assert report['current_state'] == 'active' - assert report['current_state_time'] == 14 - assert report['agg_states']['active']['dur'] == 4 - assert report['agg_states']['active']['count'] == 0 + assert report['fields']['current_state'] == 'active' + assert report['fields']['current_state_time'] == 14 + assert report['fields']['active_sum'] == 14 + assert report['fields']['active_count'] == 1 assert report['time'] == 14 + t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[2], 8, 10) - assert report['current_state'] == 'failed' - assert report['current_state_time'] == 0 - assert report['agg_states']['active']['dur'] == 2 - assert report['agg_states']['active']['count'] == 0 - assert report['agg_states']['failed']['dur'] == 0 - assert report['agg_states']['failed']['count'] == 1 + assert report['fields']['current_state'] == 'failed' + assert report['fields']['current_state_time'] == 0 + assert report['fields']['active_sum'] == 2 + assert report['fields']['active_count'] == 1 + assert report['fields']['failed_sum'] == 0 + assert report['fields']['failed_count'] == 1 assert report['time'] == 10 + t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[3], 2, 12) - assert report['current_state'] == 'inactive' - assert report['current_state_time'] == 0 - assert report['agg_states']['active']['dur'] == 6 - assert report['agg_states']['active']['count'] == 1 - assert report['agg_states']['inactive']['dur'] == 2 - assert report['agg_states']['inactive']['count'] == 2 - assert report['agg_states']['failed']['dur'] == 2 - assert report['agg_states']['failed']['count'] == 1 + assert report['fields']['current_state'] == 'inactive' + assert report['fields']['current_state_time'] == 0 + assert report['fields']['active_sum'] == 6 + assert report['fields']['active_count'] == 2 + assert report['fields']['inactive_sum'] == 2 + assert report['fields']['inactive_count'] == 2 + assert report['fields']['failed_sum'] == 2 + assert report['fields']['failed_count'] == 1 assert report['time'] == 12 + t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[4], 4, 14) - assert report['current_state'] == 'failed' - assert report['current_state_time'] == 0 - assert report['agg_states']['active']['dur'] == 4 - assert report['agg_states']['active']['count'] == 1 - assert report['agg_states']['inactive']['dur'] == 4 - assert report['agg_states']['inactive']['count'] == 2 - assert report['agg_states']['failed']['dur'] == 2 - assert report['agg_states']['failed']['count'] == 2 + assert report['fields']['current_state'] == 'failed' + assert report['fields']['current_state_time'] == 0 + assert report['fields']['active_sum'] == 4 + assert report['fields']['active_count'] == 2 + assert report['fields']['inactive_sum'] == 4 + assert report['fields']['inactive_count'] == 2 + assert report['fields']['failed_sum'] == 2 + assert report['fields']['failed_count'] == 2 assert report['time'] == 14 def test_one_period_collection(): @@ -112,10 +120,10 @@ def test_one_period_collection(): time.sleep(8) t.stop() print("Current report: {0}".format(str(t.current_report))) - assert t.current_report['current_state'] == 'active' - assert int(round(t.current_report['current_state_time'])) == 6 - assert int(round(t.current_report['agg_states']['active']['dur'])) == 6 - assert int(round(t.current_report['agg_states']['active']['count'])) == 0 + assert t.current_report['fields']['current_state'] == 'active' + assert int(round(t.current_report['fields']['current_state_time'])) == 6 + assert int(round(t.current_report['fields']['active_sum'])) == 6 + assert int(round(t.current_report['fields']['active_count'])) == 1 def test_multi_period_single_state_collection(): global sample_set @@ -128,11 +136,12 @@ def test_multi_period_single_state_collection(): time.sleep(7) t.stop() print("Current report: {0}".format(str(t.current_report))) - assert t.current_report['current_state'] == 'active' - assert int(round(t.current_report['current_state_time'])) == 6 - assert int(round(t.current_report['agg_states']['active']['dur'])) == 3 - assert int(round(t.current_report['agg_states']['active']['count'])) == 0 + assert t.current_report['fields']['current_state'] == 'active' + assert int(round(t.current_report['fields']['current_state_time'])) == 6 + assert int(round(t.current_report['fields']['active_sum'])) == 6 + assert int(round(t.current_report['fields']['active_count'])) == 1 +# [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]] def test_multi_period_multi_state_collection(): global sample_set global current_index @@ -144,11 +153,11 @@ def test_multi_period_multi_state_collection(): time.sleep(13) t.stop() print("Current report: {0}".format(str(t.current_report))) - assert t.current_report['current_state'] == 'failed' - assert int(round(t.current_report['current_state_time'])) == 0 - assert int(round(t.current_report['agg_states']['active']['dur'])) == 4 - assert int(round(t.current_report['agg_states']['active']['count'])) == 1 - assert int(round(t.current_report['agg_states']['inactive']['dur'])) == 4 - assert int(round(t.current_report['agg_states']['inactive']['count'])) == 2 - assert int(round(t.current_report['agg_states']['failed']['dur'])) == 2 - assert int(round(t.current_report['agg_states']['failed']['count'])) == 2 \ No newline at end of file + assert t.current_report['fields']['current_state'] == 'failed' + assert int(round(t.current_report['fields']['current_state_time'])) == 0 + assert int(round(t.current_report['fields']['active_sum'])) == 4 + assert int(round(t.current_report['fields']['active_count'])) == 2 + assert int(round(t.current_report['fields']['inactive_sum'])) == 4 + assert int(round(t.current_report['fields']['inactive_count'])) == 2 + assert int(round(t.current_report['fields']['failed_sum'])) == 2 + assert int(round(t.current_report['fields']['failed_count'])) == 2 \ No newline at end of file diff --git a/clmctest/services/nginx/install.sh b/clmctest/services/nginx/install.sh index edeeac8..134cae3 100755 --- a/clmctest/services/nginx/install.sh +++ b/clmctest/services/nginx/install.sh @@ -65,8 +65,8 @@ echo "After=network-online.target" >> $svc_file echo "" >> $svc_file echo "[Service]" >> $svc_file echo "WorkingDirectory=${inst}/${dir}" >> $svc_file -echo "ExecStart=/usr/bin/python3 /vagrant/src/monitoring/systemctl_monitor.py -service nginx -rate 2 -agg 10 -host localhost -port 8186 -db CLMCMetrics" >> $svc_file -echo "ExecStop=/usr/bin/bash /vagrant/src/monitoring/stop_systemctl_monitor.sh" >> $svc_file +echo "ExecStart=/usr/bin/python3 ${REPO_ROOT}/src/monitoring/systemctl_monitor.py -service nginx -rate 2 -agg 10 -host localhost -port 8186 -db CLMCMetrics" >> $svc_file +echo "ExecStop=/usr/bin/bash ${REPO_ROOT}/src/monitoring/stop_systemctl_monitor.sh" >> $svc_file echo "" >> $svc_file echo "[Install]" >> $svc_file echo "WantedBy=network-online.target" >> $svc_file diff --git a/src/monitoring/config_collector.py b/src/monitoring/config_collector.py index 5e6a9e7..1326673 100644 --- a/src/monitoring/config_collector.py +++ b/src/monitoring/config_collector.py @@ -19,6 +19,7 @@ class ConfigCollector(threading.Thread): self.sample_rate = sample_rate self.agg_period = agg_period self.samples = [] + self.agg_states = {} self.states = {} self.current_report = {} return @@ -51,7 +52,7 @@ class ConfigCollector(threading.Thread): # write output self.output_func(self.current_report) # set time in current state - current_state_time = self.current_report['current_state_time'] + current_state_time = self.current_report['fields']['current_state_time'] # remove all processed samples self.samples.clear() # add last sample as 1st sample of the next period @@ -131,41 +132,46 @@ class ConfigCollector(threading.Thread): logger.debug("States: {0}".format(str(states))) - # calc the total duration and number of transitions in each state. - # Assuming no transition into the 1st state. - # Assuming - - agg_states = {} - for index, state in enumerate(states): - if index == 0: - agg_states[states[0][self.STATE_NAME]] = {'dur':states[0][self.STATE_TIME], 'count': 0} - else: - # if first time we've seen the state add to dict with initial duration and a single transition - if state[self.STATE_NAME] not in agg_states: - agg_states[state[self.STATE_NAME]] = {'dur':state[self.STATE_TIME], 'count': 1} - logger.debug("Adding state: {0}, Count: {1}".format(state[self.STATE_NAME], 1)) - else: - logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME])) - # add state time to aggregate total - agg_states[state[self.STATE_NAME]]['dur'] += state[self.STATE_TIME] - logger.debug("Duration: {0}".format(agg_states[state[self.STATE_NAME]]['dur'])) - # increment number of times in the state - agg_states[state[self.STATE_NAME]]['count'] += 1 - logger.debug("Count: {0}".format(agg_states[state[self.STATE_NAME]]['count'])) - - # set the current state as the last state - report['current_state'] = states[-1][0] - # calc time in current state - state_count = len(agg_states) + logger.debug("Initial state time : {0}".format(initial_state_time)) + # set the current state as the last state sampled + current_state = states[-1][self.STATE_NAME] + # calc time in current state # if no change in state then take the initial time in state and add the current state time - if initial_state == states[-1][0] and state_count == 1: + if initial_state == current_state and len(states) == 1: + logger.debug("No transition so just adding last state to current state") current_state_time = initial_state_time + states[-1][self.STATE_TIME] + state_sum_key = current_state + "_sum" + state_count_key = current_state + "_count" + # initialise the number of transitions if it's the 1st time + if state_sum_key not in self.agg_states: + self.agg_states[state_count_key] = 1 + self.agg_states[state_sum_key] = current_state_time else: # current state time is the last state time current_state_time = states[-1][self.STATE_TIME] - report['current_state_time'] = current_state_time + # calc the total duration and number of transitions in each state. + for index, state in enumerate(states): + # if first time we've seen the state add to dict with initial duration and a single transition + state_sum_key = state[self.STATE_NAME] + "_sum" + state_count_key = state[self.STATE_NAME] + "_count" + + if state_sum_key not in self.agg_states: + self.agg_states[state_sum_key] = state[self.STATE_TIME] + self.agg_states[state_count_key] = 1 + logger.debug("Adding state: {0}".format(state[self.STATE_NAME])) + else: + logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME])) + # increment number of times in the state + self.agg_states[state_count_key] += 1 + logger.debug("increment number of times in the state") + # add state time to aggregate total + self.agg_states[state_sum_key] += state[self.STATE_TIME] + logger.debug("Duration: {0}".format(self.agg_states[state_sum_key])) - report['agg_states'] = agg_states + # Create report + report['fields'] = self.agg_states + report['fields']['current_state'] = current_state + report['fields']['current_state_time'] = current_state_time report['time'] = current_time logger.debug("Report: {0}".format(str(report))) diff --git a/src/monitoring/systemctl_monitor.py b/src/monitoring/systemctl_monitor.py index 9b02af0..3007e0e 100644 --- a/src/monitoring/systemctl_monitor.py +++ b/src/monitoring/systemctl_monitor.py @@ -7,7 +7,7 @@ import urllib.parse from config_collector import ConfigCollector from influxdb import InfluxDBClient -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SystemctlMonitor: @@ -62,9 +62,9 @@ class SystemctlMonitor: def write_measurement(self, report): print("Writing report: {0}".format(str(report))) - try: - db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10) - measurement = self.create_measurement(report) + try: + db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10) + measurement = self.create_measurement(report) db_client.write_points(measurement) except Exception as e: print(e) @@ -76,19 +76,11 @@ class SystemctlMonitor: "tags": { "resource_name": self.service_name }, - "fields": { - "current_state": report['current_state'], - "current_state_time": report['current_state_time'], - }, "time": measurement_time }] - for key in report['agg_states']: - field_name = key + "_sum" - measurement[0]['fields'][field_name] = report['agg_states'][key]['dur'] - field_name = key + "_count" - measurement[0]['fields'][field_name] = report['agg_states'][key]['count'] - + measurement[0]['fields'] = report['fields'] + return measurement def main(): @@ -100,10 +92,12 @@ def main(): parser.add_argument('-host', help='telegraf hostname', required=True) parser.add_argument('-port', help='telegraf port', required=True) parser.add_argument('-db', help='database name', required=True) - + parser.add_argument('-debug', '--debug', action='store_true') args = parser.parse_args() - print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}".format(args.service, args.rate, args.agg, args.host, args.port, args.db)) + print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}, {5}".format(args.service, args.rate, args.agg, args.host, args.port, args.db)) + if(args.debug): + logger.setLevel(logging.DEBUG) mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db) mon.start() -- GitLab