From 1cc5a813752654c16278cc93fe8b8ec7ad8ecc9a Mon Sep 17 00:00:00 2001
From: MJB <mjb@it-innovation.soton.ac.uk>
Date: Wed, 16 May 2018 15:35:42 +0100
Subject: [PATCH] adding it-innovation telegraf build

---
 clmctest/inputs/test_config_collector.py | 118 +++++++++++------------
 scripts/clmc-agent/build-telegraf.sh     |  39 ++++++++
 src/monitoring/config_collector.py       |  64 +++++++-----
 src/monitoring/systemctl_monitor.py      |  32 +-----
 4 files changed, 143 insertions(+), 110 deletions(-)
 create mode 100644 scripts/clmc-agent/build-telegraf.sh

diff --git a/clmctest/inputs/test_config_collector.py b/clmctest/inputs/test_config_collector.py
index 50468ec..ce320b9 100644
--- a/clmctest/inputs/test_config_collector.py
+++ b/clmctest/inputs/test_config_collector.py
@@ -58,55 +58,55 @@ sample_set = 0
 current_index = 0
 
 def test_agg():
-    t = ConfigCollector(get_sample_test, write_output)    
+    t = ConfigCollector(get_sample_test, write_output, "resource")    
     measurement = t.create_measurement(samples[0], 10, 12)
-    assert measurement['fields']['current_state'] == 'active'
-    assert measurement['fields']['current_state_time'] == 12
-    assert measurement['fields']['active_sum'] == 12
-    assert measurement['fields']['active_count'] == 1
-    assert measurement['time'] == 12    
+    assert measurement[0]['fields']['current_state'] == 'active'
+    assert measurement[0]['fields']['current_state_time'] == 12
+    assert measurement[0]['fields']['active_sum'] == 12
+    assert measurement[0]['fields']['active_count'] == 1
+    assert measurement[0]['time'] == 12000000000    
 
-    t = ConfigCollector(get_sample_test, write_output)
+    t = ConfigCollector(get_sample_test, write_output, "resource")
     measurement = t.create_measurement(samples[1], 10, 14)
-    assert measurement['fields']['current_state'] == 'active'
-    assert measurement['fields']['current_state_time'] == 14
-    assert measurement['fields']['active_sum'] == 14
-    assert measurement['fields']['active_count'] == 1
-    assert measurement['time'] == 14    
+    assert measurement[0]['fields']['current_state'] == 'active'
+    assert measurement[0]['fields']['current_state_time'] == 14
+    assert measurement[0]['fields']['active_sum'] == 14
+    assert measurement[0]['fields']['active_count'] == 1
+    assert measurement[0]['time'] == 14000000000    
 
-    t = ConfigCollector(get_sample_test, write_output)
+    t = ConfigCollector(get_sample_test, write_output, "resource")
     measurement = t.create_measurement(samples[2], 8, 10)
-    assert measurement['fields']['current_state'] == 'failed'
-    assert measurement['fields']['current_state_time'] == 0
-    assert measurement['fields']['active_sum'] == 2
-    assert measurement['fields']['active_count'] == 1
-    assert measurement['fields']['failed_sum'] == 0
-    assert measurement['fields']['failed_count'] == 1
-    assert measurement['time'] == 10    
+    assert measurement[0]['fields']['current_state'] == 'failed'
+    assert measurement[0]['fields']['current_state_time'] == 0
+    assert measurement[0]['fields']['active_sum'] == 2
+    assert measurement[0]['fields']['active_count'] == 1
+    assert measurement[0]['fields']['failed_sum'] == 0
+    assert measurement[0]['fields']['failed_count'] == 1
+    assert measurement[0]['time'] == 10000000000    
 
-    t = ConfigCollector(get_sample_test, write_output)
+    t = ConfigCollector(get_sample_test, write_output, "resource")
     measurement = t.create_measurement(samples[3], 2, 12)
-    assert measurement['fields']['current_state'] == 'inactive'
-    assert measurement['fields']['current_state_time'] == 0
-    assert measurement['fields']['active_sum'] == 6
-    assert measurement['fields']['active_count'] == 2
-    assert measurement['fields']['inactive_sum'] == 2
-    assert measurement['fields']['inactive_count'] == 2
-    assert measurement['fields']['failed_sum'] == 2
-    assert measurement['fields']['failed_count'] == 1
-    assert measurement['time'] == 12    
+    assert measurement[0]['fields']['current_state'] == 'inactive'
+    assert measurement[0]['fields']['current_state_time'] == 0
+    assert measurement[0]['fields']['active_sum'] == 6
+    assert measurement[0]['fields']['active_count'] == 2
+    assert measurement[0]['fields']['inactive_sum'] == 2
+    assert measurement[0]['fields']['inactive_count'] == 2
+    assert measurement[0]['fields']['failed_sum'] == 2
+    assert measurement[0]['fields']['failed_count'] == 1
+    assert measurement[0]['time'] == 12000000000       
     
-    t = ConfigCollector(get_sample_test, write_output)    
+    t = ConfigCollector(get_sample_test, write_output, "resource")    
     measurement = t.create_measurement(samples[4], 4, 14)
-    assert measurement['fields']['current_state'] == 'failed'
-    assert measurement['fields']['current_state_time'] == 0
-    assert measurement['fields']['active_sum'] == 4
-    assert measurement['fields']['active_count'] == 2
-    assert measurement['fields']['inactive_sum'] == 4
-    assert measurement['fields']['inactive_count'] == 2
-    assert measurement['fields']['failed_sum'] == 2
-    assert measurement['fields']['failed_count'] == 2
-    assert measurement['time'] == 14
+    assert measurement[0]['fields']['current_state'] == 'failed'
+    assert measurement[0]['fields']['current_state_time'] == 0
+    assert measurement[0]['fields']['active_sum'] == 4
+    assert measurement[0]['fields']['active_count'] == 2
+    assert measurement[0]['fields']['inactive_sum'] == 4
+    assert measurement[0]['fields']['inactive_count'] == 2
+    assert measurement[0]['fields']['failed_sum'] == 2
+    assert measurement[0]['fields']['failed_count'] == 2
+    assert measurement[0]['time'] == 14000000000   
 
 def test_one_period_collection():
     global sample_set
@@ -115,15 +115,15 @@ def test_one_period_collection():
     # one measurementing period
     sample_set = 1
     current_index = 0
-    t = ConfigCollector(get_sample_test, write_output, 2, 6)
+    t = ConfigCollector(get_sample_test, write_output, "resource", 2, 6)
     t.start()
     time.sleep(8)
     t.stop()
     print("Current measurement: {0}".format(str(t.current_measurement)))
-    assert t.current_measurement['fields']['current_state'] == 'active'
-    assert int(round(t.current_measurement['fields']['current_state_time'])) == 6
-    assert int(round(t.current_measurement['fields']['active_sum'])) == 6
-    assert int(round(t.current_measurement['fields']['active_count'])) == 1
+    assert t.current_measurement[0]['fields']['current_state'] == 'active'
+    assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6
+    assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6
+    assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1
 
 def test_multi_period_single_state_collection():
     global sample_set
@@ -131,15 +131,15 @@ def test_multi_period_single_state_collection():
     # two measurementing periods
     sample_set = 1
     current_index = 0
-    t = ConfigCollector(get_sample_test, write_output, 1, 3)
+    t = ConfigCollector(get_sample_test, write_output, "resource", 1, 3)
     t.start()
     time.sleep(7)
     t.stop()
     print("Current measurement: {0}".format(str(t.current_measurement)))
-    assert t.current_measurement['fields']['current_state'] == 'active'
-    assert int(round(t.current_measurement['fields']['current_state_time'])) == 6
-    assert int(round(t.current_measurement['fields']['active_sum'])) == 6
-    assert int(round(t.current_measurement['fields']['active_count'])) == 1
+    assert t.current_measurement[0]['fields']['current_state'] == 'active'
+    assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6
+    assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6
+    assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1
 
 # [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]
 def test_multi_period_multi_state_collection():
@@ -148,16 +148,16 @@ def test_multi_period_multi_state_collection():
     # 6 samples and 2 measurementing periods
     sample_set = 4
     current_index = 0
-    t = ConfigCollector(get_sample_test, write_output, 2, 10)
+    t = ConfigCollector(get_sample_test, write_output, "resource", 2, 10)
     t.start()
     time.sleep(13)
     t.stop()
     print("Current measurement: {0}".format(str(t.current_measurement)))
-    assert t.current_measurement['fields']['current_state'] == 'failed'
-    assert int(round(t.current_measurement['fields']['current_state_time'])) == 0
-    assert int(round(t.current_measurement['fields']['active_sum'])) == 4
-    assert int(round(t.current_measurement['fields']['active_count'])) == 2
-    assert int(round(t.current_measurement['fields']['inactive_sum'])) == 4
-    assert int(round(t.current_measurement['fields']['inactive_count'])) == 2
-    assert int(round(t.current_measurement['fields']['failed_sum'])) == 2
-    assert int(round(t.current_measurement['fields']['failed_count'])) == 2
\ No newline at end of file
+    assert t.current_measurement[0]['fields']['current_state'] == 'failed'
+    assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 0
+    assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 4
+    assert int(round(t.current_measurement[0]['fields']['active_count'])) == 2
+    assert int(round(t.current_measurement[0]['fields']['inactive_sum'])) == 4
+    assert int(round(t.current_measurement[0]['fields']['inactive_count'])) == 2
+    assert int(round(t.current_measurement[0]['fields']['failed_sum'])) == 2
+    assert int(round(t.current_measurement[0]['fields']['failed_count'])) == 2
\ No newline at end of file
diff --git a/scripts/clmc-agent/build-telegraf.sh b/scripts/clmc-agent/build-telegraf.sh
new file mode 100644
index 0000000..4a4ca1b
--- /dev/null
+++ b/scripts/clmc-agent/build-telegraf.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+
+# install build prequisites
+sudo apt-get install ruby ruby-dev rubygems build-essential rpm -y
+sudo gem install --no-ri --no-rdoc fpm 
+
+# install go
+wget https://dl.google.com/go/go1.10.2.linux-amd64.tar.gz
+tar -C /usr/local -xzf go1.10.2.linux-amd64.tar.gz
+
+# set the environment variables
+echo 'PATH=$PATH:/usr/local/go/bin' > /tmp/gorc
+echo 'GOPATH=/tmp/go' >> /tmp/gorc
+source /tmp/gorc
+
+mkdir $GOPATH
+
+# get telegraf from influx repo
+cd $GOPATH
+go get -d github.com/influxdata/telegraf
+
+# rebase to it-innovation repo
+cd $GOPATH/src/github.com/influxdata/telegraf
+git remote add it-innovation https://github.com/it-innovation/telegraf.git
+git pull --rebase it-innovation master
+
+# build telegraf
+chmod 755 ./scripts/*.sh
+make
+
+# build the packages
+make package
+
+# git push it-innovation
+
+
+
+
+
diff --git a/src/monitoring/config_collector.py b/src/monitoring/config_collector.py
index b1f5f38..6ebed75 100644
--- a/src/monitoring/config_collector.py
+++ b/src/monitoring/config_collector.py
@@ -3,6 +3,7 @@ import threading
 import time
 import random
 import logging
+from influxdb import InfluxDBClient
 
 logging.basicConfig(level=logging.DEBUG)
 logger = logging.getLogger()
@@ -11,16 +12,15 @@ class ConfigCollector(threading.Thread):
     STATE_NAME = 0
     STATE_TIME = 1
     
-    def __init__(self, sample_func, write_func, sample_rate=2, agg_period=10):
+    def __init__(self, sample_func, write_func, resource_name, sample_rate=2, agg_period=10):
         threading.Thread.__init__(self)
         self._start_event = threading.Event()
         self.sample_func = sample_func
         self.write_func = write_func
+        self.resource_name = resource_name
         self.sample_rate = sample_rate
-        self.agg_period = agg_period
-        self.samples = []        
+        self.agg_period = agg_period       
         self.agg_states = {}
-        self.states = {}
         self.current_measurement = {}
         return
 
@@ -38,29 +38,30 @@ class ConfigCollector(threading.Thread):
         logger.debug("end time = {0}".format(end_period))
         # initialise the time in the current state
         current_state_time = 0
+        samples = [] 
         while(self._start_event.is_set()):
             # get sample using sampler function
             (sample_state, sample_time) = self.sample_func()
             # add sample to list of samples
-            self.samples.append((sample_state, sample_time))
+            samples.append((sample_state, sample_time))
             logger.debug("Sample state {0}".format(sample_state))
-            logger.debug("Sample count: {0}".format(len(self.samples)))
+            logger.debug("Sample count: {0}".format(len(samples)))
             # if last sample was at the end of the aggregation period then process         
             if sample_time >= end_period:
                 # aggregate samples into single measurement
-                self.current_measurement = self.create_measurement(self.samples, current_state_time, sample_time)
+                self.current_measurement = self.create_measurement(samples, current_state_time, sample_time)
                 # write output
                 write_thread = WriteThread(self.write_func, self.current_measurement)
                 write_thread.start()      
                 # set time in current state
-                current_state_time = self.current_measurement['fields']['current_state_time']
+                current_state_time = self.current_measurement[0]['fields']['current_state_time']
                 # remove all processed samples    
-                self.samples.clear()
+                samples.clear()
                 # add last sample as 1st sample of the next period
-                self.samples.append((sample_state, sample_time))
+                samples.append((sample_state, sample_time))
                 # set new end period
                 end_period = sample_time + self.agg_period
-                logger.debug("Number of samples after agg: {0}".format(len(self.samples)))
+                logger.debug("Number of samples after agg: {0}".format(len(samples)))
                 logger.debug("Next end time {0}".format(end_period))       
 
             # calc how long it took to process samples
@@ -83,7 +84,7 @@ class ConfigCollector(threading.Thread):
     def stop(self):
         logger.debug("Stopping thread")
         self._start_event.clear()
-
+        
     def create_measurement(self, samples, initial_state_time, current_time):
         logger.debug("Samples: {0}".format(str(samples)))
 
@@ -92,8 +93,15 @@ class ConfigCollector(threading.Thread):
         logger.debug("States: {0}".format(str(states)))  
         
         # aggregate the states into a measurement
-        measurement = self.aggregate_states(states, initial_state_time)
-        measurement['time'] = current_time 
+        fields = self.aggregate_states(states, initial_state_time)
+        measurement_time = int(current_time*1000000000)
+        measurement = [{"measurement": "service_config_state",
+               "tags": {
+                   "resource_name": self.resource_name
+               },
+               "time": measurement_time
+               }]
+        measurement[0]['fields'] = fields['fields']
         logger.debug("Report: {0}".format(str(measurement)))
 
         return measurement
@@ -182,24 +190,36 @@ class ConfigCollector(threading.Thread):
         measurement['fields'] = self.agg_states
         measurement['fields']['current_state'] = current_state            
         measurement['fields']['current_state_time'] = current_state_time
-             
         return measurement
 
 class WriteThread(threading.Thread):
-    def __init__(self, write_func, report):
+    def __init__(self, write_func, measurement):
         threading.Thread.__init__(self)
         self._start_event = threading.Event()
         self.write_func = write_func
-        self.report = report
+        self.measurement = measurement
         return
 
     def run(self):
         # if thread running then return
         if(self._start_event.is_set()):
             return
-        self._start_event.set()
-        self.write_func(self.report)
-
+        self._start_event.set()  
+        self.write_func(self.measurement)
+    
     def stop(self):
-        logger.debug("Stopping thread")
-        self._start_event.clear()        
+        self._start_event.clear() 
+
+class InfluxWriter():
+    def __init__(self, hostname, port, database):
+        self.db_client = InfluxDBClient(host=hostname, port=port, database=database, timeout=10) 
+        return
+
+    def write(self, measurement):
+        # if thread running then return 
+        try:  
+            points = []                             
+            points.append(measurement)           
+            self.db_client.write_points(points)
+        except Exception as e: 
+            print(e)      
diff --git a/src/monitoring/systemctl_monitor.py b/src/monitoring/systemctl_monitor.py
index 3cc867c..db2cefc 100644
--- a/src/monitoring/systemctl_monitor.py
+++ b/src/monitoring/systemctl_monitor.py
@@ -4,7 +4,7 @@ import subprocess
 import logging
 import time
 import urllib.parse
-from config_collector import ConfigCollector
+from config_collector import ConfigCollector, InfluxWriter
 from influxdb import InfluxDBClient
 
 logging.basicConfig(level=logging.INFO)
@@ -17,18 +17,14 @@ class SystemctlMonitor:
 
     def __init__(self, service_name, sample_rate, agg_period, hostname, port, database):
         self.service_name = service_name
-        self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.write_measurement, sample_rate, agg_period)
-        self.hostname = hostname 
-        self.port = port
-        self.database = database        
-        self.db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10)                       
+        self.writer = InfluxWriter(hostname, port, database)
+        self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.writer.write_func, self.service_name, sample_rate, agg_period)                     
 
     def start(self):
         self.collection_thread.start()
 
     def stop(self):
         self.collection_thread.stop()
-        self.db_client.close()
 
     def get_current_measurement(self):
         return self.collection_thread.current_measurement
@@ -61,28 +57,6 @@ class SystemctlMonitor:
                 sub_state = parts[1]
         return load_state + "." + active_state + "." + sub_state
 
-    def write_measurement(self, report):
-        print("Writing report: {0}".format(str(report)))     
- 
-        try:                          
-            measurement = self.create_measurement(report)           
-            self.db_client.write_points(measurement)
-        except Exception as e: 
-            print(e)
-            
-
-    def create_measurement(self, report):     
-        measurement_time = int(report['time']*1000000000)
-        measurement = [{"measurement": "service_config_state",
-               "tags": {
-                   "resource_name": self.service_name
-               },
-               "time": measurement_time
-               }]
-
-        measurement[0]['fields'] = report['fields']
-
-        return measurement
 
 def main():
 
-- 
GitLab