-
Nikolay Stanchev authoredNikolay Stanchev authored
views.py 17.68 KiB
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 04-07-2018
// Created for Project : FLAME
"""
from clmcservice.graphapi.utilities import validate_json_queries_body, RTT_CYPHER_QUERY_TEMPLATE, \
build_network_graph, delete_network_graph, build_temporal_subgraph, delete_temporal_subgraph, validate_graph_rtt_params
from influxdb import InfluxDBClient
from py2neo import Graph
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPServiceUnavailable, HTTPNotImplemented
from pyramid.view import view_defaults, view_config
from requests import exceptions, get
from uuid import uuid4
from json import load
import logging
log = logging.getLogger('service_logger')
@view_defaults(renderer='json')
class GraphAPI(object):
"""
A class-based view for building temporal graphs and running graph queries.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
@view_config(route_name='graph_build', request_method='POST')
def build_temporal_graph(self):
"""
An API endpoint to build a temporal graph and store it in neo4j based on the posted JSON query document.
:raises HTTPBadRequest: if request body is not a valid JSON with the queries per service function
:return: A JSON document containing the posted request body, along with meta data about the built graph (time range and uuid, which can then be reused for other API calls)
"""
try:
body = self.request.body.decode(self.request.charset)
json_queries = validate_json_queries_body(body) # validate the content and receive a json dictionary object
except AssertionError as e:
raise HTTPBadRequest("Bad request content: {0}".format(e.args))
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])
influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10)
database_name = json_queries["service_function_chain"]
if database_name not in [db["name"] for db in influx_client.get_list_database()]:
raise HTTPBadRequest("Database for service function chain {0} not found.".format(database_name))
from_timestamp = json_queries['from'] * 10**9
to_timestamp = json_queries['to'] * 10**9
request_id = str(uuid4())
build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client)
json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}}
json_queries.pop("from")
json_queries.pop("to")
return json_queries
@view_config(route_name='graph_manage', request_method='DELETE')
def delete_temporal_graph(self):
"""
An API endpoint to delete a temporal graph associated with a uuid generated by the CLMC service.
:return: A JSON document containing the UUID of the deleted subgraph
:raises HTTPNotFound: if the request is not associated with any subgraph
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
if graph.nodes.match("Reference", uuid=graph_id).first() is None:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id)
return {"uuid": graph_id, "deleted": number_of_deleted_nodes}
@view_config(route_name='graph_algorithms_rtt', request_method='GET')
def run_rtt_query(self):
"""
An API endpoint to run the round trip time cypher query over the graph associated with a given request ID.
:return: A JSON response with a list of forward latencies, reverse latencies and SF endpoint response time.
:raises HTTPBadRequest: if the request URL doesn't contain the required URL query parameters
:raises HTTPNotFound: if the request is not associated with any subgraph or the compute node / endpoint node doesn't exist
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
try:
params = validate_graph_rtt_params(self.request.params)
except AssertionError as e:
raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args))
startpoint_node_label = params["startpoint"]
endpoint_node_label = params["endpoint"]
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
all_nodes = graph.nodes
reference_node = all_nodes.match("Reference", uuid=graph_id).first()
if reference_node is None:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
# match a switch or cluster node as a path start point and capture the type of the node
startpoint_node = all_nodes.match("Switch", name=startpoint_node_label).first()
if startpoint_node is None:
startpoint_node = all_nodes.match("Cluster", name=startpoint_node_label).first()
if startpoint_node is None:
# no match with Swtich/Cluster node
raise HTTPNotFound("Starting point node {0} doesn't exist.".format(startpoint_node_label))
else:
# a match with a Cluster node
startpoint_node_type = "Cluster"
else:
# a match with a Switch node
startpoint_node_type = "Switch"
endpoint_node = all_nodes.match("Endpoint", name=endpoint_node_label, uuid=graph_id).first()
if endpoint_node is None:
raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label))
# check if the endpoint is hosted by the compute node before running the RTT cypher query
hosted_by_node = graph.relationships.match(nodes=(endpoint_node, None), r_type="hostedBy").first().end_node
if hosted_by_node["name"] == startpoint_node["name"]:
result = {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"],
"request_size": endpoint_node["request_size"], "response_size": endpoint_node["response_size"]}
else:
query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(startpoint_node_type, startpoint_node_label, endpoint_node_label, graph_id)
log.info("Executing cypher query: {0}".format(query_to_execute))
data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result
result = data[0]
sf_node = graph.match(nodes=(None, endpoint_node), r_type="realisedBy").first().start_node
if sf_node is None:
msg = "No service function found associated with endpoint {0}".format(endpoint_node["name"])
log.error("Unexpected error: {0}".format(msg))
raise HTTPBadRequest(msg)
sf_package_node = graph.match(nodes=(sf_node, None), r_type="instanceOf").first().end_node
if sf_package_node is None:
msg = "No service function package found associated with service function {0}".format(sf_node["name"])
log.error("Unexpected error: {0}".format(msg))
raise HTTPBadRequest(msg)
result["global_tags"] = {"flame_sfe": endpoint_node["name"], "flame_server": hosted_by_node["name"], "flame_location": hosted_by_node["name"],
"flame_sfc": reference_node["sfc"], "flame_sfci": reference_node["sfci"], "flame_sfp": sf_package_node["name"], "flame_sf": sf_node["name"]}
# calculate the Round-Trip-Time
total_forward_latency = sum(result["forward_latencies"])
result["total_forward_latency"] = total_forward_latency
total_reverse_latency = sum(result["reverse_latencies"])
result["total_reverse_latency"] = total_reverse_latency
bandwidth = self.request.registry.settings["network_bandwidth"]
result["bandwidth"] = bandwidth
service_delay = result["response_time"]
request_size = result["request_size"]
response_size = result["response_size"]
round_trip_time = self.calculate_round_trip_time(total_forward_latency, total_reverse_latency, service_delay, request_size, response_size, bandwidth)
result["round_trip_time"] = round_trip_time
return result
@staticmethod
def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50):
"""
Calculates the round trip time given the list of arguments.
:param forward_latency: network latency in forward direction (s)
:param reverse_latency: network latency in reverse direction (s)
:param service_delay: media service delay (s)
:param request_size: request size (bytes)
:param response_size: response size (bytes)
:param bandwidth: network bandwidth (Mb/s)
:param packet_size: size of packet (bytes)
:param packet_header_size: size of the header of the packet (bytes)
:return: the calculated round trip time
"""
if forward_latency > 0 and reverse_latency > 0:
forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size))
else:
forward_data_delay, reverse_data_delay = 0, 0
return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay
@view_config(route_name='graph_network_topology', request_method='POST')
def build_network_topology(self):
"""
An API endpoint to build/update the network topology in the neo4j graph.
:return: A JSON response with the number of switches and clusters that were built.
"""
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
sdn_controller_ip = self.request.registry.settings['sdn_controller_ip']
sdn_controller_port = self.request.registry.settings['sdn_controller_port']
# retrieve all switches - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
try:
url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/core/controller/switches/json")
response = get(url)
except exceptions.ConnectionError:
msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
log.error("Unexpected error: {0}".format(msg))
raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")
# check if the SDN controller returned the expected response
if response.status_code != 200:
msg = "The SDN controller returned a response with status code different than 200."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the list of switches.")
try:
content = response.json()
except ValueError: # response not in JSON
msg = "The SDN controller returned a response which couldn't be converted to JSON."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the list of switches.")
# map the DPID of each switch to its IP address
switches = {}
for switch in content:
# map the dpid to the switch IP address, the IP address is in the format '/172.168.23.54:1234'
switches[switch["switchDPID"]] = switch["inetAddress"][1:].split(":")[0]
# retrieve all external links (gathered through BDDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
try:
url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/external-links/json")
response = get(url)
except exceptions.ConnectionError:
msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
log.error("Unexpected error: {0}".format(msg))
raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")
# check if the SDN controller returned the expected response
if response.status_code != 200:
msg = "The SDN controller returned a response with status code different than 200."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.")
try:
external_links = response.json()
except ValueError: # response not in JSON
msg = "The SDN controller returned a response which couldn't be converted to JSON."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.")
# retrieve all local links (gathered through LLDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable
try:
url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/links/json")
response = get(url)
except exceptions.ConnectionError:
msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port)
log.error("Unexpected error: {0}".format(msg))
raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.")
if response.status_code != 200:
msg = "The SDN controller returned a response with status code different than 200."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.")
try:
local_links = response.json()
except ValueError: # response not in JSON
msg = "The SDN controller returned a response which couldn't be converted to JSON."
log.error("Unexpected error: {0}".format(msg))
raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.")
# TODO this is a temporary solution - currently the service router to clusters mapping is read from a file (which must be manually prepared beforehand)
clusters_file = self.request.registry.settings["network_configuration_path"]
try:
with open(clusters_file) as fh:
clusters = load(fh)
except Exception as e:
log.error("Unexpected error: {0}".format(e))
log.error("No service_router-to-cluster mapping was found while building the network topology.")
clusters = {}
# build the network graph and retrieve the number of switch nodes and cluster nodes that were created
tmp_switch_count, tmp_clusters_count = build_network_graph(graph, switches, external_links, clusters)
switch_count, clusters_count = build_network_graph(graph, switches, local_links, clusters)
switch_count += tmp_switch_count
clusters_count += tmp_clusters_count
return {"new_switches_count": switch_count, "new_clusters_count": clusters_count}
@view_config(route_name='graph_network_topology', request_method='DELETE')
def delete_network_topology(self):
"""
An API endpoint to delete the network topology in the neo4j graph.
:return: A JSON response with the number of switches and clusters that were deleted.
"""
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
deleted_switches, deleted_clusters = delete_network_graph(graph)
return {"deleted_switches_count": deleted_switches, "deleted_clusters_count": deleted_clusters}