Skip to content
Snippets Groups Projects
Select Git revision
  • 6ae1daf47e2c11048c884fae4a3e4f3c25c7435f
  • master default protected
  • integration
  • pm-zipp
  • revert-4f266893
  • clmc-service
  • clmc-tutorial
  • workshop-demo
  • 2.4.4
  • 2.4.3
  • 2.4.2
  • 2.4.1
  • 2.4.0
  • 2.3.1
  • 2.3.0
  • 2.2.2
  • 2.1.2
  • 2.1.1
  • 2.1.0
  • 2.0.4
  • 2.0.3
  • 2.0.2
  • 2.0.1
  • 2.0.0
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.2
28 results

views.py

Blame
  • views.py 11.67 KiB
    """
    // © University of Southampton IT Innovation Centre, 2018
    //
    // Copyright in this software belongs to University of Southampton
    // IT Innovation Centre of Gamma House, Enterprise Road,
    // Chilworth Science Park, Southampton, SO16 7NS, UK.
    //
    // This software may not be used, sold, licensed, transferred, copied
    // or reproduced in whole or in part in any manner or form or in or
    // on any media by any person other than in accordance with the terms
    // of the Licence Agreement supplied with the software, or otherwise
    // without the prior written consent of the copyright owners.
    //
    // This software is distributed WITHOUT ANY WARRANTY, without even the
    // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
    // PURPOSE, except where stated in the Licence Agreement supplied with
    // the software.
    //
    //      Created By :            Nikolay Stanchev
    //      Created Date :          15-05-2018
    //      Created for Project :   FLAME
    """
    
    from pyramid.view import view_defaults
    from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError
    from influxdb import InfluxDBClient
    from urllib.parse import urlparse
    from subprocess import Popen, DEVNULL
    from clmcservice.utilities import validate_config_content, validate_action_content, validate_round_trip_query_params, \
        CONFIG_ATTRIBUTES, ROUND_TRIP_ATTRIBUTES, RUNNING_FLAG, PROCESS_ATTRIBUTE, MALFORMED_FLAG, COMMENT_ATTRIBUTE, COMMENT_VALUE
    import os.path
    
    
    @view_defaults(route_name='aggregator_config', renderer='json')
    class AggregatorConfig(object):
        """
        A class-based view for accessing and mutating the configuration of the aggregator.
        """
    
        def __init__(self, request):
            """
            Initialises the instance of the view with the request argument.
    
            :param request: client's call request
            """
    
            self.request = request
    
        def get(self):
            """
            A GET API call for the configuration of the aggregator.
    
            :return: A JSON response with the configuration of the aggregator.
            """
    
            aggregator_data = self.request.registry.settings
            config = {key: aggregator_data.get(key) for key in CONFIG_ATTRIBUTES}
    
            return config
    
        def put(self):
            """
            A PUT API call for the status of the aggregator.
    
            :return: A JSON response to the PUT call - essentially with the new configured data and comment of the state of the aggregator
            :raises HTTPBadRequest: if request body is not a valid JSON for the configurator
            """
    
            old_config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
            new_config = self.request.body.decode(self.request.charset)
    
            try:
                new_config = validate_config_content(new_config)
    
                for attribute in CONFIG_ATTRIBUTES:
                    self.request.registry.settings[attribute] = new_config.get(attribute)
    
                # if configuration is not already malformed, check whether the configuration is updated
                if not self.request.registry.settings[MALFORMED_FLAG]:
                    malformed = old_config != new_config and self.request.registry.settings[RUNNING_FLAG]
                    self.request.registry.settings[MALFORMED_FLAG] = malformed
                    if malformed:
                        new_config[MALFORMED_FLAG] = True
                        new_config[COMMENT_ATTRIBUTE] = COMMENT_VALUE
    
                return new_config
    
            except AssertionError:
                raise HTTPBadRequest("Bad request content - configuration format is incorrect.")
    
    
    @view_defaults(route_name='aggregator_controller', renderer='json')
    class AggregatorController(object):
    
        """
        A class-based view for controlling the aggregator.
        """
    
        def __init__(self, request):
            """
            Initialises the instance of the view with the request argument.
    
            :param request: client's call request
            """
    
            self.request = request
    
        def get(self):
            """
            A GET API call for the status of the aggregator - running or not.
    
            :return: A JSON response with the status of the aggregator.
            """
    
            aggregator_data = self.request.registry.settings
            config = {RUNNING_FLAG: aggregator_data.get(RUNNING_FLAG)}
    
            if aggregator_data[MALFORMED_FLAG] and aggregator_data[RUNNING_FLAG]:
                config[MALFORMED_FLAG] = True
                config[COMMENT_ATTRIBUTE] = COMMENT_VALUE
    
            return config
    
        def put(self):
            """
            A PUT API call for the status of the aggregator.
    
            :return: A JSON response to the PUT call - essentially saying whether the aggregator is running or not
            :raises HTTPBadRequest: if request body is not a valid JSON for the controller
            """
    
            content = self.request.body.decode(self.request.charset)
    
            try:
                content = validate_action_content(content)
    
                config = {attribute: self.request.registry.settings.get(attribute) for attribute in CONFIG_ATTRIBUTES}
    
                action = content['action']
    
                if action == 'start':
                    aggregator_started = self.request.registry.settings[RUNNING_FLAG]
                    if not aggregator_started:
                        process = self.start_aggregator(config)
                        self.request.registry.settings[RUNNING_FLAG] = True
                        self.request.registry.settings[PROCESS_ATTRIBUTE] = process
                elif action == 'stop':
                    self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
                    self.request.registry.settings[RUNNING_FLAG] = False
                    self.request.registry.settings[PROCESS_ATTRIBUTE] = None
                    self.request.registry.settings[MALFORMED_FLAG] = False
                elif action == 'restart':
                    self.stop_aggregator(self.request.registry.settings.get(PROCESS_ATTRIBUTE))
                    process = self.start_aggregator(config)
                    self.request.registry.settings[RUNNING_FLAG] = True
                    self.request.registry.settings[PROCESS_ATTRIBUTE] = process
                    self.request.registry.settings[MALFORMED_FLAG] = False
    
                return {RUNNING_FLAG: self.request.registry.settings.get(RUNNING_FLAG)}
    
            except AssertionError:
                raise HTTPBadRequest('Bad request content - must be in JSON format: {"action": value}, where value is "start", "stop" or "restart".')
    
        @staticmethod
        def start_aggregator(config):
            """
            An auxiliary method to start the aggregator.
    
            :param config: the configuration containing the arguments for the aggregator
            :return: the process object of the started aggregator script
            """
    
            dir_path = os.path.dirname(os.path.realpath(__file__))
            command = ['python', 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database',
                       config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')]
            process = Popen(command, cwd=dir_path, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL)
            print("\nStarted aggregator process with PID: {0}\n".format(process.pid))
    
            return process
    
        @staticmethod
        def stop_aggregator(process):
            """
            An auxiliary method to stop the aggregator.
    
            :param process: the process to terminate
            """
    
            # check if the process is started before trying to terminate it - process.poll() only returns something if the process has terminated, hence we check for a None value
            if process is not None and process.poll() is None:
                process.terminate()
                print("\nStopped aggregator process with PID: {0}\n".format(process.pid))
    
    
    @view_defaults(route_name='round_trip_time_query', renderer='json')
    class RoundTripTimeQuery(object):
    
        """
        A class-based view for querying the round trip time in a given range.
        """
    
        def __init__(self, request):
            """
            Initialises the instance of the view with the request argument.
    
            :param request: client's call request
            """
    
            self.request = request
    
        def get(self):
            """
            A GET API call for the averaged round trip time of a specific media service over a given time range.
    
            :return: A JSON response with the round trip time and its contributing parts.
            """
    
            params = {}
            for attribute in ROUND_TRIP_ATTRIBUTES:
                if attribute in self.request.params:
                    params[attribute] = self.request.params.get(attribute)
    
            try:
                params = validate_round_trip_query_params(params)
                config_data = {config_attribute: self.request.registry.settings.get(config_attribute) for config_attribute in CONFIG_ATTRIBUTES}
    
                media_service = params.get(ROUND_TRIP_ATTRIBUTES[0])
                start_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[1])
                end_timestamp = params.get(ROUND_TRIP_ATTRIBUTES[2])
                influx_db_name = config_data.get(CONFIG_ATTRIBUTES[1])
                influx_db_url = config_data.get(CONFIG_ATTRIBUTES[2])
    
                url_object = urlparse(influx_db_url)
                try:
                    db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=influx_db_name, timeout=10)
                    query = 'SELECT mean(*) FROM "{0}"."autogen"."e2e_delays" WHERE time >= {1} and time < {2} and sf_instance = \'{3}\''.format(
                        influx_db_name, start_timestamp, end_timestamp, media_service)
                    print(query)
                    result = db_client.query(query)
    
                    actual_result = next(result.get_points(), None)
                    if actual_result is None:
                        return {"result": None}
                    else:
                        forward_latency = actual_result.get("mean_delay_forward")
                        reverse_latency = actual_result.get("mean_delay_reverse")
                        service_delay = actual_result.get("mean_delay_service")
                        request_size = actual_result.get("mean_avg_request_size")
                        response_size = actual_result.get("mean_avg_response_size")
                        bandwidth = actual_result.get("mean_avg_bandwidth")
    
                        rtt = self.calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth)
                        return {"result": rtt}
                except:
                    raise HTTPInternalServerError("Cannot instantiate connection with database {0} on url {1}.".format(influx_db_name, influx_db_url))
    
            except AssertionError:
                raise HTTPBadRequest('Bad request content - must be in JSON format: {"media_service": value, "start_timestamp": value, "end_timestamp": value}.')
    
        @staticmethod
        def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50):
            """
            Calculates the round trip time given the list of arguments.
    
            :param forward_latency: network latency in forward direction (s)
            :param reverse_latency: network latency in reverse direction (s)
            :param service_delay: media service delay (s)
            :param request_size: request size (bytes)
            :param response_size: response size (bytes)
            :param bandwidth: network bandwidth (Mb/s)
            :param packet_size: size of packet (bytes)
            :param packet_header_size: size of the header of the packet (bytes)
            :return: the calculated round trip time
            """
    
            forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
            reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
    
            return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay