Newer
Older
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 04-07-2018
// Created for Project : FLAME
"""
from json import loads
from py2neo import Node, Relationship
import logging
GRAPH_ROUND_TRIP_TIME_URL_PARAMS = ("startpoint", "endpoint")
Nikolay Stanchev
committed
GRAPH_BUILD_QUERY_PARAMS = {"from", "to", "service_function_chain", "service_function_chain_instance", "service_functions"}
Nikolay Stanchev
committed
GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "request_size_field", "response_size_field", "measurement_name"}
INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time, {1} AS mean_request_size, {2} AS mean_response_size FROM "{3}"."{4}".{5} WHERE "flame_sfc"=\'{6}\' and "flame_sfci"=\'{7}\' and "flame_sfp"=\'{8}\' and time>={9} and time<{10} GROUP BY "flame_sfe", "flame_location", "flame_sf"'
Nikolay Stanchev
committed
# in cypher the syntax is {name: 'value'}, here we use {{name: 'value'}} to escape the curly braces when applying the python format function
RTT_CYPHER_QUERY_TEMPLATE = """
MATCH (startpoint:{0} {{ name: '{1}' }}),(endpoint:Endpoint {{ name: '{2}', uuid: '{3}'}}),
path = shortestPath((startpoint)-[*]-(endpoint))
WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' )
Nikolay Stanchev
committed
WITH extract(y in filter(x in relationships(path) WHERE type(x) = 'linkedTo') | y.latency) as latencies, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size
RETURN latencies as forward_latencies, reverse(latencies) as reverse_latencies, response_time, request_size, response_size
Nikolay Stanchev
committed
# DEPRECATED QUERY - use this if we have to switch back to using two directed edges between a given pair of nodes
# RTT_CYPHER_QUERY_TEMPLATE = """
# MATCH (dc:Cluster {{ name: '{0}' }}),(endpoint:Endpoint {{ name: '{1}', uuid: '{2}'}}),
# path = shortestPath((dc)-[*]-(endpoint))
# WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' )
# WITH nodes(path) as all_nodes, endpoint as endpoint
# WITH all_nodes[0..size(all_nodes)-1] as network_nodes, endpoint as endpoint
# UNWIND RANGE(0, size(network_nodes) - 2) as id
# WITH network_nodes[id] as source, network_nodes[id+1] as target, endpoint.response_time as response_time, endpoint.request_size as request_size, endpoint.response_size as response_size
# MATCH (source) -[r1]-> (target), (target) -[r2]-> (source)
# RETURN collect(r1.latency) as forward_latencies, reverse(collect(r2.latency)) as reverse_latencies, response_time, request_size, response_size
# """
log = logging.getLogger('service_logger')
def validate_json_queries_body(body):
"""
Validates the request body containing mappings from service functions to queries to execute.
:param body: the request body to validate
:return the validated json queries dictionary object
:raise AssertionError: if the body is invalid
"""
global GRAPH_BUILD_QUERY_PARAMS
try:
body = loads(body)
except:
raise AssertionError("Configuration must be a JSON object.")
assert GRAPH_BUILD_QUERY_PARAMS == set(body.keys()), "Invalid JSON query document."
# NOTE: this code is now outdated - we no longer have SFC instance ID depending on the SFC ID
# sfc_i = body["service_function_chain_instance"]
# sfc_i_subparts = sfc_i.split('_')
# assert len(sfc_i_subparts) > 1, "Incorrect format of service function chain instance ID - use format <sfcID>_<instanceNum>"
#
# # check the last part of the sfc_i ID is a number
# try:
# int(sfc_i_subparts[-1])
# except ValueError:
# assert False, "Incorrect format of service function chain instance ID - use format <sfcID>_<instanceNum>"
Nikolay Stanchev
committed
assert type(body["service_functions"]) == dict, "The service function description should be represented with a dictionary."
for sf in body["service_functions"]:
query_data = body["service_functions"][sf]
assert type(query_data) == dict, "Each service function must be associated with a respective JSON object."
assert GRAPH_BUILD_SF_QUERY_PARAMS == set(query_data.keys()), "Invalid query data for service function {0} in the JSON query document".format(sf)
Nikolay Stanchev
committed
assert type(body["from"]) == int, "'from' parameter must be a timestamp integer"
assert body["from"] >= 0, "'from' parameter must be a positive timestamp integer or 0"
Nikolay Stanchev
committed
assert type(body["to"]) == int, "'to' parameter must be a timestamp integer"
assert body["to"] > body["from"], "'to' parameter timestamp must be greater than 'from' parameter timestamp"
Nikolay Stanchev
committed
return body
def validate_graph_rtt_params(params):
"""
Validates the request url parameters used in running a round trip time cypher query.
:param params: the parameters dictionary to validate
:return: the validated parameters
:raise AssertionError: for invalid parameters
"""
global GRAPH_ROUND_TRIP_TIME_URL_PARAMS
url_params = {}
for param in GRAPH_ROUND_TRIP_TIME_URL_PARAMS:
assert param in params, "Incorrect url parameters - required url query parameter '{0}' is not found in the request parameters.".format(param)
url_params[param] = params[param]
return url_params
Nikolay Stanchev
committed
def find_or_create_node(graph, node_type, return_created=False, **properties):
"""
This function checks if a node of the given type with the given properties exists, and if not - creates it.
:param graph: the graph object
:param node_type: the type of the node to find or create
Nikolay Stanchev
committed
:param return_created: if True the result will contain both the node and a boolean flag if the node was created now
:param properties: the properties of the node to find or create
:return: the found or newly created node object
"""
if 'uuid' in properties:
node = graph.nodes.match(node_type, name=properties['name'], uuid=properties['uuid']).first()
else:
node = graph.nodes.match(node_type, name=properties['name']).first()
Nikolay Stanchev
committed
created = False
if node is None:
log.info("Creating node of type {0} with properties {1}".format(node_type, properties))
node = Node(node_type, **properties)
graph.create(node)
Nikolay Stanchev
committed
created = True
Nikolay Stanchev
committed
if return_created:
return node, created
else:
return node
def find_or_create_edge(graph, edge_type, from_node, to_node, **properties):
"""
This function checks if an edge of the given type with the given properties exists, and if not - creates it.
:param graph: the graph object
:param edge_type: the type of the edge to find or create
:param from_node: the source of the edge
:param to_node: the target of the edge
:param properties: the properties of the edge to find or create
:return: the found or newly created edge object
"""
edge = graph.relationships.match(nodes=(from_node, to_node), r_type=edge_type).first()
if edge is None:
log.info("Creating edge of type {0} from node {1} to node {2} with properties {3}".format(edge_type, from_node, to_node, properties))
edge = Relationship(from_node, edge_type, to_node, **properties)
graph.create(edge)
return edge
Nikolay Stanchev
committed
def build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client):
"""
A function used to generate a temporal graph in the neo4j db.
:param request_id: the ID of the request
:param from_timestamp: the start of the time range
:param to_timestamp: the end of the time range
:param json_queries: the JSON object containing the query data for each service function
:param graph: the graph DB object
:param influx_client: the influx DB client object
"""
global INFLUX_QUERY_TEMPLATE
sfc = json_queries["service_function_chain"]
sfci = json_queries["service_function_chain_instance"]
db = sfc
rp = "autogen"
log.info("Building graph for service function chain {0}/{1} from database {2} with retention policy {3}".format(sfc, sfci, db, rp))
# create a UUID reference node
reference_node = Node("Reference", **{"uuid": request_id, "sfc": sfc, "sfci": sfci, "from": from_timestamp, "to": to_timestamp})
graph.create(reference_node)
Nikolay Stanchev
committed
# create a node for the service function chain if it doesn't exist
service_function_chain_node = find_or_create_node(graph, "ServiceFunctionChain", name=sfc)
# create a node for the service function chain instance if it doesn't exist
service_function_chain_instance_node = find_or_create_node(graph, "ServiceFunctionChainInstance", name=sfci)
Nikolay Stanchev
committed
# create a instanceOf edge if it doesn't exist
find_or_create_edge(graph, "instanceOf", service_function_chain_instance_node, service_function_chain_node)
compute_nodes = set() # a set is used to keep track of all compute nodes that are found while building the graph, which is then used to retrieve the network latencies
# traverse the list of service functions
for service_function_package in json_queries["service_functions"]:
query_data = json_queries["service_functions"][service_function_package]
Nikolay Stanchev
committed
response_time_field = query_data["response_time_field"]
request_size_field = query_data["request_size_field"]
response_size_field = query_data["response_size_field"]
measurement = query_data["measurement_name"]
# build up the query by setting the placeholders in the query template
query_to_execute = INFLUX_QUERY_TEMPLATE.format(response_time_field, request_size_field, response_size_field, db, rp, measurement, sfc, sfci, service_function_package, from_timestamp, to_timestamp)
log.info("Executing query: {0}".format(query_to_execute))
result = influx_client.query(query_to_execute) # execute the query
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
result_items = result.items()
if len(result_items) > 0:
# create a node for the service function if it doesn't exist
service_function_package_node = find_or_create_node(graph, "ServiceFunctionPackage", name=service_function_package)
# crate a utilizedBy edge between the service function and the service function chain
find_or_create_edge(graph, "utilizedBy", service_function_package_node, service_function_chain_node)
# iterate through each result item
for item in result_items:
metadata, result_points = item # each result item is a tuple of two elements
# metadata consists of the result tags and the measurement name
# measurement = metadata[0]
tags = metadata[1]
result_point = next(result_points) # get the result point dictionary
response_time = result_point["mean_response_time"] # extract the response time of the SF from the result
request_size = result_point["mean_request_size"] # extract the avg request size of the SF from the result
response_size = result_point["mean_response_size"] # extract the avg response size of the SF from the result
# create a ServiceFunction node from the tag value (if it is not already created)
service_function_node = find_or_create_node(graph, "ServiceFunction", name=tags["flame_sf"])
# create an edge between the the service function and the package (if it is not already created)
find_or_create_edge(graph, "instanceOf", service_function_node, service_function_package_node)
# crate a utilizedBy edge between the service function and the service function chain instance
find_or_create_edge(graph, "utilizedBy", service_function_node, service_function_chain_instance_node)
# create an Endpoint node from the tag value (if it is not already created)
ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["flame_sfe"], response_time=response_time, request_size=request_size, response_size=response_size, uuid=request_id)
# create an edge between the service function and the endpoint (if it is not already created)
find_or_create_edge(graph, "realisedBy", service_function_node, ipendpoint_node)
# create a Cluster node from the tag value (if it is not already created)
compute_node = find_or_create_node(graph, "Cluster", name=tags["flame_location"])
# create an edge between the endpoint and the compute node (if it is not already created)
find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node)
compute_nodes.add(compute_node) # add the compute node to the set of compute nodes
log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfci, db, rp))
def delete_temporal_subgraph(graph, subgraph_id):
"""
A function used to delete a subgraph associated with a subgraph ID obtained from the CLMC service.
:param graph: the neo4j graph db object
:param subgraph_id: the ID of the subgraph delete
:return: the number of nodes that were matched with the given subgraph ID
"""
log.info("Deleting subgraph associated with ID {0}".format(subgraph_id))
subgraph = graph.nodes.match(uuid=subgraph_id)
nodes_matched = 0
for node in subgraph:
graph.delete(node)
nodes_matched += 1
log.info("Deleted {0} nodes associated with ID {1}".format(nodes_matched, subgraph_id))
return nodes_matched
Nikolay Stanchev
committed
def build_network_graph(graph, switches, links, clusters, ues):
Nikolay Stanchev
committed
"""
A function used to build the network topology in the neo4j graph given the collection of switches, links and clusters.
:param graph: the neo4j graph database client
:param switches: a collection of all switches in the topology - mapping between the DPID of the switch and its IP address
:param links: a collection of all switch-to-switch links in the network topology - JSON format, list of objects, each object must have "src-switch", "dst-switch" and "latency" as keys
:param clusters: a collection of all clusters and the IP address of the service router that they are connected to - mapping between an IP address of a service router and a cluster identifier
:param ues: a collection of all ues and the IP address of the service router that they are connected to - mapping between an IP address of a servicer router and a ue identifier
Nikolay Stanchev
committed
"""
Nikolay Stanchev
committed
new_switches_count = 0
new_clusters_count = 0
new_ues_count = 0
Nikolay Stanchev
committed
for link in links:
# get the DPID of the source switch
source = link["src-switch"]
# get the IP address of the source switch
source = switches[source]
# get the DPID of the destination switch
destination = link["dst-switch"]
# get the IP address of the destination switch
destination = switches[destination]
# retrieve the latency for this link
Nikolay Stanchev
committed
latency = link["latency"] / 1000 # convert to seconds
Nikolay Stanchev
committed
Nikolay Stanchev
committed
# create or retrieve the from node
from_node, created = find_or_create_node(graph, "Switch", return_created=True, name=source)
if created:
new_switches_count += 1
# create or retrieve the to node
to_node, created = find_or_create_node(graph, "Switch", return_created=True, name=destination)
if created:
new_switches_count += 1
Nikolay Stanchev
committed
# create the link between the two nodes
find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency)
# check whether the source service router connects a particular cluster or a particular UE
if create_node_from_mapping(graph, from_node, source, clusters, "Cluster"):
new_clusters_count += 1
if create_node_from_mapping(graph, from_node, source, ues, "UserEquipment"):
new_ues_count += 1
# check whether the destination service router connects a particular cluster or a particular UE
if create_node_from_mapping(graph, to_node, destination, clusters, "Cluster"):
new_clusters_count += 1
if create_node_from_mapping(graph, to_node, destination, ues, "UserEquipment"):
new_ues_count += 1
return new_switches_count, new_clusters_count, new_ues_count
Nikolay Stanchev
committed
def create_node_from_mapping(graph, node, node_ip, mapping, new_node_type):
"""
Creates an additional node of a given type if a mapping from a switch node is found.
:param graph: the neo4j graph database client
:param node: the original node
:param node_ip: the original node's IP address
:param mapping: the mapping object (dictionary from IP address to identifier)
:param new_node_type: the type of the new node to be created
Nikolay Stanchev
committed
:return: True if new node was created and False otherwise
"""
if node_ip in mapping:
new_node_name = mapping[node_ip]
new_node, created = find_or_create_node(graph, new_node_type, return_created=True, name=new_node_name)
find_or_create_edge(graph, "linkedTo", new_node, node, latency=0)
return created
return False
def delete_network_graph(graph):
"""
A function used to delete all nodes of type Switch and Cluster in the neo4j graph.
:param graph: the neo4j graph
:return: the number of deleted switches and clusters
"""
log.info("Deleting Switch nodes.".format())
subgraph = graph.nodes.match("Switch")
deleted_switches = len(subgraph)
for node in subgraph:
graph.delete(node)
log.info("Deleted {0} Switch nodes.".format(deleted_switches))
log.info("Deleting Cluster nodes.")
subgraph = graph.nodes.match("Cluster")
deleted_clusters = len(subgraph)
for node in subgraph:
graph.delete(node)
log.info("Deleted {0} Cluster nodes.".format(deleted_clusters))
log.info("Deleting UserEquipment nodes.")
subgraph = graph.nodes.match("UserEquipment")
deleted_ues = len(subgraph)
for node in subgraph:
graph.delete(node)
log.info("Deleted {0} UserEquipment nodes.".format(deleted_clusters))
return deleted_switches, deleted_clusters, deleted_ues