Skip to content
Snippets Groups Projects
Commit 478adc7e authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Added an API endpoint for querying total delay for a media service

parent 9b85da5f
No related branches found
No related tags found
No related merge requests found
......@@ -51,5 +51,8 @@ def main(global_config, **settings):
config.add_view('clmcservice.views.AggregatorController', attr='get', request_method='GET')
config.add_view('clmcservice.views.AggregatorController', attr='put', request_method='PUT')
config.add_route('round_trip_time_query', '/query/round-trip-time')
config.add_view('clmcservice.views.RoundTripTimeQuery', attr='get', request_method='GET')
config.scan()
return config.make_wsgi_app()
......@@ -37,6 +37,10 @@ MALFORMED_FLAG = 'malformed' # Attribute for storing the flag, which shows whet
COMMENT_ATTRIBUTE = 'comment'
COMMENT_VALUE = 'Aggregator is running in a malformed state - it uses an old version of the configuration. Please, restart it so that the updated configuration is used.'
# the attributes of the JSON response body that are expected when querying round trip time
ROUND_TRIP_ATTRIBUTES = ('media_service', 'start_timestamp', 'end_timestamp')
URL_REGEX = compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain, e.g. example.domain.com
......@@ -66,7 +70,7 @@ def validate_config_content(configuration):
assert len(configuration) == len(CONFIG_ATTRIBUTES), "Configuration mustn't contain more attributes than the required ones."
for attribute in CONFIG_ATTRIBUTES:
assert attribute in configuration
assert attribute in configuration, "Required attribute not found in the request content."
assert type(configuration.get('aggregator_report_period')) == int, "Report period must be an integer, received {0} instead.".format(configuration.get('aggregator_report_period'))
......@@ -98,6 +102,25 @@ def validate_action_content(content):
return content
def validate_round_trip_query_params(params):
"""
A utility function to validate a dictionary of parameters.
:param params: the params dict to validate
:return: the validated parameters dictionary
:raise AssertionError: if the argument is not a valid json content
"""
global ROUND_TRIP_ATTRIBUTES
assert len(params) == len(ROUND_TRIP_ATTRIBUTES), "Content mustn't contain more attributes than the required ones."
for attribute in ROUND_TRIP_ATTRIBUTES:
assert attribute in params, "Required attribute not found in the request content."
return params
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts
......
......@@ -22,9 +22,12 @@
"""
from pyramid.view import view_defaults
from pyramid.httpexceptions import HTTPBadRequest
from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError
from influxdb import InfluxDBClient
from urllib.parse import urlparse
from subprocess import Popen, DEVNULL
from clmcservice.utilities import validate_config_content, validate_action_content, CONFIG_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE
from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \
CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE
import os.path
......@@ -187,3 +190,90 @@ class AggregatorController(object):
if process is not None and process.poll() is None:
process.terminate()
print("\nStopped aggregator process with PID: {0}\n".format(process.pid))
@view_defaults(route_name='round_trip_time_query', renderer='json')
class RoundTripTimeQuery(object):
"""
A class-based view for querying the round trip time in a given range.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
def get(self):
"""
A GET API call for the averaged round trip time of a specific media service over a given time range.
:return: A JSON response with the round trip time and its contributing parts.
"""
params = {}
for attribute in ROUND_TRIP_ATTRIBUTES:
if attribute in self.request.params:
params[attribute] = self.request.params.get(attribute)
try:
params = validate_round_trip_query_params(params)
config_data = {config_attribute: self.request.registry.settings.get(config_attribute) for config_attribute in CONFIG_ATTRIBUTES}
media_service = params.get(ROUND_TRIP_ATTRIBUTES[0])
start_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[1])
end_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[2])
influx_db_name = config_data.get(CONFIG_ATTRIBUTES[1])
influx_db_url = config_data.get(CONFIG_ATTRIBUTES[2])
url_object = urlparse(influx_db_url)
try:
db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=influx_db_name, timeout=10)
query = 'SELECT mean(*) FROM "{0}"."autogen"."e2e_delays" WHERE time >= {1} and time < {2} and sf_instance = \'{3}\''.format(
influx_db_name, start_timestamp, end_timestamp, media_service)
print(query)
result = db_client.query(query)
actual_result = next(result.get_points(), None)
if actual_result is None:
return {"result": None}
else:
forward_latency = actual_result.get("mean_delay_forward")
reverse_latency = actual_result.get("mean_delay_reverse")
service_delay = actual_result.get("mean_delay_service")
request_size = actual_result.get("mean_avg_request_size")
response_size = actual_result.get("mean_avg_response_size")
bandwidth = actual_result.get("mean_avg_bandwidth")
rtt = self.calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth)
return {"result": rtt}
except:
raise HTTPInternalServerError("Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url))
except AssertionError:
raise HTTPBadRequest('Bad request content - must be in JSON format: {"media_service": value, "start_timestamp": value, "end_timestamp": value}.')
@staticmethod
def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50):
"""
Calculates the round trip time given the list of arguments.
:param forward_latency: network latency in forward direction (s)
:param reverse_latency: network latency in reverse direction (s)
:param service_delay: media service delay (s)
:param request_size: request size (bytes)
:param response_size: response size (bytes)
:param bandwidth: network bandwidth (Mb/s)
:param packet_size: size of packet (bytes)
:param packet_header_size: size of the header of the packet (bytes)
:return: the calculated round trip time
"""
forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment