Skip to content
Snippets Groups Projects
Commit d341a953 authored by MJB's avatar MJB
Browse files

updated logging, install, etc

parent 8243c06d
Branches
Tags
No related merge requests found
...@@ -51,113 +51,113 @@ def get_sample_test(): ...@@ -51,113 +51,113 @@ def get_sample_test():
current_index = 0 current_index = 0
return sample return sample
def write_output(report): def write_output(measurement):
print("Writing report output {0}".format(report)) print("Writing measurement output {0}".format(measurement))
sample_set = 0 sample_set = 0
current_index = 0 current_index = 0
def test_agg(): def test_agg():
t = ConfigCollector(get_sample_test, write_output) t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[0], 10, 12) measurement = t.create_measurement(samples[0], 10, 12)
assert report['fields']['current_state'] == 'active' assert measurement['fields']['current_state'] == 'active'
assert report['fields']['current_state_time'] == 12 assert measurement['fields']['current_state_time'] == 12
assert report['fields']['active_sum'] == 12 assert measurement['fields']['active_sum'] == 12
assert report['fields']['active_count'] == 1 assert measurement['fields']['active_count'] == 1
assert report['time'] == 12 assert measurement['time'] == 12
t = ConfigCollector(get_sample_test, write_output) t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[1], 10, 14) measurement = t.create_measurement(samples[1], 10, 14)
assert report['fields']['current_state'] == 'active' assert measurement['fields']['current_state'] == 'active'
assert report['fields']['current_state_time'] == 14 assert measurement['fields']['current_state_time'] == 14
assert report['fields']['active_sum'] == 14 assert measurement['fields']['active_sum'] == 14
assert report['fields']['active_count'] == 1 assert measurement['fields']['active_count'] == 1
assert report['time'] == 14 assert measurement['time'] == 14
t = ConfigCollector(get_sample_test, write_output) t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[2], 8, 10) measurement = t.create_measurement(samples[2], 8, 10)
assert report['fields']['current_state'] == 'failed' assert measurement['fields']['current_state'] == 'failed'
assert report['fields']['current_state_time'] == 0 assert measurement['fields']['current_state_time'] == 0
assert report['fields']['active_sum'] == 2 assert measurement['fields']['active_sum'] == 2
assert report['fields']['active_count'] == 1 assert measurement['fields']['active_count'] == 1
assert report['fields']['failed_sum'] == 0 assert measurement['fields']['failed_sum'] == 0
assert report['fields']['failed_count'] == 1 assert measurement['fields']['failed_count'] == 1
assert report['time'] == 10 assert measurement['time'] == 10
t = ConfigCollector(get_sample_test, write_output) t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[3], 2, 12) measurement = t.create_measurement(samples[3], 2, 12)
assert report['fields']['current_state'] == 'inactive' assert measurement['fields']['current_state'] == 'inactive'
assert report['fields']['current_state_time'] == 0 assert measurement['fields']['current_state_time'] == 0
assert report['fields']['active_sum'] == 6 assert measurement['fields']['active_sum'] == 6
assert report['fields']['active_count'] == 2 assert measurement['fields']['active_count'] == 2
assert report['fields']['inactive_sum'] == 2 assert measurement['fields']['inactive_sum'] == 2
assert report['fields']['inactive_count'] == 2 assert measurement['fields']['inactive_count'] == 2
assert report['fields']['failed_sum'] == 2 assert measurement['fields']['failed_sum'] == 2
assert report['fields']['failed_count'] == 1 assert measurement['fields']['failed_count'] == 1
assert report['time'] == 12 assert measurement['time'] == 12
t = ConfigCollector(get_sample_test, write_output) t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[4], 4, 14) measurement = t.create_measurement(samples[4], 4, 14)
assert report['fields']['current_state'] == 'failed' assert measurement['fields']['current_state'] == 'failed'
assert report['fields']['current_state_time'] == 0 assert measurement['fields']['current_state_time'] == 0
assert report['fields']['active_sum'] == 4 assert measurement['fields']['active_sum'] == 4
assert report['fields']['active_count'] == 2 assert measurement['fields']['active_count'] == 2
assert report['fields']['inactive_sum'] == 4 assert measurement['fields']['inactive_sum'] == 4
assert report['fields']['inactive_count'] == 2 assert measurement['fields']['inactive_count'] == 2
assert report['fields']['failed_sum'] == 2 assert measurement['fields']['failed_sum'] == 2
assert report['fields']['failed_count'] == 2 assert measurement['fields']['failed_count'] == 2
assert report['time'] == 14 assert measurement['time'] == 14
def test_one_period_collection(): def test_one_period_collection():
global sample_set global sample_set
global current_index global current_index
# one reporting period # one measurementing period
sample_set = 1 sample_set = 1
current_index = 0 current_index = 0
t = ConfigCollector(get_sample_test, write_output, 2, 6) t = ConfigCollector(get_sample_test, write_output, 2, 6)
t.start() t.start()
time.sleep(8) time.sleep(8)
t.stop() t.stop()
print("Current report: {0}".format(str(t.current_report))) print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_report['fields']['current_state'] == 'active' assert t.current_measurement['fields']['current_state'] == 'active'
assert int(round(t.current_report['fields']['current_state_time'])) == 6 assert int(round(t.current_measurement['fields']['current_state_time'])) == 6
assert int(round(t.current_report['fields']['active_sum'])) == 6 assert int(round(t.current_measurement['fields']['active_sum'])) == 6
assert int(round(t.current_report['fields']['active_count'])) == 1 assert int(round(t.current_measurement['fields']['active_count'])) == 1
def test_multi_period_single_state_collection(): def test_multi_period_single_state_collection():
global sample_set global sample_set
global current_index global current_index
# two reporting periods # two measurementing periods
sample_set = 1 sample_set = 1
current_index = 0 current_index = 0
t = ConfigCollector(get_sample_test, write_output, 1, 3) t = ConfigCollector(get_sample_test, write_output, 1, 3)
t.start() t.start()
time.sleep(7) time.sleep(7)
t.stop() t.stop()
print("Current report: {0}".format(str(t.current_report))) print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_report['fields']['current_state'] == 'active' assert t.current_measurement['fields']['current_state'] == 'active'
assert int(round(t.current_report['fields']['current_state_time'])) == 6 assert int(round(t.current_measurement['fields']['current_state_time'])) == 6
assert int(round(t.current_report['fields']['active_sum'])) == 6 assert int(round(t.current_measurement['fields']['active_sum'])) == 6
assert int(round(t.current_report['fields']['active_count'])) == 1 assert int(round(t.current_measurement['fields']['active_count'])) == 1
# [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]] # [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]
def test_multi_period_multi_state_collection(): def test_multi_period_multi_state_collection():
global sample_set global sample_set
global current_index global current_index
# 6 samples and 2 reporting periods # 6 samples and 2 measurementing periods
sample_set = 4 sample_set = 4
current_index = 0 current_index = 0
t = ConfigCollector(get_sample_test, write_output, 2, 10) t = ConfigCollector(get_sample_test, write_output, 2, 10)
t.start() t.start()
time.sleep(13) time.sleep(13)
t.stop() t.stop()
print("Current report: {0}".format(str(t.current_report))) print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_report['fields']['current_state'] == 'failed' assert t.current_measurement['fields']['current_state'] == 'failed'
assert int(round(t.current_report['fields']['current_state_time'])) == 0 assert int(round(t.current_measurement['fields']['current_state_time'])) == 0
assert int(round(t.current_report['fields']['active_sum'])) == 4 assert int(round(t.current_measurement['fields']['active_sum'])) == 4
assert int(round(t.current_report['fields']['active_count'])) == 2 assert int(round(t.current_measurement['fields']['active_count'])) == 2
assert int(round(t.current_report['fields']['inactive_sum'])) == 4 assert int(round(t.current_measurement['fields']['inactive_sum'])) == 4
assert int(round(t.current_report['fields']['inactive_count'])) == 2 assert int(round(t.current_measurement['fields']['inactive_count'])) == 2
assert int(round(t.current_report['fields']['failed_sum'])) == 2 assert int(round(t.current_measurement['fields']['failed_sum'])) == 2
assert int(round(t.current_report['fields']['failed_count'])) == 2 assert int(round(t.current_measurement['fields']['failed_count'])) == 2
\ No newline at end of file \ No newline at end of file
...@@ -30,11 +30,10 @@ import sys ...@@ -30,11 +30,10 @@ import sys
from systemctl_monitor import SystemctlMonitor from systemctl_monitor import SystemctlMonitor
URL = "http://172.2.23.212" URL = "localhost"
PORT = "8186" PORT = "8186"
DATABASE = "CLMCMetrics" DATABASE = "CLMCMetrics"
@pytest.mark.parametrize("service_name", [('nginx')]) @pytest.mark.parametrize("service_name", [('nginx')])
def test_create_measurement(telegraf_agent_config, service_name): def test_create_measurement(telegraf_agent_config, service_name):
...@@ -43,22 +42,17 @@ def test_create_measurement(telegraf_agent_config, service_name): ...@@ -43,22 +42,17 @@ def test_create_measurement(telegraf_agent_config, service_name):
if s['name'] == service_name: if s['name'] == service_name:
service = s service = s
continue continue
assert service != 'unknown', "{0} not in list of hosts".format(service_name) assert service != 'unknown', "{0} not in list of hosts".format(service_name)
mon = SystemctlMonitor(service_name, 2, 10, service['ip_address'], 8186, service['database_name']) mon = SystemctlMonitor(service_name, 2, 10, service['ip_address'], 8186, service['database_name'])
report = {'time': 1526042434.1773288, 'fields': {'loaded.active.running_sum': 231.85903143882751, 'current_state_time': 231.85903143882751, 'current_state': 'loaded.active.running', 'loaded.active.running_count': 1}}
report = {'current_state': 'failed', 'time': 2, 'agg_states': {'inactive': {'dur': 3, 'count': 3}, 'active': {'dur': 4, 'count': 4}, 'failed': {'dur': 5, 'count': 5}}, 'current_state_time': 0.0}
measurement = mon.create_measurement(report) measurement = mon.create_measurement(report)
assert measurement[0]['tags']['resource_name'] == service_name assert measurement[0]['tags']['resource_name'] == service_name
assert measurement[0]['fields']['current_state'] == report['current_state'] assert measurement[0]['fields']['current_state'] == report['fields']['current_state']
def test_get_systemctl_status(telegraf_agent_config): def test_get_systemctl_status(telegraf_agent_config):
mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE) mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE)
state = mon.get_systemctl_status('nginx') state = mon.get_systemctl_status('nginx')
assert state == 'loaded.active.running' assert state == 'loaded.active.running'
def test_monitor(telegraf_agent_config): def test_monitor(telegraf_agent_config):
...@@ -66,6 +60,5 @@ def test_monitor(telegraf_agent_config): ...@@ -66,6 +60,5 @@ def test_monitor(telegraf_agent_config):
mon.start() mon.start()
time.sleep(21) time.sleep(21)
mon.stop() mon.stop()
measurement = mon.get_current_measurement()
report = mon.get_last_report() print("Current measurement: {0}".format(str(measurement)))
print("Current report: {0}".format(str(report))) \ No newline at end of file
\ No newline at end of file
...@@ -28,9 +28,6 @@ ...@@ -28,9 +28,6 @@
apt-get update apt-get update
yes Y | apt-get install nginx yes Y | apt-get install nginx
sudo apt-get install python3 python3-pip -y
sudo python3 -m pip install pytest pyaml influxdb
# Need to set up basic stats as this not configured by default # Need to set up basic stats as this not configured by default
# http://nginx.org/en/docs/http/ngx_http_stub_status_module.html # http://nginx.org/en/docs/http/ngx_http_stub_status_module.html
...@@ -54,7 +51,10 @@ fi ...@@ -54,7 +51,10 @@ fi
nginx -s reload nginx -s reload
systemctl start nginx systemctl start nginx
## install a configuration monitoring service ## install a configuration monitoring service, this needs to be in a venv with the rest of the CLMC
sudo apt-get install python3 python3-pip -y
sudo pip3 install pyaml influxdb
svc="nginxmon" svc="nginxmon"
echo "install systemctl monitoring service" echo "install systemctl monitoring service"
......
...@@ -5,23 +5,23 @@ import random ...@@ -5,23 +5,23 @@ import random
import logging import logging
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__) logger = logging.getLogger()
class ConfigCollector(threading.Thread): class ConfigCollector(threading.Thread):
STATE_NAME = 0 STATE_NAME = 0
STATE_TIME = 1 STATE_TIME = 1
def __init__(self, sample_func, output_func, sample_rate=2, agg_period=10): def __init__(self, sample_func, write_func, sample_rate=2, agg_period=10):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._start_event = threading.Event() self._start_event = threading.Event()
self.sample_func = sample_func self.sample_func = sample_func
self.output_func = output_func self.write_func = write_func
self.sample_rate = sample_rate self.sample_rate = sample_rate
self.agg_period = agg_period self.agg_period = agg_period
self.samples = [] self.samples = []
self.agg_states = {} self.agg_states = {}
self.states = {} self.states = {}
self.current_report = {} self.current_measurement = {}
return return
def run(self): def run(self):
...@@ -48,11 +48,12 @@ class ConfigCollector(threading.Thread): ...@@ -48,11 +48,12 @@ class ConfigCollector(threading.Thread):
# if last sample was at the end of the aggregation period then process # if last sample was at the end of the aggregation period then process
if sample_time >= end_period: if sample_time >= end_period:
# aggregate samples into single measurement # aggregate samples into single measurement
self.current_report = self.agg_samples(self.samples, current_state_time, sample_time) self.current_measurement = self.create_measurement(self.samples, current_state_time, sample_time)
# write output # write output
self.output_func(self.current_report) write_thread = WriteThread(self.write_func, self.current_measurement)
write_thread.start()
# set time in current state # set time in current state
current_state_time = self.current_report['fields']['current_state_time'] current_state_time = self.current_measurement['fields']['current_state_time']
# remove all processed samples # remove all processed samples
self.samples.clear() self.samples.clear()
# add last sample as 1st sample of the next period # add last sample as 1st sample of the next period
...@@ -83,25 +84,34 @@ class ConfigCollector(threading.Thread): ...@@ -83,25 +84,34 @@ class ConfigCollector(threading.Thread):
logger.debug("Stopping thread") logger.debug("Stopping thread")
self._start_event.clear() self._start_event.clear()
def agg_samples(self, samples, initial_state_time, current_time): def create_measurement(self, samples, initial_state_time, current_time):
report = {}
logger.debug("Samples: {0}".format(str(samples))) logger.debug("Samples: {0}".format(str(samples)))
# check we have more than two samples to aggregate # aggregate samples into states
sample_count = len(samples) states = self.aggregate_samples(samples)
logger.debug("Sample count {0}".format(sample_count)) logger.debug("States: {0}".format(str(states)))
if sample_count < 2: # aggregate the states into a measurement
logger.warn("Not enough samples to aggregate in period {0}".format(sample_count)) measurement = self.aggregate_states(states, initial_state_time)
return report measurement['time'] = current_time
logger.debug("Report: {0}".format(str(measurement)))
# set initial state to the 1st sample return measurement
initial_state = samples[0][self.STATE_NAME]
logger.debug("Initial state : {0}".format(initial_state))
# Calculates the state time from the set of samples def aggregate_samples(self, samples):
states = [] states = []
sample_count = len(samples)
logger.debug("Sample count {0}".format(sample_count))
# error if no samples to aggregate
if sample_count == 0:
raise ValueError('No samples in the samples list')
# no aggregation needed if only one sample
if sample_count == 1:
return samples[0]
# aggregate samples
last_index = sample_count-1 last_index = sample_count-1
for index, sample in enumerate(samples): for index, sample in enumerate(samples):
# for the 1st sample we set the current state and state_start_time # for the 1st sample we set the current state and state_start_time
...@@ -119,7 +129,6 @@ class ConfigCollector(threading.Thread): ...@@ -119,7 +129,6 @@ class ConfigCollector(threading.Thread):
current_state = sample[self.STATE_NAME] current_state = sample[self.STATE_NAME]
# set the start time of the next state # set the start time of the next state
state_start_time = state_start_time + state_time state_start_time = state_start_time + state_time
# deal with the final sample # deal with the final sample
if index == last_index: if index == last_index:
# calc state duration if last sample is the same as previous state # calc state duration if last sample is the same as previous state
...@@ -129,16 +138,17 @@ class ConfigCollector(threading.Thread): ...@@ -129,16 +138,17 @@ class ConfigCollector(threading.Thread):
# add transition in final sample with zero duration # add transition in final sample with zero duration
elif current_state != sample[self.STATE_NAME]: elif current_state != sample[self.STATE_NAME]:
states.append([current_state,0]) states.append([current_state,0])
return states
logger.debug("States: {0}".format(str(states))) def aggregate_states(self, states, initial_state_time):
# set initial state to the 1st sample
initial_state = states[0][self.STATE_NAME]
logger.debug("Initial state : {0}".format(initial_state))
logger.debug("Initial state time : {0}".format(initial_state_time)) logger.debug("Initial state time : {0}".format(initial_state_time))
# set the current state as the last state sampled # set the current state as the last state sampled
current_state = states[-1][self.STATE_NAME] current_state = states[-1][self.STATE_NAME]
# calc time in current state # if no change in state take the initial state time and add current state time
# if no change in state then take the initial time in state and add the current state time
if initial_state == current_state and len(states) == 1: if initial_state == current_state and len(states) == 1:
logger.debug("No transition so just adding last state to current state")
current_state_time = initial_state_time + states[-1][self.STATE_TIME] current_state_time = initial_state_time + states[-1][self.STATE_TIME]
state_sum_key = current_state + "_sum" state_sum_key = current_state + "_sum"
state_count_key = current_state + "_count" state_count_key = current_state + "_count"
...@@ -150,15 +160,14 @@ class ConfigCollector(threading.Thread): ...@@ -150,15 +160,14 @@ class ConfigCollector(threading.Thread):
# current state time is the last state time # current state time is the last state time
current_state_time = states[-1][self.STATE_TIME] current_state_time = states[-1][self.STATE_TIME]
# calc the total duration and number of transitions in each state. # calc the total duration and number of transitions in each state.
for index, state in enumerate(states): for state in states:
# if first time we've seen the state add to dict with initial duration and a single transition # if first occurance of state add with initial duration and a single transition
state_sum_key = state[self.STATE_NAME] + "_sum" state_sum_key = state[self.STATE_NAME] + "_sum"
state_count_key = state[self.STATE_NAME] + "_count" state_count_key = state[self.STATE_NAME] + "_count"
if state_sum_key not in self.agg_states: if state_sum_key not in self.agg_states:
logger.debug("Adding state: {0}".format(state[self.STATE_NAME]))
self.agg_states[state_sum_key] = state[self.STATE_TIME] self.agg_states[state_sum_key] = state[self.STATE_TIME]
self.agg_states[state_count_key] = 1 self.agg_states[state_count_key] = 1
logger.debug("Adding state: {0}".format(state[self.STATE_NAME]))
else: else:
logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME])) logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME]))
# increment number of times in the state # increment number of times in the state
...@@ -169,28 +178,28 @@ class ConfigCollector(threading.Thread): ...@@ -169,28 +178,28 @@ class ConfigCollector(threading.Thread):
logger.debug("Duration: {0}".format(self.agg_states[state_sum_key])) logger.debug("Duration: {0}".format(self.agg_states[state_sum_key]))
# Create report # Create report
report['fields'] = self.agg_states measurement = {}
report['fields']['current_state'] = current_state measurement['fields'] = self.agg_states
report['fields']['current_state_time'] = current_state_time measurement['fields']['current_state'] = current_state
report['time'] = current_time measurement['fields']['current_state_time'] = current_state_time
logger.debug("Report: {0}".format(str(report))) return measurement
return report class WriteThread(threading.Thread):
def __init__(self, write_func, report):
threading.Thread.__init__(self)
class OutputThread(threading.Thread): self._start_event = threading.Event()
def __init__(self, output_func, report): self.write_func = write_func
self.output_func = output_func
self.report = report self.report = report
return
def run(self): def run(self):
# if thread running then return # if thread running then return
if(self._start_event.is_set()): if(self._start_event.is_set()):
return return
self._start_event.set() self._start_event.set()
output_func(report) self.write_func(self.report)
def stop(self): def stop(self):
logger.debug("Stoppping thread") logger.debug("Stopping thread")
self._start_event.clear() self._start_event.clear()
...@@ -8,7 +8,7 @@ from config_collector import ConfigCollector ...@@ -8,7 +8,7 @@ from config_collector import ConfigCollector
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger()
class SystemctlMonitor: class SystemctlMonitor:
ACTIVE_STATE_KEY='ActiveState' ACTIVE_STATE_KEY='ActiveState'
...@@ -21,15 +21,17 @@ class SystemctlMonitor: ...@@ -21,15 +21,17 @@ class SystemctlMonitor:
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
self.database = database self.database = database
self.db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10)
def start(self): def start(self):
self.collection_thread.start() self.collection_thread.start()
def stop(self): def stop(self):
self.collection_thread.stop() self.collection_thread.stop()
self.db_client.close()
def get_last_report(self): def get_current_measurement(self):
return self.collection_thread.current_report return self.collection_thread.current_measurement
def get_systemctl_sample(self): def get_systemctl_sample(self):
return (self.get_systemctl_status(self.service_name), time.time()) return (self.get_systemctl_status(self.service_name), time.time())
...@@ -63,9 +65,8 @@ class SystemctlMonitor: ...@@ -63,9 +65,8 @@ class SystemctlMonitor:
print("Writing report: {0}".format(str(report))) print("Writing report: {0}".format(str(report)))
try: try:
db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10)
measurement = self.create_measurement(report) measurement = self.create_measurement(report)
db_client.write_points(measurement) self.db_client.write_points(measurement)
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -96,8 +97,12 @@ def main(): ...@@ -96,8 +97,12 @@ def main():
args = parser.parse_args() args = parser.parse_args()
print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}, {5}".format(args.service, args.rate, args.agg, args.host, args.port, args.db)) print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}, {5}".format(args.service, args.rate, args.agg, args.host, args.port, args.db))
if(args.debug): if args.debug == True:
print("Setting logging level to to DEBUG")
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db) mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db)
mon.start() mon.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment