Skip to content
Snippets Groups Projects
Commit 8243c06d authored by Michael Boniface's avatar Michael Boniface
Browse files

fixed the monitoring to be culmulative over the entire duration of the sampling

parent eab379f8
No related branches found
No related tags found
No related merge requests found
...@@ -39,9 +39,6 @@ samples = [[['active', 0], ['active', 2]], ...@@ -39,9 +39,6 @@ samples = [[['active', 0], ['active', 2]],
[['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]], [['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]],
[['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]] [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]]
sample_set = 0
current_index = 0
def get_sample_test(): def get_sample_test():
global sample_set global sample_set
global current_index global current_index
...@@ -56,48 +53,59 @@ def get_sample_test(): ...@@ -56,48 +53,59 @@ def get_sample_test():
def write_output(report): def write_output(report):
print("Writing report output {0}".format(report)) print("Writing report output {0}".format(report))
def test_aggregation(): sample_set = 0
t = ConfigCollector(get_sample_test, write_output) current_index = 0
def test_agg():
t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[0], 10, 12) report = t.agg_samples(samples[0], 10, 12)
assert report['fields']['current_state'] == 'active'
assert report['fields']['current_state_time'] == 12
assert report['fields']['active_sum'] == 12
assert report['fields']['active_count'] == 1
assert report['time'] == 12
t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[1], 10, 14) report = t.agg_samples(samples[1], 10, 14)
assert report['current_state'] == 'active' assert report['fields']['current_state'] == 'active'
assert report['current_state_time'] == 14 assert report['fields']['current_state_time'] == 14
assert report['agg_states']['active']['dur'] == 4 assert report['fields']['active_sum'] == 14
assert report['agg_states']['active']['count'] == 0 assert report['fields']['active_count'] == 1
assert report['time'] == 14 assert report['time'] == 14
t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[2], 8, 10) report = t.agg_samples(samples[2], 8, 10)
assert report['current_state'] == 'failed' assert report['fields']['current_state'] == 'failed'
assert report['current_state_time'] == 0 assert report['fields']['current_state_time'] == 0
assert report['agg_states']['active']['dur'] == 2 assert report['fields']['active_sum'] == 2
assert report['agg_states']['active']['count'] == 0 assert report['fields']['active_count'] == 1
assert report['agg_states']['failed']['dur'] == 0 assert report['fields']['failed_sum'] == 0
assert report['agg_states']['failed']['count'] == 1 assert report['fields']['failed_count'] == 1
assert report['time'] == 10 assert report['time'] == 10
t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[3], 2, 12) report = t.agg_samples(samples[3], 2, 12)
assert report['current_state'] == 'inactive' assert report['fields']['current_state'] == 'inactive'
assert report['current_state_time'] == 0 assert report['fields']['current_state_time'] == 0
assert report['agg_states']['active']['dur'] == 6 assert report['fields']['active_sum'] == 6
assert report['agg_states']['active']['count'] == 1 assert report['fields']['active_count'] == 2
assert report['agg_states']['inactive']['dur'] == 2 assert report['fields']['inactive_sum'] == 2
assert report['agg_states']['inactive']['count'] == 2 assert report['fields']['inactive_count'] == 2
assert report['agg_states']['failed']['dur'] == 2 assert report['fields']['failed_sum'] == 2
assert report['agg_states']['failed']['count'] == 1 assert report['fields']['failed_count'] == 1
assert report['time'] == 12 assert report['time'] == 12
t = ConfigCollector(get_sample_test, write_output)
report = t.agg_samples(samples[4], 4, 14) report = t.agg_samples(samples[4], 4, 14)
assert report['current_state'] == 'failed' assert report['fields']['current_state'] == 'failed'
assert report['current_state_time'] == 0 assert report['fields']['current_state_time'] == 0
assert report['agg_states']['active']['dur'] == 4 assert report['fields']['active_sum'] == 4
assert report['agg_states']['active']['count'] == 1 assert report['fields']['active_count'] == 2
assert report['agg_states']['inactive']['dur'] == 4 assert report['fields']['inactive_sum'] == 4
assert report['agg_states']['inactive']['count'] == 2 assert report['fields']['inactive_count'] == 2
assert report['agg_states']['failed']['dur'] == 2 assert report['fields']['failed_sum'] == 2
assert report['agg_states']['failed']['count'] == 2 assert report['fields']['failed_count'] == 2
assert report['time'] == 14 assert report['time'] == 14
def test_one_period_collection(): def test_one_period_collection():
...@@ -112,10 +120,10 @@ def test_one_period_collection(): ...@@ -112,10 +120,10 @@ def test_one_period_collection():
time.sleep(8) time.sleep(8)
t.stop() t.stop()
print("Current report: {0}".format(str(t.current_report))) print("Current report: {0}".format(str(t.current_report)))
assert t.current_report['current_state'] == 'active' assert t.current_report['fields']['current_state'] == 'active'
assert int(round(t.current_report['current_state_time'])) == 6 assert int(round(t.current_report['fields']['current_state_time'])) == 6
assert int(round(t.current_report['agg_states']['active']['dur'])) == 6 assert int(round(t.current_report['fields']['active_sum'])) == 6
assert int(round(t.current_report['agg_states']['active']['count'])) == 0 assert int(round(t.current_report['fields']['active_count'])) == 1
def test_multi_period_single_state_collection(): def test_multi_period_single_state_collection():
global sample_set global sample_set
...@@ -128,11 +136,12 @@ def test_multi_period_single_state_collection(): ...@@ -128,11 +136,12 @@ def test_multi_period_single_state_collection():
time.sleep(7) time.sleep(7)
t.stop() t.stop()
print("Current report: {0}".format(str(t.current_report))) print("Current report: {0}".format(str(t.current_report)))
assert t.current_report['current_state'] == 'active' assert t.current_report['fields']['current_state'] == 'active'
assert int(round(t.current_report['current_state_time'])) == 6 assert int(round(t.current_report['fields']['current_state_time'])) == 6
assert int(round(t.current_report['agg_states']['active']['dur'])) == 3 assert int(round(t.current_report['fields']['active_sum'])) == 6
assert int(round(t.current_report['agg_states']['active']['count'])) == 0 assert int(round(t.current_report['fields']['active_count'])) == 1
# [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]
def test_multi_period_multi_state_collection(): def test_multi_period_multi_state_collection():
global sample_set global sample_set
global current_index global current_index
...@@ -144,11 +153,11 @@ def test_multi_period_multi_state_collection(): ...@@ -144,11 +153,11 @@ def test_multi_period_multi_state_collection():
time.sleep(13) time.sleep(13)
t.stop() t.stop()
print("Current report: {0}".format(str(t.current_report))) print("Current report: {0}".format(str(t.current_report)))
assert t.current_report['current_state'] == 'failed' assert t.current_report['fields']['current_state'] == 'failed'
assert int(round(t.current_report['current_state_time'])) == 0 assert int(round(t.current_report['fields']['current_state_time'])) == 0
assert int(round(t.current_report['agg_states']['active']['dur'])) == 4 assert int(round(t.current_report['fields']['active_sum'])) == 4
assert int(round(t.current_report['agg_states']['active']['count'])) == 1 assert int(round(t.current_report['fields']['active_count'])) == 2
assert int(round(t.current_report['agg_states']['inactive']['dur'])) == 4 assert int(round(t.current_report['fields']['inactive_sum'])) == 4
assert int(round(t.current_report['agg_states']['inactive']['count'])) == 2 assert int(round(t.current_report['fields']['inactive_count'])) == 2
assert int(round(t.current_report['agg_states']['failed']['dur'])) == 2 assert int(round(t.current_report['fields']['failed_sum'])) == 2
assert int(round(t.current_report['agg_states']['failed']['count'])) == 2 assert int(round(t.current_report['fields']['failed_count'])) == 2
\ No newline at end of file \ No newline at end of file
...@@ -65,8 +65,8 @@ echo "After=network-online.target" >> $svc_file ...@@ -65,8 +65,8 @@ echo "After=network-online.target" >> $svc_file
echo "" >> $svc_file echo "" >> $svc_file
echo "[Service]" >> $svc_file echo "[Service]" >> $svc_file
echo "WorkingDirectory=${inst}/${dir}" >> $svc_file echo "WorkingDirectory=${inst}/${dir}" >> $svc_file
echo "ExecStart=/usr/bin/python3 /vagrant/src/monitoring/systemctl_monitor.py -service nginx -rate 2 -agg 10 -host localhost -port 8186 -db CLMCMetrics" >> $svc_file echo "ExecStart=/usr/bin/python3 ${REPO_ROOT}/src/monitoring/systemctl_monitor.py -service nginx -rate 2 -agg 10 -host localhost -port 8186 -db CLMCMetrics" >> $svc_file
echo "ExecStop=/usr/bin/bash /vagrant/src/monitoring/stop_systemctl_monitor.sh" >> $svc_file echo "ExecStop=/usr/bin/bash ${REPO_ROOT}/src/monitoring/stop_systemctl_monitor.sh" >> $svc_file
echo "" >> $svc_file echo "" >> $svc_file
echo "[Install]" >> $svc_file echo "[Install]" >> $svc_file
echo "WantedBy=network-online.target" >> $svc_file echo "WantedBy=network-online.target" >> $svc_file
......
...@@ -19,6 +19,7 @@ class ConfigCollector(threading.Thread): ...@@ -19,6 +19,7 @@ class ConfigCollector(threading.Thread):
self.sample_rate = sample_rate self.sample_rate = sample_rate
self.agg_period = agg_period self.agg_period = agg_period
self.samples = [] self.samples = []
self.agg_states = {}
self.states = {} self.states = {}
self.current_report = {} self.current_report = {}
return return
...@@ -51,7 +52,7 @@ class ConfigCollector(threading.Thread): ...@@ -51,7 +52,7 @@ class ConfigCollector(threading.Thread):
# write output # write output
self.output_func(self.current_report) self.output_func(self.current_report)
# set time in current state # set time in current state
current_state_time = self.current_report['current_state_time'] current_state_time = self.current_report['fields']['current_state_time']
# remove all processed samples # remove all processed samples
self.samples.clear() self.samples.clear()
# add last sample as 1st sample of the next period # add last sample as 1st sample of the next period
...@@ -131,41 +132,46 @@ class ConfigCollector(threading.Thread): ...@@ -131,41 +132,46 @@ class ConfigCollector(threading.Thread):
logger.debug("States: {0}".format(str(states))) logger.debug("States: {0}".format(str(states)))
# calc the total duration and number of transitions in each state. logger.debug("Initial state time : {0}".format(initial_state_time))
# Assuming no transition into the 1st state. # set the current state as the last state sampled
# Assuming current_state = states[-1][self.STATE_NAME]
# calc time in current state
agg_states = {}
for index, state in enumerate(states):
if index == 0:
agg_states[states[0][self.STATE_NAME]] = {'dur':states[0][self.STATE_TIME], 'count': 0}
else:
# if first time we've seen the state add to dict with initial duration and a single transition
if state[self.STATE_NAME] not in agg_states:
agg_states[state[self.STATE_NAME]] = {'dur':state[self.STATE_TIME], 'count': 1}
logger.debug("Adding state: {0}, Count: {1}".format(state[self.STATE_NAME], 1))
else:
logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME]))
# add state time to aggregate total
agg_states[state[self.STATE_NAME]]['dur'] += state[self.STATE_TIME]
logger.debug("Duration: {0}".format(agg_states[state[self.STATE_NAME]]['dur']))
# increment number of times in the state
agg_states[state[self.STATE_NAME]]['count'] += 1
logger.debug("Count: {0}".format(agg_states[state[self.STATE_NAME]]['count']))
# set the current state as the last state
report['current_state'] = states[-1][0]
# calc time in current state
state_count = len(agg_states)
# if no change in state then take the initial time in state and add the current state time # if no change in state then take the initial time in state and add the current state time
if initial_state == states[-1][0] and state_count == 1: if initial_state == current_state and len(states) == 1:
logger.debug("No transition so just adding last state to current state")
current_state_time = initial_state_time + states[-1][self.STATE_TIME] current_state_time = initial_state_time + states[-1][self.STATE_TIME]
state_sum_key = current_state + "_sum"
state_count_key = current_state + "_count"
# initialise the number of transitions if it's the 1st time
if state_sum_key not in self.agg_states:
self.agg_states[state_count_key] = 1
self.agg_states[state_sum_key] = current_state_time
else: else:
# current state time is the last state time # current state time is the last state time
current_state_time = states[-1][self.STATE_TIME] current_state_time = states[-1][self.STATE_TIME]
report['current_state_time'] = current_state_time # calc the total duration and number of transitions in each state.
for index, state in enumerate(states):
# if first time we've seen the state add to dict with initial duration and a single transition
state_sum_key = state[self.STATE_NAME] + "_sum"
state_count_key = state[self.STATE_NAME] + "_count"
if state_sum_key not in self.agg_states:
self.agg_states[state_sum_key] = state[self.STATE_TIME]
self.agg_states[state_count_key] = 1
logger.debug("Adding state: {0}".format(state[self.STATE_NAME]))
else:
logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME]))
# increment number of times in the state
self.agg_states[state_count_key] += 1
logger.debug("increment number of times in the state")
# add state time to aggregate total
self.agg_states[state_sum_key] += state[self.STATE_TIME]
logger.debug("Duration: {0}".format(self.agg_states[state_sum_key]))
report['agg_states'] = agg_states # Create report
report['fields'] = self.agg_states
report['fields']['current_state'] = current_state
report['fields']['current_state_time'] = current_state_time
report['time'] = current_time report['time'] = current_time
logger.debug("Report: {0}".format(str(report))) logger.debug("Report: {0}".format(str(report)))
......
...@@ -7,7 +7,7 @@ import urllib.parse ...@@ -7,7 +7,7 @@ import urllib.parse
from config_collector import ConfigCollector from config_collector import ConfigCollector
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SystemctlMonitor: class SystemctlMonitor:
...@@ -62,9 +62,9 @@ class SystemctlMonitor: ...@@ -62,9 +62,9 @@ class SystemctlMonitor:
def write_measurement(self, report): def write_measurement(self, report):
print("Writing report: {0}".format(str(report))) print("Writing report: {0}".format(str(report)))
try: try:
db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10) db_client = InfluxDBClient(host=self.hostname, port=self.port, database=self.database, timeout=10)
measurement = self.create_measurement(report) measurement = self.create_measurement(report)
db_client.write_points(measurement) db_client.write_points(measurement)
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -76,19 +76,11 @@ class SystemctlMonitor: ...@@ -76,19 +76,11 @@ class SystemctlMonitor:
"tags": { "tags": {
"resource_name": self.service_name "resource_name": self.service_name
}, },
"fields": {
"current_state": report['current_state'],
"current_state_time": report['current_state_time'],
},
"time": measurement_time "time": measurement_time
}] }]
for key in report['agg_states']: measurement[0]['fields'] = report['fields']
field_name = key + "_sum"
measurement[0]['fields'][field_name] = report['agg_states'][key]['dur']
field_name = key + "_count"
measurement[0]['fields'][field_name] = report['agg_states'][key]['count']
return measurement return measurement
def main(): def main():
...@@ -100,10 +92,12 @@ def main(): ...@@ -100,10 +92,12 @@ def main():
parser.add_argument('-host', help='telegraf hostname', required=True) parser.add_argument('-host', help='telegraf hostname', required=True)
parser.add_argument('-port', help='telegraf port', required=True) parser.add_argument('-port', help='telegraf port', required=True)
parser.add_argument('-db', help='database name', required=True) parser.add_argument('-db', help='database name', required=True)
parser.add_argument('-debug', '--debug', action='store_true')
args = parser.parse_args() args = parser.parse_args()
print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}".format(args.service, args.rate, args.agg, args.host, args.port, args.db)) print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}, {5}".format(args.service, args.rate, args.agg, args.host, args.port, args.db))
if(args.debug):
logger.setLevel(logging.DEBUG)
mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db) mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db)
mon.start() mon.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment