Skip to content
Snippets Groups Projects
Commit 1cc5a813 authored by MJB's avatar MJB
Browse files

adding it-innovation telegraf build

parent d341a953
No related branches found
No related tags found
No related merge requests found
......@@ -58,55 +58,55 @@ sample_set = 0
current_index = 0
def test_agg():
t = ConfigCollector(get_sample_test, write_output)
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[0], 10, 12)
assert measurement['fields']['current_state'] == 'active'
assert measurement['fields']['current_state_time'] == 12
assert measurement['fields']['active_sum'] == 12
assert measurement['fields']['active_count'] == 1
assert measurement['time'] == 12
assert measurement[0]['fields']['current_state'] == 'active'
assert measurement[0]['fields']['current_state_time'] == 12
assert measurement[0]['fields']['active_sum'] == 12
assert measurement[0]['fields']['active_count'] == 1
assert measurement[0]['time'] == 12000000000
t = ConfigCollector(get_sample_test, write_output)
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[1], 10, 14)
assert measurement['fields']['current_state'] == 'active'
assert measurement['fields']['current_state_time'] == 14
assert measurement['fields']['active_sum'] == 14
assert measurement['fields']['active_count'] == 1
assert measurement['time'] == 14
assert measurement[0]['fields']['current_state'] == 'active'
assert measurement[0]['fields']['current_state_time'] == 14
assert measurement[0]['fields']['active_sum'] == 14
assert measurement[0]['fields']['active_count'] == 1
assert measurement[0]['time'] == 14000000000
t = ConfigCollector(get_sample_test, write_output)
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[2], 8, 10)
assert measurement['fields']['current_state'] == 'failed'
assert measurement['fields']['current_state_time'] == 0
assert measurement['fields']['active_sum'] == 2
assert measurement['fields']['active_count'] == 1
assert measurement['fields']['failed_sum'] == 0
assert measurement['fields']['failed_count'] == 1
assert measurement['time'] == 10
assert measurement[0]['fields']['current_state'] == 'failed'
assert measurement[0]['fields']['current_state_time'] == 0
assert measurement[0]['fields']['active_sum'] == 2
assert measurement[0]['fields']['active_count'] == 1
assert measurement[0]['fields']['failed_sum'] == 0
assert measurement[0]['fields']['failed_count'] == 1
assert measurement[0]['time'] == 10000000000
t = ConfigCollector(get_sample_test, write_output)
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[3], 2, 12)
assert measurement['fields']['current_state'] == 'inactive'
assert measurement['fields']['current_state_time'] == 0
assert measurement['fields']['active_sum'] == 6
assert measurement['fields']['active_count'] == 2
assert measurement['fields']['inactive_sum'] == 2
assert measurement['fields']['inactive_count'] == 2
assert measurement['fields']['failed_sum'] == 2
assert measurement['fields']['failed_count'] == 1
assert measurement['time'] == 12
assert measurement[0]['fields']['current_state'] == 'inactive'
assert measurement[0]['fields']['current_state_time'] == 0
assert measurement[0]['fields']['active_sum'] == 6
assert measurement[0]['fields']['active_count'] == 2
assert measurement[0]['fields']['inactive_sum'] == 2
assert measurement[0]['fields']['inactive_count'] == 2
assert measurement[0]['fields']['failed_sum'] == 2
assert measurement[0]['fields']['failed_count'] == 1
assert measurement[0]['time'] == 12000000000
t = ConfigCollector(get_sample_test, write_output)
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[4], 4, 14)
assert measurement['fields']['current_state'] == 'failed'
assert measurement['fields']['current_state_time'] == 0
assert measurement['fields']['active_sum'] == 4
assert measurement['fields']['active_count'] == 2
assert measurement['fields']['inactive_sum'] == 4
assert measurement['fields']['inactive_count'] == 2
assert measurement['fields']['failed_sum'] == 2
assert measurement['fields']['failed_count'] == 2
assert measurement['time'] == 14
assert measurement[0]['fields']['current_state'] == 'failed'
assert measurement[0]['fields']['current_state_time'] == 0
assert measurement[0]['fields']['active_sum'] == 4
assert measurement[0]['fields']['active_count'] == 2
assert measurement[0]['fields']['inactive_sum'] == 4
assert measurement[0]['fields']['inactive_count'] == 2
assert measurement[0]['fields']['failed_sum'] == 2
assert measurement[0]['fields']['failed_count'] == 2
assert measurement[0]['time'] == 14000000000
def test_one_period_collection():
global sample_set
......@@ -115,15 +115,15 @@ def test_one_period_collection():
# one measurementing period
sample_set = 1
current_index = 0
t = ConfigCollector(get_sample_test, write_output, 2, 6)
t = ConfigCollector(get_sample_test, write_output, "resource", 2, 6)
t.start()
time.sleep(8)
t.stop()
print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_measurement['fields']['current_state'] == 'active'
assert int(round(t.current_measurement['fields']['current_state_time'])) == 6
assert int(round(t.current_measurement['fields']['active_sum'])) == 6
assert int(round(t.current_measurement['fields']['active_count'])) == 1
assert t.current_measurement[0]['fields']['current_state'] == 'active'
assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1
def test_multi_period_single_state_collection():
global sample_set
......@@ -131,15 +131,15 @@ def test_multi_period_single_state_collection():
# two measurementing periods
sample_set = 1
current_index = 0
t = ConfigCollector(get_sample_test, write_output, 1, 3)
t = ConfigCollector(get_sample_test, write_output, "resource", 1, 3)
t.start()
time.sleep(7)
t.stop()
print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_measurement['fields']['current_state'] == 'active'
assert int(round(t.current_measurement['fields']['current_state_time'])) == 6
assert int(round(t.current_measurement['fields']['active_sum'])) == 6
assert int(round(t.current_measurement['fields']['active_count'])) == 1
assert t.current_measurement[0]['fields']['current_state'] == 'active'
assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1
# [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]
def test_multi_period_multi_state_collection():
......@@ -148,16 +148,16 @@ def test_multi_period_multi_state_collection():
# 6 samples and 2 measurementing periods
sample_set = 4
current_index = 0
t = ConfigCollector(get_sample_test, write_output, 2, 10)
t = ConfigCollector(get_sample_test, write_output, "resource", 2, 10)
t.start()
time.sleep(13)
t.stop()
print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_measurement['fields']['current_state'] == 'failed'
assert int(round(t.current_measurement['fields']['current_state_time'])) == 0
assert int(round(t.current_measurement['fields']['active_sum'])) == 4
assert int(round(t.current_measurement['fields']['active_count'])) == 2
assert int(round(t.current_measurement['fields']['inactive_sum'])) == 4
assert int(round(t.current_measurement['fields']['inactive_count'])) == 2
assert int(round(t.current_measurement['fields']['failed_sum'])) == 2
assert int(round(t.current_measurement['fields']['failed_count'])) == 2
\ No newline at end of file
assert t.current_measurement[0]['fields']['current_state'] == 'failed'
assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 0
assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 4
assert int(round(t.current_measurement[0]['fields']['active_count'])) == 2
assert int(round(t.current_measurement[0]['fields']['inactive_sum'])) == 4
assert int(round(t.current_measurement[0]['fields']['inactive_count'])) == 2
assert int(round(t.current_measurement[0]['fields']['failed_sum'])) == 2
assert int(round(t.current_measurement[0]['fields']['failed_count'])) == 2
\ No newline at end of file
#!/bin/bash
# install build prequisites
sudo apt-get install ruby ruby-dev rubygems build-essential rpm -y
sudo gem install --no-ri --no-rdoc fpm
# install go
wget https://dl.google.com/go/go1.10.2.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.10.2.linux-amd64.tar.gz
# set the environment variables
echo 'PATH=$PATH:/usr/local/go/bin' > /tmp/gorc
echo 'GOPATH=/tmp/go' >> /tmp/gorc
source /tmp/gorc
mkdir $GOPATH
# get telegraf from influx repo
cd $GOPATH
go get -d github.com/influxdata/telegraf
# rebase to it-innovation repo
cd $GOPATH/src/github.com/influxdata/telegraf
git remote add it-innovation https://github.com/it-innovation/telegraf.git
git pull --rebase it-innovation master
# build telegraf
chmod 755 ./scripts/*.sh
make
# build the packages
make package
# git push it-innovation
......@@ -3,6 +3,7 @@ import threading
import time
import random
import logging
from influxdb import InfluxDBClient
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
......@@ -11,16 +12,15 @@ class ConfigCollector(threading.Thread):
STATE_NAME = 0
STATE_TIME = 1
def __init__(self, sample_func, write_func, sample_rate=2, agg_period=10):
def __init__(self, sample_func, write_func, resource_name, sample_rate=2, agg_period=10):
threading.Thread.__init__(self)
self._start_event = threading.Event()
self.sample_func = sample_func
self.write_func = write_func
self.resource_name = resource_name
self.sample_rate = sample_rate
self.agg_period = agg_period
self.samples = []
self.agg_period = agg_period
self.agg_states = {}
self.states = {}
self.current_measurement = {}
return
......@@ -38,29 +38,30 @@ class ConfigCollector(threading.Thread):
logger.debug("end time = {0}".format(end_period))
# initialise the time in the current state
current_state_time = 0
samples = []
while(self._start_event.is_set()):
# get sample using sampler function
(sample_state, sample_time) = self.sample_func()
# add sample to list of samples
self.samples.append((sample_state, sample_time))
samples.append((sample_state, sample_time))
logger.debug("Sample state {0}".format(sample_state))
logger.debug("Sample count: {0}".format(len(self.samples)))
logger.debug("Sample count: {0}".format(len(samples)))
# if last sample was at the end of the aggregation period then process
if sample_time >= end_period:
# aggregate samples into single measurement
self.current_measurement = self.create_measurement(self.samples, current_state_time, sample_time)
self.current_measurement = self.create_measurement(samples, current_state_time, sample_time)
# write output
write_thread = WriteThread(self.write_func, self.current_measurement)
write_thread.start()
# set time in current state
current_state_time = self.current_measurement['fields']['current_state_time']
current_state_time = self.current_measurement[0]['fields']['current_state_time']
# remove all processed samples
self.samples.clear()
samples.clear()
# add last sample as 1st sample of the next period
self.samples.append((sample_state, sample_time))
samples.append((sample_state, sample_time))
# set new end period
end_period = sample_time + self.agg_period
logger.debug("Number of samples after agg: {0}".format(len(self.samples)))
logger.debug("Number of samples after agg: {0}".format(len(samples)))
logger.debug("Next end time {0}".format(end_period))
# calc how long it took to process samples
......@@ -83,7 +84,7 @@ class ConfigCollector(threading.Thread):
def stop(self):
logger.debug("Stopping thread")
self._start_event.clear()
def create_measurement(self, samples, initial_state_time, current_time):
logger.debug("Samples: {0}".format(str(samples)))
......@@ -92,8 +93,15 @@ class ConfigCollector(threading.Thread):
logger.debug("States: {0}".format(str(states)))
# aggregate the states into a measurement
measurement = self.aggregate_states(states, initial_state_time)
measurement['time'] = current_time
fields = self.aggregate_states(states, initial_state_time)
measurement_time = int(current_time*1000000000)
measurement = [{"measurement": "service_config_state",
"tags": {
"resource_name": self.resource_name
},
"time": measurement_time
}]
measurement[0]['fields'] = fields['fields']
logger.debug("Report: {0}".format(str(measurement)))
return measurement
......@@ -182,24 +190,36 @@ class ConfigCollector(threading.Thread):
measurement['fields'] = self.agg_states
measurement['fields']['current_state'] = current_state
measurement['fields']['current_state_time'] = current_state_time
return measurement
class WriteThread(threading.Thread):
def __init__(self, write_func, report):
def __init__(self, write_func, measurement):
threading.Thread.__init__(self)
self._start_event = threading.Event()
self.write_func = write_func
self.report = report
self.measurement = measurement
return
def run(self):
# if thread running then return
if(self._start_event.is_set()):
return
self._start_event.set()
self.write_func(self.report)
self._start_event.set()
self.write_func(self.measurement)
def stop(self):
logger.debug("Stopping thread")
self._start_event.clear()
self._start_event.clear()
class InfluxWriter():
def __init__(self, hostname, port, database):
self.db_client = InfluxDBClient(host=hostname, port=port, database=database, timeout=10)
return
def write(self, measurement):
# if thread running then return
try:
points = []
points.append(measurement)
self.db_client.write_points(points)
except Exception as e:
print(e)
......@@ -4,7 +4,7 @@ import subprocess
import logging
import time
import urllib.parse
from config_collector import ConfigCollector
from config_collector import ConfigCollector, InfluxWriter
from influxdb import InfluxDBClient
logging.basicConfig(level=logging.INFO)
......@@ -17,18 +17,14 @@ class SystemctlMonitor:
def __init__(self, service_name, sample_rate, agg_period, hostname, port, database):
self.service_name = service_name
self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.write_measurement, sample_rate, agg_period)
self.hostname = hostname
self.port = port
self.database = database
self.db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10)
self.writer = InfluxWriter(hostname, port, database)
self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.writer.write_func, self.service_name, sample_rate, agg_period)
def start(self):
self.collection_thread.start()
def stop(self):
self.collection_thread.stop()
self.db_client.close()
def get_current_measurement(self):
return self.collection_thread.current_measurement
......@@ -61,28 +57,6 @@ class SystemctlMonitor:
sub_state = parts[1]
return load_state + "." + active_state + "." + sub_state
def write_measurement(self, report):
print("Writing report: {0}".format(str(report)))
try:
measurement = self.create_measurement(report)
self.db_client.write_points(measurement)
except Exception as e:
print(e)
def create_measurement(self, report):
measurement_time = int(report['time']*1000000000)
measurement = [{"measurement": "service_config_state",
"tags": {
"resource_name": self.service_name
},
"time": measurement_time
}]
measurement[0]['fields'] = report['fields']
return measurement
def main():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment