Skip to content
Snippets Groups Projects
views.py 18.3 KiB
Newer Older
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
//      Created By :            Nikolay Stanchev
//      Created Date :          04-07-2018
//      Created for Project :   FLAME
"""


from clmcservice.graphapi.utilities import validate_json_queries_body, RTT_CYPHER_QUERY_TEMPLATE, \
    build_network_graph, delete_network_graph, build_temporal_subgraph, delete_temporal_subgraph, validate_graph_rtt_params
from influxdb import InfluxDBClient
from py2neo import Graph
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPServiceUnavailable, HTTPNotImplemented
from pyramid.view import view_defaults, view_config
import logging


log = logging.getLogger('service_logger')


@view_defaults(renderer='json')
class GraphAPI(object):
    """
    A class-based view for  building temporal graphs and running graph queries.
    """

    def __init__(self, request):
        """
        Initialises the instance of the view with the request argument.

        :param request: client's call request
        """

        self.request = request

    @view_config(route_name='graph_build', request_method='POST')
    def build_temporal_graph(self):
        """
        An API endpoint to build a temporal graph and store it in neo4j based on the posted JSON query document.

        :raises HTTPBadRequest: if request body is not a valid JSON with the queries per service function
        :return: A JSON document containing the posted request body, along with meta data about the built graph (time range and uuid, which can then be reused for other API calls)
        """

        try:
            body = self.request.body.decode(self.request.charset)
            json_queries = validate_json_queries_body(body)  # validate the content and receive a json dictionary object
        except AssertionError as e:
            raise HTTPBadRequest("Bad request content: {0}".format(e.args))

        graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])
        influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10)

        database_name = json_queries["service_function_chain"]
        if database_name not in [db["name"] for db in influx_client.get_list_database()]:
            raise HTTPBadRequest("Database for service function chain {0} not found.".format(database_name))
        from_timestamp = json_queries['from'] * 10**9
        to_timestamp = json_queries['to'] * 10**9
        build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client)
        json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}}
        json_queries.pop("from")
        json_queries.pop("to")

        return json_queries

    @view_config(route_name='graph_manage', request_method='DELETE')
    def delete_temporal_graph(self):
        """
        An API endpoint to delete a temporal graph associated with a uuid generated by the CLMC service.

        :return: A JSON document containing the UUID of the deleted subgraph
        :raises HTTPNotFound: if the request is not associated with any subgraph
        """

        graph_id = self.request.matchdict['graph_id']  # get the UUID of the subgraph from the URL
        graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])  # connect to the neo4j graph db

        if graph.nodes.match("Reference", uuid=graph_id).first() is None:
            raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))

        number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id)
        return {"uuid": graph_id, "deleted": number_of_deleted_nodes}

    @view_config(route_name='graph_algorithms_rtt', request_method='GET')
    def run_rtt_query(self):
        """
        An API endpoint to run the round trip time cypher query over the graph associated with a given request ID.

        :return: A JSON response with a list of forward latencies, reverse latencies and SF endpoint response time.
        :raises HTTPBadRequest: if the request URL doesn't contain the required URL query parameters
        :raises HTTPNotFound: if the request is not associated with any subgraph or the compute node / endpoint node doesn't exist
        """

        graph_id = self.request.matchdict['graph_id']  # get the UUID of the subgraph from the URL

        try:
            params = validate_graph_rtt_params(self.request.params)
        except AssertionError as e:
            raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args))

        startpoint_node_label = params["startpoint"]
        endpoint_node_label = params["endpoint"]

        graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])  # connect to the neo4j graph db

        all_nodes = graph.nodes

        reference_node = all_nodes.match("Reference", uuid=graph_id).first()
        if reference_node is None:
            raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))

        # match a switch or cluster node as a path start point and capture the type of the node
        startpoint_node = all_nodes.match("Switch", name=startpoint_node_label).first()
            startpoint_node = all_nodes.match("Cluster", name=startpoint_node_label).first()
            if startpoint_node is None:
                # no match with Swtich/Cluster node
                raise HTTPNotFound("Starting point node {0} doesn't exist.".format(startpoint_node_label))
            else:
                # a match with a Cluster node
                startpoint_node_type = "Cluster"
        else:
            # a match with a Switch node
            startpoint_node_type = "Switch"

        endpoint_node = all_nodes.match("Endpoint", name=endpoint_node_label, uuid=graph_id).first()
        if endpoint_node is None:
            raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label))

        # check if the endpoint is hosted by the compute node before running the RTT cypher query
        hosted_by_node = graph.relationships.match(nodes=(endpoint_node, None), r_type="hostedBy").first().end_node
        if hosted_by_node["name"] == startpoint_node["name"]:
            result = {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"],
                      "request_size": endpoint_node["request_size"], "response_size": endpoint_node["response_size"]}
        else:
            query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(startpoint_node_type, startpoint_node_label, endpoint_node_label, graph_id)
            log.info("Executing cypher query: {0}".format(query_to_execute))
            data = graph.run(query_to_execute).data()  # returns a list of dictionaries, each dictionary represents a row in the result
            result = data[0]
        sf_node = graph.match(nodes=(None, endpoint_node), r_type="realisedBy").first().start_node
        if sf_node is None:
            msg = "No service function found associated with endpoint {0}".format(endpoint_node["name"])
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPBadRequest(msg)

        sf_package_node = graph.match(nodes=(sf_node, None), r_type="instanceOf").first().end_node
        if sf_package_node is None:
            msg = "No service function package found associated with service function {0}".format(sf_node["name"])
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPBadRequest(msg)

        result["global_tags"] = {"flame_sfe": endpoint_node["name"], "flame_server": hosted_by_node["name"], "flame_location": hosted_by_node["name"],
                                 "flame_sfc": reference_node["sfc"], "flame_sfci": reference_node["sfci"], "flame_sfp": sf_package_node["name"], "flame_sf": sf_node["name"]}

        # calculate the Round-Trip-Time
        total_forward_latency = sum(result["forward_latencies"])
        result["total_forward_latency"] = total_forward_latency
        total_reverse_latency = sum(result["reverse_latencies"])
        result["total_reverse_latency"] = total_reverse_latency
        bandwidth = self.request.registry.settings["network_bandwidth"]
        result["bandwidth"] = bandwidth
        service_delay = result["response_time"]
        request_size = result["request_size"]
        response_size = result["response_size"]

        round_trip_time = self.calculate_round_trip_time(total_forward_latency, total_reverse_latency, service_delay, request_size, response_size, bandwidth)
        result["round_trip_time"] = round_trip_time

        return result

    @staticmethod
    def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50):
        """
        Calculates the round trip time given the list of arguments.

        :param forward_latency: network latency in forward direction (s)
        :param reverse_latency: network latency in reverse direction (s)
        :param service_delay: media service delay (s)
        :param request_size: request size (bytes)
        :param response_size: response size (bytes)
        :param bandwidth: network bandwidth (Mb/s)
        :param packet_size: size of packet (bytes)
        :param packet_header_size: size of the header of the packet (bytes)
        :return: the calculated round trip time
        """

        if forward_latency > 0 and reverse_latency > 0:
            forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
            reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
        else:
            forward_data_delay, reverse_data_delay = 0, 0

        return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay

    @view_config(route_name='graph_network_topology', request_method='POST')
    def build_network_topology(self):
        """
        An API endpoint to build/update the network topology in the neo4j graph.

        :return: A JSON response with the number of switches and clusters that were built.
        """

        graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])  # connect to the neo4j graph db

        sdn_controller_ip = self.request.registry.settings['sdn_controller_ip']
        sdn_controller_port = self.request.registry.settings['sdn_controller_port']

        # retrieve all switches - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
        try:
            url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/core/controller/switches/json")
            msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")

        # check if the SDN controller returned the expected response
        if response.status_code != 200:
            msg = "The SDN controller returned a response with status code different than 200."
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the list of switches.")

        try:
            content = response.json()
        except ValueError:  # response not in JSON
            msg = "The SDN controller returned a response which couldn't be converted to JSON."
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the list of switches.")

        # map the DPID of each switch to its IP address
        switches = {}
        for switch in content:
            # map the dpid to the switch IP address, the IP address is in the format '/172.168.23.54:1234'
            switches[switch["switchDPID"]] = switch["inetAddress"][1:].split(":")[0]

        # retrieve all external links (gathered through BDDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
            url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/external-links/json")
            msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")

        # check if the SDN controller returned the expected response
        if response.status_code != 200:
            msg = "The SDN controller returned a response with status code different than 200."
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.")

        try:
            external_links = response.json()
        except ValueError:  # response not in JSON
            msg = "The SDN controller returned a response which couldn't be converted to JSON."
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.")

        # retrieve all local links (gathered through LLDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
        try:
            url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/links/json")
            response = get(url)
        except exceptions.ConnectionError:
            msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")

        if response.status_code != 200:
            msg = "The SDN controller returned a response with status code different than 200."
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.")

        try:
            local_links = response.json()
        except ValueError:  # response not in JSON
            msg = "The SDN controller returned a response which couldn't be converted to JSON."
            log.error("Unexpected error: {0}".format(msg))
            raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.")

        # TODO this is a temporary solution - currently the service router to clusters mapping is read from a file (which must be manually prepared beforehand)
        clusters_file = self.request.registry.settings["network_clusters_path"]
        try:
            with open(clusters_file) as fh:
                clusters = load(fh)
        except Exception as e:
            log.error("Unexpected error: {0}".format(e))
            log.error("No service-router-to-cluster mapping was found while building the network topology.")
        # TODO this is a temporary solution - currently the service router to ues mapping is read from a file (which must be manually prepared beforehand)
        ues_file = self.request.registry.settings["network_ues_path"]
        try:
            with open(ues_file) as fh:
                ues = load(fh)
        except Exception as e:
            log.error("Unexpected error: {0}".format(e))
            log.error("No ue-to-cluster mapping was found while building the network topology.")
            ues = {}

        # build the network graph and retrieve the number of switch nodes and cluster nodes that were created
        tmp_switch_count, tmp_clusters_count, tmp_ues_count = build_network_graph(graph, switches, external_links, clusters, ues)
        switch_count, clusters_count, ues_count = build_network_graph(graph, switches, local_links, clusters, ues)
        switch_count += tmp_switch_count
        clusters_count += tmp_clusters_count
        return {"new_switches_count": switch_count, "new_clusters_count": clusters_count, "new_ues_count": ues_count}

    @view_config(route_name='graph_network_topology', request_method='DELETE')
    def delete_network_topology(self):
        """
        An API endpoint to delete the network topology in the neo4j graph.

        :return: A JSON response with the number of switches and clusters that were deleted.
        """

        graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])  # connect to the neo4j graph db

        deleted_switches, deleted_clusters, deleted_ues = delete_network_graph(graph)
        return {"deleted_switches_count": deleted_switches, "deleted_clusters_count": deleted_clusters, "deleted_ues_count": deleted_ues}