Skip to content
Snippets Groups Projects
Commit 39007e26 authored by Stephen C Phillips's avatar Stephen C Phillips
Browse files

Merge remote-tracking branch 'origin/integration' into integration

parents 89161a1f fda24a1f
No related branches found
No related tags found
No related merge requests found
Showing
with 1636 additions and 5 deletions
## (c) University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Michael Boniface
## Created Date : 02-02-2018
## Created for Project : FLAME
hosts:
- name: clmc-service
cpus: 1
memory: 2048
disk: "10GB"
forward_ports:
- guest: 8086
host: 8086
- guest: 8888
host: 8888
- guest: 9092
host: 9092
ip_address: "172.40.231.51"
- name: minio
service_name: "minio"
cpus: 1
memory: 2048
disk: "10GB"
forward_ports:
- guest: 9000
host: 9000
ip_address: "172.40.231.155"
location: "DC1"
sfc_id: "MS_Template_1"
sfc_id_instance: "MS_I1"
sf_id: "adaptive_streaming"
sf_id_instance: "adaptive_streaming_I1"
ipendpoint_id: "adaptive_streaming_I1_minio"
influxdb_url: "http://172.40.231.51:8086"
database_name: "CLMCMetrics"
- name: test-runner
cpus: 1
memory: 2048
disk: "10GB"
ip_address: "172.40.231.200"
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Michael Boniface
## Created Date : 29-04-2018
## Created for Project : FLAME
"""
import pytest
import time
import random
import logging
import sys
from config_collector import ConfigCollector
STATE_INDEX = 0
TIME_INDEX = 1
samples = [[['active', 0], ['active', 2]],
[['active', 0], ['active', 2], ['active', 4]],
[['active', 0], ['failed', 2]],
[['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]],
[['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]]
def get_sample_test():
global sample_set
global current_index
sample = (samples[sample_set][current_index][STATE_INDEX], time.time())
sample_count = len(samples[sample_set])
if current_index < sample_count-1:
current_index +=1
else:
current_index = 0
return sample
def write_output(measurement):
print("Writing measurement output {0}".format(measurement))
sample_set = 0
current_index = 0
def test_agg():
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[0], 10, 12)
assert measurement[0]['fields']['current_state'] == 'active'
assert measurement[0]['fields']['current_state_time'] == 12
assert measurement[0]['fields']['active_sum'] == 12
assert measurement[0]['fields']['active_count'] == 1
assert measurement[0]['time'] == 12000000000
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[1], 10, 14)
assert measurement[0]['fields']['current_state'] == 'active'
assert measurement[0]['fields']['current_state_time'] == 14
assert measurement[0]['fields']['active_sum'] == 14
assert measurement[0]['fields']['active_count'] == 1
assert measurement[0]['time'] == 14000000000
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[2], 8, 10)
assert measurement[0]['fields']['current_state'] == 'failed'
assert measurement[0]['fields']['current_state_time'] == 0
assert measurement[0]['fields']['active_sum'] == 2
assert measurement[0]['fields']['active_count'] == 1
assert measurement[0]['fields']['failed_sum'] == 0
assert measurement[0]['fields']['failed_count'] == 1
assert measurement[0]['time'] == 10000000000
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[3], 2, 12)
assert measurement[0]['fields']['current_state'] == 'inactive'
assert measurement[0]['fields']['current_state_time'] == 0
assert measurement[0]['fields']['active_sum'] == 6
assert measurement[0]['fields']['active_count'] == 2
assert measurement[0]['fields']['inactive_sum'] == 2
assert measurement[0]['fields']['inactive_count'] == 2
assert measurement[0]['fields']['failed_sum'] == 2
assert measurement[0]['fields']['failed_count'] == 1
assert measurement[0]['time'] == 12000000000
t = ConfigCollector(get_sample_test, write_output, "resource")
measurement = t.create_measurement(samples[4], 4, 14)
assert measurement[0]['fields']['current_state'] == 'failed'
assert measurement[0]['fields']['current_state_time'] == 0
assert measurement[0]['fields']['active_sum'] == 4
assert measurement[0]['fields']['active_count'] == 2
assert measurement[0]['fields']['inactive_sum'] == 4
assert measurement[0]['fields']['inactive_count'] == 2
assert measurement[0]['fields']['failed_sum'] == 2
assert measurement[0]['fields']['failed_count'] == 2
assert measurement[0]['time'] == 14000000000
def test_one_period_collection():
global sample_set
global current_index
# one measurementing period
sample_set = 1
current_index = 0
t = ConfigCollector(get_sample_test, write_output, "resource", 2, 6)
t.start()
time.sleep(8)
t.stop()
print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_measurement[0]['fields']['current_state'] == 'active'
assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1
def test_multi_period_single_state_collection():
global sample_set
global current_index
# two measurementing periods
sample_set = 1
current_index = 0
t = ConfigCollector(get_sample_test, write_output, "resource", 1, 3)
t.start()
time.sleep(7)
t.stop()
print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_measurement[0]['fields']['current_state'] == 'active'
assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 6
assert int(round(t.current_measurement[0]['fields']['active_count'])) == 1
# [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]
def test_multi_period_multi_state_collection():
global sample_set
global current_index
# 6 samples and 2 measurementing periods
sample_set = 4
current_index = 0
t = ConfigCollector(get_sample_test, write_output, "resource", 2, 10)
t.start()
time.sleep(13)
t.stop()
print("Current measurement: {0}".format(str(t.current_measurement)))
assert t.current_measurement[0]['fields']['current_state'] == 'failed'
assert int(round(t.current_measurement[0]['fields']['current_state_time'])) == 0
assert int(round(t.current_measurement[0]['fields']['active_sum'])) == 4
assert int(round(t.current_measurement[0]['fields']['active_count'])) == 2
assert int(round(t.current_measurement[0]['fields']['inactive_sum'])) == 4
assert int(round(t.current_measurement[0]['fields']['inactive_count'])) == 2
assert int(round(t.current_measurement[0]['fields']['failed_sum'])) == 2
assert int(round(t.current_measurement[0]['fields']['failed_count'])) == 2
\ No newline at end of file
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Michael Boniface
## Created Date : 29-04-2018
## Created for Project : FLAME
"""
import pytest
import time
import random
import logging
import sys
from systemctl_monitor import SystemctlMonitor
URL = "localhost"
PORT = "8186"
DATABASE = "CLMCMetrics"
@pytest.mark.parametrize("service_name", [('nginx')])
def test_create_measurement(telegraf_agent_config, service_name):
service = 'unknown'
for s in telegraf_agent_config['hosts']:
if s['name'] == service_name:
service = s
continue
assert service != 'unknown', "{0} not in list of hosts".format(service_name)
mon = SystemctlMonitor(service_name, 2, 10, service['ip_address'], 8186, service['database_name'])
report = {'time': 1526042434.1773288, 'fields': {'loaded.active.running_sum': 231.85903143882751, 'current_state_time': 231.85903143882751, 'current_state': 'loaded.active.running', 'loaded.active.running_count': 1}}
measurement = mon.create_measurement(report)
assert measurement[0]['tags']['resource_name'] == service_name
assert measurement[0]['fields']['current_state'] == report['fields']['current_state']
def test_get_systemctl_status(telegraf_agent_config):
mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE)
state = mon.get_systemctl_status('nginx')
assert state == 'loaded.active.running'
def test_monitor(telegraf_agent_config):
mon = SystemctlMonitor('nginx', 2, 10, URL, PORT, DATABASE)
mon.start()
time.sleep(21)
mon.stop()
measurement = mon.get_current_measurement()
print("Current measurement: {0}".format(str(measurement)))
\ No newline at end of file
...@@ -49,4 +49,33 @@ if [ ! -f "$NGINX_CONF_TARGET" ]; then ...@@ -49,4 +49,33 @@ if [ ! -f "$NGINX_CONF_TARGET" ]; then
fi fi
nginx -s reload nginx -s reload
systemctl start nginx systemctl start nginx
\ No newline at end of file
## install a configuration monitoring service, this needs to be in a venv with the rest of the CLMC
sudo apt-get install python3 python3-pip -y
sudo pip3 install pyaml influxdb
svc="nginxmon"
echo "install systemctl monitoring service"
svc_file="${svc}.service"
echo "[Unit]" > $svc_file
echo "Description=nginxmon" >> $svc_file
echo "After=network-online.target" >> $svc_file
echo "" >> $svc_file
echo "[Service]" >> $svc_file
echo "WorkingDirectory=${inst}/${dir}" >> $svc_file
echo "ExecStart=/usr/bin/python3 ${REPO_ROOT}/src/monitoring/systemctl_monitor.py -service nginx -rate 2 -agg 10 -host localhost -port 8186 -db CLMCMetrics" >> $svc_file
echo "ExecStop=/usr/bin/bash ${REPO_ROOT}/src/monitoring/stop_systemctl_monitor.sh" >> $svc_file
echo "" >> $svc_file
echo "[Install]" >> $svc_file
echo "WantedBy=network-online.target" >> $svc_file
sudo cp $svc_file /lib/systemd/system
rm $svc_file
echo "enable"
sudo systemctl daemon-reload
sudo systemctl enable ${svc}
echo "start"
sudo systemctl start ${svc}
\ No newline at end of file
...@@ -25,4 +25,9 @@ ...@@ -25,4 +25,9 @@
urls = ["http://localhost:80/nginx_status"] urls = ["http://localhost:80/nginx_status"]
## HTTP response timeout (default: 5s) ## HTTP response timeout (default: 5s)
# response_timeout = "5s" # response_timeout = "5s"
\ No newline at end of file
# # Influx HTTP write listener
[[inputs.http_listener]]
## Address and port to host HTTP listener on
service_address = ":8186"
\ No newline at end of file
This diff is collapsed.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
# Understanding end-to-end media service performance in the FLAME platform
© University of Southampton IT Innovation Centre, 2018
This document describe the FLAME model of end-to-end (E2E) media service performance as it is observed and measured using the CLMC on the FLAME platform.
#### **Authors**
|Authors|Organisation|
|-|-|
|[Simon Crowle](mailto:sgc@it-innovation.soton.ac.uk)|[University of Southampton, IT Innovation Centre](http://www.it-innovation.soton.ac.uk)|
## Introduction
Readers of this document are assumed to have at least read the [CLMC information model](clmc-information-model.md). Here we explore the requirements which inform the definition of metrics that determine *'end-to-end'* media service performance. Before continuing, some terms are defined:
| term | definition |
| --- | --- |
| *client* | an end-user of a FLAME media service - typically somebody accessing the service via an mobile computing device connected to an _service router_ |
| *endpoint* | an endpoint (EP) is a virtual machine (VM) connected to the FLAME network |
| *service router* | an EP that allows other EPs to communicate with one another using fully qualified domain names (FQDN), rather than IP addresses |
| *network node* | an _EP_, _service router_ or other hardware that receives and sends network traffic along network connections attached to it |
| *media component* | a media component (MC) is a process that in part or wholly realizes the functionality of a media service |
| *E2E path* | the directed, acyclic traversal of FLAME network nodes, beginning with a source _EP_ and moving to a target _EP_ via network nodes in the FLAME network |
| *E2E response time* | the total time taken for a service request to i) traverse an _E2E path_, ii) be processed at the _MC_, iii) be returned as a response via an _E2E path_
In the sections that follow we set out some basic properties of a potential media service and then explore these in more detail with a concrete example. Following on from this analysis we provide a test-based approach to the specification of E2E media service performance measures.
## E2E SFC chains
Let us begin by identifying some simple, generic interactions within a media service function chain (SFC):
```
// simple chain
Client --> data storage MC
// sequential chain
Client --> data processor MC --> data storage MC
// complex chain
Client --> data processor MC_A --> data processor MC_B
|-> data storage MC <-|
```
The first example above imagines a client requesting data be stored in (or retrieved from) a database managed by the MC responsible for persistence. In the second case, the client requests some processing of some data held in the data store, the results of which are also stored. Finally, the third case outlines a more complex scenario in which the client requests some processing of data which in turn generates further requests for additional data processing in other MCs which also may depend on storage I/O functionality. Here additional data processing by related MCs could include job scheduling or task decomposition and distribution to worker nodes. An advanced media service, such as a game server, is a useful example of such a service in which graphics rendering; game state modelling; artificial intelligence and network communications are handled in parallel using varying problem decomposition methods.
## E2E simple chain
Next we will define a simple network into which we will place a data processing EP and a data storage EP - we assert the clients could connect to any of _service routers_ that link these MC together.
![E2E network](image/e2e-simple-chain-network.png)
Our simple network consists of three _service routers_ that connect clients with MC data and storage functionality; each demand from client 1 for the storage function could be routed in one network hop from router 'A' to router 'C' or in two from routers 'A' -> 'B' -> 'C'. A demand for storage function from _client 2_ would include zero network hops.
### E2E simple chain metrics
A principal metric we use to understand _E2E response time_: the average time taken between a request or response being transmitted and received _within the FLAME network_. Scoping the E2E response time to within the FLAME network is an important qualification since it is only within this network that all necessary measurements can reliably be taken.
An out-going simple E2E request chain looks like this:
![E2E request steps](image/e2e-simple-chain-request-steps.png)
the delay associated with the processing of the service request is isolated to within the storage MC:
![E2E MC processing](image/e2e-simple-chain-mc-processing.png)
whilst for the response E2E delay, we see this:
![E2E response steps](image/e2e-simple-chain-response-steps.png)
Above we denote the time required for an service router to handle (or pass on) an in-coming message as _handle request_ or _handle response_. When a message is first encountered by a service router, an optimized path through the FLAME network must also be determined; this is labelled above as _route specification_. The _e2e response time_ is the sum of the request, service processing and response delays.
> __Side note:__
> To understand _delay_ more robustly, we may also consider the rate at which requests or responses arrive (_arrival rate_) at each node in the network since message management (queuing, for example) will have an effect at scale. Similarly, the _payload size_ of the messages being handled could also be observed since the quantity of data traversing the SFC will also impact delay in similar, large scale scenarios.
>
\ No newline at end of file
#!/bin/bash
# install build prequisites
sudo apt-get install ruby ruby-dev rubygems build-essential rpm -y
sudo gem install --no-ri --no-rdoc fpm
# install go
wget https://dl.google.com/go/go1.10.2.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.10.2.linux-amd64.tar.gz
# set the environment variables
echo 'PATH=$PATH:/usr/local/go/bin' > /tmp/gorc
echo 'GOPATH=/tmp/go' >> /tmp/gorc
source /tmp/gorc
mkdir $GOPATH
# get telegraf from influx repo
cd $GOPATH
go get -d github.com/influxdata/telegraf
# rebase to it-innovation repo
cd $GOPATH/src/github.com/influxdata/telegraf
git remote add it-innovation https://github.com/it-innovation/telegraf.git
git pull --rebase it-innovation master
# build telegraf
chmod 755 ./scripts/*.sh
make
# build the packages
make package
# git push it-innovation
...@@ -24,6 +24,11 @@ ...@@ -24,6 +24,11 @@
import os import os
import os.path import os.path
import subprocess import subprocess
from glob import glob
from os.path import basename
from os.path import dirname
from os.path import join
from os.path import splitext
from setuptools import setup, find_packages from setuptools import setup, find_packages
def read(fname): def read(fname):
...@@ -38,18 +43,18 @@ def get_version(fname): ...@@ -38,18 +43,18 @@ def get_version(fname):
return git_revision return git_revision
setup( setup(
name = "clmctest", name = "clmc",
version = get_version("clmctest/_version.py"), version = get_version("clmctest/_version.py"),
author = "Michael Boniface", author = "Michael Boniface",
author_email = "mjb@it-innovation.soton.ac.uk", author_email = "mjb@it-innovation.soton.ac.uk",
description = "FLAME CLMC Test Module", description = "FLAME CLMC Test Module",
license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE", license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE",
keywords = "FLAME CLMC tests", keywords = "FLAME CLMC",
url='https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc', url='https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc',
packages=find_packages(exclude=["services"]), packages=find_packages(exclude=["services"]),
include_package_data=True, include_package_data=True,
package_data={'': ['_version.py', '*.yml', '*.sh', '*.json', '*.conf']}, package_data={'': ['_version.py', '*.yml', '*.sh', '*.json', '*.conf']},
long_description="FLAME CLMC tests", long_description="FLAME CLMC",
classifiers=[ classifiers=[
"Development Status :: Alpha", "Development Status :: Alpha",
"Topic :: FLAME Tests", "Topic :: FLAME Tests",
......
#!/usr/bin/python3
\ No newline at end of file
#!/usr/bin/python3
import threading
import time
import random
import logging
from influxdb import InfluxDBClient
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
class ConfigCollector(threading.Thread):
STATE_NAME = 0
STATE_TIME = 1
def __init__(self, sample_func, write_func, resource_name, sample_rate=2, agg_period=10):
threading.Thread.__init__(self)
self._start_event = threading.Event()
self.sample_func = sample_func
self.write_func = write_func
self.resource_name = resource_name
self.sample_rate = sample_rate
self.agg_period = agg_period
self.agg_states = {}
self.current_measurement = {}
return
def run(self):
# if thread running then return
if(self._start_event.is_set()):
return
self._start_event.set()
# set start period to current time
start_period = time.time()
logger.debug("start time = {0}".format(start_period))
# set end period to the aggregation period
end_period = start_period + self.agg_period
logger.debug("end time = {0}".format(end_period))
# initialise the time in the current state
current_state_time = 0
samples = []
while(self._start_event.is_set()):
# get sample using sampler function
(sample_state, sample_time) = self.sample_func()
# add sample to list of samples
samples.append((sample_state, sample_time))
logger.debug("Sample state {0}".format(sample_state))
logger.debug("Sample count: {0}".format(len(samples)))
# if last sample was at the end of the aggregation period then process
if sample_time >= end_period:
# aggregate samples into single measurement
self.current_measurement = self.create_measurement(samples, current_state_time, sample_time)
# write output
write_thread = WriteThread(self.write_func, self.current_measurement)
write_thread.start()
# set time in current state
current_state_time = self.current_measurement[0]['fields']['current_state_time']
# remove all processed samples
samples.clear()
# add last sample as 1st sample of the next period
samples.append((sample_state, sample_time))
# set new end period
end_period = sample_time + self.agg_period
logger.debug("Number of samples after agg: {0}".format(len(samples)))
logger.debug("Next end time {0}".format(end_period))
# calc how long it took to process samples
processing_time = time.time() - sample_time
logger.debug("Processing time {0}".format(processing_time))
# calc the remaining time to wait until next sample
sleep_time = self.sample_rate - processing_time
logger.debug("Sleep time {0}".format(sleep_time))
# if processing took longer than the sample rate we have a problemm
# and we will need to put processing into a worker thread
if(sleep_time < 0):
logger.warn("Aggregation processing took longer that sample rate")
sleep_time = 0
logger.debug("Sleeping for sample {0}".format(sleep_time))
# wait for the next sample
time.sleep(sleep_time)
logger.debug("Finished collection thread")
return
def stop(self):
logger.debug("Stopping thread")
self._start_event.clear()
def create_measurement(self, samples, initial_state_time, current_time):
logger.debug("Samples: {0}".format(str(samples)))
# aggregate samples into states
states = self.aggregate_samples(samples)
logger.debug("States: {0}".format(str(states)))
# aggregate the states into a measurement
fields = self.aggregate_states(states, initial_state_time)
measurement_time = int(current_time*1000000000)
measurement = [{"measurement": "service_config_state",
"tags": {
"resource_name": self.resource_name
},
"time": measurement_time
}]
measurement[0]['fields'] = fields['fields']
logger.debug("Report: {0}".format(str(measurement)))
return measurement
def aggregate_samples(self, samples):
states = []
sample_count = len(samples)
logger.debug("Sample count {0}".format(sample_count))
# error if no samples to aggregate
if sample_count == 0:
raise ValueError('No samples in the samples list')
# no aggregation needed if only one sample
if sample_count == 1:
return samples[0]
# aggregate samples
last_index = sample_count-1
for index, sample in enumerate(samples):
# for the 1st sample we set the current state and state_start_time
if index == 0:
current_state = sample[self.STATE_NAME]
state_start_time = sample[self.STATE_TIME]
logger.debug("Start time : {0}".format(state_start_time))
else:
# add state duration for previous state after transition
if current_state != sample[self.STATE_NAME]:
# calc time in current state
state_time = sample[self.STATE_TIME] - state_start_time
states.append([current_state,state_time])
# set the state to the next state
current_state = sample[self.STATE_NAME]
# set the start time of the next state
state_start_time = state_start_time + state_time
# deal with the final sample
if index == last_index:
# calc state duration if last sample is the same as previous state
if current_state == sample[self.STATE_NAME]:
state_time = sample[self.STATE_TIME] - state_start_time
states.append([current_state,state_time])
# add transition in final sample with zero duration
elif current_state != sample[self.STATE_NAME]:
states.append([current_state,0])
return states
def aggregate_states(self, states, initial_state_time):
# set initial state to the 1st sample
initial_state = states[0][self.STATE_NAME]
logger.debug("Initial state : {0}".format(initial_state))
logger.debug("Initial state time : {0}".format(initial_state_time))
# set the current state as the last state sampled
current_state = states[-1][self.STATE_NAME]
# if no change in state take the initial state time and add current state time
if initial_state == current_state and len(states) == 1:
current_state_time = initial_state_time + states[-1][self.STATE_TIME]
state_sum_key = current_state + "_sum"
state_count_key = current_state + "_count"
# initialise the number of transitions if it's the 1st time
if state_sum_key not in self.agg_states:
self.agg_states[state_count_key] = 1
self.agg_states[state_sum_key] = current_state_time
else:
# current state time is the last state time
current_state_time = states[-1][self.STATE_TIME]
# calc the total duration and number of transitions in each state.
for state in states:
# if first occurance of state add with initial duration and a single transition
state_sum_key = state[self.STATE_NAME] + "_sum"
state_count_key = state[self.STATE_NAME] + "_count"
if state_sum_key not in self.agg_states:
logger.debug("Adding state: {0}".format(state[self.STATE_NAME]))
self.agg_states[state_sum_key] = state[self.STATE_TIME]
self.agg_states[state_count_key] = 1
else:
logger.debug("Aggregating state: {0}".format(state[self.STATE_NAME]))
# increment number of times in the state
self.agg_states[state_count_key] += 1
logger.debug("increment number of times in the state")
# add state time to aggregate total
self.agg_states[state_sum_key] += state[self.STATE_TIME]
logger.debug("Duration: {0}".format(self.agg_states[state_sum_key]))
# Create report
measurement = {}
measurement['fields'] = self.agg_states
measurement['fields']['current_state'] = current_state
measurement['fields']['current_state_time'] = current_state_time
return measurement
class WriteThread(threading.Thread):
def __init__(self, write_func, measurement):
threading.Thread.__init__(self)
self._start_event = threading.Event()
self.write_func = write_func
self.measurement = measurement
return
def run(self):
# if thread running then return
if(self._start_event.is_set()):
return
self._start_event.set()
self.write_func(self.measurement)
def stop(self):
self._start_event.clear()
class InfluxWriter():
def __init__(self, hostname, port, database):
self.db_client = InfluxDBClient(host=hostname, port=port, database=database, timeout=10)
return
def write(self, measurement):
# if thread running then return
try:
points = []
points.append(measurement)
self.db_client.write_points(points)
except Exception as e:
print(e)
#!/bin/bash
pid=`ps aux | egrep "[s]ystemctl_monitor.py" | awk '{ print $2 }'` && kill $pid
\ No newline at end of file
#!/usr/bin/python3
import argparse
import subprocess
import logging
import time
import urllib.parse
from config_collector import ConfigCollector, InfluxWriter
from influxdb import InfluxDBClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
class SystemctlMonitor:
ACTIVE_STATE_KEY='ActiveState'
SUBSTATE_KEY='SubState'
LOAD_STATE_KEY='LoadState'
def __init__(self, service_name, sample_rate, agg_period, hostname, port, database):
self.service_name = service_name
self.writer = InfluxWriter(hostname, port, database)
self.collection_thread = ConfigCollector(self.get_systemctl_sample, self.writer.write_func, self.service_name, sample_rate, agg_period)
def start(self):
self.collection_thread.start()
def stop(self):
self.collection_thread.stop()
def get_current_measurement(self):
return self.collection_thread.current_measurement
def get_systemctl_sample(self):
return (self.get_systemctl_status(self.service_name), time.time())
def get_systemctl_status(self, service_name):
load_state = 'unknown'
active_state = 'unknown'
sub_state = 'unknown'
cmd = "systemctl show {0}".format(service_name)
proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, shell=True)
out, err = proc.communicate()
if out: out = out.decode('ascii')
if err: err = err.decode('ascii')
logger.debug("Return code = {0}".format(proc.returncode))
if proc.returncode != 0:
logger.error("Could not get status for service {0}, {1}".format(service_name, err))
raise Exception("Could not get status for service {0}, {1}".format(service_name, err))
for line in iter(out.splitlines()):
parts = line.split('=')
if parts[0] == SystemctlMonitor.LOAD_STATE_KEY:
load_state = parts[1]
elif parts[0] == SystemctlMonitor.ACTIVE_STATE_KEY:
active_state = parts[1]
elif parts[0] == SystemctlMonitor.SUBSTATE_KEY:
sub_state = parts[1]
return load_state + "." + active_state + "." + sub_state
def main():
parser = argparse.ArgumentParser(description='systemctrl state monitor')
parser.add_argument('-service', help='service name', required=True)
parser.add_argument('-rate', help='sample rate', required=True)
parser.add_argument('-agg', help='aggregation period', required=True)
parser.add_argument('-host', help='telegraf hostname', required=True)
parser.add_argument('-port', help='telegraf port', required=True)
parser.add_argument('-db', help='database name', required=True)
parser.add_argument('-debug', '--debug', action='store_true')
args = parser.parse_args()
print("Starting SystemctlMonitor : {0}, {1}, {2}, {3}, {4}, {5}".format(args.service, args.rate, args.agg, args.host, args.port, args.db))
if args.debug == True:
print("Setting logging level to to DEBUG")
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
mon = SystemctlMonitor(args.service, int(args.rate), int(args.agg), args.host, int(args.port), args.db)
mon.start()
if __name__== "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment