From 60838c223db3d9aa9682fac608200a1de7604cb6 Mon Sep 17 00:00:00 2001 From: MJB <mjb@it-innovation.soton.ac.uk> Date: Tue, 1 May 2018 21:52:37 +0100 Subject: [PATCH] systemctl monitoring --- clmctest/inputs/config_mon.py | 210 +++++++++++++++++++++++++++++ clmctest/inputs/test_config_mon.py | 167 +++++++++++++++++++++++ 2 files changed, 377 insertions(+) create mode 100644 clmctest/inputs/config_mon.py create mode 100644 clmctest/inputs/test_config_mon.py diff --git a/clmctest/inputs/config_mon.py b/clmctest/inputs/config_mon.py new file mode 100644 index 0000000..ff6b866 --- /dev/null +++ b/clmctest/inputs/config_mon.py @@ -0,0 +1,210 @@ +import threading +import time +import random +import logging +import subprocess +import argparse + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +class CollectionThread(threading.Thread): + + def __init__(self, get_sample, sample_rate=2, agg_period=10): + threading.Thread.__init__(self) + self._start_event = threading.Event() + self.get_sample = get_sample + self.sample_rate = sample_rate + self.agg_period = agg_period + self.samples = [] + self.states = {} + self.current_report = {} + return + + def run(self): + if(self._start_event.is_set()): + return + start_period = time.time() + logger.debug("start time = {0}".format(start_period)) + end_period = start_period + self.agg_period + logger.debug("end time = {0}".format(end_period)) + current_state_time = 0 + + # Add initial state + self.samples = [('unknown', start_period)] + self._start_event.set() + while(self._start_event.is_set()): + # get sample + (sample_state, sample_time) = self.get_sample() + self.samples.append((sample_state, sample_time)) + logger.debug("Sample state {0}".format(sample_state)) + logger.debug("Sample count: {0}".format(len(self.samples))) + # process samples if end of period + if sample_time >= end_period: + # create the report + self.current_report = self.gen_report(self.samples, current_state_time) + current_state_time = self.current_report['current_state_time'] + # remove all aggregated samples + self.samples.clear() + # add last sample as 1st sample of the next period + self.samples.append((sample_state, sample_time)) + # set new end period + end_period = sample_time + self.agg_period + logger.debug("Number of samples after agg: {0}".format(len(self.samples))) + logger.debug("Next end time {0}".format(end_period)) + + # calc how long it took to process samples + processing_time = time.time() - sample_time + logger.debug("Processing time {0}".format(processing_time)) + # calc the remaining time to wait until next sample + sleep_time = self.sample_rate - processing_time + logger.debug("Sleep time {0}".format(sleep_time)) + # if processing took longer than the sample rate we have a problemm + # we need to put this into a worker thread + if(sleep_time < 0): + logger.warn("Aggregation processing took longer that sample rate") + sleep_time = 0 + logger.debug("Sleeping for sample {0}".format(sleep_time)) + time.sleep(sleep_time) + logger.debug("Finished collection thread") + return + + def stop(self): + logger.debug("Stoppping thread") + self._start_event.clear() + + def gen_report(self, samples, initial_state_time): + report = {} + + # check we have more than two samples to aggregate + sample_count = len(samples) + logger.debug("Sample count {0}".format(sample_count)) + if sample_count < 2: + logger.warn("Not enough samples to aggregate in period {0}".format(sample_count)) + return report + + initial_state = samples[0][0] + logger.debug("Initial state : {0}".format(initial_state)) + + states = [] + last_index = sample_count-1 + for index, sample in enumerate(samples): + if index == 0: + current_state = sample[0] + state_start_time = sample[1] + logger.debug("Start time : {0}".format(state_start_time)) + else: + # add duration for previous state after transition + if current_state != sample[0]: + state_time = sample[1] - state_start_time + states.append([current_state,state_time]) + current_state = sample[0] + state_start_time = state_start_time + state_time + # deal with the final sample + if index == last_index: + # calc state duration if last sample is the same as previous state + if current_state == sample[0]: + state_time = sample[1] - state_start_time + states.append([current_state,state_time]) + # add transition in final sample with zero duration + elif current_state != sample[0]: + states.append([current_state,0]) + + logger.debug("States: {0}".format(str(states))) + + # calc the total duration and transitions in each state + agg_states = {} + for index, state in enumerate(states): + if state[0] not in agg_states: + agg_states[state[0]] = {'dur':state[1], 'count': 1} + logger.debug("Adding state: {0}, Count: {1}".format(state[0], 1)) + else: + logger.debug("Aggregating state: {0}".format(state[0])) + agg_states[state[0]]['dur'] += state[1] + logger.debug("Duration: {0}".format(agg_states[state[0]]['dur'])) + agg_states[state[0]]['count'] += 1 + logger.debug("Count: {0}".format(agg_states[state[0]]['count'])) + + # set the current state as the last state + report['current_state'] = states[-1][0] + # calc time in current state + state_count = len(agg_states) + # if no change in state then take the initial time in state and add the current state time + if initial_state == states[-1][0] and state_count == 1: + current_state_time = initial_state_time + states[-1][1] + else: + current_state_time = states[-1][1] + report['current_state_time'] = current_state_time + + report['agg_states'] = agg_states + logger.debug("Report: {0}".format(str(report))) + + return report + +class SystemctlMonitor: + ACTIVE_STATE_KEY='ActiveState' + SUBSTATE_KEY='SubState' + LOAD_STATE_KEY='LoadState' + + def __init__(self, service_name, sample_rate, agg_period): + self.service_name = service_name + self.collection_thread = CollectionThread(self.get_systemctl_sample, sample_rate, agg_period) + + def start(self): + self.collection_thread.start() + + def stop(self): + self.collection_thread.stop() + + def get_last_report(self): + return self.collection_thread.current_report + + def get_systemctl_sample(self): + return (self.get_systemctl_status(self.service_name), time.time()) + + def get_systemctl_status(self, service_name): + load_state = 'unknown' + active_state = 'unknown' + sub_state = 'unknown' + + cmd = "systemctl show {0}".format(service_name) + proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True) + out, err = proc.communicate() + if out: out = out.decode('ascii') + if err: err = err.decode('ascii') + logger.debug("Return code = {0}".format(proc.returncode)) + if proc.returncode != 0: + logger.error("Could not get status for service {0}, {1}".format(service_name, err)) + raise Exception("Could not get status for service {0}, {1}".format(service_name, err)) + + for line in iter(out.splitlines()): + parts = line.split('=') + if parts[0] == SystemctlMonitor.LOAD_STATE_KEY: + load_state = parts[1] + elif parts[0] == SystemctlMonitor.ACTIVE_STATE_KEY: + active_state = parts[1] + elif parts[0] == SystemctlMonitor.SUBSTATE_KEY: + sub_state = parts[1] + return load_state + "." + active_state + "." + sub_state + + +def main(): + + parser = argparse.ArgumentParser(description='systemctrl state monitor') + parser.add_argument('-s', help='service name', required=True) + parser.add_argument('-r', help='sample rate', required=True) + parser.add_argument('-p', help='aggregation period', required=True) + + args = parser.parse_args() + print("Starting monitoring : {0}, {1}, {2}".format(args.s, args.r, args.p)) + mon = SystemctlMonitor(args.s, int(args.r), int(args.p)) + mon.start() + time.sleep(11) + mon.stop() + report = mon.get_last_report() + print("Current report: {0}".format(str(report))) + +if __name__== "__main__": + main() + + diff --git a/clmctest/inputs/test_config_mon.py b/clmctest/inputs/test_config_mon.py new file mode 100644 index 0000000..05d0185 --- /dev/null +++ b/clmctest/inputs/test_config_mon.py @@ -0,0 +1,167 @@ +#!/usr/bin/python3 +""" +## © University of Southampton IT Innovation Centre, 2018 +## +## Copyright in this software belongs to University of Southampton +## IT Innovation Centre of Gamma House, Enterprise Road, +## Chilworth Science Park, Southampton, SO16 7NS, UK. +## +## This software may not be used, sold, licensed, transferred, copied +## or reproduced in whole or in part in any manner or form or in or +## on any media by any person other than in accordance with the terms +## of the Licence Agreement supplied with the software, or otherwise +## without the prior written consent of the copyright owners. +## +## This software is distributed WITHOUT ANY WARRANTY, without even the +## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +## PURPOSE, except where stated in the Licence Agreement supplied with +## the software. +## +## Created By : Michael Boniface +## Created Date : 29-04-2018 +## Created for Project : FLAME +""" + +import pytest +import time +import random +import logging +from clmctest.inputs.config_mon import SystemctlMonitor, CollectionThread + +#RequiresMountsFor=/var/tmp +#Description=The nginx HTTP and reverse proxy server +#LoadState=loaded +#ActiveState=active +#SubState=running +#FragmentPath=/usr/lib/systemd/system/nginx.service +#UnitFileState=disabled +#UnitFilePreset=disabled +STATE_INDEX = 0 +TIME_INDEX = 1 + +samples = [[['active', 0]], + [['active', 0], ['active', 2]], + [['active', 0], ['failed', 2]], + [['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]], + [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]] + +sample_set = 0 +current_index = 0 + +def get_sample_test(): + global sample_set + global current_index + + sample = (samples[sample_set][current_index][STATE_INDEX], time.time()) + sample_count = len(samples[sample_set]) + if current_index < sample_count-1: + current_index +=1 + else: + current_index = 0 + return sample + +def test_report(): + t = CollectionThread(get_sample_test) + report = t.gen_report(samples[0], 10) + + report = t.gen_report(samples[1], 10) + assert report['current_state'] == 'active' + assert report['current_state_time'] == 12 + assert report['agg_states']['active']['dur'] == 2 + assert report['agg_states']['active']['count'] == 1 + + report = t.gen_report(samples[2], 8) + assert report['current_state'] == 'failed' + assert report['current_state_time'] == 0 + assert report['agg_states']['active']['dur'] == 2 + assert report['agg_states']['active']['count'] == 1 + assert report['agg_states']['failed']['dur'] == 0 + assert report['agg_states']['failed']['count'] == 1 + + report = t.gen_report(samples[3], 2) + assert report['current_state'] == 'inactive' + assert report['current_state_time'] == 0 + assert report['agg_states']['active']['dur'] == 6 + assert report['agg_states']['active']['count'] == 2 + assert report['agg_states']['inactive']['dur'] == 2 + assert report['agg_states']['inactive']['count'] == 2 + assert report['agg_states']['failed']['dur'] == 2 + assert report['agg_states']['failed']['count'] == 1 + + report = t.gen_report(samples[4], 4) + assert report['current_state'] == 'failed' + assert report['current_state_time'] == 0 + assert report['agg_states']['active']['dur'] == 4 + assert report['agg_states']['active']['count'] == 2 + assert report['agg_states']['inactive']['dur'] == 4 + assert report['agg_states']['inactive']['count'] == 2 + assert report['agg_states']['failed']['dur'] == 2 + assert report['agg_states']['failed']['count'] == 2 + +def test_one_period_collection(): + global sample_set + global current_index + + # one reporting period + sample_set = 1 + current_index = 0 + t = CollectionThread(get_sample_test, 2, 6) + t.start() + time.sleep(8) + t.stop() + print("Current report: {0}".format(str(t.current_report))) + assert t.current_report['current_state'] == 'active' + assert int(round(t.current_report['current_state_time'])) == 6 + assert int(round(t.current_report['agg_states']['active']['dur'])) == 6 + assert int(round(t.current_report['agg_states']['active']['count'])) == 1 + +def test_multi_period_single_state_collection(): + global sample_set + global current_index + # two reporting periods + sample_set = 1 + current_index = 0 + t = CollectionThread(get_sample_test, 1, 3) + t.start() + time.sleep(7) + t.stop() + print("Current report: {0}".format(str(t.current_report))) + assert t.current_report['current_state'] == 'active' + assert int(round(t.current_report['current_state_time'])) == 6 + assert int(round(t.current_report['agg_states']['active']['dur'])) == 3 + assert int(round(t.current_report['agg_states']['active']['count'])) == 1 + +def test_multi_period_multi_state_collection(): + global sample_set + global current_index + # 6 samples and 2 reporting periods + sample_set = 4 + current_index = 0 + t = CollectionThread(get_sample_test, 2, 10) + t.start() + time.sleep(13) + t.stop() + print("Current report: {0}".format(str(t.current_report))) + assert t.current_report['current_state'] == 'failed' + assert int(round(t.current_report['current_state_time'])) == 0 + assert int(round(t.current_report['agg_states']['active']['dur'])) == 4 + assert int(round(t.current_report['agg_states']['active']['count'])) == 2 + assert int(round(t.current_report['agg_states']['inactive']['dur'])) == 4 + assert int(round(t.current_report['agg_states']['inactive']['count'])) == 2 + assert int(round(t.current_report['agg_states']['failed']['dur'])) == 2 + assert int(round(t.current_report['agg_states']['failed']['count'])) == 2 + +def test_get_systemctl_status(): + mon = SystemctlMonitor('nginx', 2, 10) + state = mon.get_systemctl_status('nginx') + + assert state == 'loaded.active.running' + +def test_monitor(): + mon = SystemctlMonitor('nginx', 2, 10) + mon.start() + time.sleep(11) + mon.stop() + + report = mon.get_last_report() + print("Current report: {0}".format(str(report))) \ No newline at end of file -- GitLab