diff --git a/clmctest/inputs/test_config_collector.py b/clmctest/inputs/test_config_collector.py index 50468ecaf88e0e43fb677eb5aa171db0174db293..ce320b9a7562d2aad1445973724fb59e01de4abe 100644 --- a/clmctest/inputs/test_config_collector.py +++ b/clmctest/inputs/test_config_collector.py @@ -58,55 +58,55 @@ sample_set = 0 current_index = 0 def test_agg(): - t = ConfigCollector(get_sample_test, write_output) + t = ConfigCollector(get_sample_test, write_output, "resource") measurement = t.create_measurement(samples[0], 10, 12) - assert measurement['fields']['current_state'] == 'active' - assert measurement['fields']['current_state_time'] == 12 - assert measurement['fields']['active_sum'] == 12 - assert measurement['fields']['active_count'] == 1 - assert measurement['time'] == 12 + assert measurement[0]['fields']['current_state'] == 'active' + assert measurement[0]['fields']['current_state_time'] == 12 + assert measurement[0]['fields']['active_sum'] == 12 + assert measurement[0]['fields']['active_count'] == 1 + assert measurement[0]['time'] == 12000000000 - t = ConfigCollector(get_sample_test, write_output) + t = ConfigCollector(get_sample_test, write_output, "resource") measurement = t.create_measurement(samples[1], 10, 14) - assert measurement['fields']['current_state'] == 'active' - assert measurement['fields']['current_state_time'] == 14 - assert measurement['fields']['active_sum'] == 14 - assert measurement['fields']['active_count'] == 1 - assert measurement['time'] == 14 + assert measurement[0]['fields']['current_state'] == 'active' + assert measurement[0]['fields']['current_state_time'] == 14 + assert measurement[0]['fields']['active_sum'] == 14 + assert measurement[0]['fields']['active_count'] == 1 + assert measurement[0]['time'] == 14000000000 - t = ConfigCollector(get_sample_test, write_output) + t = ConfigCollector(get_sample_test, write_output, "resource") measurement = t.create_measurement(samples[2], 8, 10) - assert measurement['fields']['current_state'] == 'failed' - assert measurement['fields']['current_state_time'] == 0 - assert measurement['fields']['active_sum'] == 2 - assert measurement['fields']['active_count'] == 1 - assert measurement['fields']['failed_sum'] == 0 - assert measurement['fields']['failed_count'] == 1 - assert measurement['time'] == 10 + assert measurement[0]['fields']['current_state'] == 'failed' + assert measurement[0]['fields']['current_state_time'] == 0 + assert measurement[0]['fields']['active_sum'] == 2 + assert measurement[0]['fields']['active_count'] == 1 + assert measurement[0]['fields']['failed_sum'] == 0 + assert measurement[0]['fields']['failed_count'] == 1 + assert measurement[0]['time'] == 10000000000 - t = ConfigCollector(get_sample_test, write_output) + t = ConfigCollector(get_sample_test, write_output, "resource") measurement = t.create_measurement(samples[3], 2, 12) - assert measurement['fields']['current_state'] == 'inactive' - assert measurement['fields']['current_state_time'] == 0 - assert measurement['fields']['active_sum'] == 6 - assert measurement['fields']['active_count'] == 2 - assert measurement['fields']['inactive_sum'] == 2 - assert measurement['fields']['inactive_count'] == 2 - assert measurement['fields']['failed_sum'] == 2 - assert measurement['fields']['failed_count'] == 1 - assert measurement['time'] == 12 + assert measurement[0]['fields']['current_state'] == 'inactive' + assert measurement[0]['fields']['current_state_time'] == 0 + assert measurement[0]['fields']['active_sum'] == 6 + assert measurement[0]['fields']['active_count'] == 2 + assert measurement[0]['fields']['inactive_sum'] == 2 + assert measurement[0]['fields']['inactive_count'] == 2 + assert measurement[0]['fields']['failed_sum'] == 2 + assert measurement[0]['fields']['failed_count'] == 1 + assert measurement[0]['time'] == 12000000000 - t = ConfigCollector(get_sample_test, write_output) + t = ConfigCollector(get_sample_test, write_output, "resource") measurement = t.create_measurement(samples[4], 4, 14) - assert measurement['fields']['current_state'] == 'failed' - assert measurement['fields']['current_state_time'] == 0 - assert measurement['fields']['active_sum'] == 4 - assert measurement['fields']['active_count'] == 2 - assert measurement['fields']['inactive_sum'] == 4 - assert measurement['fields']['inactive_count'] == 2 - assert measurement['fields']['failed_sum'] == 2 - assert measurement['fields']['failed_count'] == 2 - assert measurement['time'] == 14 + assert measurement[0]['fields']['current_state'] == 'failed' + assert measurement[0]['fields']['current_state_time'] == 0 + assert measurement[0]['fields']['active_sum'] == 4 + assert measurement[0]['fields']['active_count'] == 2 + assert measurement[0]['fields']['inactive_sum'] == 4 + assert measurement[0]['fields']['inactive_count'] == 2 + assert measurement[0]['fields']['failed_sum'] == 2 + assert measurement[0]['fields']['failed_count'] == 2 + assert measurement[0]['time'] == 14000000000 def test_one_period_collection(): global sample_set @@ -115,15 +115,15 @@ def test_one_period_collection(): # one measurementing period sample_set = 1 current_index = 0 - t = ConfigCollector(get_sample_test, write_output, 2, 6) + t = ConfigCollector(get_sample_test, write_output, "resource", 2, 6) t.start() time.sleep(8) t.stop() print("Current measurement: {0}".format(str(t.current_measurement))) - assert t.current_measurement['fields']['current_state'] == 'active' - assert int(round(t.current_measurement['fields']['current_state_time'])) == 6 - assert int(round(t.current_measurement['fields']['active_sum'])) == 6 - assert int(round(t.current_measurement['fields']['active_count'])) == 1 + assert t.current_measurement[0]['fields']['current_state'] == 'active' + assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6 + assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6 + assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1 def test_multi_period_single_state_collection(): global sample_set @@ -131,15 +131,15 @@ def test_multi_period_single_state_collection(): # two measurementing periods sample_set = 1 current_index = 0 - t = ConfigCollector(get_sample_test, write_output, 1, 3) + t = ConfigCollector(get_sample_test, write_output, "resource", 1, 3) t.start() time.sleep(7) t.stop() print("Current measurement: {0}".format(str(t.current_measurement))) - assert t.current_measurement['fields']['current_state'] == 'active' - assert int(round(t.current_measurement['fields']['current_state_time'])) == 6 - assert int(round(t.current_measurement['fields']['active_sum'])) == 6 - assert int(round(t.current_measurement['fields']['active_count'])) == 1 + assert t.current_measurement[0]['fields']['current_state'] == 'active' + assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6 + assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6 + assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1 # [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]] def test_multi_period_multi_state_collection(): @@ -148,16 +148,16 @@ def test_multi_period_multi_state_collection(): # 6 samples and 2 measurementing periods sample_set = 4 current_index = 0 - t = ConfigCollector(get_sample_test, write_output, 2, 10) + t = ConfigCollector(get_sample_test, write_output, "resource", 2, 10) t.start() time.sleep(13) t.stop() print("Current measurement: {0}".format(str(t.current_measurement))) - assert t.current_measurement['fields']['current_state'] == 'failed' - assert int(round(t.current_measurement['fields']['current_state_time'])) == 0 - assert int(round(t.current_measurement['fields']['active_sum'])) == 4 - assert int(round(t.current_measurement['fields']['active_count'])) == 2 - assert int(round(t.current_measurement['fields']['inactive_sum'])) == 4 - assert int(round(t.current_measurement['fields']['inactive_count'])) == 2 - assert int(round(t.current_measurement['fields']['failed_sum'])) == 2 - assert int(round(t.current_measurement['fields']['failed_count'])) == 2 \ No newline at end of file + assert t.current_measurement[0]['fields']['current_state'] == 'failed' + assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 0 + assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 4 + assert int(round(t.current_measurement[0]['fields']['active_count'])) == 2 + assert int(round(t.current_measurement[0]['fields']['inactive_sum'])) == 4 + assert int(round(t.current_measurement[0]['fields']['inactive_count'])) == 2 + assert int(round(t.current_measurement[0]['fields']['failed_sum'])) == 2 + assert int(round(t.current_measurement[0]['fields']['failed_count'])) == 2 \ No newline at end of file diff --git a/scripts/clmc-agent/build-telegraf.sh b/scripts/clmc-agent/build-telegraf.sh new file mode 100644 index 0000000000000000000000000000000000000000..4a4ca1b35e822827834263566f6546d3b8b93769 --- /dev/null +++ b/scripts/clmc-agent/build-telegraf.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +# install build prequisites +sudo apt-get install ruby ruby-dev rubygems build-essential rpm -y +sudo gem install --no-ri --no-rdoc fpm + +# install go +wget https://dl.google.com/go/go1.10.2.linux-amd64.tar.gz +tar -C /usr/local -xzf go1.10.2.linux-amd64.tar.gz + +# set the environment variables +echo 'PATH=$PATH:/usr/local/go/bin' > /tmp/gorc +echo 'GOPATH=/tmp/go' >> /tmp/gorc +source /tmp/gorc + +mkdir $GOPATH + +# get telegraf from influx repo +cd $GOPATH +go get -d github.com/influxdata/telegraf + +# rebase to it-innovation repo +cd $GOPATH/src/github.com/influxdata/telegraf +git remote add it-innovation https://github.com/it-innovation/telegraf.git +git pull --rebase it-innovation master + +# build telegraf +chmod 755 ./scripts/*.sh +make + +# build the packages +make package + +# git push it-innovation + + + + + diff --git a/src/monitoring/config_collector.py b/src/monitoring/config_collector.py index b1f5f38f67a386053c5c4c1785b70b568c38474c..6ebed75071e2d9f804d1f9e4c3b5136c0926a3b8 100644 --- a/src/monitoring/config_collector.py +++ b/src/monitoring/config_collector.py @@ -3,6 +3,7 @@ import threading import time import random import logging +from influxdb import InfluxDBClient logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() @@ -11,16 +12,15 @@ class ConfigCollector(threading.Thread): STATE_NAME = 0 STATE_TIME = 1 - def __init__(self, sample_func, write_func, sample_rate=2, agg_period=10): + def __init__(self, sample_func, write_func, resource_name, sample_rate=2, agg_period=10): threading.Thread.__init__(self) self._start_event = threading.Event() self.sample_func = sample_func self.write_func = write_func + self.resource_name = resource_name self.sample_rate = sample_rate - self.agg_period = agg_period - self.samples = [] + self.agg_period = agg_period self.agg_states = {} - self.states = {} self.current_measurement = {} return @@ -38,29 +38,30 @@ class ConfigCollector(threading.Thread): logger.debug("end time = {0}".format(end_period)) # initialise the time in the current state current_state_time = 0 + samples = [] while(self._start_event.is_set()): # get sample using sampler function (sample_state, sample_time) = self.sample_func() # add sample to list of samples - self.samples.append((sample_state, sample_time)) + samples.append((sample_state, sample_time)) logger.debug("Sample state {0}".format(sample_state)) - logger.debug("Sample count: {0}".format(len(self.samples))) + logger.debug("Sample count: {0}".format(len(samples))) # if last sample was at the end of the aggregation period then process if sample_time >= end_period: # aggregate samples into single measurement - self.current_measurement = self.create_measurement(self.samples, current_state_time, sample_time) + self.current_measurement = self.create_measurement(samples, current_state_time, sample_time) # write output write_thread = WriteThread(self.write_func, self.current_measurement) write_thread.start() # set time in current state - current_state_time = self.current_measurement['fields']['current_state_time'] + current_state_time = self.current_measurement[0]['fields']['current_state_time'] # remove all processed samples - self.samples.clear() + samples.clear() # add last sample as 1st sample of the next period - self.samples.append((sample_state, sample_time)) + samples.append((sample_state, sample_time)) # set new end period end_period = sample_time + self.agg_period - logger.debug("Number of samples after agg: {0}".format(len(self.samples))) + logger.debug("Number of samples after agg: {0}".format(len(samples))) logger.debug("Next end time {0}".format(end_period)) # calc how long it took to process samples @@ -83,7 +84,7 @@ class ConfigCollector(threading.Thread): def stop(self): logger.debug("Stopping thread") self._start_event.clear() - + def create_measurement(self, samples, initial_state_time, current_time): logger.debug("Samples: {0}".format(str(samples))) @@ -92,8 +93,15 @@ class ConfigCollector(threading.Thread): logger.debug("States: {0}".format(str(states))) # aggregate the states into a measurement - measurement = self.aggregate_states(states, initial_state_time) - measurement['time'] = current_time + fields = self.aggregate_states(states, initial_state_time) + measurement_time = int(current_time*1000000000) + measurement = [{"measurement": "service_config_state", + "tags": { + "resource_name": self.resource_name + }, + "time": measurement_time + }] + measurement[0]['fields'] = fields['fields'] logger.debug("Report: {0}".format(str(measurement))) return measurement @@ -182,24 +190,36 @@ class ConfigCollector(threading.Thread): measurement['fields'] = self.agg_states measurement['fields']['current_state'] = current_state measurement['fields']['current_state_time'] = current_state_time - return measurement class WriteThread(threading.Thread): - def __init__(self, write_func, report): + def __init__(self, write_func, measurement): threading.Thread.__init__(self) self._start_event = threading.Event() self.write_func = write_func - self.report = report + self.measurement = measurement return def run(self): # if thread running then return if(self._start_event.is_set()): return - self._start_event.set() - self.write_func(self.report) - + self._start_event.set() + self.write_func(self.measurement) + def stop(self): - logger.debug("Stopping thread") - self._start_event.clear() + self._start_event.clear() + +class InfluxWriter(): + def __init__(self, hostname, port, database): + self.db_client = InfluxDBClient(host=hostname, port=port, database=database, timeout=10) + return + + def write(self, measurement): + # if thread running then return + try: + points = [] + points.append(measurement) + self.db_client.write_points(points) + except Exception as e: + print(e) diff --git a/src/monitoring/systemctl_monitor.py b/src/monitoring/systemctl_monitor.py index 3cc867cdcdbb2ed2b06c74c10184cd313624ccf0..db2cefcdce4d4ceee828bac26646aaf757b2d525 100644 --- a/src/monitoring/systemctl_monitor.py +++ b/src/monitoring/systemctl_monitor.py @@ -4,7 +4,7 @@ import subprocess import logging import time import urllib.parse -from config_collector import ConfigCollector +from config_collector import ConfigCollector, InfluxWriter from influxdb import InfluxDBClient logging.basicConfig(level=logging.INFO) @@ -17,18 +17,14 @@ class SystemctlMonitor: def __init__(self, service_name, sample_rate, agg_period, hostname, port, database): self.service_name = service_name - self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.write_measurement, sample_rate, agg_period) - self.hostname = hostname - self.port = port - self.database = database - self.db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10) + self.writer = InfluxWriter(hostname, port, database) + self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.writer.write_func, self.service_name, sample_rate, agg_period) def start(self): self.collection_thread.start() def stop(self): self.collection_thread.stop() - self.db_client.close() def get_current_measurement(self): return self.collection_thread.current_measurement @@ -61,28 +57,6 @@ class SystemctlMonitor: sub_state = parts[1] return load_state + "." + active_state + "." + sub_state - def write_measurement(self, report): - print("Writing report: {0}".format(str(report))) - - try: - measurement = self.create_measurement(report) - self.db_client.write_points(measurement) - except Exception as e: - print(e) - - - def create_measurement(self, report): - measurement_time = int(report['time']*1000000000) - measurement = [{"measurement": "service_config_state", - "tags": { - "resource_name": self.service_name - }, - "time": measurement_time - }] - - measurement[0]['fields'] = report['fields'] - - return measurement def main():