Skip to content
Snippets Groups Projects
utilities.py 18.6 KiB
Newer Older
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
//      Created By :            Nikolay Stanchev
//      Created Date :          04-07-2018
//      Created for Project :   FLAME
"""

from json import loads
from py2neo import Node, Relationship
import logging


GRAPH_ROUND_TRIP_TIME_URL_PARAMS = ("startpoint", "endpoint")
GRAPH_BUILD_QUERY_PARAMS = {"from", "to", "service_function_chain", "service_function_chain_instance", "service_functions"}
GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "request_size_field", "response_size_field", "measurement_name"}
INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and "flame_sfp"=\'{8}\' and time>={9} and time<{10} GROUP BY "flame_sfe", "flame_location", "flame_sf"'
# in cypher the syntax is {name: 'value'}, here we use {{name: 'value'}} to escape the curly braces when applying the python format function
RTT_CYPHER_QUERY_TEMPLATE = """
MATCH (startpoint:{0} {{ name: '{1}' }}),(endpoint:Endpoint {{ name: '{2}', uuid: '{3}'}}), 
path = shortestPath((startpoint)-[*]-(endpoint))
WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' )
WITH extract(y in filter(x in relationships(path) WHERE type(x) = 'linkedTo') | y.latency) as latencies, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size
RETURN latencies  as forward_latencies, reverse(latencies) as reverse_latencies, response_time, request_size, response_size
# DEPRECATED QUERY - use this if we have to switch back to using two directed edges between a given pair of nodes
# RTT_CYPHER_QUERY_TEMPLATE = """
# MATCH (dc:Cluster {{ name: '{0}' }}),(endpoint:Endpoint {{ name: '{1}', uuid: '{2}'}}),
# path = shortestPath((dc)-[*]-(endpoint))
# WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' )
# WITH nodes(path) as all_nodes, endpoint as endpoint
#     WITH all_nodes[0..size(all_nodes)-1] as network_nodes, endpoint as endpoint
#     UNWIND RANGE(0, size(network_nodes) - 2) as id
#     WITH network_nodes[id] as source, network_nodes[id+1] as target, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size
#         MATCH (source) -[r1]-> (target), (target) -[r2]-> (source)
#         RETURN collect(r1.latency) as forward_latencies, reverse(collect(r2.latency)) as reverse_latencies, response_time, request_size, response_size
# """


log = logging.getLogger('service_logger')


def validate_json_queries_body(body):
    """
    Validates the request body containing mappings from service functions to queries to execute.

    :param body: the request body to validate
    :return the validated json queries dictionary object
    :raise AssertionError: if the body is invalid
    """

    global GRAPH_BUILD_QUERY_PARAMS

    try:
        body = loads(body)
    except:
        raise AssertionError("Configuration must be a JSON object.")

    assert GRAPH_BUILD_QUERY_PARAMS == set(body.keys()), "Invalid JSON query document."

    # NOTE: this code is now outdated - we no longer have SFC instance ID depending on the SFC ID
    # sfc_i = body["service_function_chain_instance"]
    # sfc_i_subparts = sfc_i.split('_')
    # assert len(sfc_i_subparts) > 1, "Incorrect format of service function chain instance ID - use format <sfcID>_<instanceNum>"
    #
    # # check the last part of the sfc_i ID is a number
    # try:
    #     int(sfc_i_subparts[-1])
    # except ValueError:
    #     assert False, "Incorrect format of service function chain instance ID - use format <sfcID>_<instanceNum>"
Nikolay Stanchev's avatar
Nikolay Stanchev committed
    assert type(body["service_functions"]) == dict, "The service function description should be represented with a dictionary."

    for sf in body["service_functions"]:
        query_data = body["service_functions"][sf]
        assert type(query_data) == dict, "Each service function must be associated with a respective JSON object."
        assert GRAPH_BUILD_SF_QUERY_PARAMS == set(query_data.keys()), "Invalid query data for service function {0} in the JSON query document".format(sf)

    assert type(body["from"]) == int, "'from' parameter must be a timestamp integer"
    assert body["from"] >= 0, "'from' parameter must be a positive timestamp integer or 0"
    assert type(body["to"]) == int, "'to' parameter must be a timestamp integer"
    assert body["to"] > body["from"], "'to' parameter timestamp must be greater than 'from' parameter timestamp"


def validate_graph_rtt_params(params):
    """
    Validates the request url parameters used in running a round trip time cypher query.

    :param params: the parameters dictionary to validate
    :return: the validated parameters
    :raise AssertionError: for invalid parameters
    """

    global GRAPH_ROUND_TRIP_TIME_URL_PARAMS

    url_params = {}
    for param in GRAPH_ROUND_TRIP_TIME_URL_PARAMS:
        assert param in params, "Incorrect url parameters - required url query parameter '{0}' is not found in the request parameters.".format(param)
        url_params[param] = params[param]

    return url_params


def find_or_create_node(graph, node_type, return_created=False, **properties):
    """
    This function checks if a node of the given type with the given properties exists, and if not - creates it.

    :param graph: the graph object
    :param node_type: the type of the node to find or create
    :param return_created: if True the result will contain both the node and a boolean flag if the node was created now
    :param properties: the properties of the node to find or create
    :return: the found or newly created node object
    """

    if 'uuid' in properties:
        node = graph.nodes.match(node_type, name=properties['name'], uuid=properties['uuid']).first()
    else:
        node = graph.nodes.match(node_type, name=properties['name']).first()

    if node is None:
        log.info("Creating node of type {0} with properties {1}".format(node_type, properties))
        node = Node(node_type, **properties)
        graph.create(node)


def find_or_create_edge(graph, edge_type, from_node, to_node, **properties):
    """
    This function checks if an edge of the given type with the given properties exists, and if not - creates it.

    :param graph: the graph object
    :param edge_type: the type of the edge to find or create
    :param from_node: the source of the edge
    :param to_node: the target of the edge
    :param properties: the properties of the edge to find or create
    :return: the found or newly created edge object
    """

    edge = graph.relationships.match(nodes=(from_node, to_node), r_type=edge_type).first()
    if edge is None:
        log.info("Creating edge of type {0} from node {1} to node {2} with properties {3}".format(edge_type, from_node, to_node, properties))
        edge = Relationship(from_node, edge_type, to_node, **properties)
        graph.create(edge)

    return edge


def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client):
    """
    A function used to generate a temporal graph in the neo4j db.

    :param request_id: the ID of the request
    :param from_timestamp: the start of the time range
    :param to_timestamp: the end of the time range
    :param json_queries: the JSON object containing the query data for each service function
    :param graph: the graph DB object
    :param influx_client: the influx DB client object
    """

    global INFLUX_QUERY_TEMPLATE

    sfc = json_queries["service_function_chain"]
    sfci = json_queries["service_function_chain_instance"]
    db = sfc
    rp = "autogen"
    log.info("Building graph for service function chain {0}/{1} from database {2} with retention policy {3}".format(sfc, sfci, db, rp))
    reference_node = Node("Reference", **{"uuid": request_id, "sfc": sfc, "sfci": sfci, "from": from_timestamp, "to": to_timestamp})
    # create a node for the service function chain if it doesn't exist
    service_function_chain_node = find_or_create_node(graph, "ServiceFunctionChain", name=sfc)
    # create a node for the service function chain instance if it doesn't exist
    service_function_chain_instance_node = find_or_create_node(graph, "ServiceFunctionChainInstance", name=sfci)
    # create a instanceOf edge if it doesn't exist
    find_or_create_edge(graph, "instanceOf", service_function_chain_instance_node, service_function_chain_node)

    compute_nodes = set()  # a set is used to keep track of all compute nodes that are found while building the graph, which is then used to retrieve the network latencies

    # traverse the list of service functions
    for service_function_package in json_queries["service_functions"]:
        query_data = json_queries["service_functions"][service_function_package]
        response_time_field = query_data["response_time_field"]
        request_size_field = query_data["request_size_field"]
        response_size_field = query_data["response_size_field"]
        measurement = query_data["measurement_name"]

        # build up the query by setting the placeholders in the query template
        query_to_execute = INFLUX_QUERY_TEMPLATE.format(response_time_field, request_size_field, response_size_field, db, rp, measurement, sfc, sfci, service_function_package, from_timestamp, to_timestamp)

        log.info("Executing query: {0}".format(query_to_execute))
        result = influx_client.query(query_to_execute)  # execute the query
        result_items = result.items()

        if len(result_items) > 0:
            # create a node for the service function if it doesn't exist
            service_function_package_node = find_or_create_node(graph, "ServiceFunctionPackage", name=service_function_package)
            # crate a utilizedBy edge between the service function and the service function chain
            find_or_create_edge(graph, "utilizedBy", service_function_package_node, service_function_chain_node)

            # iterate through each result item
            for item in result_items:
                metadata, result_points = item  # each result item is a tuple of two elements

                # metadata consists of the result tags and the measurement name
                # measurement = metadata[0]
                tags = metadata[1]

                result_point = next(result_points)  # get the result point dictionary
                response_time = result_point["mean_response_time"]  # extract the response time of the SF from the result
                request_size = result_point["mean_request_size"]  # extract the avg request size of the SF from the result
                response_size = result_point["mean_response_size"]  # extract the avg response size of the SF from the result

                # create a ServiceFunction node from the tag value (if it is not already created)
                service_function_node = find_or_create_node(graph, "ServiceFunction", name=tags["flame_sf"])
                # create an edge between the the service function and the package (if it is not already created)
                find_or_create_edge(graph, "instanceOf", service_function_node, service_function_package_node)
                # crate a utilizedBy edge between the service function and the service function chain instance
                find_or_create_edge(graph, "utilizedBy", service_function_node, service_function_chain_instance_node)

                # create an Endpoint node from the tag value (if it is not already created)
                ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["flame_sfe"], response_time=response_time, request_size=request_size, response_size=response_size, uuid=request_id)
                # create an edge between the service function and the endpoint (if it is not already created)
                find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node)

                # create a Cluster node from the tag value (if it is not already created)
                compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"])
                # create an edge between the endpoint and the compute node (if it is not already created)
                find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node)

                compute_nodes.add(compute_node)  # add the compute node to the set of compute nodes
    log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfci, db, rp))


