Skip to content
Snippets Groups Projects
Commit 60838c22 authored by MJB's avatar MJB
Browse files

systemctl monitoring

parent 250e0647
No related branches found
No related tags found
No related merge requests found
import threading
import time
import random
import logging
import subprocess
import argparse
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class CollectionThread(threading.Thread):
def __init__(self, get_sample, sample_rate=2, agg_period=10):
threading.Thread.__init__(self)
self._start_event = threading.Event()
self.get_sample = get_sample
self.sample_rate = sample_rate
self.agg_period = agg_period
self.samples = []
self.states = {}
self.current_report = {}
return
def run(self):
if(self._start_event.is_set()):
return
start_period = time.time()
logger.debug("start time = {0}".format(start_period))
end_period = start_period + self.agg_period
logger.debug("end time = {0}".format(end_period))
current_state_time = 0
# Add initial state
self.samples = [('unknown', start_period)]
self._start_event.set()
while(self._start_event.is_set()):
# get sample
(sample_state, sample_time) = self.get_sample()
self.samples.append((sample_state, sample_time))
logger.debug("Sample state {0}".format(sample_state))
logger.debug("Sample count: {0}".format(len(self.samples)))
# process samples if end of period
if sample_time >= end_period:
# create the report
self.current_report = self.gen_report(self.samples, current_state_time)
current_state_time = self.current_report['current_state_time']
# remove all aggregated samples
self.samples.clear()
# add last sample as 1st sample of the next period
self.samples.append((sample_state, sample_time))
# set new end period
end_period = sample_time + self.agg_period
logger.debug("Number of samples after agg: {0}".format(len(self.samples)))
logger.debug("Next end time {0}".format(end_period))
# calc how long it took to process samples
processing_time = time.time() - sample_time
logger.debug("Processing time {0}".format(processing_time))
# calc the remaining time to wait until next sample
sleep_time = self.sample_rate - processing_time
logger.debug("Sleep time {0}".format(sleep_time))
# if processing took longer than the sample rate we have a problemm
# we need to put this into a worker thread
if(sleep_time < 0):
logger.warn("Aggregation processing took longer that sample rate")
sleep_time = 0
logger.debug("Sleeping for sample {0}".format(sleep_time))
time.sleep(sleep_time)
logger.debug("Finished collection thread")
return
def stop(self):
logger.debug("Stoppping thread")
self._start_event.clear()
def gen_report(self, samples, initial_state_time):
report = {}
# check we have more than two samples to aggregate
sample_count = len(samples)
logger.debug("Sample count {0}".format(sample_count))
if sample_count < 2:
logger.warn("Not enough samples to aggregate in period {0}".format(sample_count))
return report
initial_state = samples[0][0]
logger.debug("Initial state : {0}".format(initial_state))
states = []
last_index = sample_count-1
for index, sample in enumerate(samples):
if index == 0:
current_state = sample[0]
state_start_time = sample[1]
logger.debug("Start time : {0}".format(state_start_time))
else:
# add duration for previous state after transition
if current_state != sample[0]:
state_time = sample[1] - state_start_time
states.append([current_state,state_time])
current_state = sample[0]
state_start_time = state_start_time + state_time
# deal with the final sample
if index == last_index:
# calc state duration if last sample is the same as previous state
if current_state == sample[0]:
state_time = sample[1] - state_start_time
states.append([current_state,state_time])
# add transition in final sample with zero duration
elif current_state != sample[0]:
states.append([current_state,0])
logger.debug("States: {0}".format(str(states)))
# calc the total duration and transitions in each state
agg_states = {}
for index, state in enumerate(states):
if state[0] not in agg_states:
agg_states[state[0]] = {'dur':state[1], 'count': 1}
logger.debug("Adding state: {0}, Count: {1}".format(state[0], 1))
else:
logger.debug("Aggregating state: {0}".format(state[0]))
agg_states[state[0]]['dur'] += state[1]
logger.debug("Duration: {0}".format(agg_states[state[0]]['dur']))
agg_states[state[0]]['count'] += 1
logger.debug("Count: {0}".format(agg_states[state[0]]['count']))
# set the current state as the last state
report['current_state'] = states[-1][0]
# calc time in current state
state_count = len(agg_states)
# if no change in state then take the initial time in state and add the current state time
if initial_state == states[-1][0] and state_count == 1:
current_state_time = initial_state_time + states[-1][1]
else:
current_state_time = states[-1][1]
report['current_state_time'] = current_state_time
report['agg_states'] = agg_states
logger.debug("Report: {0}".format(str(report)))
return report
class SystemctlMonitor:
ACTIVE_STATE_KEY='ActiveState'
SUBSTATE_KEY='SubState'
LOAD_STATE_KEY='LoadState'
def __init__(self, service_name, sample_rate, agg_period):
self.service_name = service_name
self.collection_thread = CollectionThread(self.get_systemctl_sample, sample_rate, agg_period)
def start(self):
self.collection_thread.start()
def stop(self):
self.collection_thread.stop()
def get_last_report(self):
return self.collection_thread.current_report
def get_systemctl_sample(self):
return (self.get_systemctl_status(self.service_name), time.time())
def get_systemctl_status(self, service_name):
load_state = 'unknown'
active_state = 'unknown'
sub_state = 'unknown'
cmd = "systemctl show {0}".format(service_name)
proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True)
out, err = proc.communicate()
if out: out = out.decode('ascii')
if err: err = err.decode('ascii')
logger.debug("Return code = {0}".format(proc.returncode))
if proc.returncode != 0:
logger.error("Could not get status for service {0}, {1}".format(service_name, err))
raise Exception("Could not get status for service {0}, {1}".format(service_name, err))
for line in iter(out.splitlines()):
parts = line.split('=')
if parts[0] == SystemctlMonitor.LOAD_STATE_KEY:
load_state = parts[1]
elif parts[0] == SystemctlMonitor.ACTIVE_STATE_KEY:
active_state = parts[1]
elif parts[0] == SystemctlMonitor.SUBSTATE_KEY:
sub_state = parts[1]
return load_state + "." + active_state + "." + sub_state
def main():
parser = argparse.ArgumentParser(description='systemctrl state monitor')
parser.add_argument('-s', help='service name', required=True)
parser.add_argument('-r', help='sample rate', required=True)
parser.add_argument('-p', help='aggregation period', required=True)
args = parser.parse_args()
print("Starting monitoring : {0}, {1}, {2}".format(args.s, args.r, args.p))
mon = SystemctlMonitor(args.s, int(args.r), int(args.p))
mon.start()
time.sleep(11)
mon.stop()
report = mon.get_last_report()
print("Current report: {0}".format(str(report)))
if __name__== "__main__":
main()
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Michael Boniface
## Created Date : 29-04-2018
## Created for Project : FLAME
"""
import pytest
import time
import random
import logging
from clmctest.inputs.config_mon import SystemctlMonitor, CollectionThread
#RequiresMountsFor=/var/tmp
#Description=The nginx HTTP and reverse proxy server
#LoadState=loaded
#ActiveState=active
#SubState=running
#FragmentPath=/usr/lib/systemd/system/nginx.service
#UnitFileState=disabled
#UnitFilePreset=disabled
STATE_INDEX = 0
TIME_INDEX = 1
samples = [[['active', 0]],
[['active', 0], ['active', 2]],
[['active', 0], ['failed', 2]],
[['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]],
[['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]]
sample_set = 0
current_index = 0
def get_sample_test():
global sample_set
global current_index
sample = (samples[sample_set][current_index][STATE_INDEX], time.time())
sample_count = len(samples[sample_set])
if current_index < sample_count-1:
current_index +=1
else:
current_index = 0
return sample
def test_report():
t = CollectionThread(get_sample_test)
report = t.gen_report(samples[0], 10)
report = t.gen_report(samples[1], 10)
assert report['current_state'] == 'active'
assert report['current_state_time'] == 12
assert report['agg_states']['active']['dur'] == 2
assert report['agg_states']['active']['count'] == 1
report = t.gen_report(samples[2], 8)
assert report['current_state'] == 'failed'
assert report['current_state_time'] == 0
assert report['agg_states']['active']['dur'] == 2
assert report['agg_states']['active']['count'] == 1
assert report['agg_states']['failed']['dur'] == 0
assert report['agg_states']['failed']['count'] == 1
report = t.gen_report(samples[3], 2)
assert report['current_state'] == 'inactive'
assert report['current_state_time'] == 0
assert report['agg_states']['active']['dur'] == 6
assert report['agg_states']['active']['count'] == 2
assert report['agg_states']['inactive']['dur'] == 2
assert report['agg_states']['inactive']['count'] == 2
assert report['agg_states']['failed']['dur'] == 2
assert report['agg_states']['failed']['count'] == 1
report = t.gen_report(samples[4], 4)
assert report['current_state'] == 'failed'
assert report['current_state_time'] == 0
assert report['agg_states']['active']['dur'] == 4
assert report['agg_states']['active']['count'] == 2
assert report['agg_states']['inactive']['dur'] == 4
assert report['agg_states']['inactive']['count'] == 2
assert report['agg_states']['failed']['dur'] == 2
assert report['agg_states']['failed']['count'] == 2
def test_one_period_collection():
global sample_set
global current_index
# one reporting period
sample_set = 1
current_index = 0
t = CollectionThread(get_sample_test, 2, 6)
t.start()
time.sleep(8)
t.stop()
print("Current report: {0}".format(str(t.current_report)))
assert t.current_report['current_state'] == 'active'
assert int(round(t.current_report['current_state_time'])) == 6
assert int(round(t.current_report['agg_states']['active']['dur'])) == 6
assert int(round(t.current_report['agg_states']['active']['count'])) == 1
def test_multi_period_single_state_collection():
global sample_set
global current_index
# two reporting periods
sample_set = 1
current_index = 0
t = CollectionThread(get_sample_test, 1, 3)
t.start()
time.sleep(7)
t.stop()
print("Current report: {0}".format(str(t.current_report)))
assert t.current_report['current_state'] == 'active'
assert int(round(t.current_report['current_state_time'])) == 6
assert int(round(t.current_report['agg_states']['active']['dur'])) == 3
assert int(round(t.current_report['agg_states']['active']['count'])) == 1
def test_multi_period_multi_state_collection():
global sample_set
global current_index
# 6 samples and 2 reporting periods
sample_set = 4
current_index = 0
t = CollectionThread(get_sample_test, 2, 10)
t.start()
time.sleep(13)
t.stop()
print("Current report: {0}".format(str(t.current_report)))
assert t.current_report['current_state'] == 'failed'
assert int(round(t.current_report['current_state_time'])) == 0
assert int(round(t.current_report['agg_states']['active']['dur'])) == 4
assert int(round(t.current_report['agg_states']['active']['count'])) == 2
assert int(round(t.current_report['agg_states']['inactive']['dur'])) == 4
assert int(round(t.current_report['agg_states']['inactive']['count'])) == 2
assert int(round(t.current_report['agg_states']['failed']['dur'])) == 2
assert int(round(t.current_report['agg_states']['failed']['count'])) == 2
def test_get_systemctl_status():
mon = SystemctlMonitor('nginx', 2, 10)
state = mon.get_systemctl_status('nginx')
assert state == 'loaded.active.running'
def test_monitor():
mon = SystemctlMonitor('nginx', 2, 10)
mon.start()
time.sleep(11)
mon.stop()
report = mon.get_last_report()
print("Current report: {0}".format(str(report)))
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment