Skip to content
Snippets Groups Projects
Commit 6ae1daf4 authored by MJB's avatar MJB
Browse files

Merge branch 'clmcservice' into integration

parents c4429fdc 5c476e7b
Branches
Tags
No related merge requests found
Showing
with 89 additions and 179 deletions
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from threading import Thread, Event
import clmctest.monitoring.LineProtocolGenerator as lp
class Aggregator(Thread):
"""
A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread,
so that the implementation can be tested using pytest.
"""
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_url: database url to use
"""
super(Aggregator, self).__init__() # call the constructor of the thread
# initialise a database client using the database url and the database name
url_object = urlparse(database_url)
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10)
self.db_url = database_url
self.db_name = database
# a stop flag event object used to handle the killing of the thread
self._stop_flag = Event()
# a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
self.network_cache = {}
self.service_cache = {}
def stop(self):
"""
A method used to stop the thread.
"""
self._stop_flag.set()
def set_event_lock(self, event):
"""
Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing).
:param event: the event lock object
"""
setattr(self, 'event', event)
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
if hasattr(self, 'event'):
self.event.set()
current_time = int(time())
while True:
if self._stop_flag.is_set():
break
boundary_time = current_time - Aggregator.REPORT_PERIOD
boundary_time_nano = boundary_time * 1000000000
current_time_nano = current_time * 1000000000
# query the network delays and group them by path ID
network_delays = {}
result = self.db_client.query(
'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano))
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
# query the service delays and group them by endpoint, service function instance and sfr
service_delays = {}
result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path
if target not in service_delays and target not in self.service_cache:
# if not continue with the other network path reports
continue
e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time}
e2e_arguments['path_ID'] = path_id
e2e_arguments['source_SFR'] = source
e2e_arguments['target_SFR'] = target
e2e_arguments['delay_forward'] = network_delays[path][0]
e2e_arguments['avg_bandwidth'] = network_delays[path][1]
# reverse the path ID to get the network delay for the reversed path
reversed_path = (path_id, target, source)
if reversed_path in network_delays or reversed_path in self.network_cache:
# get the reverse delay, use the latest value if reported or the cache value
e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0]
else:
e2e_arguments['delay_reverse'] = None
# get the response time of the media component connected to the target SFR
service_delay = service_delays.get(target, self.service_cache.get(target))
response_time, request_size, response_size, endpoint, sf_instance = service_delay
# put these points in the e2e arguments dictionary
e2e_arguments['delay_service'] = response_time
e2e_arguments['avg_request_size'] = request_size
e2e_arguments['avg_response_size'] = response_size
e2e_arguments['endpoint'] = endpoint
e2e_arguments['sf_instance'] = sf_instance
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.values():
self.db_client.write_points(
lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time']))
old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed
while current_time < old_timestamp + self.REPORT_PERIOD:
sleep(1)
current_time = int(time())
if __name__ == '__main__':
Aggregator().start()
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from threading import Thread
from clmcservice.aggregator import Aggregator
class TestAggregator(Thread):
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_url: database url to use
"""
super(TestAggregator, self).__init__() # call the constructor of the thread
self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period)
def stop(self):
"""
A method used to stop the thread.
"""
self.aggregator.stop()
def set_event_lock(self, event):
"""
Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing).
:param event: the event lock object
"""
setattr(self, 'event', event)
def run(self):
if hasattr(self, 'event'):
self.event.set()
self.aggregator.run()
......@@ -28,7 +28,7 @@ import pkg_resources
from influxdb import InfluxDBClient
from clmctest.monitoring.StreamingSim import Sim
from clmctest.monitoring.E2ESim import Simulator
from clmctest.monitoring.E2EAggregator import Aggregator
from clmctest.monitoring.E2ETestAggregatorThread import TestAggregator
@pytest.fixture(scope="module")
......@@ -105,4 +105,4 @@ def e2e_aggregator(streaming_sim_config):
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
return Aggregator(database_url=influx_url)
return TestAggregator(database_url=influx_url)
......@@ -95,7 +95,13 @@ class TestE2ESimulation(object):
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
actual_result = next(query_result.get_points())
print("expected_result == actual_result {0}, {1}".format(expected_result, actual_result))
for key in expected_result:
print("expected_result == actual_result {0}, {1}".format(expected_result.get(key), actual_result.get(key)))
if type(expected_result.get(key)) == float:
assert expected_result.get(key) == pytest.approx(actual_result.get(key), 0.3) # approximate only when comparing float values
else:
assert expected_result.get(key) == actual_result.get(key), "E2E Simulation test failure"
assert expected_result == actual_result, "E2E Simulation test failure"
......
File moved
File moved
......@@ -22,6 +22,7 @@
## Created for Project : FLAME
"""
from threading import Event
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
......@@ -66,13 +67,23 @@ class Aggregator(object):
self.network_cache = {}
self.service_cache = {}
# a stop flag event object used to handle the stopping of the process
self._stop_flag = Event()
def stop(self):
"""
Stop the aggregator from running.
"""
self._stop_flag.set()
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
current_time = int(time())
while True:
while not self._stop_flag.is_set():
boundary_time = current_time - self.report_period
......
File moved
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment