#!/usr/bin/python3 """ // © University of Southampton IT Innovation Centre, 2018 // // Copyright in this software belongs to University of Southampton // IT Innovation Centre of Gamma House, Enterprise Road, // Chilworth Science Park, Southampton, SO16 7NS, UK. // // This software may not be used, sold, licensed, transferred, copied // or reproduced in whole or in part in any manner or form or in or // on any media by any person other than in accordance with the terms // of the Licence Agreement supplied with the software, or otherwise // without the prior written consent of the copyright owners. // // This software is distributed WITHOUT ANY WARRANTY, without even the // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // PURPOSE, except where stated in the Licence Agreement supplied with // the software. // // Created By : Nikolay Stanchev // Created Date : 04-07-2018 // Created for Project : FLAME """ from json import loads from py2neo import Node, Relationship import logging GRAPH_ROUND_TRIP_TIME_URL_PARAMS = ("startpoint", "endpoint") GRAPH_BUILD_QUERY_PARAMS = {"from", "to", "service_function_chain", "service_function_chain_instance", "service_functions"} GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "request_size_field", "response_size_field", "measurement_name"} INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and "flame_sfp"=\'{8}\' and time>={9} and time<{10} GROUP BY "flame_sfe", "flame_location", "flame_sf"' # in cypher the syntax is {name: 'value'}, here we use {{name: 'value'}} to escape the curly braces when applying the python format function RTT_CYPHER_QUERY_TEMPLATE = """ MATCH (startpoint:{0} {{ name: '{1}' }}),(endpoint:Endpoint {{ name: '{2}', uuid: '{3}'}}), path = shortestPath((startpoint)-[*]-(endpoint)) WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' ) WITH extract(y in filter(x in relationships(path) WHERE type(x) = 'linkedTo') | y.latency) as latencies, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size RETURN latencies as forward_latencies, reverse(latencies) as reverse_latencies, response_time, request_size, response_size """ # DEPRECATED QUERY - use this if we have to switch back to using two directed edges between a given pair of nodes # RTT_CYPHER_QUERY_TEMPLATE = """ # MATCH (dc:Cluster {{ name: '{0}' }}),(endpoint:Endpoint {{ name: '{1}', uuid: '{2}'}}), # path = shortestPath((dc)-[*]-(endpoint)) # WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' ) # WITH nodes(path) as all_nodes, endpoint as endpoint # WITH all_nodes[0..size(all_nodes)-1] as network_nodes, endpoint as endpoint # UNWIND RANGE(0, size(network_nodes) - 2) as id # WITH network_nodes[id] as source, network_nodes[id+1] as target, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size # MATCH (source) -[r1]-> (target), (target) -[r2]-> (source) # RETURN collect(r1.latency) as forward_latencies, reverse(collect(r2.latency)) as reverse_latencies, response_time, request_size, response_size # """ log = logging.getLogger('service_logger') def validate_json_queries_body(body): """ Validates the request body containing mappings from service functions to queries to execute. :param body: the request body to validate :return the validated json queries dictionary object :raise AssertionError: if the body is invalid """ global GRAPH_BUILD_QUERY_PARAMS try: body = loads(body) except: raise AssertionError("Configuration must be a JSON object.") assert GRAPH_BUILD_QUERY_PARAMS == set(body.keys()), "Invalid JSON query document." # NOTE: this code is now outdated - we no longer have SFC instance ID depending on the SFC ID # sfc_i = body["service_function_chain_instance"] # sfc_i_subparts = sfc_i.split('_') # assert len(sfc_i_subparts) > 1, "Incorrect format of service function chain instance ID - use format <sfcID>_<instanceNum>" # # # check the last part of the sfc_i ID is a number # try: # int(sfc_i_subparts[-1]) # except ValueError: # assert False, "Incorrect format of service function chain instance ID - use format <sfcID>_<instanceNum>" assert type(body["service_functions"]) == dict, "The service function description should be represented with a dictionary." for sf in body["service_functions"]: query_data = body["service_functions"][sf] assert type(query_data) == dict, "Each service function must be associated with a respective JSON object." assert GRAPH_BUILD_SF_QUERY_PARAMS == set(query_data.keys()), "Invalid query data for service function {0} in the JSON query document".format(sf) assert type(body["from"]) == int, "'from' parameter must be a timestamp integer" assert body["from"] >= 0, "'from' parameter must be a positive timestamp integer or 0" assert type(body["to"]) == int, "'to' parameter must be a timestamp integer" assert body["to"] > body["from"], "'to' parameter timestamp must be greater than 'from' parameter timestamp" return body def validate_graph_rtt_params(params): """ Validates the request url parameters used in running a round trip time cypher query. :param params: the parameters dictionary to validate :return: the validated parameters :raise AssertionError: for invalid parameters """ global GRAPH_ROUND_TRIP_TIME_URL_PARAMS url_params = {} for param in GRAPH_ROUND_TRIP_TIME_URL_PARAMS: assert param in params, "Incorrect url parameters - required url query parameter '{0}' is not found in the request parameters.".format(param) url_params[param] = params[param] return url_params def find_or_create_node(graph, node_type, return_created=False, **properties): """ This function checks if a node of the given type with the given properties exists, and if not - creates it. :param graph: the graph object :param node_type: the type of the node to find or create :param return_created: if True the result will contain both the node and a boolean flag if the node was created now :param properties: the properties of the node to find or create :return: the found or newly created node object """ if 'uuid' in properties: node = graph.nodes.match(node_type, name=properties['name'], uuid=properties['uuid']).first() else: node = graph.nodes.match(node_type, name=properties['name']).first() created = False if node is None: log.info("Creating node of type {0} with properties {1}".format(node_type, properties)) node = Node(node_type, **properties) graph.create(node) created = True if return_created: return node, created else: return node def find_or_create_edge(graph, edge_type, from_node, to_node, **properties): """ This function checks if an edge of the given type with the given properties exists, and if not - creates it. :param graph: the graph object :param edge_type: the type of the edge to find or create :param from_node: the source of the edge :param to_node: the target of the edge :param properties: the properties of the edge to find or create :return: the found or newly created edge object """ edge = graph.relationships.match(nodes=(from_node, to_node), r_type=edge_type).first() if edge is None: log.info("Creating edge of type {0} from node {1} to node {2} with properties {3}".format(edge_type, from_node, to_node, properties)) edge = Relationship(from_node, edge_type, to_node, **properties) graph.create(edge) return edge def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client): """ A function used to generate a temporal graph in the neo4j db. :param request_id: the ID of the request :param from_timestamp: the start of the time range :param to_timestamp: the end of the time range :param json_queries: the JSON object containing the query data for each service function :param graph: the graph DB object :param influx_client: the influx DB client object """ global INFLUX_QUERY_TEMPLATE sfc = json_queries["service_function_chain"] sfci = json_queries["service_function_chain_instance"] db = sfc rp = "autogen" log.info("Building graph for service function chain {0}/{1} from database {2} with retention policy {3}".format(sfc, sfci, db, rp)) # create a UUID reference node reference_node = Node("Reference", **{"uuid": request_id, "sfc": sfc, "sfci": sfci, "from": from_timestamp, "to": to_timestamp}) graph.create(reference_node) # create a node for the service function chain if it doesn't exist service_function_chain_node = find_or_create_node(graph, "ServiceFunctionChain", name=sfc) # create a node for the service function chain instance if it doesn't exist service_function_chain_instance_node = find_or_create_node(graph, "ServiceFunctionChainInstance", name=sfci) # create a instanceOf edge if it doesn't exist find_or_create_edge(graph, "instanceOf", service_function_chain_instance_node, service_function_chain_node) compute_nodes = set() # a set is used to keep track of all compute nodes that are found while building the graph, which is then used to retrieve the network latencies # traverse the list of service functions for service_function_package in json_queries["service_functions"]: query_data = json_queries["service_functions"][service_function_package] response_time_field = query_data["response_time_field"] request_size_field = query_data["request_size_field"] response_size_field = query_data["response_size_field"] measurement = query_data["measurement_name"] # build up the query by setting the placeholders in the query template query_to_execute = INFLUX_QUERY_TEMPLATE.format(response_time_field, request_size_field, response_size_field, db, rp, measurement, sfc, sfci, service_function_package, from_timestamp, to_timestamp) log.info("Executing query: {0}".format(query_to_execute)) result = influx_client.query(query_to_execute) # execute the query result_items = result.items() if len(result_items) > 0: # create a node for the service function if it doesn't exist service_function_package_node = find_or_create_node(graph, "ServiceFunctionPackage", name=service_function_package) # crate a utilizedBy edge between the service function and the service function chain find_or_create_edge(graph, "utilizedBy", service_function_package_node, service_function_chain_node) # iterate through each result item for item in result_items: metadata, result_points = item # each result item is a tuple of two elements # metadata consists of the result tags and the measurement name # measurement = metadata[0] tags = metadata[1] result_point = next(result_points) # get the result point dictionary response_time = result_point["mean_response_time"] # extract the response time of the SF from the result request_size = result_point["mean_request_size"] # extract the avg request size of the SF from the result response_size = result_point["mean_response_size"] # extract the avg response size of the SF from the result # create a ServiceFunction node from the tag value (if it is not already created) service_function_node = find_or_create_node(graph, "ServiceFunction", name=tags["flame_sf"]) # create an edge between the the service function and the package (if it is not already created) find_or_create_edge(graph, "instanceOf", service_function_node, service_function_package_node) # crate a utilizedBy edge between the service function and the service function chain instance find_or_create_edge(graph, "utilizedBy", service_function_node, service_function_chain_instance_node) # create an Endpoint node from the tag value (if it is not already created) ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["flame_sfe"], response_time=response_time, request_size=request_size, response_size=response_size, uuid=request_id) # create an edge between the service function and the endpoint (if it is not already created) find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node) # create a Cluster node from the tag value (if it is not already created) compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"]) # create an edge between the endpoint and the compute node (if it is not already created) find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node) compute_nodes.add(compute_node) # add the compute node to the set of compute nodes log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfci, db, rp)) def delete_temporal_subgraph(graph, subgraph_id): """ A function used to delete a subgraph associated with a subgraph ID obtained from the CLMC service. :param graph: the neo4j graph db object :param subgraph_id: the ID of the subgraph delete :return: the number of nodes that were matched with the given subgraph ID """ log.info("Deleting subgraph associated with ID {0}".format(subgraph_id)) subgraph = graph.nodes.match(uuid=subgraph_id) nodes_matched = 0 for node in subgraph: graph.delete(node) nodes_matched += 1 log.info("Deleted {0} nodes associated with ID {1}".format(nodes_matched, subgraph_id)) return nodes_matched def build_network_graph(graph, switches, links, clusters, ues): """ A function used to build the network topology in the neo4j graph given the collection of switches, links and clusters. :param graph: the neo4j graph database client :param switches: a collection of all switches in the topology - mapping between the DPID of the switch and its IP address :param links: a collection of all switch-to-switch links in the network topology - JSON format, list of objects, each object must have "src-switch", "dst-switch" and "latency" as keys :param clusters: a collection of all clusters and the IP address of the service router that they are connected to - mapping between an IP address of a service router and a cluster identifier :param ues: a collection of all ues and the IP address of the service router that they are connected to - mapping between an IP address of a servicer router and a ue identifier """ new_switches_count = 0 new_clusters_count = 0 new_ues_count = 0 for link in links: # get the DPID of the source switch source = link["src-switch"] # get the IP address of the source switch source = switches[source] # get the DPID of the destination switch destination = link["dst-switch"] # get the IP address of the destination switch destination = switches[destination] # retrieve the latency for this link latency = link["latency"] / 1000 # convert to seconds # create or retrieve the from node from_node, created = find_or_create_node(graph, "Switch", return_created=True, name=source) if created: new_switches_count += 1 # create or retrieve the to node to_node, created = find_or_create_node(graph, "Switch", return_created=True, name=destination) if created: new_switches_count += 1 # create the link between the two nodes find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency) # check whether the source service router connects a particular cluster or a particular UE if create_node_from_mapping(graph, from_node, source, clusters, "Cluster"): new_clusters_count += 1 if create_node_from_mapping(graph, from_node, source, ues, "UserEquipment"): new_ues_count += 1 # check whether the destination service router connects a particular cluster or a particular UE if create_node_from_mapping(graph, to_node, destination, clusters, "Cluster"): new_clusters_count += 1 if create_node_from_mapping(graph, to_node, destination, ues, "UserEquipment"): new_ues_count += 1 return new_switches_count, new_clusters_count, new_ues_count def create_node_from_mapping(graph, node, node_ip, mapping, new_node_type): """ Creates an additional node of a given type if a mapping from a switch node is found. :param graph: the neo4j graph database client :param node: the original node :param node_ip: the original node's IP address :param mapping: the mapping object (dictionary from IP address to identifier) :param new_node_type: the type of the new node to be created :return: True if new node was created and False otherwise """ if node_ip in mapping: new_node_name = mapping[node_ip] new_node, created = find_or_create_node(graph, new_node_type, return_created=True, name=new_node_name) find_or_create_edge(graph, "linkedTo", new_node, node, latency=0) return created return False def delete_network_graph(graph): """ A function used to delete all nodes of type Switch and Cluster in the neo4j graph. :param graph: the neo4j graph :return: the number of deleted switches and clusters """ log.info("Deleting Switch nodes.".format()) subgraph = graph.nodes.match("Switch") deleted_switches = len(subgraph) for node in subgraph: graph.delete(node) log.info("Deleted {0} Switch nodes.".format(deleted_switches)) log.info("Deleting Cluster nodes.") subgraph = graph.nodes.match("Cluster") deleted_clusters = len(subgraph) for node in subgraph: graph.delete(node) log.info("Deleted {0} Cluster nodes.".format(deleted_clusters)) log.info("Deleting UserEquipment nodes.") subgraph = graph.nodes.match("UserEquipment") deleted_ues = len(subgraph) for node in subgraph: graph.delete(node) log.info("Deleted {0} UserEquipment nodes.".format(deleted_clusters)) return deleted_switches, deleted_clusters, deleted_ues