def delete_temporal_subgraph(graph, subgraph_id):
    """
    A function used to delete a subgraph associated with a subgraph ID obtained from the CLMC service.

    :param graph: the neo4j graph db object
    :param subgraph_id: the ID of the subgraph delete

    :return: the number of nodes that were matched with the given subgraph ID
    """

    log.info("Deleting subgraph associated with ID {0}".format(subgraph_id))

    subgraph = graph.nodes.match(uuid=subgraph_id)
    nodes_matched = 0
    for node in subgraph:
        graph.delete(node)
        nodes_matched += 1

    log.info("Deleted {0} nodes associated with ID {1}".format(nodes_matched, subgraph_id))

    return nodes_matched
def build_network_graph(graph, switches, links, clusters, ues):
    """
    A function used to build the network topology in the neo4j graph given the collection of switches, links and clusters.

    :param graph: the neo4j graph database client
    :param switches: a collection of all switches in the topology - mapping between the DPID of the switch and its IP address
    :param links: a collection of all switch-to-switch links in the network topology - JSON format, list of objects, each object must have "src-switch", "dst-switch" and "latency" as keys
    :param clusters: a collection of all clusters and the IP address of the service router that they are connected to - mapping between an IP address of a service router and a cluster identifier
    :param ues: a collection of all ues and the IP address of the service router that they are connected to - mapping between an IP address of a servicer router and a ue identifier
    for link in links:
        # get the DPID of the source switch
        source = link["src-switch"]
        # get the IP address of the source switch
        source = switches[source]

        # get the DPID of the destination switch
        destination = link["dst-switch"]
        # get the IP address of the destination switch
        destination = switches[destination]

        # retrieve the latency for this link
        latency = link["latency"] / 1000  # convert to seconds
        # create or retrieve the from node
        from_node, created = find_or_create_node(graph, "Switch", return_created=True, name=source)
        if created:
            new_switches_count += 1

        # create or retrieve the to node
        to_node, created = find_or_create_node(graph, "Switch", return_created=True, name=destination)
        if created:
            new_switches_count += 1

        # create the link between the two nodes
        find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency)

        # check whether the source service router connects a particular cluster or a particular UE
        if create_node_from_mapping(graph, from_node, source, clusters, "Cluster"):
            new_clusters_count += 1
        if create_node_from_mapping(graph, from_node, source, ues, "UserEquipment"):
            new_ues_count += 1

        # check whether the destination service router connects a particular cluster or a particular UE
        if create_node_from_mapping(graph, to_node, destination, clusters, "Cluster"):
            new_clusters_count += 1
        if create_node_from_mapping(graph, to_node, destination, ues, "UserEquipment"):
            new_ues_count += 1

    return new_switches_count, new_clusters_count, new_ues_count

def create_node_from_mapping(graph, node, node_ip, mapping, new_node_type):
    """
    Creates an additional node of a given type if a mapping from a switch node is found.

    :param graph: the neo4j graph database client
    :param node: the original node
    :param node_ip: the original node's IP address
    :param mapping: the mapping object (dictionary from IP address to identifier)
    :param new_node_type: the type of the new node to be created
    :return: True if new node was created and False otherwise
    """

    if node_ip in mapping:
        new_node_name = mapping[node_ip]
        new_node, created = find_or_create_node(graph, new_node_type, return_created=True, name=new_node_name)
        find_or_create_edge(graph, "linkedTo", new_node, node, latency=0)
        return created

    return False


def delete_network_graph(graph):
    """
    A function used to delete all nodes of type Switch and Cluster in the neo4j graph.

    :param graph: the neo4j graph
    :return: the number of deleted switches and clusters
    """

    log.info("Deleting Switch nodes.".format())

    subgraph = graph.nodes.match("Switch")
    deleted_switches = len(subgraph)
    for node in subgraph:
        graph.delete(node)

    log.info("Deleted {0} Switch nodes.".format(deleted_switches))

    log.info("Deleting Cluster nodes.")

    subgraph = graph.nodes.match("Cluster")
    deleted_clusters = len(subgraph)
    for node in subgraph:
        graph.delete(node)

    log.info("Deleted {0} Cluster nodes.".format(deleted_clusters))

    log.info("Deleting UserEquipment nodes.")

    subgraph = graph.nodes.match("UserEquipment")
    deleted_ues = len(subgraph)
    for node in subgraph:
        graph.delete(node)

    log.info("Deleted {0} UserEquipment nodes.".format(deleted_clusters))

    return deleted_switches, deleted_clusters, deleted_ues