#!/usr/bin/python3 """ // © University of Southampton IT Innovation Centre, 2018 // // Copyright in this software belongs to University of Southampton // IT Innovation Centre of Gamma House, Enterprise Road, // Chilworth Science Park, Southampton, SO16 7NS, UK. // // This software may not be used, sold, licensed, transferred, copied // or reproduced in whole or in part in any manner or form or in or // on any media by any person other than in accordance with the terms // of the Licence Agreement supplied with the software, or otherwise // without the prior written consent of the copyright owners. // // This software is distributed WITHOUT ANY WARRANTY, without even the // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // PURPOSE, except where stated in the Licence Agreement supplied with // the software. // // Created By : Nikolay Stanchev // Created Date : 04-07-2018 // Created for Project : FLAME """ from clmcservice.graphapi.utilities import validate_json_queries_body, RTT_CYPHER_QUERY_TEMPLATE, \ build_network_graph, delete_network_graph, build_temporal_subgraph, delete_temporal_subgraph, validate_graph_rtt_params, find_node_with_possible_types from influxdb import InfluxDBClient from py2neo import Graph from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPServiceUnavailable, HTTPNotImplemented from pyramid.view import view_defaults, view_config from requests import exceptions, get from uuid import uuid4 from json import load import logging log = logging.getLogger('service_logger') @view_defaults(renderer='json') class GraphAPI(object): """ A class-based view for building temporal graphs and running graph queries. """ def __init__(self, request): """ Initialises the instance of the view with the request argument. :param request: client's call request """ self.request = request @view_config(route_name='graph_build', request_method='POST') def build_temporal_graph(self): """ An API endpoint to build a temporal graph and store it in neo4j based on the posted JSON query document. :raises HTTPBadRequest: if request body is not a valid JSON with the queries per service function :return: A JSON document containing the posted request body, along with meta data about the built graph (time range and uuid, which can then be reused for other API calls) """ try: body = self.request.body.decode(self.request.charset) json_queries = validate_json_queries_body(body) # validate the content and receive a json dictionary object except AssertionError as e: raise HTTPBadRequest("Bad request content: {0}".format(e.args)) graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10) database_name = json_queries["service_function_chain"] if database_name not in [db["name"] for db in influx_client.get_list_database()]: raise HTTPBadRequest("Database for service function chain {0} not found.".format(database_name)) from_timestamp = json_queries['from'] * 10**9 to_timestamp = json_queries['to'] * 10**9 request_id = str(uuid4()) build_temporal_subgraph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client) json_response = {"database": database_name, 'graph': {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}}} return json_response @view_config(route_name='graph_manage', request_method='DELETE') def delete_temporal_graph(self): """ An API endpoint to delete a temporal graph associated with a uuid generated by the CLMC service. :return: A JSON document containing the UUID of the deleted subgraph :raises HTTPNotFound: if the request is not associated with any subgraph """ graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db if graph.nodes.match("Reference", uuid=graph_id).first() is None: raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id)) number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id) return {"deleted": number_of_deleted_nodes} @view_config(route_name='graph_algorithms_rtt', request_method='GET') def run_rtt_query(self): """ An API endpoint to run the round trip time cypher query over the graph associated with a given request ID. :return: A JSON response with a list of forward latencies, reverse latencies and SF endpoint response time. :raises HTTPBadRequest: if the request URL doesn't contain the required URL query parameters :raises HTTPNotFound: if the request is not associated with any subgraph or the compute node / endpoint node doesn't exist """ graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL try: params = validate_graph_rtt_params(self.request.params) except AssertionError as e: raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args)) startpoint_node_label = params["startpoint"] endpoint_node_label = params["endpoint"] graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db all_nodes = graph.nodes reference_node = all_nodes.match("Reference", uuid=graph_id).first() if reference_node is None: raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id)) # match a switch, cluster or ue node as a path start point and capture the type of the node startpoint_node, startpoint_node_type = find_node_with_possible_types(startpoint_node_label, ("Switch", "Cluster", "UserEquipment"), graph) if startpoint_node is None: raise HTTPNotFound("Starting point node {0} doesn't exist.".format(startpoint_node_label)) endpoint_node = all_nodes.match("Endpoint", name=endpoint_node_label, uuid=graph_id).first() if endpoint_node is None: raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label)) # check if the endpoint is hosted by the compute node before running the RTT cypher query hosted_by_node = graph.relationships.match(nodes=(endpoint_node, None), r_type="hostedBy").first().end_node if hosted_by_node["name"] == startpoint_node["name"]: result = {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"], "request_size": endpoint_node["request_size"], "response_size": endpoint_node["response_size"]} else: query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(startpoint_node_type, startpoint_node_label, endpoint_node_label, graph_id) log.info("Executing cypher query: {0}".format(query_to_execute)) data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result result = data[0] sf_node = graph.match(nodes=(None, endpoint_node), r_type="realisedBy").first().start_node if sf_node is None: msg = "No service function found associated with endpoint {0}".format(endpoint_node["name"]) log.error("Unexpected error: {0}".format(msg)) raise HTTPBadRequest(msg) sf_package_node = graph.match(nodes=(sf_node, None), r_type="instanceOf").first().end_node if sf_package_node is None: msg = "No service function package found associated with service function {0}".format(sf_node["name"]) log.error("Unexpected error: {0}".format(msg)) raise HTTPBadRequest(msg) result["global_tags"] = {"flame_sfe": endpoint_node["name"], "flame_server": hosted_by_node["name"], "flame_location": hosted_by_node["name"], "flame_sfc": reference_node["sfc"], "flame_sfci": reference_node["sfci"], "flame_sfp": sf_package_node["name"], "flame_sf": sf_node["name"]} result["local_tags"] = {"traffic_source": startpoint_node_label} # calculate the Round-Trip-Time total_forward_latency = sum(result["forward_latencies"]) result["total_forward_latency"] = total_forward_latency total_reverse_latency = sum(result["reverse_latencies"]) result["total_reverse_latency"] = total_reverse_latency bandwidth = self.request.registry.settings["network_bandwidth"] result["bandwidth"] = bandwidth service_delay = result["response_time"] request_size = result["request_size"] response_size = result["response_size"] round_trip_time = self.calculate_round_trip_time(total_forward_latency, total_reverse_latency, service_delay, request_size, response_size, bandwidth) result["round_trip_time"] = round_trip_time return result @staticmethod def calculate_round_trip_time(forward_latency, reverse_latency, service_delay, request_size, response_size, bandwidth, packet_size=1500, packet_header_size=50): """ Calculates the round trip time given the list of arguments. :param forward_latency: network latency in forward direction (s) :param reverse_latency: network latency in reverse direction (s) :param service_delay: media service delay (s) :param request_size: request size (bytes) :param response_size: response size (bytes) :param bandwidth: network bandwidth (Mb/s) :param packet_size: size of packet (bytes) :param packet_header_size: size of the header of the packet (bytes) :return: the calculated round trip time """ if forward_latency > 0 and reverse_latency > 0: forward_data_delay = (8/10**6) * (request_size / bandwidth) * (packet_size / (packet_size - packet_header_size)) reverse_data_delay = (8/10**6) * (response_size / bandwidth) * (packet_size / (packet_size - packet_header_size)) else: forward_data_delay, reverse_data_delay = 0, 0 return forward_latency + forward_data_delay + service_delay + reverse_latency + reverse_data_delay @view_config(route_name='graph_network_topology', request_method='POST') def build_network_topology(self): """ An API endpoint to build/update the network topology in the neo4j graph. :return: A JSON response with the number of switches and clusters that were built. """ graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db sdn_controller_ip = self.request.registry.settings['sdn_controller_ip'] sdn_controller_port = self.request.registry.settings['sdn_controller_port'] # retrieve all switches - if SDN controller is unavailable on the given IP address return 503 Service Unavailable try: url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/core/controller/switches/json") response = get(url) except exceptions.ConnectionError: msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port) log.error("Unexpected error: {0}".format(msg)) raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.") # check if the SDN controller returned the expected response if response.status_code != 200: msg = "The SDN controller returned a response with status code different than 200." log.error("Unexpected error: {0}".format(msg)) raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the list of switches.") try: content = response.json() except ValueError: # response not in JSON msg = "The SDN controller returned a response which couldn't be converted to JSON." log.error("Unexpected error: {0}".format(msg)) raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the list of switches.") # map the DPID of each switch to its IP address switches = {} for switch in content: # map the dpid to the switch IP address, the IP address is in the format '/172.168.23.54:1234' switches[switch["switchDPID"]] = switch["inetAddress"][1:].split(":")[0] # retrieve all external links (gathered through BDDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable try: url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/external-links/json") response = get(url) except exceptions.ConnectionError: msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port) log.error("Unexpected error: {0}".format(msg)) raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.") # check if the SDN controller returned the expected response if response.status_code != 200: msg = "The SDN controller returned a response with status code different than 200." log.error("Unexpected error: {0}".format(msg)) raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.") try: external_links = response.json() except ValueError: # response not in JSON msg = "The SDN controller returned a response which couldn't be converted to JSON." log.error("Unexpected error: {0}".format(msg)) raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.") # retrieve all local links (gathered through LLDP) - if SDN controller is unavailable on the given IP address return 503 Service Unavailable try: url = "http://{0}:{1}{2}".format(sdn_controller_ip, sdn_controller_port, "/wm/topology/links/json") response = get(url) except exceptions.ConnectionError: msg = "The SDN controller is not available on IP {0} and port {1}.".format(sdn_controller_ip, sdn_controller_port) log.error("Unexpected error: {0}".format(msg)) raise HTTPServiceUnavailable("The SDN controller couldn't be reached when trying to build the network topology.") if response.status_code != 200: msg = "The SDN controller returned a response with status code different than 200." log.error("Unexpected error: {0}".format(msg)) raise HTTPNotImplemented("The SDN controller failed to return a successful response when querying for the network topology.") try: local_links = response.json() except ValueError: # response not in JSON msg = "The SDN controller returned a response which couldn't be converted to JSON." log.error("Unexpected error: {0}".format(msg)) raise HTTPNotImplemented("The SDN controller failed to return a valid JSON response when querying for the network topology.") # TODO this is a temporary solution - currently the service router to clusters mapping is read from a file (which must be manually prepared beforehand) clusters_file = self.request.registry.settings["network_clusters_path"] try: with open(clusters_file) as fh: clusters = load(fh) except Exception as e: log.error("Unexpected error: {0}".format(e)) log.error("No service-router-to-cluster mapping was found while building the network topology.") clusters = {} # TODO this is a temporary solution - currently the service router to ues mapping is read from a file (which must be manually prepared beforehand) ues_file = self.request.registry.settings["network_ues_path"] try: with open(ues_file) as fh: ues = load(fh) except Exception as e: log.error("Unexpected error: {0}".format(e)) log.error("No service-route-to-ue mapping was found while building the network topology.") ues = {} # build the network graph and retrieve the number of switch nodes and cluster nodes that were created tmp_switch_count, tmp_clusters_count, tmp_ues_count = build_network_graph(graph, switches, external_links, clusters, ues) switch_count, clusters_count, ues_count = build_network_graph(graph, switches, local_links, clusters, ues) switch_count += tmp_switch_count clusters_count += tmp_clusters_count ues_count += tmp_ues_count return {"new_switches_count": switch_count, "new_clusters_count": clusters_count, "new_ues_count": ues_count} @view_config(route_name='graph_network_topology', request_method='DELETE') def delete_network_topology(self): """ An API endpoint to delete the network topology in the neo4j graph. :return: A JSON response with the number of switches and clusters that were deleted. """ graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db deleted_switches, deleted_clusters, deleted_ues = delete_network_graph(graph) return {"deleted_switches_count": deleted_switches, "deleted_clusters_count": deleted_clusters, "deleted_ues_count": deleted_ues}