Newer
Older
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 04-07-2018
// Created for Project : FLAME
"""
Nikolay Stanchev
committed
from clmcservice.graphapi.utilities import validate_json_queries_body, RTT_CYPHER_QUERY_TEMPLATE, \
build_network_graph, delete_network_graph, build_temporal_subgraph, delete_temporal_subgraph, validate_graph_rtt_params
from influxdb import InfluxDBClient
from py2neo import Graph
Nikolay Stanchev
committed
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPServiceUnavailable, HTTPNotImplemented
from pyramid.view import view_defaults, view_config
Nikolay Stanchev
committed
from requests import exceptions, get
Nikolay Stanchev
committed
from uuid import uuid4
from json import load
import logging
log = logging.getLogger('service_logger')
@view_defaults(renderer='json')
class GraphAPI(object):
"""
A class-based view for building temporal graphs and running graph queries.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
@view_config(route_name='graph_build', request_method='POST')
def build_temporal_graph(self):
"""
An API endpoint to build a temporal graph and store it in neo4j based on the posted JSON query document.
Nikolay Stanchev
committed
:raises HTTPBadRequest: if request body is not a valid JSON with the queries per service function
:return: A JSON document containing the posted request body, along with meta data about the built graph (time range and uuid, which can then be reused for other API calls)
"""
try:
body = self.request.body.decode(self.request.charset)
json_queries = validate_json_queries_body(body) # validate the content and receive a json dictionary object
except AssertionError as e:
raise HTTPBadRequest("Bad request content: {0}".format(e.args))
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])
influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10)
database_name = json_queries["service_function_chain"]
if database_name not in [db["name"] for db in influx_client.get_list_database()]:
raise HTTPBadRequest("Database for service function chain {0} not found.".format(database_name))
Nikolay Stanchev
committed
from_timestamp = json_queries['from'] * 10**9
to_timestamp = json_queries['to'] * 10**9
request_id = str(uuid4())
Nikolay Stanchev
committed
build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client)
json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}}
json_queries.pop("from")
json_queries.pop("to")
return json_queries
@view_config(route_name='graph_manage', request_method='DELETE')
def delete_temporal_graph(self):
"""
An API endpoint to delete a temporal graph associated with a uuid generated by the CLMC service.
:return: A JSON document containing the UUID of the deleted subgraph
:raises HTTPNotFound: if the request is not associated with any subgraph
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
if graph.nodes.match("Reference", uuid=graph_id).first() is None:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id)
return {"uuid": graph_id, "deleted": number_of_deleted_nodes}
@view_config(route_name='graph_algorithms_rtt', request_method='GET')
def run_rtt_query(self):
"""
An API endpoint to run the round trip time cypher query over the graph associated with a given request ID.
:return: A JSON response with a list of forward latencies, reverse latencies and SF endpoint response time.
:raises HTTPBadRequest: if the request URL doesn't contain the required URL query parameters
:raises HTTPNotFound: if the request is not associated with any subgraph or the compute node / endpoint node doesn't exist
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
try:
params = validate_graph_rtt_params(self.request.params)
except AssertionError as e:
raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args))
startpoint_node_label = params["startpoint"]
endpoint_node_label = params["endpoint"]
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
all_nodes = graph.nodes
reference_node = all_nodes.match("Reference", uuid=graph_id).first()
if reference_node is None:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
# match a switch or cluster node as a path start point and capture the type of the node
startpoint_node = all_nodes.match("Switch", name=startpoint_node_label).first()
if startpoint_node is None:
startpoint_node = all_nodes.match("Cluster", name=startpoint_node_label).first()
if startpoint_node is None:
# no match with Swtich/Cluster node
raise HTTPNotFound("Starting point node {0} doesn't exist.".format(startpoint_node_label))
else:
# a match with a Cluster node
startpoint_node_type = "Cluster"
else:
# a match with a Switch node
startpoint_node_type = "Switch"
endpoint_node = all_nodes.match("Endpoint", name=endpoint_node_label, uuid=graph_id).first()
if endpoint_node is None:
raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label))
# check if the endpoint is hosted by the compute node before running the RTT cypher query
hosted_by_node = graph.relationships.match(nodes=(endpoint_node, None), r_type="hostedBy").first().end_node
if hosted_by_node["name"] == startpoint_node["name"]:
result = {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"],
"request_size": endpoint_node["request_size"], "response_size": endpoint_node["response_size"]}
else:
query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(startpoint_node_type, startpoint_node_label, endpoint_node_label, graph_id)
log.info("Executing cypher query: {0}".format(query_to_execute))
data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result
result = data[0]
sf_node = graph.match(nodes=(None, endpoint_node), r_type="realisedBy").first().start_node
if sf_node is None:
msg = "No service function found associated with endpoint {0}".format(endpoint_node["name"])
log.error("Unexpected error: {0}".format(msg))
raise HTTPBadRequest(msg)
sf_package_node = graph.match(nodes=(sf_node, None), r_type="instanceOf").first().end_node
if sf_package_node is None:
msg = "No service function package found associated with service function {0}".format(sf_node["name"])
log.error("Unexpected error: {0}".format(msg))
raise HTTPBadRequest(msg)
result["global_tags"] = {"flame_sfe": endpoint_node["name"], "flame_server": hosted_by_node["name"], "flame_location": hosted_by_node["name"],
"flame_sfc": reference_node["sfc"], "flame_sfci": reference_node["sfci"], "flame_sfp": sf_package_node["name"], "flame_sf": sf_node["name"]}
Nikolay Stanchev
committed
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# calculate the Round-Trip-Time
total_forward_latency = sum(result["forward_latencies"])
result["total_forward_latency"] = total_forward_latency
total_reverse_latency = sum(result["reverse_latencies"])
result["total_reverse_latency"] = total_reverse_latency
bandwidth = self.request.registry.settings["network_bandwidth"]
result["bandwidth"] = bandwidth
service_delay = result["response_time"]
request_size = result["request_size"]
response_size = result["response_size"]
round_trip_time = self.calculate_round_trip_time(total_forward_latency, total_reverse_latency, service_delay, request_size, response_size, bandwidth)
result["round_trip_time"] = round_trip_time
return result
@staticmethod
def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50):
"""
Calculates the round trip time given the list of arguments.
:param forward_latency: network latency in forward direction (s)
:param reverse_latency: network latency in reverse direction (s)
:param service_delay: media service delay (s)
:param request_size: request size (bytes)
:param response_size: response size (bytes)
:param bandwidth: network bandwidth (Mb/s)
:param packet_size: size of packet (bytes)
:param packet_header_size: size of the header of the packet (bytes)
:return: the calculated round trip time
"""
if forward_latency > 0 and reverse_latency > 0:
forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
else:
forward_data_delay, reverse_data_delay = 0, 0
return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay
Nikolay Stanchev
committed
@view_config(route_name='graph_network_topology', request_method='POST')
def build_network_topology(self):
"""
An API endpoint to build/update the network topology in the neo4j graph.
:return: A JSON response with the number of switches and clusters that were built.
"""
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
sdn_controller_ip = self.request.registry.settings['sdn_controller_ip']
Nikolay Stanchev
committed
sdn_controller_port = self.request.registry.settings['sdn_controller_port']
Nikolay Stanchev
committed
# retrieve all switches - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
try:
Nikolay Stanchev
committed
url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/core/controller/switches/json")
Nikolay Stanchev
committed
response = get(url)
except exceptions.ConnectionError:
Nikolay Stanchev
committed
msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
Nikolay Stanchev
committed
log.error("Unexpected error: {0}".format(msg))
raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")
# check if the SDN controller returned the expected response
if response.status_code != 200:
msg = "The SDN controller returned a response with status code different than 200."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the list of switches.")
try:
content = response.json()
except ValueError: # response not in JSON
msg = "The SDN controller returned a response which couldn't be converted to JSON."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the list of switches.")
# map the DPID of each switch to its IP address
switches = {}
for switch in content:
# map the dpid to the switch IP address, the IP address is in the format '/172.168.23.54:1234'
switches[switch["switchDPID"]] = switch["inetAddress"][1:].split(":")[0]
# retrieve all external links (gathered through BDDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
Nikolay Stanchev
committed
try:
Nikolay Stanchev
committed
url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/external-links/json")
Nikolay Stanchev
committed
response = get(url)
except exceptions.ConnectionError:
Nikolay Stanchev
committed
msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
Nikolay Stanchev
committed
log.error("Unexpected error: {0}".format(msg))
raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")
# check if the SDN controller returned the expected response
if response.status_code != 200:
msg = "The SDN controller returned a response with status code different than 200."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.")
try:
external_links = response.json()
except ValueError: # response not in JSON
msg = "The SDN controller returned a response which couldn't be converted to JSON."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.")
# retrieve all local links (gathered through LLDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
try:
Nikolay Stanchev
committed
url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/links/json")
response = get(url)
except exceptions.ConnectionError:
Nikolay Stanchev
committed
msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
log.error("Unexpected error: {0}".format(msg))
raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")
if response.status_code != 200:
msg = "The SDN controller returned a response with status code different than 200."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.")
try:
local_links = response.json()
Nikolay Stanchev
committed
except ValueError: # response not in JSON
msg = "The SDN controller returned a response which couldn't be converted to JSON."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.")
Nikolay Stanchev
committed
# TODO this is a temporary solution - currently the service router to clusters mapping is read from a file (which must be manually prepared beforehand)
clusters_file = self.request.registry.settings["network_clusters_path"]
Nikolay Stanchev
committed
try:
with open(clusters_file) as fh:
clusters = load(fh)
except Exception as e:
log.error("Unexpected error: {0}".format(e))
log.error("No service-router-to-cluster mapping was found while building the network topology.")
Nikolay Stanchev
committed
clusters = {}
# TODO this is a temporary solution - currently the service router to ues mapping is read from a file (which must be manually prepared beforehand)
ues_file = self.request.registry.settings["network_ues_path"]
try:
with open(ues_file) as fh:
ues = load(fh)
except Exception as e:
log.error("Unexpected error: {0}".format(e))
log.error("No ue-to-cluster mapping was found while building the network topology.")
ues = {}
Nikolay Stanchev
committed
# build the network graph and retrieve the number of switch nodes and cluster nodes that were created
tmp_switch_count, tmp_clusters_count, tmp_ues_count = build_network_graph(graph, switches, external_links, clusters, ues)
switch_count, clusters_count, ues_count = build_network_graph(graph, switches, local_links, clusters, ues)
switch_count += tmp_switch_count
clusters_count += tmp_clusters_count
ues_count += tmp_ues_count
Nikolay Stanchev
committed
return {"new_switches_count": switch_count, "new_clusters_count": clusters_count, "new_ues_count": ues_count}
@view_config(route_name='graph_network_topology', request_method='DELETE')
def delete_network_topology(self):
"""
An API endpoint to delete the network topology in the neo4j graph.
:return: A JSON response with the number of switches and clusters that were deleted.
"""
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
deleted_switches, deleted_clusters, deleted_ues = delete_network_graph(graph)
return {"deleted_switches_count": deleted_switches, "deleted_clusters_count": deleted_clusters, "deleted_ues_count": deleted_ues}