Select Git revision
simple_tab.py
views.py 11.67 KiB
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 15-05-2018
// Created for Project : FLAME
"""
from pyramid.view import view_defaults
from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError
from influxdb import InfluxDBClient
from urllib.parse import urlparse
from subprocess import Popen, DEVNULL
from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \
CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE
import os.path
@view_defaults(route_name='aggregator_config', renderer='json')
class AggregatorConfig(object):
"""
A class-based view for accessing and mutating the configuration of the aggregator.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the configuration of the aggregator.
:return: A JSON response with the configuration of the aggregator.
"""
aggregator_data = self.request.registry.settings
config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES}
return config
def put(self):
"""
A PUT API call for the status of the aggregator.
:return: A JSON response to the PUT call - essentially with the new configured data and comment of the state of the aggregator
:raises HTTPBadRequest: if request body is not a valid JSON for the configurator
"""
old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
new_config = self.request.body.decode(self.request.charset)
try:
new_config = validate_config_content(new_config)
for attribute in CONFIG_ATTRIBUTES:
self.request.registry.settings[attribute] = new_config.get(attribute)
# if configuration is not already malformed, check whether the configuration is updated
if not self.request.registry.settings[MALFORMED_FLAG]:
malformed = old_config != new_config and self.request.registry.settings[RUNNING_FLAG]
self.request.registry.settings[MALFORMED_FLAG] = malformed
if malformed:
new_config[MALFORMED_FLAG] = True
new_config[COMMENT_ATTRIBUTE] = COMMENT_VALUE
return new_config
except AssertionError:
raise HTTPBadRequest("Bad request content - configuration format is incorrect.")
@view_defaults(route_name='aggregator_controller', renderer='json')
class AggregatorController(object):
"""
A class-based view for controlling the aggregator.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the status of the aggregator - running or not.
:return: A JSON response with the status of the aggregator.
"""
aggregator_data = self.request.registry.settings
config = {RUNNING_FLAG: aggregator_data.get(RUNNING_FLAG)}
if aggregator_data[MALFORMED_FLAG] and aggregator_data[RUNNING_FLAG]:
config[MALFORMED_FLAG] = True
config[COMMENT_ATTRIBUTE] = COMMENT_VALUE
return config
def put(self):
"""
A PUT API call for the status of the aggregator.
:return: A JSON response to the PUT call - essentially saying whether the aggregator is running or not
:raises HTTPBadRequest: if request body is not a valid JSON for the controller
"""
content = self.request.body.decode(self.request.charset)
try:
content = validate_action_content(content)
config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
action = content['action']
if action == 'start':
aggregator_started = self.request.registry.settings[RUNNING_FLAG]
if not aggregator_started:
process = self.start_aggregator(config)
self.request.registry.settings[RUNNING_FLAG] = True
self.request.registry.settings[PROCESS_ATTRIBUTE] = process
elif action == 'stop':
self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
self.request.registry.settings[RUNNING_FLAG] = False
self.request.registry.settings[PROCESS_ATTRIBUTE] = None
self.request.registry.settings[MALFORMED_FLAG] = False
elif action == 'restart':
self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
process = self.start_aggregator(config)
self.request.registry.settings[RUNNING_FLAG] = True
self.request.registry.settings[PROCESS_ATTRIBUTE] = process
self.request.registry.settings[MALFORMED_FLAG] = False
return {RUNNING_FLAG: self.request.registry.settings.get(RUNNING_FLAG)}
except AssertionError:
raise HTTPBadRequest('Bad request content - must be in JSON format: {"action": value}, where value is "start", "stop" or "restart".')
@staticmethod
def start_aggregator(config):
"""
An auxiliary method to start the aggregator.
:param config: the configuration containing the arguments for the aggregator
:return: the process object of the started aggregator script
"""
dir_path = os.path.dirname(os.path.realpath(__file__))
command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database',
config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')]
process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL)
print("\nStarted aggregator process with PID: {0}\n".format(process.pid))
return process
@staticmethod
def stop_aggregator(process):
"""
An auxiliary method to stop the aggregator.
:param process: the process to terminate
"""
# check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value
if process is not None and process.poll() is None:
process.terminate()
print("\nStopped aggregator process with PID: {0}\n".format(process.pid))
@view_defaults(route_name='round_trip_time_query', renderer='json')
class RoundTripTimeQuery(object):
"""
A class-based view for querying the round trip time in a given range.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the averaged round trip time of a specific media service over a given time range.
:return: A JSON response with the round trip time and its contributing parts.
"""
params = {}
for attribute in ROUND_TRIP_ATTRIBUTES:
if attribute in self.request.params:
params[attribute] = self.request.params.get(attribute)
try:
params = validate_round_trip_query_params(params)
config_data = {config_attribute: self.request.registry.settings.get(config_attribute) for config_attribute in CONFIG_ATTRIBUTES}
media_service = params.get(ROUND_TRIP_ATTRIBUTES[0])
start_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[1])
end_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[2])
influx_db_name = config_data.get(CONFIG_ATTRIBUTES[1])
influx_db_url = config_data.get(CONFIG_ATTRIBUTES[2])
url_object = urlparse(influx_db_url)
try:
db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=influx_db_name, timeout=10)
query = 'SELECT mean(*) FROM "{0}"."autogen"."e2e_delays" WHERE time >= {1} and time < {2} and sf_instance = \'{3}\''.format(
influx_db_name, start_timestamp, end_timestamp, media_service)
print(query)
result = db_client.query(query)
actual_result = next(result.get_points(), None)
if actual_result is None:
return {"result": None}
else:
forward_latency = actual_result.get("mean_delay_forward")
reverse_latency = actual_result.get("mean_delay_reverse")
service_delay = actual_result.get("mean_delay_service")
request_size = actual_result.get("mean_avg_request_size")
response_size = actual_result.get("mean_avg_response_size")
bandwidth = actual_result.get("mean_avg_bandwidth")
rtt = self.calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth)
return {"result": rtt}
except:
raise HTTPInternalServerError("Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url))
except AssertionError:
raise HTTPBadRequest('Bad request content - must be in JSON format: {"media_service": value, "start_timestamp": value, "end_timestamp": value}.')
@staticmethod
def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50):
"""
Calculates the round trip time given the list of arguments.
:param forward_latency: network latency in forward direction (s)
:param reverse_latency: network latency in reverse direction (s)
:param service_delay: media service delay (s)
:param request_size: request size (bytes)
:param response_size: response size (bytes)
:param bandwidth: network bandwidth (Mb/s)
:param packet_size: size of packet (bytes)
:param packet_header_size: size of the header of the packet (bytes)
:return: the calculated round trip time
"""
forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay