diff --git a/clmctest/inputs/test_config_mon.py b/clmctest/inputs/test_config_collector.py similarity index 77% rename from clmctest/inputs/test_config_mon.py rename to clmctest/inputs/test_config_collector.py index 05d018551d2f9536994b166cf26eb393e3cc3d9d..8d13b314a97f3ba7606d209da76733b1eedcfd4d 100644 --- a/clmctest/inputs/test_config_mon.py +++ b/clmctest/inputs/test_config_collector.py @@ -26,21 +26,15 @@ import pytest import time import random import logging -from clmctest.inputs.config_mon import SystemctlMonitor, CollectionThread +import sys + +from config_collector import ConfigCollector -#RequiresMountsFor=/var/tmp -#Description=The nginx HTTP and reverse proxy server -#LoadState=loaded -#ActiveState=active -#SubState=running -#FragmentPath=/usr/lib/systemd/system/nginx.service -#UnitFileState=disabled -#UnitFilePreset=disabled STATE_INDEX = 0 TIME_INDEX = 1 -samples = [[['active', 0]], - [['active', 0], ['active', 2]], +samples = [[['active', 0], ['active', 2]], + [['active', 0], ['active', 2], ['active', 4]], [['active', 0], ['failed', 2]], [['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]], [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]] @@ -60,43 +54,51 @@ def get_sample_test(): current_index = 0 return sample -def test_report(): - t = CollectionThread(get_sample_test) - report = t.gen_report(samples[0], 10) +def write_output(report): + print("Writing report output {0}".format(report)) + + +def test_aggregation(): + t = ConfigCollector(get_sample_test, write_output) + report = t.agg_samples(samples[0], 10, 12) - report = t.gen_report(samples[1], 10) + report = t.agg_samples(samples[1], 10, 14) assert report['current_state'] == 'active' - assert report['current_state_time'] == 12 - assert report['agg_states']['active']['dur'] == 2 - assert report['agg_states']['active']['count'] == 1 + assert report['current_state_time'] == 14 + assert report['agg_states']['active']['dur'] == 4 + assert report['agg_states']['active']['count'] == 0 + assert report['time'] == 14 - report = t.gen_report(samples[2], 8) + report = t.agg_samples(samples[2], 8, 10) assert report['current_state'] == 'failed' assert report['current_state_time'] == 0 assert report['agg_states']['active']['dur'] == 2 - assert report['agg_states']['active']['count'] == 1 + assert report['agg_states']['active']['count'] == 0 assert report['agg_states']['failed']['dur'] == 0 assert report['agg_states']['failed']['count'] == 1 + assert report['time'] == 10 - report = t.gen_report(samples[3], 2) + report = t.agg_samples(samples[3], 2, 12) assert report['current_state'] == 'inactive' assert report['current_state_time'] == 0 assert report['agg_states']['active']['dur'] == 6 - assert report['agg_states']['active']['count'] == 2 + assert report['agg_states']['active']['count'] == 1 assert report['agg_states']['inactive']['dur'] == 2 assert report['agg_states']['inactive']['count'] == 2 assert report['agg_states']['failed']['dur'] == 2 assert report['agg_states']['failed']['count'] == 1 + assert report['time'] == 12 - report = t.gen_report(samples[4], 4) + report = t.agg_samples(samples[4], 4, 14) assert report['current_state'] == 'failed' assert report['current_state_time'] == 0 assert report['agg_states']['active']['dur'] == 4 - assert report['agg_states']['active']['count'] == 2 + assert report['agg_states']['active']['count'] == 1 assert report['agg_states']['inactive']['dur'] == 4 assert report['agg_states']['inactive']['count'] == 2 assert report['agg_states']['failed']['dur'] == 2 assert report['agg_states']['failed']['count'] == 2 + assert report['time'] == 14 def test_one_period_collection(): global sample_set @@ -105,7 +107,7 @@ def test_one_period_collection(): # one reporting period sample_set = 1 current_index = 0 - t = CollectionThread(get_sample_test, 2, 6) + t = ConfigCollector(get_sample_test, write_output, 2, 6) t.start() time.sleep(8) t.stop() @@ -113,7 +115,7 @@ def test_one_period_collection(): assert t.current_report['current_state'] == 'active' assert int(round(t.current_report['current_state_time'])) == 6 assert int(round(t.current_report['agg_states']['active']['dur'])) == 6 - assert int(round(t.current_report['agg_states']['active']['count'])) == 1 + assert int(round(t.current_report['agg_states']['active']['count'])) == 0 def test_multi_period_single_state_collection(): global sample_set @@ -121,7 +123,7 @@ def test_multi_period_single_state_collection(): # two reporting periods sample_set = 1 current_index = 0 - t = CollectionThread(get_sample_test, 1, 3) + t = ConfigCollector(get_sample_test, write_output, 1, 3) t.start() time.sleep(7) t.stop() @@ -129,7 +131,7 @@ def test_multi_period_single_state_collection(): assert t.current_report['current_state'] == 'active' assert int(round(t.current_report['current_state_time'])) == 6 assert int(round(t.current_report['agg_states']['active']['dur'])) == 3 - assert int(round(t.current_report['agg_states']['active']['count'])) == 1 + assert int(round(t.current_report['agg_states']['active']['count'])) == 0 def test_multi_period_multi_state_collection(): global sample_set @@ -137,7 +139,7 @@ def test_multi_period_multi_state_collection(): # 6 samples and 2 reporting periods sample_set = 4 current_index = 0 - t = CollectionThread(get_sample_test, 2, 10) + t = ConfigCollector(get_sample_test, write_output, 2, 10) t.start() time.sleep(13) t.stop() @@ -145,23 +147,8 @@ def test_multi_period_multi_state_collection(): assert t.current_report['current_state'] == 'failed' assert int(round(t.current_report['current_state_time'])) == 0 assert int(round(t.current_report['agg_states']['active']['dur'])) == 4 - assert int(round(t.current_report['agg_states']['active']['count'])) == 2 + assert int(round(t.current_report['agg_states']['active']['count'])) == 1 assert int(round(t.current_report['agg_states']['inactive']['dur'])) == 4 assert int(round(t.current_report['agg_states']['inactive']['count'])) == 2 assert int(round(t.current_report['agg_states']['failed']['dur'])) == 2 - assert int(round(t.current_report['agg_states']['failed']['count'])) == 2 - -def test_get_systemctl_status(): - mon = SystemctlMonitor('nginx', 2, 10) - state = mon.get_systemctl_status('nginx') - - assert state == 'loaded.active.running' - -def test_monitor(): - mon = SystemctlMonitor('nginx', 2, 10) - mon.start() - time.sleep(11) - mon.stop() - - report = mon.get_last_report() - print("Current report: {0}".format(str(report))) \ No newline at end of file + assert int(round(t.current_report['agg_states']['failed']['count'])) == 2 \ No newline at end of file diff --git a/clmctest/inputs/test_systemctl_mon.py b/clmctest/inputs/test_systemctl_mon.py new file mode 100644 index 0000000000000000000000000000000000000000..f8abbd8074594ea220ffe6e0066e35211341d46f --- /dev/null +++ b/clmctest/inputs/test_systemctl_mon.py @@ -0,0 +1,71 @@ +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Michael Boniface +## Created Date : 29-04-2018 +## Created for Project : FLAME +""" + +import pytest +import time +import random +import logging +import sys + +from systemctl_monitor import SystemctlMonitor + +URL = "http://172.2.23.212" +PORT = "8186" +DATABASE = "CLMCMetrics" + + +@pytest.mark.parametrize("service_name", [('nginx')]) +def test_create_measurement(telegraf_agent_config, service_name): + + service = 'unknown' + for s in telegraf_agent_config['hosts']: + if s['name'] == service_name: + service = s + continue + + assert service != 'unknown', "{0} not in list of hosts".format(service_name) + + mon = SystemctlMonitor(service_name, 2, 10, service['ip_address'], 8186, service['database_name']) + + report = {'current_state': 'failed', 'time': 2, 'agg_states': {'inactive': {'dur': 3, 'count': 3}, 'active': {'dur': 4, 'count': 4}, 'failed': {'dur': 5, 'count': 5}}, 'current_state_time': 0.0} + + measurement = mon.create_measurement(report) + + assert measurement[0]['tags']['resource_name'] == service_name + assert measurement[0]['fields']['current_state'] == report['current_state'] + +def test_get_systemctl_status(telegraf_agent_config): + mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE) + state = mon.get_systemctl_status('nginx') + + assert state == 'loaded.active.running' + +def test_monitor(telegraf_agent_config): + mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE) + mon.start() + time.sleep(21) + mon.stop() + + report = mon.get_last_report() + print("Current report: {0}".format(str(report))) \ No newline at end of file diff --git a/clmctest/services/nginx/install.sh b/clmctest/services/nginx/install.sh index d8baa4697ca7c3089e2cf14436440f26c7e578c3..edeeac89f614756a448198d188ed3cba36be4a2d 100755 --- a/clmctest/services/nginx/install.sh +++ b/clmctest/services/nginx/install.sh @@ -28,6 +28,9 @@ apt-get update yes Y | apt-get install nginx +sudo apt-get install python3 python3-pip -y +sudo python3 -m pip install pytest pyaml influxdb + # Need to set up basic stats as this not configured by default # http://nginx.org/en/docs/http/ngx_http_stub_status_module.html @@ -49,4 +52,30 @@ if [ ! -f "$NGINX_CONF_TARGET" ]; then fi nginx -s reload -systemctl start nginx \ No newline at end of file +systemctl start nginx + +## install a configuration monitoring service +svc="nginxmon" + +echo "install systemctl monitoring service" +svc_file="${svc}.service" +echo "[Unit]" > $svc_file +echo "Description=nginxmon" >> $svc_file +echo "After=network-online.target" >> $svc_file +echo "" >> $svc_file +echo "[Service]" >> $svc_file +echo "WorkingDirectory=${inst}/${dir}" >> $svc_file +echo "ExecStart=/usr/bin/python3 /vagrant/src/monitoring/systemctl_monitor.py -service nginx -rate 2 -agg 10 -host localhost -port 8186 -db CLMCMetrics" >> $svc_file +echo "ExecStop=/usr/bin/bash /vagrant/src/monitoring/stop_systemctl_monitor.sh" >> $svc_file +echo "" >> $svc_file +echo "[Install]" >> $svc_file +echo "WantedBy=network-online.target" >> $svc_file +sudo cp $svc_file /lib/systemd/system +rm $svc_file + +echo "enable" +sudo systemctl daemon-reload +sudo systemctl enable ${svc} + +echo "start" +sudo systemctl start ${svc} \ No newline at end of file diff --git a/clmctest/services/nginx/telegraf_nginx.conf b/clmctest/services/nginx/telegraf_nginx.conf index d6588d5599fb3951f105b2b1d2f056a76c8381c0..6bc3b8785bec84b0241a64801decaad6daf8fd8e 100644 --- a/clmctest/services/nginx/telegraf_nginx.conf +++ b/clmctest/services/nginx/telegraf_nginx.conf @@ -25,4 +25,9 @@ urls = ["http://localhost:80/nginx_status"] ## HTTP response timeout (default: 5s) -# response_timeout = "5s" \ No newline at end of file +# response_timeout = "5s" + +# # Influx HTTP write listener +[[inputs.http_listener]] + ## Address and port to host HTTP listener on + service_address = ":8186" \ No newline at end of file diff --git a/setup.py b/setup.py index d24242fdb6521284cea118bad3135af501cf9497..72cae3a9d31413d969a931a41509dca9c4272ea6 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,11 @@ import os import os.path import subprocess +from glob import glob +from os.path import basename +from os.path import dirname +from os.path import join +from os.path import splitext from setuptools import setup, find_packages def read(fname): @@ -38,18 +43,18 @@ def get_version(fname): return git_revision setup( - name = "clmctest", + name = "clmc", version = get_version("clmctest/_version.py"), author = "Michael Boniface", author_email = "mjb@it-innovation.soton.ac.uk", description = "FLAME CLMC Test Module", license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE", - keywords = "FLAME CLMC tests", + keywords = "FLAME CLMC", url='https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc', packages=find_packages(exclude=["services"]), include_package_data=True, package_data={'': ['_version.py', '*.yml', '*.sh', '*.json', '*.conf']}, - long_description="FLAME CLMC tests", + long_description="FLAME CLMC", classifiers=[ "Development Status :: Alpha", "Topic :: FLAME Tests", diff --git a/src/monitoring/__init__.py b/src/monitoring/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..44f772595799f5fe338534918c95e23e08e80464 --- /dev/null +++ b/src/monitoring/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/python3 \ No newline at end of file diff --git a/clmctest/inputs/config_mon.py b/src/monitoring/config_collector.py similarity index 50% rename from clmctest/inputs/config_mon.py rename to src/monitoring/config_collector.py index ff6b8667ddd86073bb169590c7ad7bc272ebe4b0..5e6a9e7a1a5ad19c428323128499096ed4d4bdd9 100644 --- a/clmctest/inputs/config_mon.py +++ b/src/monitoring/config_collector.py @@ -1,19 +1,21 @@ +#!/usr/bin/python3 import threading import time import random import logging -import subprocess -import argparse logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) -class CollectionThread(threading.Thread): - - def __init__(self, get_sample, sample_rate=2, agg_period=10): +class ConfigCollector(threading.Thread): + STATE_NAME = 0 + STATE_TIME = 1 + + def __init__(self, sample_func, output_func, sample_rate=2, agg_period=10): threading.Thread.__init__(self) self._start_event = threading.Event() - self.get_sample = get_sample + self.sample_func = sample_func + self.output_func = output_func self.sample_rate = sample_rate self.agg_period = agg_period self.samples = [] @@ -22,36 +24,42 @@ class CollectionThread(threading.Thread): return def run(self): + # if thread running then return if(self._start_event.is_set()): return + self._start_event.set() + + # set start period to current time start_period = time.time() logger.debug("start time = {0}".format(start_period)) + # set end period to the aggregation period end_period = start_period + self.agg_period logger.debug("end time = {0}".format(end_period)) + # initialise the time in the current state current_state_time = 0 - - # Add initial state - self.samples = [('unknown', start_period)] - self._start_event.set() while(self._start_event.is_set()): - # get sample - (sample_state, sample_time) = self.get_sample() + # get sample using sampler function + (sample_state, sample_time) = self.sample_func() + # add sample to list of samples self.samples.append((sample_state, sample_time)) logger.debug("Sample state {0}".format(sample_state)) - logger.debug("Sample count: {0}".format(len(self.samples))) - # process samples if end of period + logger.debug("Sample count: {0}".format(len(self.samples))) + # if last sample was at the end of the aggregation period then process if sample_time >= end_period: - # create the report - self.current_report = self.gen_report(self.samples, current_state_time) + # aggregate samples into single measurement + self.current_report = self.agg_samples(self.samples, current_state_time, sample_time) + # write output + self.output_func(self.current_report) + # set time in current state current_state_time = self.current_report['current_state_time'] - # remove all aggregated samples + # remove all processed samples self.samples.clear() # add last sample as 1st sample of the next period self.samples.append((sample_state, sample_time)) # set new end period end_period = sample_time + self.agg_period logger.debug("Number of samples after agg: {0}".format(len(self.samples))) - logger.debug("Next end time {0}".format(end_period)) + logger.debug("Next end time {0}".format(end_period)) # calc how long it took to process samples processing_time = time.time() - sample_time @@ -60,70 +68,90 @@ class CollectionThread(threading.Thread): sleep_time = self.sample_rate - processing_time logger.debug("Sleep time {0}".format(sleep_time)) # if processing took longer than the sample rate we have a problemm - # we need to put this into a worker thread + # and we will need to put processing into a worker thread if(sleep_time < 0): logger.warn("Aggregation processing took longer that sample rate") sleep_time = 0 logger.debug("Sleeping for sample {0}".format(sleep_time)) + # wait for the next sample time.sleep(sleep_time) logger.debug("Finished collection thread") return def stop(self): - logger.debug("Stoppping thread") + logger.debug("Stopping thread") self._start_event.clear() - def gen_report(self, samples, initial_state_time): + def agg_samples(self, samples, initial_state_time, current_time): report = {} + logger.debug("Samples: {0}".format(str(samples))) + # check we have more than two samples to aggregate sample_count = len(samples) logger.debug("Sample count {0}".format(sample_count)) + if sample_count < 2: logger.warn("Not enough samples to aggregate in period {0}".format(sample_count)) return report - initial_state = samples[0][0] + # set initial state to the 1st sample + initial_state = samples[0][self.STATE_NAME] logger.debug("Initial state : {0}".format(initial_state)) + # Calculates the state time from the set of samples states = [] last_index = sample_count-1 for index, sample in enumerate(samples): + # for the 1st sample we set the current state and state_start_time if index == 0: - current_state = sample[0] - state_start_time = sample[1] + current_state = sample[self.STATE_NAME] + state_start_time = sample[self.STATE_TIME] logger.debug("Start time : {0}".format(state_start_time)) else: - # add duration for previous state after transition - if current_state != sample[0]: - state_time = sample[1] - state_start_time + # add state duration for previous state after transition + if current_state != sample[self.STATE_NAME]: + # calc time in current state + state_time = sample[self.STATE_TIME] - state_start_time states.append([current_state,state_time]) - current_state = sample[0] + # set the state to the next state + current_state = sample[self.STATE_NAME] + # set the start time of the next state state_start_time = state_start_time + state_time + # deal with the final sample if index == last_index: # calc state duration if last sample is the same as previous state - if current_state == sample[0]: - state_time = sample[1] - state_start_time + if current_state == sample[self.STATE_NAME]: + state_time = sample[self.STATE_TIME] - state_start_time states.append([current_state,state_time]) # add transition in final sample with zero duration - elif current_state != sample[0]: + elif current_state != sample[self.STATE_NAME]: states.append([current_state,0]) logger.debug("States: {0}".format(str(states))) - # calc the total duration and transitions in each state + # calc the total duration and number of transitions in each state. + # Assuming no transition into the 1st state. + # Assuming + agg_states = {} for index, state in enumerate(states): - if state[0] not in agg_states: - agg_states[state[0]] = {'dur':state[1], 'count': 1} - logger.debug("Adding state: {0}, Count: {1}".format(state[0], 1)) + if index == 0: + agg_states[states[0][self.STATE_NAME]] = {'dur':states[0][self.STATE_TIME], 'count': 0} else: - logger.debug("Aggregating state: {0}".format(state[0])) - agg_states[state[0]]['dur'] += state[1] - logger.debug("Duration: {0}".format(agg_states[state[0]]['dur'])) - agg_states[state[0]]['count'] += 1 - logger.debug("Count: {0}".format(agg_states[state[0]]['count'])) + # if first time we've seen the state add to dict with initial duration and a single transition + if state[self.STATE_NAME] not in agg_states: + agg_states[state[self.STATE_NAME]] = {'dur':state[self.STATE_TIME], 'count': 1} + logger.debug("Adding state: {0}, Count: {1}".format(state[self.STATE_NAME], 1)) + else: + logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME])) + # add state time to aggregate total + agg_states[state[self.STATE_NAME]]['dur'] += state[self.STATE_TIME] + logger.debug("Duration: {0}".format(agg_states[state[self.STATE_NAME]]['dur'])) + # increment number of times in the state + agg_states[state[self.STATE_NAME]]['count'] += 1 + logger.debug("Count: {0}".format(agg_states[state[self.STATE_NAME]]['count'])) # set the current state as the last state report['current_state'] = states[-1][0] @@ -131,80 +159,32 @@ class CollectionThread(threading.Thread): state_count = len(agg_states) # if no change in state then take the initial time in state and add the current state time if initial_state == states[-1][0] and state_count == 1: - current_state_time = initial_state_time + states[-1][1] + current_state_time = initial_state_time + states[-1][self.STATE_TIME] else: - current_state_time = states[-1][1] + # current state time is the last state time + current_state_time = states[-1][self.STATE_TIME] report['current_state_time'] = current_state_time report['agg_states'] = agg_states + report['time'] = current_time + logger.debug("Report: {0}".format(str(report))) return report -class SystemctlMonitor: - ACTIVE_STATE_KEY='ActiveState' - SUBSTATE_KEY='SubState' - LOAD_STATE_KEY='LoadState' - def __init__(self, service_name, sample_rate, agg_period): - self.service_name = service_name - self.collection_thread = CollectionThread(self.get_systemctl_sample, sample_rate, agg_period) +class OutputThread(threading.Thread): + def __init__(self, output_func, report): + self.output_func = output_func + self.report = report - def start(self): - self.collection_thread.start() + def run(self): + # if thread running then return + if(self._start_event.is_set()): + return + self._start_event.set() + output_func(report) def stop(self): - self.collection_thread.stop() - - def get_last_report(self): - return self.collection_thread.current_report - - def get_systemctl_sample(self): - return (self.get_systemctl_status(self.service_name), time.time()) - - def get_systemctl_status(self, service_name): - load_state = 'unknown' - active_state = 'unknown' - sub_state = 'unknown' - - cmd = "systemctl show {0}".format(service_name) - proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True) - out, err = proc.communicate() - if out: out = out.decode('ascii') - if err: err = err.decode('ascii') - logger.debug("Return code = {0}".format(proc.returncode)) - if proc.returncode != 0: - logger.error("Could not get status for service {0}, {1}".format(service_name, err)) - raise Exception("Could not get status for service {0}, {1}".format(service_name, err)) - - for line in iter(out.splitlines()): - parts = line.split('=') - if parts[0] == SystemctlMonitor.LOAD_STATE_KEY: - load_state = parts[1] - elif parts[0] == SystemctlMonitor.ACTIVE_STATE_KEY: - active_state = parts[1] - elif parts[0] == SystemctlMonitor.SUBSTATE_KEY: - sub_state = parts[1] - return load_state + "." + active_state + "." + sub_state - - -def main(): - - parser = argparse.ArgumentParser(description='systemctrl state monitor') - parser.add_argument('-s', help='service name', required=True) - parser.add_argument('-r', help='sample rate', required=True) - parser.add_argument('-p', help='aggregation period', required=True) - - args = parser.parse_args() - print("Starting monitoring : {0}, {1}, {2}".format(args.s, args.r, args.p)) - mon = SystemctlMonitor(args.s, int(args.r), int(args.p)) - mon.start() - time.sleep(11) - mon.stop() - report = mon.get_last_report() - print("Current report: {0}".format(str(report))) - -if __name__== "__main__": - main() - - + logger.debug("Stoppping thread") + self._start_event.clear() diff --git a/src/monitoring/stop_systemctl_monitor.sh b/src/monitoring/stop_systemctl_monitor.sh new file mode 100644 index 0000000000000000000000000000000000000000..5cd1d4df8515b992af631f14f4fe04d0e703e166 --- /dev/null +++ b/src/monitoring/stop_systemctl_monitor.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +pid=`ps aux | egrep "[s]ystemctl_monitor.py" | awk '{ print $2 }'` && kill $pid \ No newline at end of file diff --git a/src/monitoring/systemctl_monitor.py b/src/monitoring/systemctl_monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..9b02af0e33b4191eddf538957b0f0505043c211b --- /dev/null +++ b/src/monitoring/systemctl_monitor.py @@ -0,0 +1,113 @@ +#!/usr/bin/python3 +import argparse +import subprocess +import logging +import time +import urllib.parse +from config_collector import ConfigCollector +from influxdb import InfluxDBClient + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +class SystemctlMonitor: + ACTIVE_STATE_KEY='ActiveState' + SUBSTATE_KEY='SubState' + LOAD_STATE_KEY='LoadState' + + def __init__(self, service_name, sample_rate, agg_period, hostname, port, database): + self.service_name = service_name + self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.write_measurement, sample_rate, agg_period) + self.hostname = hostname + self.port = port + self.database = database + + def start(self): + self.collection_thread.start() + + def stop(self): + self.collection_thread.stop() + + def get_last_report(self): + return self.collection_thread.current_report + + def get_systemctl_sample(self): + return (self.get_systemctl_status(self.service_name), time.time()) + + def get_systemctl_status(self, service_name): + load_state = 'unknown' + active_state = 'unknown' + sub_state = 'unknown' + + cmd = "systemctl show {0}".format(service_name) + proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True) + out, err = proc.communicate() + if out: out = out.decode('ascii') + if err: err = err.decode('ascii') + logger.debug("Return code = {0}".format(proc.returncode)) + if proc.returncode != 0: + logger.error("Could not get status for service {0}, {1}".format(service_name, err)) + raise Exception("Could not get status for service {0}, {1}".format(service_name, err)) + + for line in iter(out.splitlines()): + parts = line.split('=') + if parts[0] == SystemctlMonitor.LOAD_STATE_KEY: + load_state = parts[1] + elif parts[0] == SystemctlMonitor.ACTIVE_STATE_KEY: + active_state = parts[1] + elif parts[0] == SystemctlMonitor.SUBSTATE_KEY: + sub_state = parts[1] + return load_state + "." + active_state + "." + sub_state + + def write_measurement(self, report): + print("Writing report: {0}".format(str(report))) + + try: + db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10) + measurement = self.create_measurement(report) + db_client.write_points(measurement) + except Exception as e: + print(e) + + + def create_measurement(self, report): + measurement_time = int(report['time']*1000000000) + measurement = [{"measurement": "service_config_state", + "tags": { + "resource_name": self.service_name + }, + "fields": { + "current_state": report['current_state'], + "current_state_time": report['current_state_time'], + }, + "time": measurement_time + }] + + for key in report['agg_states']: + field_name = key + "_sum" + measurement[0]['fields'][field_name] = report['agg_states'][key]['dur'] + field_name = key + "_count" + measurement[0]['fields'][field_name] = report['agg_states'][key]['count'] + + return measurement + +def main(): + + parser = argparse.ArgumentParser(description='systemctrl state monitor') + parser.add_argument('-service', help='service name', required=True) + parser.add_argument('-rate', help='sample rate', required=True) + parser.add_argument('-agg', help='aggregation period', required=True) + parser.add_argument('-host', help='telegraf hostname', required=True) + parser.add_argument('-port', help='telegraf port', required=True) + parser.add_argument('-db', help='database name', required=True) + + args = parser.parse_args() + print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}".format(args.service, args.rate, args.agg, args.host, args.port, args.db)) + + mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db) + mon.start() + +if __name__== "__main__": + main() + +