Skip to content
Snippets Groups Projects
Commit eb3edd8c authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Updated aggregator and E2E simulation tests

parent 24b54e7e
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from threading import Thread
from clmcservice.aggregator import Aggregator
class TestAggregator(Thread):
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_url: database url to use
"""
super(TestAggregator, self).__init__() # call the constructor of the thread
self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period)
def stop(self):
"""
A method used to stop the thread.
"""
self.aggregator.stop()
def set_event_lock(self, event):
"""
Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing).
:param event: the event lock object
"""
setattr(self, 'event', event)
def run(self):
if hasattr(self, 'event'):
self.event.set()
self.aggregator.run()
[run]
source = clmcservice
omit = clmcservice/tests.py
include MANIFEST.in
recursive-include clmcservice
\ No newline at end of file
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 15-05-2018
// Created for Project : FLAME
"""
from pyramid.config import Configurator
from pyramid.settings import asbool
from clmcservice.utilities import RUNNING_FLAG, MALFORMED_FLAG
def main(global_config, **settings):
"""
This function returns a Pyramid WSGI application.
"""
# a conversion is necessary so that the configuration values of the aggregator are stored with the right type instead of strings
aggregator_running = asbool(settings.get(RUNNING_FLAG, False))
settings[RUNNING_FLAG] = asbool(aggregator_running)
aggregator_report_period = int(settings.get('aggregator_report_period', 5))
settings['aggregator_report_period'] = aggregator_report_period
settings[MALFORMED_FLAG] = False
config = Configurator(settings=settings)
config.add_route('aggregator_config', '/aggregator/config')
config.add_view('clmcservice.views.AggregatorConfig', attr='get', request_method='GET')
config.add_view('clmcservice.views.AggregatorConfig', attr='put', request_method='PUT')
config.add_route('aggregator_controller', '/aggregator/control')
config.add_view('clmcservice.views.AggregatorController', attr='get', request_method='GET')
config.add_view('clmcservice.views.AggregatorController', attr='put', request_method='PUT')
config.add_route('round_trip_time_query', '/query/round-trip-time')
config.add_view('clmcservice.views.RoundTripTimeQuery', attr='get', request_method='GET')
config.scan()
return config.make_wsgi_app()
......@@ -22,86 +22,86 @@
## Created for Project : FLAME
"""
from threading import Event
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from threading import Thread, Event
import clmctest.monitoring.LineProtocolGenerator as lp
from clmcservice.utilities import generate_e2e_delay_report
import getopt
import sys
class Aggregator(Thread):
class Aggregator(object):
"""
A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread,
so that the implementation can be tested using pytest.
A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process.
"""
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL):
RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx
def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_name: database name to use
:param database_url: database url to use
:param report_period: the report period in seconds
"""
super(Aggregator, self).__init__() # call the constructor of the thread
# initialise a database client using the database url and the database name
url_object = urlparse(database_url)
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10)
while True:
try:
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
break
except:
sleep(self.RETRY_PERIOD)
self.db_url = database_url
self.db_name = database
# a stop flag event object used to handle the killing of the thread
self._stop_flag = Event()
self.db_name = database_name
self.report_period = report_period
# a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
self.network_cache = {}
self.service_cache = {}
# a stop flag event object used to handle the stopping of the process
self._stop_flag = Event()
def stop(self):
"""
A method used to stop the thread.
Stop the aggregator from running.
"""
self._stop_flag.set()
def set_event_lock(self, event):
"""
Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing).
:param event: the event lock object
"""
setattr(self, 'event', event)
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
if hasattr(self, 'event'):
self.event.set()
current_time = int(time())
while True:
if self._stop_flag.is_set():
break
while not self._stop_flag.is_set():
boundary_time = current_time - Aggregator.REPORT_PERIOD
boundary_time = current_time - self.report_period
boundary_time_nano = boundary_time * 1000000000
current_time_nano = current_time * 1000000000
# query the network delays and group them by path ID
network_delays = {}
result = self.db_client.query(
'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano))
while True:
try:
result = self.db_client.query(
'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano))
break
except:
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
......@@ -113,7 +113,16 @@ class Aggregator(Thread):
# query the service delays and group them by endpoint, service function instance and sfr
service_delays = {}
result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
while True:
try:
result = self.db_client.query(
'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(
boundary_time_nano, current_time_nano))
break
except:
sleep(self.RETRY_PERIOD)
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
......@@ -159,17 +168,48 @@ class Aggregator(Thread):
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.values():
self.db_client.write_points(
lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time']))
while True:
try:
self.db_client.write_points(
generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'],
e2e_arguments['delay_service'],
e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'],
e2e_arguments['time']))
break
except:
sleep(self.RETRY_PERIOD)
old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed
while current_time < old_timestamp + self.REPORT_PERIOD:
# wait until {report_period) seconds have passed
while current_time < old_timestamp + self.report_period:
sleep(1)
current_time = int(time())
if __name__ == '__main__':
Aggregator().start()
# Parse command line options
try:
opts, args = getopt.getopt(sys.argv[1:], "p:d:u:", ['period=', 'database=', 'url='])
arg_period = Aggregator.REPORT_PERIOD
arg_database_name = Aggregator.DATABASE
arg_database_url = Aggregator.DATABASE_URL
# Apply parameters if given
for opt, arg in opts:
if opt in ('-p', '--period'):
arg_period = int(arg)
elif opt in ('-d', '--database'):
arg_database_name = arg
elif opt in ('-u', '--url'):
arg_database_url = arg
Aggregator(database_name=arg_database_name, database_url=arg_database_url, report_period=arg_period).run()
# print the error messages in case of a parse error
except getopt.GetoptError as err:
print(err)
print('Parse error; run the script using the following format: python aggregator.py -p <seconds> -d <database name> -u <database url>')
This diff is collapsed.
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 15-05-2018
// Created for Project : FLAME
"""
from json import loads
from re import compile, IGNORECASE
CONFIG_ATTRIBUTES = ('aggregator_report_period', 'aggregator_database_name', 'aggregator_database_url') # all of the configuration attributes - to be used as dictionary keys
RUNNING_FLAG = 'aggregator_running' # Attribute for storing the flag, which shows whether the aggregator is running or not - to be used as a dictionary key
PROCESS_ATTRIBUTE = 'aggregator_process' # Attribute for storing the process object of the aggregator - to be used as a dictionary key
# a 'malformed' running state of the aggregator is when the configuration is updated, but the aggregator is not restarted so it is running with an old version of the conf.
MALFORMED_FLAG = 'malformed' # Attribute for storing the flag, which shows whether the aggregator is running in an malformed state or not - to be used as a dictionary key
# used to indicate a malformed configuration message
COMMENT_ATTRIBUTE = 'comment'
COMMENT_VALUE = 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'
# the attributes of the JSON response body that are expected when querying round trip time
ROUND_TRIP_ATTRIBUTES = ('media_service', 'start_timestamp', 'end_timestamp')
URL_REGEX = compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain, e.g. example.domain.com
r'localhost|' # or localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # or IP address (IPv4 format)
r'(?::\d{2,5})?' # optional port number
r'(?:[/?#][^\s]*)?$', # URL path or query parameters
IGNORECASE)
def validate_config_content(configuration):
"""
A utility function to validate a configuration string representing a JSON dictionary.
:param configuration: the configuration string to validate
:return the validated configuration dictionary object with the values converted to their required type
:raise AssertionError: if the argument is not a valid configuration
"""
global CONFIG_ATTRIBUTES
try:
configuration = loads(configuration)
except:
raise AssertionError("Configuration must be a JSON object.")
assert len(configuration) == len(CONFIG_ATTRIBUTES), "Configuration mustn't contain more attributes than the required ones."
for attribute in CONFIG_ATTRIBUTES:
assert attribute in configuration, "Required attribute not found in the request content."
assert type(configuration.get('aggregator_report_period')) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
assert configuration.get('aggregator_report_period') > 0, "Report period must be a positive integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
assert URL_REGEX.match(configuration.get('aggregator_database_url')) is not None, "The aggregator must have a valid database URL in its configuration, received {0} instead.".format(configuration.get('aggregator_database_url'))
return configuration
def validate_action_content(content):
"""
A utility function to validate a content string representing a JSON dictionary.
:param content: the content string to validate
:return: the validated content dictionary
:raise AssertionError: if the argument is not a valid json content
"""
try:
content = loads(content)
except:
raise AssertionError("Content must be a JSON object.")
assert len(content) == 1, "Content mustn't contain more attributes than the required one."
assert content['action'] in ('start', 'stop', 'restart')
return content
def validate_round_trip_query_params(params):
"""
A utility function to validate a dictionary of parameters.
:param params: the params dict to validate
:return: the validated parameters dictionary
:raise AssertionError: if the argument is not a valid json content
"""
global ROUND_TRIP_ATTRIBUTES
assert len(params) == len(ROUND_TRIP_ATTRIBUTES), "Content mustn't contain more attributes than the required ones."
for attribute in ROUND_TRIP_ATTRIBUTES:
assert attribute in params, "Required attribute not found in the request content."
return params
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_sfr: source service router
:param target_sfr: target service router
:param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "e2e_delays",
"tags": {
"path_ID": path_id,
"source_SFR": source_sfr,
"target_SFR": target_sfr,
"endpoint": endpoint,
"sf_instance": sf_instance
},
"fields": {
"delay_forward": float(delay_forward),
"delay_reverse": float(delay_reverse),
"delay_service": float(delay_service),
"avg_request_size": float(avg_request_size),
"avg_response_size": float(avg_response_size),
"avg_bandwidth": float(avg_bandwidth)
},
"time": int(1000000000*time)
}]
return result
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 15-05-2018
// Created for Project : FLAME
"""
from pyramid.view import view_defaults
from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError
from influxdb import InfluxDBClient
from urllib.parse import urlparse
from subprocess import Popen, DEVNULL
from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \
CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE
import os.path
@view_defaults(route_name='aggregator_config', renderer='json')
class AggregatorConfig(object):
"""
A class-based view for accessing and mutating the configuration of the aggregator.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the configuration of the aggregator.
:return: A JSON response with the configuration of the aggregator.
"""
aggregator_data = self.request.registry.settings
config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES}
return config
def put(self):
"""
A PUT API call for the status of the aggregator.
:return: A JSON response to the PUT call - essentially with the new configured data and comment of the state of the aggregator
:raises HTTPBadRequest: if request body is not a valid JSON for the configurator
"""
old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
new_config = self.request.body.decode(self.request.charset)
try:
new_config = validate_config_content(new_config)
for attribute in CONFIG_ATTRIBUTES:
self.request.registry.settings[attribute] = new_config.get(attribute)
# if configuration is not already malformed, check whether the configuration is updated
if not self.request.registry.settings[MALFORMED_FLAG]:
malformed = old_config != new_config and self.request.registry.settings[RUNNING_FLAG]
self.request.registry.settings[MALFORMED_FLAG] = malformed
if malformed:
new_config[MALFORMED_FLAG] = True
new_config[COMMENT_ATTRIBUTE] = COMMENT_VALUE
return new_config
except AssertionError:
raise HTTPBadRequest("Bad request content - configuration format is incorrect.")
@view_defaults(route_name='aggregator_controller', renderer='json')
class AggregatorController(object):
"""
A class-based view for controlling the aggregator.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the status of the aggregator - running or not.
:return: A JSON response with the status of the aggregator.
"""
aggregator_data = self.request.registry.settings
config = {RUNNING_FLAG: aggregator_data.get(RUNNING_FLAG)}
if aggregator_data[MALFORMED_FLAG] and aggregator_data[RUNNING_FLAG]:
config[MALFORMED_FLAG] = True
config[COMMENT_ATTRIBUTE] = COMMENT_VALUE
return config
def put(self):
"""
A PUT API call for the status of the aggregator.
:return: A JSON response to the PUT call - essentially saying whether the aggregator is running or not
:raises HTTPBadRequest: if request body is not a valid JSON for the controller
"""
content = self.request.body.decode(self.request.charset)
try:
content = validate_action_content(content)
config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
action = content['action']
if action == 'start':
aggregator_started = self.request.registry.settings[RUNNING_FLAG]
if not aggregator_started:
process = self.start_aggregator(config)
self.request.registry.settings[RUNNING_FLAG] = True
self.request.registry.settings[PROCESS_ATTRIBUTE] = process
elif action == 'stop':
self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
self.request.registry.settings[RUNNING_FLAG] = False
self.request.registry.settings[PROCESS_ATTRIBUTE] = None
self.request.registry.settings[MALFORMED_FLAG] = False
elif action == 'restart':
self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
process = self.start_aggregator(config)
self.request.registry.settings[RUNNING_FLAG] = True
self.request.registry.settings[PROCESS_ATTRIBUTE] = process
self.request.registry.settings[MALFORMED_FLAG] = False
return {RUNNING_FLAG: self.request.registry.settings.get(RUNNING_FLAG)}
except AssertionError:
raise HTTPBadRequest('Bad request content - must be in JSON format: {"action": value}, where value is "start", "stop" or "restart".')
@staticmethod
def start_aggregator(config):
"""
An auxiliary method to start the aggregator.
:param config: the configuration containing the arguments for the aggregator
:return: the process object of the started aggregator script
"""
dir_path = os.path.dirname(os.path.realpath(__file__))
command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database',
config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')]
process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL)
print("\nStarted aggregator process with PID: {0}\n".format(process.pid))
return process
@staticmethod
def stop_aggregator(process):
"""
An auxiliary method to stop the aggregator.
:param process: the process to terminate
"""
# check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value
if process is not None and process.poll() is None:
process.terminate()
print("\nStopped aggregator process with PID: {0}\n".format(process.pid))
@view_defaults(route_name='round_trip_time_query', renderer='json')
class RoundTripTimeQuery(object):
"""
A class-based view for querying the round trip time in a given range.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the averaged round trip time of a specific media service over a given time range.
:return: A JSON response with the round trip time and its contributing parts.
"""
params = {}
for attribute in ROUND_TRIP_ATTRIBUTES:
if attribute in self.request.params:
params[attribute] = self.request.params.get(attribute)
try:
params = validate_round_trip_query_params(params)
config_data = {config_attribute: self.request.registry.settings.get(config_attribute) for config_attribute in CONFIG_ATTRIBUTES}
media_service = params.get(ROUND_TRIP_ATTRIBUTES[0])
start_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[1])
end_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[2])
influx_db_name = config_data.get(CONFIG_ATTRIBUTES[1])
influx_db_url = config_data.get(CONFIG_ATTRIBUTES[2])
url_object = urlparse(influx_db_url)
try:
db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=influx_db_name, timeout=10)
query = 'SELECT mean(*) FROM "{0}"."autogen"."e2e_delays" WHERE time >= {1} and time < {2} and sf_instance = \'{3}\''.format(
influx_db_name, start_timestamp, end_timestamp, media_service)
print(query)
result = db_client.query(query)
actual_result = next(result.get_points(), None)
if actual_result is None:
return {"result": None}
else:
forward_latency = actual_result.get("mean_delay_forward")
reverse_latency = actual_result.get("mean_delay_reverse")
service_delay = actual_result.get("mean_delay_service")
request_size = actual_result.get("mean_avg_request_size")
response_size = actual_result.get("mean_avg_response_size")
bandwidth = actual_result.get("mean_avg_bandwidth")
rtt = self.calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth)
return {"result": rtt}
except:
raise HTTPInternalServerError("Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url))
except AssertionError:
raise HTTPBadRequest('Bad request content - must be in JSON format: {"media_service": value, "start_timestamp": value, "end_timestamp": value}.')
@staticmethod
def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50):
"""
Calculates the round trip time given the list of arguments.
:param forward_latency: network latency in forward direction (s)
:param reverse_latency: network latency in reverse direction (s)
:param service_delay: media service delay (s)
:param request_size: request size (bytes)
:param response_size: response size (bytes)
:param bandwidth: network bandwidth (Mb/s)
:param packet_size: size of packet (bytes)
:param packet_header_size: size of the header of the packet (bytes)
:return: the calculated round trip time
"""
forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay
###
# app configuration
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/environment.html
###
[app:main]
use = egg:clmcservice
pyramid.reload_templates = true
pyramid.debug_authorization = false
pyramid.debug_notfound = false
pyramid.debug_routematch = false
pyramid.default_locale_name = en
pyramid.includes = pyramid_debugtoolbar
aggregator_running = false
aggregator_report_period = 5
aggregator_database_name = E2EMetrics
aggregator_database_url = http://172.40.231.51:8086
# By default, the toolbar only appears for clients from IP addresses
# '127.0.0.1' and '::1'.
# debugtoolbar.hosts = 127.0.0.1 ::1
###
# wsgi server configuration
###
[server:main]
use = egg:waitress#main
listen = localhost:8080
###
# logging configuration
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/logging.html
###
[loggers]
keys = root, clmcservice
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = console
[logger_clmcservice]
level = DEBUG
handlers =
qualname = clmcservice
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s
###
# app configuration
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/environment.html
###
[app:main]
use = egg:clmcservice
pyramid.reload_templates = false
pyramid.debug_authorization = false
pyramid.debug_notfound = false
pyramid.debug_routematch = false
pyramid.default_locale_name = en
aggregator_running = false
aggregator_report_period = 5
aggregator_database_name = E2EMetrics
aggregator_database_url = http://172.40.231.51:8086
###
# wsgi server configuration
###
[server:main]
use = egg:waitress#main
listen = *:8080
###
# logging configuration
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/logging.html
###
[loggers]
keys = root, clmcservice
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_clmcservice]
level = WARN
handlers =
qualname = clmcservice
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s
[pytest]
testpaths = clmcservice
python_files = *.py
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 15-05-2018
// Created for Project : FLAME
"""
import os
import os.path
from setuptools import setup, find_packages
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
def get_version(fname):
if os.path.isfile(fname):
git_revision = read(fname)
else:
git_revision = "SNAPSHOT"
return git_revision
requires = [
'plaster_pastedeploy',
'pyramid',
'pyramid_debugtoolbar',
'waitress',
'influxdb',
'pytest',
]
tests_require = [
'WebTest >= 1.3.1', # py3 compat
'pytest-cov',
]
setup(
name = "clmcservice",
version = get_version("_version.py"),
author = "Michael Boniface",
author_email = "mjb@it-innovation.soton.ac.uk",
description = "FLAME CLMC Service Module",
long_description="FLAME CLMC Service",
license = "https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/integration/LICENSE",
keywords = "FLAME CLMC service",
url = 'https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc',
packages=find_packages(),
include_package_data=True,
install_requires=requires,
extras_require={
'testing': tests_require,
},
package_data={'': ['_version.py']},
classifiers=[
"Development Status :: Alpha",
"Topic :: FLAME CLMC Service",
"License :: ",
],
entry_points={
'paste.app_factory': [
'main = clmcservice:main',
],
},
)
\ No newline at end of file
[tox]
envlist = py36
[testenv]
deps=pytest
commands=pytest
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment