diff --git a/clmctest/inputs/test_config_collector.py b/clmctest/inputs/test_config_collector.py index 3513c9009bc87d180b5e6c0756c6f7fd4940fd7b..50468ecaf88e0e43fb677eb5aa171db0174db293 100644 --- a/clmctest/inputs/test_config_collector.py +++ b/clmctest/inputs/test_config_collector.py @@ -51,113 +51,113 @@ def get_sample_test(): current_index = 0 return sample -def write_output(report): - print("Writing report output {0}".format(report)) +def write_output(measurement): + print("Writing measurement output {0}".format(measurement)) sample_set = 0 current_index = 0 def test_agg(): t = ConfigCollector(get_sample_test, write_output) - report = t.agg_samples(samples[0], 10, 12) - assert report['fields']['current_state'] == 'active' - assert report['fields']['current_state_time'] == 12 - assert report['fields']['active_sum'] == 12 - assert report['fields']['active_count'] == 1 - assert report['time'] == 12 + measurement = t.create_measurement(samples[0], 10, 12) + assert measurement['fields']['current_state'] == 'active' + assert measurement['fields']['current_state_time'] == 12 + assert measurement['fields']['active_sum'] == 12 + assert measurement['fields']['active_count'] == 1 + assert measurement['time'] == 12 t = ConfigCollector(get_sample_test, write_output) - report = t.agg_samples(samples[1], 10, 14) - assert report['fields']['current_state'] == 'active' - assert report['fields']['current_state_time'] == 14 - assert report['fields']['active_sum'] == 14 - assert report['fields']['active_count'] == 1 - assert report['time'] == 14 + measurement = t.create_measurement(samples[1], 10, 14) + assert measurement['fields']['current_state'] == 'active' + assert measurement['fields']['current_state_time'] == 14 + assert measurement['fields']['active_sum'] == 14 + assert measurement['fields']['active_count'] == 1 + assert measurement['time'] == 14 t = ConfigCollector(get_sample_test, write_output) - report = t.agg_samples(samples[2], 8, 10) - assert report['fields']['current_state'] == 'failed' - assert report['fields']['current_state_time'] == 0 - assert report['fields']['active_sum'] == 2 - assert report['fields']['active_count'] == 1 - assert report['fields']['failed_sum'] == 0 - assert report['fields']['failed_count'] == 1 - assert report['time'] == 10 + measurement = t.create_measurement(samples[2], 8, 10) + assert measurement['fields']['current_state'] == 'failed' + assert measurement['fields']['current_state_time'] == 0 + assert measurement['fields']['active_sum'] == 2 + assert measurement['fields']['active_count'] == 1 + assert measurement['fields']['failed_sum'] == 0 + assert measurement['fields']['failed_count'] == 1 + assert measurement['time'] == 10 t = ConfigCollector(get_sample_test, write_output) - report = t.agg_samples(samples[3], 2, 12) - assert report['fields']['current_state'] == 'inactive' - assert report['fields']['current_state_time'] == 0 - assert report['fields']['active_sum'] == 6 - assert report['fields']['active_count'] == 2 - assert report['fields']['inactive_sum'] == 2 - assert report['fields']['inactive_count'] == 2 - assert report['fields']['failed_sum'] == 2 - assert report['fields']['failed_count'] == 1 - assert report['time'] == 12 + measurement = t.create_measurement(samples[3], 2, 12) + assert measurement['fields']['current_state'] == 'inactive' + assert measurement['fields']['current_state_time'] == 0 + assert measurement['fields']['active_sum'] == 6 + assert measurement['fields']['active_count'] == 2 + assert measurement['fields']['inactive_sum'] == 2 + assert measurement['fields']['inactive_count'] == 2 + assert measurement['fields']['failed_sum'] == 2 + assert measurement['fields']['failed_count'] == 1 + assert measurement['time'] == 12 t = ConfigCollector(get_sample_test, write_output) - report = t.agg_samples(samples[4], 4, 14) - assert report['fields']['current_state'] == 'failed' - assert report['fields']['current_state_time'] == 0 - assert report['fields']['active_sum'] == 4 - assert report['fields']['active_count'] == 2 - assert report['fields']['inactive_sum'] == 4 - assert report['fields']['inactive_count'] == 2 - assert report['fields']['failed_sum'] == 2 - assert report['fields']['failed_count'] == 2 - assert report['time'] == 14 + measurement = t.create_measurement(samples[4], 4, 14) + assert measurement['fields']['current_state'] == 'failed' + assert measurement['fields']['current_state_time'] == 0 + assert measurement['fields']['active_sum'] == 4 + assert measurement['fields']['active_count'] == 2 + assert measurement['fields']['inactive_sum'] == 4 + assert measurement['fields']['inactive_count'] == 2 + assert measurement['fields']['failed_sum'] == 2 + assert measurement['fields']['failed_count'] == 2 + assert measurement['time'] == 14 def test_one_period_collection(): global sample_set global current_index - # one reporting period + # one measurementing period sample_set = 1 current_index = 0 t = ConfigCollector(get_sample_test, write_output, 2, 6) t.start() time.sleep(8) t.stop() - print("Current report: {0}".format(str(t.current_report))) - assert t.current_report['fields']['current_state'] == 'active' - assert int(round(t.current_report['fields']['current_state_time'])) == 6 - assert int(round(t.current_report['fields']['active_sum'])) == 6 - assert int(round(t.current_report['fields']['active_count'])) == 1 + print("Current measurement: {0}".format(str(t.current_measurement))) + assert t.current_measurement['fields']['current_state'] == 'active' + assert int(round(t.current_measurement['fields']['current_state_time'])) == 6 + assert int(round(t.current_measurement['fields']['active_sum'])) == 6 + assert int(round(t.current_measurement['fields']['active_count'])) == 1 def test_multi_period_single_state_collection(): global sample_set global current_index - # two reporting periods + # two measurementing periods sample_set = 1 current_index = 0 t = ConfigCollector(get_sample_test, write_output, 1, 3) t.start() time.sleep(7) t.stop() - print("Current report: {0}".format(str(t.current_report))) - assert t.current_report['fields']['current_state'] == 'active' - assert int(round(t.current_report['fields']['current_state_time'])) == 6 - assert int(round(t.current_report['fields']['active_sum'])) == 6 - assert int(round(t.current_report['fields']['active_count'])) == 1 + print("Current measurement: {0}".format(str(t.current_measurement))) + assert t.current_measurement['fields']['current_state'] == 'active' + assert int(round(t.current_measurement['fields']['current_state_time'])) == 6 + assert int(round(t.current_measurement['fields']['active_sum'])) == 6 + assert int(round(t.current_measurement['fields']['active_count'])) == 1 # [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]] def test_multi_period_multi_state_collection(): global sample_set global current_index - # 6 samples and 2 reporting periods + # 6 samples and 2 measurementing periods sample_set = 4 current_index = 0 t = ConfigCollector(get_sample_test, write_output, 2, 10) t.start() time.sleep(13) t.stop() - print("Current report: {0}".format(str(t.current_report))) - assert t.current_report['fields']['current_state'] == 'failed' - assert int(round(t.current_report['fields']['current_state_time'])) == 0 - assert int(round(t.current_report['fields']['active_sum'])) == 4 - assert int(round(t.current_report['fields']['active_count'])) == 2 - assert int(round(t.current_report['fields']['inactive_sum'])) == 4 - assert int(round(t.current_report['fields']['inactive_count'])) == 2 - assert int(round(t.current_report['fields']['failed_sum'])) == 2 - assert int(round(t.current_report['fields']['failed_count'])) == 2 \ No newline at end of file + print("Current measurement: {0}".format(str(t.current_measurement))) + assert t.current_measurement['fields']['current_state'] == 'failed' + assert int(round(t.current_measurement['fields']['current_state_time'])) == 0 + assert int(round(t.current_measurement['fields']['active_sum'])) == 4 + assert int(round(t.current_measurement['fields']['active_count'])) == 2 + assert int(round(t.current_measurement['fields']['inactive_sum'])) == 4 + assert int(round(t.current_measurement['fields']['inactive_count'])) == 2 + assert int(round(t.current_measurement['fields']['failed_sum'])) == 2 + assert int(round(t.current_measurement['fields']['failed_count'])) == 2 \ No newline at end of file diff --git a/clmctest/inputs/test_systemctl_mon.py b/clmctest/inputs/test_systemctl_mon.py index f8abbd8074594ea220ffe6e0066e35211341d46f..d57f69e69e9dd68358b5d6ca370f2b15885cca5b 100644 --- a/clmctest/inputs/test_systemctl_mon.py +++ b/clmctest/inputs/test_systemctl_mon.py @@ -30,11 +30,10 @@ import sys from systemctl_monitor import SystemctlMonitor -URL = "http://172.2.23.212" +URL = "localhost" PORT = "8186" DATABASE = "CLMCMetrics" - @pytest.mark.parametrize("service_name", [('nginx')]) def test_create_measurement(telegraf_agent_config, service_name): @@ -43,22 +42,17 @@ def test_create_measurement(telegraf_agent_config, service_name): if s['name'] == service_name: service = s continue - assert service != 'unknown', "{0} not in list of hosts".format(service_name) mon = SystemctlMonitor(service_name, 2, 10, service['ip_address'], 8186, service['database_name']) - - report = {'current_state': 'failed', 'time': 2, 'agg_states': {'inactive': {'dur': 3, 'count': 3}, 'active': {'dur': 4, 'count': 4}, 'failed': {'dur': 5, 'count': 5}}, 'current_state_time': 0.0} - + report = {'time': 1526042434.1773288, 'fields': {'loaded.active.running_sum': 231.85903143882751, 'current_state_time': 231.85903143882751, 'current_state': 'loaded.active.running', 'loaded.active.running_count': 1}} measurement = mon.create_measurement(report) - assert measurement[0]['tags']['resource_name'] == service_name - assert measurement[0]['fields']['current_state'] == report['current_state'] + assert measurement[0]['fields']['current_state'] == report['fields']['current_state'] def test_get_systemctl_status(telegraf_agent_config): mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE) state = mon.get_systemctl_status('nginx') - assert state == 'loaded.active.running' def test_monitor(telegraf_agent_config): @@ -66,6 +60,5 @@ def test_monitor(telegraf_agent_config): mon.start() time.sleep(21) mon.stop() - - report = mon.get_last_report() - print("Current report: {0}".format(str(report))) \ No newline at end of file + measurement = mon.get_current_measurement() + print("Current measurement: {0}".format(str(measurement))) \ No newline at end of file diff --git a/clmctest/services/nginx/install.sh b/clmctest/services/nginx/install.sh index 134cae32d69412425c9ecfae8fd3beb8d02c41ff..7080fe826162a59914e787d5a779579b57f5ffea 100755 --- a/clmctest/services/nginx/install.sh +++ b/clmctest/services/nginx/install.sh @@ -28,9 +28,6 @@ apt-get update yes Y | apt-get install nginx -sudo apt-get install python3 python3-pip -y -sudo python3 -m pip install pytest pyaml influxdb - # Need to set up basic stats as this not configured by default # http://nginx.org/en/docs/http/ngx_http_stub_status_module.html @@ -54,7 +51,10 @@ fi nginx -s reload systemctl start nginx -## install a configuration monitoring service +## install a configuration monitoring service, this needs to be in a venv with the rest of the CLMC +sudo apt-get install python3 python3-pip -y +sudo pip3 install pyaml influxdb + svc="nginxmon" echo "install systemctl monitoring service" diff --git a/src/monitoring/config_collector.py b/src/monitoring/config_collector.py index 1326673dcce8d2af9163c3fec101d2e1fe8b7b60..b1f5f38f67a386053c5c4c1785b70b568c38474c 100644 --- a/src/monitoring/config_collector.py +++ b/src/monitoring/config_collector.py @@ -5,23 +5,23 @@ import random import logging logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger(__name__) +logger = logging.getLogger() class ConfigCollector(threading.Thread): STATE_NAME = 0 STATE_TIME = 1 - def __init__(self, sample_func, output_func, sample_rate=2, agg_period=10): + def __init__(self, sample_func, write_func, sample_rate=2, agg_period=10): threading.Thread.__init__(self) self._start_event = threading.Event() self.sample_func = sample_func - self.output_func = output_func + self.write_func = write_func self.sample_rate = sample_rate self.agg_period = agg_period self.samples = [] self.agg_states = {} self.states = {} - self.current_report = {} + self.current_measurement = {} return def run(self): @@ -48,11 +48,12 @@ class ConfigCollector(threading.Thread): # if last sample was at the end of the aggregation period then process if sample_time >= end_period: # aggregate samples into single measurement - self.current_report = self.agg_samples(self.samples, current_state_time, sample_time) + self.current_measurement = self.create_measurement(self.samples, current_state_time, sample_time) # write output - self.output_func(self.current_report) + write_thread = WriteThread(self.write_func, self.current_measurement) + write_thread.start() # set time in current state - current_state_time = self.current_report['fields']['current_state_time'] + current_state_time = self.current_measurement['fields']['current_state_time'] # remove all processed samples self.samples.clear() # add last sample as 1st sample of the next period @@ -83,25 +84,34 @@ class ConfigCollector(threading.Thread): logger.debug("Stopping thread") self._start_event.clear() - def agg_samples(self, samples, initial_state_time, current_time): - report = {} - + def create_measurement(self, samples, initial_state_time, current_time): logger.debug("Samples: {0}".format(str(samples))) - # check we have more than two samples to aggregate - sample_count = len(samples) - logger.debug("Sample count {0}".format(sample_count)) - - if sample_count < 2: - logger.warn("Not enough samples to aggregate in period {0}".format(sample_count)) - return report - - # set initial state to the 1st sample - initial_state = samples[0][self.STATE_NAME] - logger.debug("Initial state : {0}".format(initial_state)) + # aggregate samples into states + states = self.aggregate_samples(samples) + logger.debug("States: {0}".format(str(states))) - # Calculates the state time from the set of samples + # aggregate the states into a measurement + measurement = self.aggregate_states(states, initial_state_time) + measurement['time'] = current_time + logger.debug("Report: {0}".format(str(measurement))) + + return measurement + + def aggregate_samples(self, samples): states = [] + + sample_count = len(samples) + logger.debug("Sample count {0}".format(sample_count)) + # error if no samples to aggregate + if sample_count == 0: + raise ValueError('No samples in the samples list') + + # no aggregation needed if only one sample + if sample_count == 1: + return samples[0] + + # aggregate samples last_index = sample_count-1 for index, sample in enumerate(samples): # for the 1st sample we set the current state and state_start_time @@ -119,7 +129,6 @@ class ConfigCollector(threading.Thread): current_state = sample[self.STATE_NAME] # set the start time of the next state state_start_time = state_start_time + state_time - # deal with the final sample if index == last_index: # calc state duration if last sample is the same as previous state @@ -129,16 +138,17 @@ class ConfigCollector(threading.Thread): # add transition in final sample with zero duration elif current_state != sample[self.STATE_NAME]: states.append([current_state,0]) + return states - logger.debug("States: {0}".format(str(states))) - + def aggregate_states(self, states, initial_state_time): + # set initial state to the 1st sample + initial_state = states[0][self.STATE_NAME] + logger.debug("Initial state : {0}".format(initial_state)) logger.debug("Initial state time : {0}".format(initial_state_time)) # set the current state as the last state sampled current_state = states[-1][self.STATE_NAME] - # calc time in current state - # if no change in state then take the initial time in state and add the current state time + # if no change in state take the initial state time and add current state time if initial_state == current_state and len(states) == 1: - logger.debug("No transition so just adding last state to current state") current_state_time = initial_state_time + states[-1][self.STATE_TIME] state_sum_key = current_state + "_sum" state_count_key = current_state + "_count" @@ -150,17 +160,16 @@ class ConfigCollector(threading.Thread): # current state time is the last state time current_state_time = states[-1][self.STATE_TIME] # calc the total duration and number of transitions in each state. - for index, state in enumerate(states): - # if first time we've seen the state add to dict with initial duration and a single transition + for state in states: + # if first occurance of state add with initial duration and a single transition state_sum_key = state[self.STATE_NAME] + "_sum" state_count_key = state[self.STATE_NAME] + "_count" - - if state_sum_key not in self.agg_states: - self.agg_states[state_sum_key] = state[self.STATE_TIME] - self.agg_states[state_count_key] = 1 + if state_sum_key not in self.agg_states: logger.debug("Adding state: {0}".format(state[self.STATE_NAME])) + self.agg_states[state_sum_key] = state[self.STATE_TIME] + self.agg_states[state_count_key] = 1 else: - logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME])) + logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME])) # increment number of times in the state self.agg_states[state_count_key] += 1 logger.debug("increment number of times in the state") @@ -169,28 +178,28 @@ class ConfigCollector(threading.Thread): logger.debug("Duration: {0}".format(self.agg_states[state_sum_key])) # Create report - report['fields'] = self.agg_states - report['fields']['current_state'] = current_state - report['fields']['current_state_time'] = current_state_time - report['time'] = current_time - - logger.debug("Report: {0}".format(str(report))) + measurement = {} + measurement['fields'] = self.agg_states + measurement['fields']['current_state'] = current_state + measurement['fields']['current_state_time'] = current_state_time - return report - + return measurement -class OutputThread(threading.Thread): - def __init__(self, output_func, report): - self.output_func = output_func +class WriteThread(threading.Thread): + def __init__(self, write_func, report): + threading.Thread.__init__(self) + self._start_event = threading.Event() + self.write_func = write_func self.report = report + return def run(self): # if thread running then return if(self._start_event.is_set()): return self._start_event.set() - output_func(report) + self.write_func(self.report) def stop(self): - logger.debug("Stoppping thread") + logger.debug("Stopping thread") self._start_event.clear() diff --git a/src/monitoring/systemctl_monitor.py b/src/monitoring/systemctl_monitor.py index 3007e0e65de9dd33581ac438b6c2dc58fe02de25..3cc867cdcdbb2ed2b06c74c10184cd313624ccf0 100644 --- a/src/monitoring/systemctl_monitor.py +++ b/src/monitoring/systemctl_monitor.py @@ -8,7 +8,7 @@ from config_collector import ConfigCollector from influxdb import InfluxDBClient logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logger = logging.getLogger() class SystemctlMonitor: ACTIVE_STATE_KEY='ActiveState' @@ -21,15 +21,17 @@ class SystemctlMonitor: self.hostname = hostname self.port = port self.database = database + self.db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10) def start(self): self.collection_thread.start() def stop(self): self.collection_thread.stop() + self.db_client.close() - def get_last_report(self): - return self.collection_thread.current_report + def get_current_measurement(self): + return self.collection_thread.current_measurement def get_systemctl_sample(self): return (self.get_systemctl_status(self.service_name), time.time()) @@ -61,11 +63,10 @@ class SystemctlMonitor: def write_measurement(self, report): print("Writing report: {0}".format(str(report))) - - try: - db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10) + + try: measurement = self.create_measurement(report) - db_client.write_points(measurement) + self.db_client.write_points(measurement) except Exception as e: print(e) @@ -96,8 +97,12 @@ def main(): args = parser.parse_args() print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}, {5}".format(args.service, args.rate, args.agg, args.host, args.port, args.db)) - if(args.debug): + if args.debug == True: + print("Setting logging level to to DEBUG") logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db) mon.start()