Skip to content
Snippets Groups Projects
Commit b1dd6dfd authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Implements the initial version of the Graph API

parent 3d6a02cd
No related branches found
No related tags found
No related merge requests found
import getopt
import sys
from influxdb import InfluxDBClient
from json import load
def generate_network_measurements(influx_host, db_name, network_config_path):
"""
Generates network measurements which follow the telegraf ping plugin format.
:param influx_host: influx DB host
:param db_name: name of database
:param network_config_path the path to the network configuration file
"""
with open(network_config_path) as fh:
json_data = load(fh)
# declares the data to push to influx - host, url, avg_response_ms, min_response_ms, max_response_ms
data = ((link["source"], link["target"], link["avg_response_time"], link["min_response_time"], link["max_response_time"]) for link in json_data["links"])
json_body = [
{"measurement": "ping",
"tags": {"host": host, "url": url},
"fields": {"packets_transmitted": 10, "reply_received": 10, "packets_received": 10,
"percent_reply_loss": 0, "percent_packets_loss": 0, "errors": 0, "average_response_ms": avg_ms,
"minimum_response_ms": min_ms, "maximum_response_ms": max_ms, "result_code": 0},
"time": 1528385860 * 10**9
} for host, url, avg_ms, min_ms, max_ms in data
]
print("Establishing connection with influx DB on {0} with database {1}".format(influx_host, db_name))
db_client = InfluxDBClient(host=influx_host, timeout=10, database=db_name)
db_client.drop_measurement("ping") # clear data in the ping measurement from previous executions of this script
print("Writing network latency data to influx..\n")
assert db_client.write_points(json_body) # assert the write method returns True - successful write
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h:d:p:", ['host=', 'database=', 'path='])
except getopt.GetoptError:
print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>')
sys.exit(1)
if len(opts) != 3:
print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>')
sys.exit(1)
db_host, database, path = None, None, None
# Apply options, if any
for opt, arg in opts:
if opt in ('-h', '--host'):
db_host = arg
elif opt in ('-d', '--database'):
database = arg
elif opt in ('-p', '--path'):
path = arg
if all([db_host is not None, database is not None, path is not None]):
generate_network_measurements(db_host, database, path)
...@@ -43,6 +43,7 @@ def main(global_config, **settings): ...@@ -43,6 +43,7 @@ def main(global_config, **settings):
settings[CONF_OBJECT] = conf settings[CONF_OBJECT] = conf
settings[MALFORMED_FLAG] = False settings[MALFORMED_FLAG] = False
settings['influx_port'] = int(settings['influx_port']) # the influx port setting must be converted to integer instead of a string
config = Configurator(settings=settings) config = Configurator(settings=settings)
...@@ -59,5 +60,10 @@ def main(global_config, **settings): ...@@ -59,5 +60,10 @@ def main(global_config, **settings):
config.add_route('config_sfc', '/config/sf-chains') config.add_route('config_sfc', '/config/sf-chains')
config.add_route('config_sfc_instance', '/config/sf-chains/instance') config.add_route('config_sfc_instance', '/config/sf-chains/instance')
# add routes of the GRAPH API
config.add_route('graph_build', '/graph/build')
config.add_route('graph_manage', '/graph/temporal/{graph_id}')
config.add_route('graph_algorithms_rtt', '/graph/temporal/{graph_id}/algorithms/round-trip-time')
config.scan() # This method scans the packages and finds any views related to the routes added in the app configuration config.scan() # This method scans the packages and finds any views related to the routes added in the app configuration
return config.make_wsgi_app() return config.make_wsgi_app()
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 04-07-2018
// Created for Project : FLAME
"""
from itertools import permutations
from json import loads
from py2neo import Node, Relationship
import logging
GRAPH_ROUND_TRIP_TIME_URL_PARAMS = ("compute_node", "endpoint")
GRAPH_BUILD_URL_PARAMS = ("from", "to")
GRAPH_BUILD_QUERY_PARAMS = {"database", "retention_policy", "service_function_chain_instance", "service_functions"}
GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "measurement_name"}
INFLUX_QUERY_TEMPLATE = 'SELECT {0} AS mean_response_time FROM "{1}"."{2}".{3} WHERE sfc_i=\'{4}\' and time>={5} and time<{6} GROUP BY ipendpoint, location, sf_i'
RTT_CYPHER_QUERY_TEMPLATE = """
MATCH (dc:ComputeNode {{ name: '{0}' }}),(endpoint:Endpoint {{ name: '{1}', uuid: '{2}'}}),
path = shortestPath((dc)-[*]-(endpoint))
WHERE ALL(r IN relationships(path) WHERE type(r)='linkedTo' or type(r)='hostedBy' )
WITH nodes(path) as all_nodes, endpoint as endpoint
WITH all_nodes[0..size(all_nodes)-1] as network_nodes, endpoint as endpoint
UNWIND RANGE(0, size(network_nodes) - 2) as id
WITH network_nodes[id] as source, network_nodes[id+1] as target, endpoint.response_time as response_time
MATCH (source) -[r1]-> (target), (target) -[r2]-> (source)
RETURN collect(r1.latency) as forward_latencies, reverse(collect(r2.latency)) as reverse_latencies, response_time
"""
log = logging.getLogger('service_logger')
def validate_json_queries_body(body):
"""
Validates the request body containing mappings from service functions to queries to execute.
:param body: the request body to validate
:return the validated json queries dictionary object
:raise AssertionError: if the body is invalid
"""
global GRAPH_BUILD_QUERY_PARAMS
try:
body = loads(body)
except:
raise AssertionError("Configuration must be a JSON object.")
assert GRAPH_BUILD_QUERY_PARAMS == set(body.keys()), "Invalid JSON query document."
for sf in body["service_functions"]:
query_data = body["service_functions"][sf]
assert type(query_data) == dict, "Each service function must be associated with a respective JSON object."
assert GRAPH_BUILD_SF_QUERY_PARAMS == set(query_data.keys()), "Invalid query data for service function {0} in the JSON query document".format(sf)
return body
def validate_graph_url_params(params):
"""
Validates the request url parameters used in building a temporal graph.
:param params: the parameters dictionary to validate
:return: the validated parameters
:raise AssertionError: for invalid parameters
"""
global GRAPH_BUILD_URL_PARAMS
url_params = {}
for param in GRAPH_BUILD_URL_PARAMS:
assert param in params, "Incorrect url parameters - required url query parameter '{0}' is not found in the request parameters.".format(param)
url_params[param] = params[param]
# convert timestamps to integers
url_params['from'] = int(url_params['from'])
url_params['to'] = int(url_params['to'])
return url_params
def validate_graph_rtt_params(params):
"""
Validates the request url parameters used in running a round trip time cypher query.
:param params: the parameters dictionary to validate
:return: the validated parameters
:raise AssertionError: for invalid parameters
"""
global GRAPH_ROUND_TRIP_TIME_URL_PARAMS
url_params = {}
for param in GRAPH_ROUND_TRIP_TIME_URL_PARAMS:
assert param in params, "Incorrect url parameters - required url query parameter '{0}' is not found in the request parameters.".format(param)
url_params[param] = params[param]
return url_params
def find_or_create_node(graph, node_type, **properties):
"""
This function checks if a node of the given type with the given properties exists, and if not - creates it.
:param graph: the graph object
:param node_type: the type of the node to find or create
:param properties: the properties of the node to find or create
:return: the found or newly created node object
"""
if 'uuid' in properties:
node = graph.nodes.match(node_type, name=properties['name'], uuid=properties['uuid']).first()
else:
node = graph.nodes.match(node_type, name=properties['name']).first()
if node is None:
log.info("Creating node of type {0} with properties {1}".format(node_type, properties))
node = Node(node_type, **properties)
graph.create(node)
return node
def find_or_create_edge(graph, edge_type, from_node, to_node, **properties):
"""
This function checks if an edge of the given type with the given properties exists, and if not - creates it.
:param graph: the graph object
:param edge_type: the type of the edge to find or create
:param from_node: the source of the edge
:param to_node: the target of the edge
:param properties: the properties of the edge to find or create
:return: the found or newly created edge object
"""
edge = graph.relationships.match(nodes=(from_node, to_node), r_type=edge_type).first()
if edge is None:
log.info("Creating edge of type {0} from node {1} to node {2} with properties {3}".format(edge_type, from_node, to_node, properties))
edge = Relationship(from_node, edge_type, to_node, **properties)
graph.create(edge)
return edge
def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client):
"""
A function used to generate a temporal graph in the neo4j db.
:param request_id: the ID of the request
:param from_timestamp: the start of the time range
:param to_timestamp: the end of the time range
:param json_queries: the JSON object containing the query data for each service function
:param graph: the graph DB object
:param influx_client: the influx DB client object
"""
global INFLUX_QUERY_TEMPLATE
db = json_queries["database"]
rp = json_queries["retention_policy"]
sfc_i = json_queries["service_function_chain_instance"]
log.info("Building graph for service function chain {0} from database {1} with retention policy {2}".format(sfc_i, db, rp))
compute_nodes = set() # a set is used to keep track of all compute nodes that are found while building the graph, which is then used to retrieve the network latencies
# traverse the list of service functions
for service_function in json_queries["service_functions"]:
query_data = json_queries["service_functions"][service_function]
field_to_query = query_data["response_time_field"]
measurement = query_data["measurement_name"]
# build up the query by setting the placeholders in the query template
query_to_execute = INFLUX_QUERY_TEMPLATE.format(field_to_query, db, rp, measurement, sfc_i, from_timestamp, to_timestamp)
# create a node for the service function if it doesn't exist
service_function_node = find_or_create_node(graph, "ServiceFunction", name=service_function)
log.info("Executing query: {0}".format(query_to_execute))
result = influx_client.query(query_to_execute) # execute the query
# iterate through each result item
for item in result.items():
metadata, result_points = item # each result item is a tuple of two elements
# metadata consists of the result tags and the measurement name
# measurement = metadata[0]
tags = metadata[1]
result_point = next(result_points) # get the result point dictionary
response_time = result_point["mean_response_time"] # extract the response time of the SF from the result
# create a ServiceFunctionInstance node from the tag value (if it is not already created)
service_function_instance_node = find_or_create_node(graph, "ServiceFunctionInstance", name=tags["sf_i"])
# create an edge between the instance and the service function (if it is not already created)
find_or_create_edge(graph, "instanceOf", service_function_instance_node, service_function_node)
# create an Endpoint node from the tag value (if it is not already created)
ipendpoint_node = find_or_create_node(graph, "Endpoint", name=tags["ipendpoint"], response_time=response_time, uuid=request_id)
# create an edge between the instance and the endpoint (if it is not already created)
find_or_create_edge(graph, "realisedBy", service_function_instance_node, ipendpoint_node)
# create a ComputeNode node from the tag value (if it is not already created)
compute_node = find_or_create_node(graph, "ComputeNode", name=tags["location"])
# create an edge between the endpoint and the compute node (if it is not already created)
find_or_create_edge(graph, "hostedBy", ipendpoint_node, compute_node)
compute_nodes.add(compute_node) # add the compute node to the set of compute nodes
log.info("Building network links from the ping telegraf plugin in influx")
# retrieve all network latencies available from the influx ping table
for network_link in permutations(compute_nodes, 2):
from_node, to_node = network_link
# query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\' and time>={2} and time<{3}'.format(from_node['name'], to_node['name'], from_timestamp, to_timestamp)
# In future when latencies are reported continuously, we should put timestamp filtering in the query for network links
query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\''.format(from_node['name'], to_node['name'])
log.info("Executing query: {0}".format(query))
result = influx_client.query(query) # execute the query
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
try:
actual_result = next(result.get_points())
latency = actual_result.get("mean_average_response_ms")
find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency)
except StopIteration:
# in this case there is no such link reported to Influx
log.info("There is no direct link between {0} and {1}".format(from_node, to_node))
log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfc_i, db, rp))
def delete_temporal_subgraph(graph, subgraph_id):
"""
A function used to delete a subgraph associated with a subgraph ID obtained from the CLMC service.
:param graph: the neo4j graph db object
:param subgraph_id: the ID of the subgraph delete
:return: the number of nodes that were matched with the given subgraph ID
"""
log.info("Deleting subgraph associated with ID {0}".format(subgraph_id))
subgraph = graph.nodes.match(uuid=subgraph_id)
nodes_matched = 0
for node in subgraph:
graph.delete(node)
nodes_matched += 1
log.info("Deleted {0} nodes associated with ID {1}".format(nodes_matched, subgraph_id))
return nodes_matched
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 04-07-2018
// Created for Project : FLAME
"""
from clmcservice.graphapi.utilities import validate_json_queries_body, validate_graph_url_params, build_temporal_graph, delete_temporal_subgraph, validate_graph_rtt_params, RTT_CYPHER_QUERY_TEMPLATE
from uuid import uuid4
from influxdb import InfluxDBClient
from py2neo import Graph
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound
from pyramid.view import view_defaults, view_config
import logging
log = logging.getLogger('service_logger')
@view_defaults(renderer='json')
class GraphAPI(object):
"""
A class-based view for building temporal graphs and running graph queries.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
@view_config(route_name='graph_build', request_method='POST')
def build_temporal_graph(self):
"""
An API endpoint to build a temporal graph and store it in neo4j based on the posted JSON query document.
The request parameters must also include URL query parameters defining the time range for which the graph must be generated.
:raises HTTPBadRequest: if request body is not a valid JSON with the queries per service function or if request URL doesn't contain the required URL query parameters
:return: A JSON document containing the posted request body, along with meta data about the built graph (time range and uuid, which can then be reused for other API calls)
"""
try:
body = self.request.body.decode(self.request.charset)
json_queries = validate_json_queries_body(body) # validate the content and receive a json dictionary object
except AssertionError as e:
raise HTTPBadRequest("Bad request content: {0}".format(e.args))
try:
params = validate_graph_url_params(self.request.params)
except AssertionError as e:
raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args))
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])
influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10)
from_timestamp = params['from'] * 10**9
to_timestamp = params['to'] * 10**9
request_id = str(uuid4())
build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client)
json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}}
return json_queries
@view_config(route_name='graph_manage', request_method='DELETE')
def delete_temporal_graph(self):
"""
An API endpoint to delete a temporal graph associated with a uuid generated by the CLMC service.
:return: A JSON document containing the UUID of the deleted subgraph
:raises HTTPNotFound: if the request is not associated with any subgraph
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id)
if number_of_deleted_nodes > 0:
return {"uuid": graph_id, "deleted": number_of_deleted_nodes}
else:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
@view_config(route_name='graph_algorithms_rtt', request_method='GET')
def run_rtt_query(self):
"""
An API endpoint to run the round trip time cypher query over the graph associated with a given request ID.
:return: A JSON response with a list of forward latencies, reverse latencies and SF endpoint response time.
:raises HTTPBadRequest: if the request URL doesn't contain the required URL query parameters
:raises HTTPNotFound: if the request is not associated with any subgraph or the compute node / endpoint node doesn't exist
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
try:
params = validate_graph_rtt_params(self.request.params)
except AssertionError as e:
raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args))
compute_node_label = params["compute_node"]
endpoint_node_label = params["endpoint"]
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
all_nodes = graph.nodes
compute_node = all_nodes.match("ComputeNode", name=compute_node_label).first()
if compute_node is None:
raise HTTPNotFound("Compute node {0} doesn't exist.".format(compute_node_label))
endpoint_node = all_nodes.match("Endpoint", name=endpoint_node_label, uuid=graph_id).first()
if endpoint_node is None:
raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label))
# check if the endpoint is hosted by the compute node before running the RTT cypher query
hosted_by_rel = graph.relationships.match(nodes=(endpoint_node, compute_node), r_type="hostedBy").first()
if hosted_by_rel is not None:
return {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"]}
query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(compute_node_label, endpoint_node_label, graph_id)
log.info("Executing cypher query: {0}".format(query_to_execute))
data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result
return data[0] # we only expect one result from the query - a dictionary with the following fields: forward_latencies, reverse_latencies, response_time
...@@ -20,6 +20,14 @@ configuration_file_path = /etc/flame/clmc/service.conf ...@@ -20,6 +20,14 @@ configuration_file_path = /etc/flame/clmc/service.conf
# PostgreSQL connection url # PostgreSQL connection url
sqlalchemy.url = postgresql://clmc:clmc_service@localhost:5432/whoamidb sqlalchemy.url = postgresql://clmc:clmc_service@localhost:5432/whoamidb
# Influx connection
influx_host = localhost
influx_port = 8086
# Neo4j connection
neo4j_host = localhost
neo4j_password = admin
# By default, the toolbar only appears for clients from IP addresses # By default, the toolbar only appears for clients from IP addresses
# '127.0.0.1' and '::1'. # '127.0.0.1' and '::1'.
# debugtoolbar.hosts = 127.0.0.1 ::1 # debugtoolbar.hosts = 127.0.0.1 ::1
......
{
"links": [
{
"source": "DC1",
"target": "DC2",
"min_response_time": 10,
"max_response_time": 20,
"avg_response_time": 15
},
{
"source": "DC2",
"target": "DC1",
"min_response_time": 16,
"max_response_time": 28,
"avg_response_time": 22
},
{
"source": "DC1",
"target": "DC3",
"min_response_time": 17,
"max_response_time": 19,
"avg_response_time": 18
},
{
"source": "DC3",
"target": "DC1",
"min_response_time": 15,
"max_response_time": 25,
"avg_response_time": 20
},
{
"source": "DC2",
"target": "DC3",
"min_response_time": 11,
"max_response_time": 29,
"avg_response_time": 20
},
{
"source": "DC3",
"target": "DC2",
"min_response_time": 12,
"max_response_time": 40,
"avg_response_time": 26
}
]
}
\ No newline at end of file
...@@ -20,6 +20,14 @@ configuration_file_path = /etc/flame/clmc/service.conf ...@@ -20,6 +20,14 @@ configuration_file_path = /etc/flame/clmc/service.conf
# PostgreSQL connection url # PostgreSQL connection url
sqlalchemy.url = postgresql://clmc:clmc_service@localhost:5432/whoamidb sqlalchemy.url = postgresql://clmc:clmc_service@localhost:5432/whoamidb
# Influx connection
influx_host = localhost
influx_port = 8086
# Neo4j connection
neo4j_host = localhost
neo4j_password = admin
### ###
# wsgi server configuration # wsgi server configuration
### ###
......
...@@ -50,6 +50,7 @@ requires = [ ...@@ -50,6 +50,7 @@ requires = [
'zope.sqlalchemy', 'zope.sqlalchemy',
'psycopg2', 'psycopg2',
'influxdb', 'influxdb',
'py2neo',
'pytest', 'pytest',
] ]
...@@ -87,4 +88,4 @@ setup( ...@@ -87,4 +88,4 @@ setup(
'initialize_clmcservice_db = clmcservice.initialize_db:main', 'initialize_clmcservice_db = clmcservice.initialize_db:main',
] ]
}, },
) )
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment