From eab379f82a87644315035e2aac4f925afe7130e6 Mon Sep 17 00:00:00 2001
From: Michael Boniface <mjb@it-innovation.soton.ac.uk>
Date: Thu, 10 May 2018 17:00:21 +0100
Subject: [PATCH] added systemctl configuration monitor

---
 ...config_mon.py => test_config_collector.py} |  79 +++-----
 clmctest/inputs/test_systemctl_mon.py         |  71 +++++++
 clmctest/services/nginx/install.sh            |  31 ++-
 clmctest/services/nginx/telegraf_nginx.conf   |   7 +-
 setup.py                                      |  11 +-
 src/monitoring/__init__.py                    |   1 +
 .../monitoring/config_collector.py            | 190 ++++++++----------
 src/monitoring/stop_systemctl_monitor.sh      |   3 +
 src/monitoring/systemctl_monitor.py           | 113 +++++++++++
 9 files changed, 350 insertions(+), 156 deletions(-)
 rename clmctest/inputs/{test_config_mon.py => test_config_collector.py} (77%)
 create mode 100644 clmctest/inputs/test_systemctl_mon.py
 create mode 100644 src/monitoring/__init__.py
 rename clmctest/inputs/config_mon.py => src/monitoring/config_collector.py (50%)
 create mode 100644 src/monitoring/stop_systemctl_monitor.sh
 create mode 100644 src/monitoring/systemctl_monitor.py

diff --git a/clmctest/inputs/test_config_mon.py b/clmctest/inputs/test_config_collector.py
similarity index 77%
rename from clmctest/inputs/test_config_mon.py
rename to clmctest/inputs/test_config_collector.py
index 05d0185..8d13b31 100644
--- a/clmctest/inputs/test_config_mon.py
+++ b/clmctest/inputs/test_config_collector.py
@@ -26,21 +26,15 @@ import pytest
 import time
 import random
 import logging
-from clmctest.inputs.config_mon import SystemctlMonitor, CollectionThread
+import sys
+
+from config_collector import ConfigCollector
 
-#RequiresMountsFor=/var/tmp
-#Description=The nginx HTTP and reverse proxy server
-#LoadState=loaded
-#ActiveState=active
-#SubState=running
-#FragmentPath=/usr/lib/systemd/system/nginx.service
-#UnitFileState=disabled
-#UnitFilePreset=disabled
 STATE_INDEX = 0
 TIME_INDEX = 1
  
-samples = [[['active', 0]], 
-        [['active', 0], ['active', 2]],
+samples = [[['active', 0], ['active', 2]], 
+        [['active', 0], ['active', 2], ['active', 4]],
         [['active', 0], ['failed', 2]],
         [['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]],
         [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]]
@@ -60,43 +54,51 @@ def get_sample_test():
         current_index = 0
     return sample
 
-def test_report():
-    t = CollectionThread(get_sample_test)
-    report = t.gen_report(samples[0], 10)
+def write_output(report):
+    print("Writing report output {0}".format(report))
+    
+
+def test_aggregation():
+    t = ConfigCollector(get_sample_test, write_output)
+    report = t.agg_samples(samples[0], 10, 12)
 
-    report = t.gen_report(samples[1], 10)
+    report = t.agg_samples(samples[1], 10, 14)
     assert report['current_state'] == 'active'
-    assert report['current_state_time'] == 12
-    assert report['agg_states']['active']['dur'] == 2
-    assert report['agg_states']['active']['count'] == 1
+    assert report['current_state_time'] == 14
+    assert report['agg_states']['active']['dur'] == 4
+    assert report['agg_states']['active']['count'] == 0
+    assert report['time'] == 14    
 
-    report = t.gen_report(samples[2], 8)
+    report = t.agg_samples(samples[2], 8, 10)
     assert report['current_state'] == 'failed'
     assert report['current_state_time'] == 0
     assert report['agg_states']['active']['dur'] == 2
-    assert report['agg_states']['active']['count'] == 1
+    assert report['agg_states']['active']['count'] == 0
     assert report['agg_states']['failed']['dur'] == 0
     assert report['agg_states']['failed']['count'] == 1
+    assert report['time'] == 10    
 
-    report = t.gen_report(samples[3], 2)
+    report = t.agg_samples(samples[3], 2, 12)
     assert report['current_state'] == 'inactive'
     assert report['current_state_time'] == 0
     assert report['agg_states']['active']['dur'] == 6
-    assert report['agg_states']['active']['count'] == 2
+    assert report['agg_states']['active']['count'] == 1
     assert report['agg_states']['inactive']['dur'] == 2
     assert report['agg_states']['inactive']['count'] == 2
     assert report['agg_states']['failed']['dur'] == 2
     assert report['agg_states']['failed']['count'] == 1
+    assert report['time'] == 12    
     
-    report = t.gen_report(samples[4], 4)
+    report = t.agg_samples(samples[4], 4, 14)
     assert report['current_state'] == 'failed'
     assert report['current_state_time'] == 0
     assert report['agg_states']['active']['dur'] == 4
-    assert report['agg_states']['active']['count'] == 2
+    assert report['agg_states']['active']['count'] == 1
     assert report['agg_states']['inactive']['dur'] == 4
     assert report['agg_states']['inactive']['count'] == 2
     assert report['agg_states']['failed']['dur'] == 2
     assert report['agg_states']['failed']['count'] == 2
+    assert report['time'] == 14
 
 def test_one_period_collection():
     global sample_set
@@ -105,7 +107,7 @@ def test_one_period_collection():
     # one reporting period
     sample_set = 1
     current_index = 0
-    t = CollectionThread(get_sample_test, 2, 6)
+    t = ConfigCollector(get_sample_test, write_output, 2, 6)
     t.start()
     time.sleep(8)
     t.stop()
@@ -113,7 +115,7 @@ def test_one_period_collection():
     assert t.current_report['current_state'] == 'active'
     assert int(round(t.current_report['current_state_time'])) == 6
     assert int(round(t.current_report['agg_states']['active']['dur'])) == 6
-    assert int(round(t.current_report['agg_states']['active']['count'])) == 1
+    assert int(round(t.current_report['agg_states']['active']['count'])) == 0
 
 def test_multi_period_single_state_collection():
     global sample_set
@@ -121,7 +123,7 @@ def test_multi_period_single_state_collection():
     # two reporting periods
     sample_set = 1
     current_index = 0
-    t = CollectionThread(get_sample_test, 1, 3)
+    t = ConfigCollector(get_sample_test, write_output, 1, 3)
     t.start()
     time.sleep(7)
     t.stop()
@@ -129,7 +131,7 @@ def test_multi_period_single_state_collection():
     assert t.current_report['current_state'] == 'active'
     assert int(round(t.current_report['current_state_time'])) == 6
     assert int(round(t.current_report['agg_states']['active']['dur'])) == 3
-    assert int(round(t.current_report['agg_states']['active']['count'])) == 1
+    assert int(round(t.current_report['agg_states']['active']['count'])) == 0
 
 def test_multi_period_multi_state_collection():
     global sample_set
@@ -137,7 +139,7 @@ def test_multi_period_multi_state_collection():
     # 6 samples and 2 reporting periods
     sample_set = 4
     current_index = 0
-    t = CollectionThread(get_sample_test, 2, 10)
+    t = ConfigCollector(get_sample_test, write_output, 2, 10)
     t.start()
     time.sleep(13)
     t.stop()
@@ -145,23 +147,8 @@ def test_multi_period_multi_state_collection():
     assert t.current_report['current_state'] == 'failed'
     assert int(round(t.current_report['current_state_time'])) == 0
     assert int(round(t.current_report['agg_states']['active']['dur'])) == 4
-    assert int(round(t.current_report['agg_states']['active']['count'])) == 2
+    assert int(round(t.current_report['agg_states']['active']['count'])) == 1
     assert int(round(t.current_report['agg_states']['inactive']['dur'])) == 4
     assert int(round(t.current_report['agg_states']['inactive']['count'])) == 2
     assert int(round(t.current_report['agg_states']['failed']['dur'])) == 2
-    assert int(round(t.current_report['agg_states']['failed']['count'])) == 2
-
-def test_get_systemctl_status():
-    mon = SystemctlMonitor('nginx', 2, 10)
-    state = mon.get_systemctl_status('nginx')
-
-    assert state == 'loaded.active.running'
-
-def test_monitor():
-    mon = SystemctlMonitor('nginx', 2, 10)
-    mon.start()
-    time.sleep(11)
-    mon.stop()
-
-    report = mon.get_last_report()
-    print("Current report: {0}".format(str(report)))
\ No newline at end of file
+    assert int(round(t.current_report['agg_states']['failed']['count'])) == 2
\ No newline at end of file
diff --git a/clmctest/inputs/test_systemctl_mon.py b/clmctest/inputs/test_systemctl_mon.py
new file mode 100644
index 0000000..f8abbd8
--- /dev/null
+++ b/clmctest/inputs/test_systemctl_mon.py
@@ -0,0 +1,71 @@
+#!/usr/bin/python3
+"""
+## © University of Southampton IT Innovation Centre, 2018
+##
+## Copyright in this software belongs to University of Southampton
+## IT Innovation Centre of Gamma House, Enterprise Road, 
+## Chilworth Science Park, Southampton, SO16 7NS, UK.
+##
+## This software may not be used, sold, licensed, transferred, copied
+## or reproduced in whole or in part in any manner or form or in or
+## on any media by any person other than in accordance with the terms
+## of the Licence Agreement supplied with the software, or otherwise
+## without the prior written consent of the copyright owners.
+##
+## This software is distributed WITHOUT ANY WARRANTY, without even the
+## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+## PURPOSE, except where stated in the Licence Agreement supplied with
+## the software.
+##
+##      Created By :            Michael Boniface
+##      Created Date :          29-04-2018
+##      Created for Project :   FLAME
+"""
+
+import pytest
+import time
+import random
+import logging
+import sys
+
+from systemctl_monitor import SystemctlMonitor
+
+URL = "http://172.2.23.212"
+PORT = "8186"
+DATABASE = "CLMCMetrics"
+
+
+@pytest.mark.parametrize("service_name", [('nginx')])
+def test_create_measurement(telegraf_agent_config, service_name):
+
+    service = 'unknown'
+    for s in telegraf_agent_config['hosts']:
+        if s['name'] == service_name:
+            service = s
+            continue
+    
+    assert service != 'unknown', "{0} not in list of hosts".format(service_name)   
+
+    mon = SystemctlMonitor(service_name, 2, 10, service['ip_address'], 8186, service['database_name'])
+
+    report = {'current_state': 'failed', 'time': 2, 'agg_states': {'inactive': {'dur': 3, 'count': 3}, 'active': {'dur': 4, 'count': 4}, 'failed': {'dur': 5, 'count': 5}}, 'current_state_time': 0.0}
+
+    measurement = mon.create_measurement(report)
+
+    assert measurement[0]['tags']['resource_name'] == service_name
+    assert measurement[0]['fields']['current_state'] == report['current_state']
+ 
+def test_get_systemctl_status(telegraf_agent_config):
+    mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE)
+    state = mon.get_systemctl_status('nginx')
+
+    assert state == 'loaded.active.running'
+
+def test_monitor(telegraf_agent_config):
+    mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE)
+    mon.start()
+    time.sleep(21)
+    mon.stop()
+
+    report = mon.get_last_report()
+    print("Current report: {0}".format(str(report)))
\ No newline at end of file
diff --git a/clmctest/services/nginx/install.sh b/clmctest/services/nginx/install.sh
index d8baa46..edeeac8 100755
--- a/clmctest/services/nginx/install.sh
+++ b/clmctest/services/nginx/install.sh
@@ -28,6 +28,9 @@
 apt-get update
 yes Y | apt-get install nginx 
 
+sudo apt-get install python3 python3-pip -y
+sudo python3 -m pip install pytest pyaml influxdb
+
 # Need to set up basic stats as this not configured by default
 # http://nginx.org/en/docs/http/ngx_http_stub_status_module.html
 
@@ -49,4 +52,30 @@ if [ ! -f "$NGINX_CONF_TARGET" ]; then
 fi
 
 nginx -s reload
-systemctl start nginx
\ No newline at end of file
+systemctl start nginx
+
+## install a configuration monitoring service
+svc="nginxmon"
+
+echo "install systemctl monitoring service"
+svc_file="${svc}.service"
+echo "[Unit]" > $svc_file
+echo "Description=nginxmon" >> $svc_file
+echo "After=network-online.target" >> $svc_file
+echo "" >> $svc_file
+echo "[Service]" >> $svc_file
+echo "WorkingDirectory=${inst}/${dir}" >> $svc_file
+echo "ExecStart=/usr/bin/python3 /vagrant/src/monitoring/systemctl_monitor.py -service nginx -rate 2 -agg 10 -host localhost -port 8186 -db CLMCMetrics" >> $svc_file
+echo "ExecStop=/usr/bin/bash /vagrant/src/monitoring/stop_systemctl_monitor.sh" >> $svc_file
+echo "" >> $svc_file
+echo "[Install]" >> $svc_file
+echo "WantedBy=network-online.target" >> $svc_file
+sudo cp $svc_file /lib/systemd/system
+rm $svc_file
+
+echo "enable"
+sudo systemctl daemon-reload
+sudo systemctl enable ${svc}
+
+echo "start"
+sudo systemctl start ${svc}
\ No newline at end of file
diff --git a/clmctest/services/nginx/telegraf_nginx.conf b/clmctest/services/nginx/telegraf_nginx.conf
index d6588d5..6bc3b87 100644
--- a/clmctest/services/nginx/telegraf_nginx.conf
+++ b/clmctest/services/nginx/telegraf_nginx.conf
@@ -25,4 +25,9 @@
   urls = ["http://localhost:80/nginx_status"]
 
   ## HTTP response timeout (default: 5s)
-#  response_timeout = "5s"
\ No newline at end of file
+#  response_timeout = "5s"
+
+# # Influx HTTP write listener
+[[inputs.http_listener]]
+  ## Address and port to host HTTP listener on
+  service_address = ":8186"
\ No newline at end of file
diff --git a/setup.py b/setup.py
index d24242f..72cae3a 100644
--- a/setup.py
+++ b/setup.py
@@ -24,6 +24,11 @@
 import os
 import os.path
 import subprocess
+from glob import glob
+from os.path import basename
+from os.path import dirname
+from os.path import join
+from os.path import splitext
 from setuptools import setup, find_packages
 
 def read(fname):
@@ -38,18 +43,18 @@ def get_version(fname):
     return git_revision
 
 setup(
-    name = "clmctest",
+    name = "clmc",
     version = get_version("clmctest/_version.py"),
     author = "Michael Boniface",
     author_email = "mjb@it-innovation.soton.ac.uk",
     description = "FLAME CLMC Test Module",
     license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE",
-    keywords = "FLAME CLMC tests",
+    keywords = "FLAME CLMC",
     url='https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc',
     packages=find_packages(exclude=["services"]),
     include_package_data=True,
     package_data={'': ['_version.py', '*.yml', '*.sh', '*.json', '*.conf']},        
-    long_description="FLAME CLMC tests",
+    long_description="FLAME CLMC",
     classifiers=[
         "Development Status :: Alpha",
         "Topic :: FLAME Tests",
diff --git a/src/monitoring/__init__.py b/src/monitoring/__init__.py
new file mode 100644
index 0000000..44f7725
--- /dev/null
+++ b/src/monitoring/__init__.py
@@ -0,0 +1 @@
+#!/usr/bin/python3
\ No newline at end of file
diff --git a/clmctest/inputs/config_mon.py b/src/monitoring/config_collector.py
similarity index 50%
rename from clmctest/inputs/config_mon.py
rename to src/monitoring/config_collector.py
index ff6b866..5e6a9e7 100644
--- a/clmctest/inputs/config_mon.py
+++ b/src/monitoring/config_collector.py
@@ -1,19 +1,21 @@
+#!/usr/bin/python3
 import threading
 import time
 import random
 import logging
-import subprocess
-import argparse
 
 logging.basicConfig(level=logging.DEBUG)
 logger = logging.getLogger(__name__)
 
-class CollectionThread(threading.Thread):
-
-    def __init__(self, get_sample, sample_rate=2, agg_period=10):
+class ConfigCollector(threading.Thread):
+    STATE_NAME = 0
+    STATE_TIME = 1
+    
+    def __init__(self, sample_func, output_func, sample_rate=2, agg_period=10):
         threading.Thread.__init__(self)
         self._start_event = threading.Event()
-        self.get_sample = get_sample
+        self.sample_func = sample_func
+        self.output_func = output_func
         self.sample_rate = sample_rate
         self.agg_period = agg_period
         self.samples = []        
@@ -22,36 +24,42 @@ class CollectionThread(threading.Thread):
         return
 
     def run(self):
+        # if thread running then return
         if(self._start_event.is_set()):
             return
+        self._start_event.set()
+
+        # set start period to current time
         start_period = time.time()
         logger.debug("start time = {0}".format(start_period))
+        # set end period to the aggregation period
         end_period = start_period + self.agg_period
         logger.debug("end time = {0}".format(end_period))
+        # initialise the time in the current state
         current_state_time = 0
-
-        # Add initial state
-        self.samples = [('unknown', start_period)]
-        self._start_event.set()
         while(self._start_event.is_set()):
-            # get sample 
-            (sample_state, sample_time) = self.get_sample()
+            # get sample using sampler function
+            (sample_state, sample_time) = self.sample_func()
+            # add sample to list of samples
             self.samples.append((sample_state, sample_time))
             logger.debug("Sample state {0}".format(sample_state))
-            logger.debug("Sample count: {0}".format(len(self.samples)))  
-            # process samples if end of period         
+            logger.debug("Sample count: {0}".format(len(self.samples)))
+            # if last sample was at the end of the aggregation period then process         
             if sample_time >= end_period:
-                # create the report
-                self.current_report = self.gen_report(self.samples, current_state_time)
+                # aggregate samples into single measurement
+                self.current_report = self.agg_samples(self.samples, current_state_time, sample_time)
+                # write output
+                self.output_func(self.current_report)      
+                # set time in current state
                 current_state_time = self.current_report['current_state_time']
-                # remove all aggregated samples    
+                # remove all processed samples    
                 self.samples.clear()
                 # add last sample as 1st sample of the next period
                 self.samples.append((sample_state, sample_time))
                 # set new end period
                 end_period = sample_time + self.agg_period
                 logger.debug("Number of samples after agg: {0}".format(len(self.samples)))
-                logger.debug("Next end time {0}".format(end_period))                
+                logger.debug("Next end time {0}".format(end_period))       
 
             # calc how long it took to process samples
             processing_time = time.time() - sample_time
@@ -60,70 +68,90 @@ class CollectionThread(threading.Thread):
             sleep_time = self.sample_rate - processing_time
             logger.debug("Sleep time {0}".format(sleep_time))
             # if processing took longer than the sample rate we have a problemm 
-            # we need to put this into a worker thread
+            # and we will need to put processing into a worker thread
             if(sleep_time < 0):
                 logger.warn("Aggregation processing took longer that sample rate")
                 sleep_time = 0
             logger.debug("Sleeping for sample {0}".format(sleep_time))
+            # wait for the next sample
             time.sleep(sleep_time)
         logger.debug("Finished collection thread")
         return
 
     def stop(self):
-        logger.debug("Stoppping thread")
+        logger.debug("Stopping thread")
         self._start_event.clear()
 
-    def gen_report(self, samples, initial_state_time):
+    def agg_samples(self, samples, initial_state_time, current_time):
         report = {} 
 
+        logger.debug("Samples: {0}".format(str(samples)))
+
         # check we have more than two samples to aggregate
         sample_count = len(samples)
         logger.debug("Sample count {0}".format(sample_count))
+        
         if sample_count < 2:
             logger.warn("Not enough samples to aggregate in period {0}".format(sample_count)) 
             return report
         
-        initial_state = samples[0][0]
+        # set initial state to the 1st sample
+        initial_state = samples[0][self.STATE_NAME]
         logger.debug("Initial state : {0}".format(initial_state))            
         
+        # Calculates the state time from the set of samples
         states = []
         last_index = sample_count-1 
         for index, sample in enumerate(samples):
+            # for the 1st sample we set the current state and state_start_time
             if index == 0:
-                current_state = sample[0]
-                state_start_time = sample[1]
+                current_state = sample[self.STATE_NAME]
+                state_start_time = sample[self.STATE_TIME]
                 logger.debug("Start time : {0}".format(state_start_time))            
             else:
-                # add duration for previous state after transition
-                if current_state != sample[0]:
-                    state_time = sample[1] - state_start_time
+                # add state duration for previous state after transition
+                if current_state != sample[self.STATE_NAME]:
+                    # calc time in current state
+                    state_time = sample[self.STATE_TIME] - state_start_time
                     states.append([current_state,state_time])
-                    current_state = sample[0]
+                    # set the state to the next state
+                    current_state = sample[self.STATE_NAME]
+                    # set the start time of the next state
                     state_start_time = state_start_time + state_time
+
                 # deal with the final sample
                 if index == last_index:
                     # calc state duration if last sample is the same as previous state
-                    if current_state == sample[0]:
-                        state_time = sample[1] - state_start_time
+                    if current_state == sample[self.STATE_NAME]:
+                        state_time = sample[self.STATE_TIME] - state_start_time
                         states.append([current_state,state_time])
                     # add transition in final sample with zero duration
-                    elif current_state != sample[0]:
+                    elif current_state != sample[self.STATE_NAME]:
                         states.append([current_state,0])
 
             logger.debug("States: {0}".format(str(states)))
 
-        # calc the total duration and transitions in each state
+        # calc the total duration and number of transitions in each state.
+        # Assuming no transition into the 1st state. 
+        # Assuming 
+
         agg_states = {}
         for index, state in enumerate(states):
-            if state[0] not in agg_states:              
-                agg_states[state[0]] = {'dur':state[1], 'count': 1}
-                logger.debug("Adding state: {0}, Count: {1}".format(state[0], 1))
+            if index == 0:
+                agg_states[states[0][self.STATE_NAME]] = {'dur':states[0][self.STATE_TIME], 'count': 0}
             else:
-                logger.debug("Aggregating state: {0}".format(state[0]))
-                agg_states[state[0]]['dur'] += state[1]
-                logger.debug("Duration: {0}".format(agg_states[state[0]]['dur']))
-                agg_states[state[0]]['count'] += 1
-                logger.debug("Count: {0}".format(agg_states[state[0]]['count']))
+                # if first time we've seen the state add to dict with initial duration and a single transition   
+                if state[self.STATE_NAME] not in agg_states:           
+                    agg_states[state[self.STATE_NAME]] = {'dur':state[self.STATE_TIME], 'count': 1}
+                    logger.debug("Adding state: {0}, Count: {1}".format(state[self.STATE_NAME], 1))
+                else:
+                    logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME]))
+                    # add state time to aggregate total
+                    agg_states[state[self.STATE_NAME]]['dur'] += state[self.STATE_TIME]
+                    logger.debug("Duration: {0}".format(agg_states[state[self.STATE_NAME]]['dur']))
+                    # increment number of times in the state
+                    agg_states[state[self.STATE_NAME]]['count'] += 1
+                    logger.debug("Count: {0}".format(agg_states[state[self.STATE_NAME]]['count']))
 
         # set the current state as the last state
         report['current_state'] = states[-1][0]
@@ -131,80 +159,32 @@ class CollectionThread(threading.Thread):
         state_count = len(agg_states)
         # if no change in state then take the initial time in state and add the current state time
         if initial_state == states[-1][0] and state_count == 1:
-            current_state_time = initial_state_time + states[-1][1]
+            current_state_time = initial_state_time + states[-1][self.STATE_TIME]
         else:
-            current_state_time = states[-1][1]
+            # current state time is the last state time
+            current_state_time = states[-1][self.STATE_TIME]
         report['current_state_time'] = current_state_time
 
         report['agg_states'] = agg_states
+        report['time'] = current_time 
+
         logger.debug("Report: {0}".format(str(report)))
              
         return report
 
-class SystemctlMonitor:
-    ACTIVE_STATE_KEY='ActiveState'
-    SUBSTATE_KEY='SubState'  
-    LOAD_STATE_KEY='LoadState'
 
-    def __init__(self, service_name, sample_rate, agg_period):
-        self.service_name = service_name
-        self.collection_thread = CollectionThread(self.get_systemctl_sample, sample_rate, agg_period)
+class OutputThread(threading.Thread):
+    def __init__(self, output_func, report):
+        self.output_func = output_func
+        self.report = report
 
-    def start(self):
-        self.collection_thread.start()
+    def run(self):
+        # if thread running then return
+        if(self._start_event.is_set()):
+            return
+        self._start_event.set()
+        output_func(report)
 
     def stop(self):
-        self.collection_thread.stop()
-
-    def get_last_report(self):
-        return self.collection_thread.current_report
-
-    def get_systemctl_sample(self):
-        return (self.get_systemctl_status(self.service_name), time.time())           
-
-    def get_systemctl_status(self, service_name):
-        load_state = 'unknown'
-        active_state = 'unknown'
-        sub_state = 'unknown'
-
-        cmd = "systemctl show {0}".format(service_name)
-        proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True)
-        out, err = proc.communicate()
-        if out: out = out.decode('ascii')
-        if err: err = err.decode('ascii') 
-        logger.debug("Return code = {0}".format(proc.returncode))
-        if proc.returncode != 0:
-            logger.error("Could not get status for service {0}, {1}".format(service_name, err))
-            raise Exception("Could not get status for service {0}, {1}".format(service_name, err))
-       
-        for line in iter(out.splitlines()):
-            parts = line.split('=')
-            if parts[0] == SystemctlMonitor.LOAD_STATE_KEY:
-                load_state = parts[1]
-            elif parts[0] == SystemctlMonitor.ACTIVE_STATE_KEY:
-                active_state = parts[1]            
-            elif parts[0] == SystemctlMonitor.SUBSTATE_KEY:
-                sub_state = parts[1]
-        return load_state + "." + active_state + "." + sub_state
-
-
-def main():
-
-    parser = argparse.ArgumentParser(description='systemctrl state monitor')
-    parser.add_argument('-s', help='service name', required=True)
-    parser.add_argument('-r', help='sample rate', required=True)
-    parser.add_argument('-p', help='aggregation period', required=True)
-
-    args = parser.parse_args()
-    print("Starting monitoring : {0}, {1}, {2}".format(args.s, args.r, args.p))
-    mon = SystemctlMonitor(args.s, int(args.r), int(args.p))
-    mon.start()
-    time.sleep(11)
-    mon.stop()
-    report = mon.get_last_report()
-    print("Current report: {0}".format(str(report)))
-  
-if __name__== "__main__":
-  main()
-
-
+        logger.debug("Stoppping thread")
+        self._start_event.clear()        
diff --git a/src/monitoring/stop_systemctl_monitor.sh b/src/monitoring/stop_systemctl_monitor.sh
new file mode 100644
index 0000000..5cd1d4d
--- /dev/null
+++ b/src/monitoring/stop_systemctl_monitor.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+pid=`ps aux | egrep "[s]ystemctl_monitor.py" | awk '{ print $2 }'` && kill $pid
\ No newline at end of file
diff --git a/src/monitoring/systemctl_monitor.py b/src/monitoring/systemctl_monitor.py
new file mode 100644
index 0000000..9b02af0
--- /dev/null
+++ b/src/monitoring/systemctl_monitor.py
@@ -0,0 +1,113 @@
+#!/usr/bin/python3
+import argparse
+import subprocess
+import logging
+import time
+import urllib.parse
+from config_collector import ConfigCollector
+from influxdb import InfluxDBClient
+
+logging.basicConfig(level=logging.DEBUG)
+logger = logging.getLogger(__name__)
+
+class SystemctlMonitor:
+    ACTIVE_STATE_KEY='ActiveState'
+    SUBSTATE_KEY='SubState'  
+    LOAD_STATE_KEY='LoadState'
+
+    def __init__(self, service_name, sample_rate, agg_period, hostname, port, database):
+        self.service_name = service_name
+        self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.write_measurement, sample_rate, agg_period)
+        self.hostname = hostname 
+        self.port = port
+        self.database = database        
+
+    def start(self):
+        self.collection_thread.start()
+
+    def stop(self):
+        self.collection_thread.stop()
+
+    def get_last_report(self):
+        return self.collection_thread.current_report
+
+    def get_systemctl_sample(self):
+        return (self.get_systemctl_status(self.service_name), time.time())           
+
+    def get_systemctl_status(self, service_name):
+        load_state = 'unknown'
+        active_state = 'unknown'
+        sub_state = 'unknown'
+
+        cmd = "systemctl show {0}".format(service_name)
+        proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True)
+        out, err = proc.communicate()
+        if out: out = out.decode('ascii')
+        if err: err = err.decode('ascii') 
+        logger.debug("Return code = {0}".format(proc.returncode))
+        if proc.returncode != 0:
+            logger.error("Could not get status for service {0}, {1}".format(service_name, err))
+            raise Exception("Could not get status for service {0}, {1}".format(service_name, err))
+       
+        for line in iter(out.splitlines()):
+            parts = line.split('=')
+            if parts[0] == SystemctlMonitor.LOAD_STATE_KEY:
+                load_state = parts[1]
+            elif parts[0] == SystemctlMonitor.ACTIVE_STATE_KEY:
+                active_state = parts[1]            
+            elif parts[0] == SystemctlMonitor.SUBSTATE_KEY:
+                sub_state = parts[1]
+        return load_state + "." + active_state + "." + sub_state
+
+    def write_measurement(self, report):
+        print("Writing report: {0}".format(str(report)))     
+
+        try:   
+            db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10)  
+            measurement = self.create_measurement(report)
+            db_client.write_points(measurement)
+        except Exception as e: 
+            print(e)
+            
+
+    def create_measurement(self, report):     
+        measurement_time = int(report['time']*1000000000)
+        measurement = [{"measurement": "service_config_state",
+               "tags": {
+                   "resource_name": self.service_name
+               },
+               "fields": {
+                   "current_state": report['current_state'],
+                   "current_state_time": report['current_state_time'],
+               },
+               "time": measurement_time
+               }]
+
+        for key in report['agg_states']: 
+            field_name = key + "_sum"
+            measurement[0]['fields'][field_name] = report['agg_states'][key]['dur']
+            field_name = key + "_count"
+            measurement[0]['fields'][field_name] = report['agg_states'][key]['count']    
+              
+        return measurement
+
+def main():
+
+    parser = argparse.ArgumentParser(description='systemctrl state monitor')
+    parser.add_argument('-service', help='service name', required=True)
+    parser.add_argument('-rate', help='sample rate', required=True)
+    parser.add_argument('-agg', help='aggregation period', required=True)
+    parser.add_argument('-host', help='telegraf hostname', required=True)
+    parser.add_argument('-port', help='telegraf port', required=True)           
+    parser.add_argument('-db', help='database name', required=True)    
+
+    args = parser.parse_args()
+    print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}".format(args.service, args.rate, args.agg, args.host, args.port, args.db))
+
+    mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db)
+    mon.start()
+  
+if __name__== "__main__":
+  main()
+
+
-- 
GitLab