Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 04-07-2018
// Created for Project : FLAME
"""
from clmcservice.graphapi.utilities import validate_json_queries_body, validate_graph_url_params, build_temporal_graph, delete_temporal_subgraph, validate_graph_rtt_params, RTT_CYPHER_QUERY_TEMPLATE
from uuid import uuid4
from influxdb import InfluxDBClient
from py2neo import Graph
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound
from pyramid.view import view_defaults, view_config
import logging
log = logging.getLogger('service_logger')
@view_defaults(renderer='json')
class GraphAPI(object):
"""
A class-based view for building temporal graphs and running graph queries.
"""
def __init__(self, request):
"""
Initialises the instance of the view with the request argument.
:param request: client's call request
"""
self.request = request
@view_config(route_name='graph_build', request_method='POST')
def build_temporal_graph(self):
"""
An API endpoint to build a temporal graph and store it in neo4j based on the posted JSON query document.
The request parameters must also include URL query parameters defining the time range for which the graph must be generated.
:raises HTTPBadRequest: if request body is not a valid JSON with the queries per service function or if request URL doesn't contain the required URL query parameters
:return: A JSON document containing the posted request body, along with meta data about the built graph (time range and uuid, which can then be reused for other API calls)
"""
try:
body = self.request.body.decode(self.request.charset)
json_queries = validate_json_queries_body(body) # validate the content and receive a json dictionary object
except AssertionError as e:
raise HTTPBadRequest("Bad request content: {0}".format(e.args))
try:
params = validate_graph_url_params(self.request.params)
except AssertionError as e:
raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args))
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password'])
influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10)
from_timestamp = params['from'] * 10**9
to_timestamp = params['to'] * 10**9
request_id = str(uuid4())
build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries, graph, influx_client)
json_queries['graph'] = {"uuid": request_id, "time_range": {"from": from_timestamp, "to": to_timestamp}}
return json_queries
@view_config(route_name='graph_manage', request_method='DELETE')
def delete_temporal_graph(self):
"""
An API endpoint to delete a temporal graph associated with a uuid generated by the CLMC service.
:return: A JSON document containing the UUID of the deleted subgraph
:raises HTTPNotFound: if the request is not associated with any subgraph
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
number_of_deleted_nodes = delete_temporal_subgraph(graph, graph_id)
if number_of_deleted_nodes > 0:
return {"uuid": graph_id, "deleted": number_of_deleted_nodes}
else:
raise HTTPNotFound("No subgraph found associated with the request ID {0}".format(graph_id))
@view_config(route_name='graph_algorithms_rtt', request_method='GET')
def run_rtt_query(self):
"""
An API endpoint to run the round trip time cypher query over the graph associated with a given request ID.
:return: A JSON response with a list of forward latencies, reverse latencies and SF endpoint response time.
:raises HTTPBadRequest: if the request URL doesn't contain the required URL query parameters
:raises HTTPNotFound: if the request is not associated with any subgraph or the compute node / endpoint node doesn't exist
"""
graph_id = self.request.matchdict['graph_id'] # get the UUID of the subgraph from the URL
try:
params = validate_graph_rtt_params(self.request.params)
except AssertionError as e:
raise HTTPBadRequest("Request URL format is incorrect: {0}".format(e.args))
compute_node_label = params["compute_node"]
endpoint_node_label = params["endpoint"]
graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) # connect to the neo4j graph db
all_nodes = graph.nodes
compute_node = all_nodes.match("ComputeNode", name=compute_node_label).first()
if compute_node is None:
raise HTTPNotFound("Compute node {0} doesn't exist.".format(compute_node_label))
endpoint_node = all_nodes.match("Endpoint", name=endpoint_node_label, uuid=graph_id).first()
if endpoint_node is None:
raise HTTPNotFound("Endpoint node {0} doesn't exist.".format(endpoint_node_label))
# check if the endpoint is hosted by the compute node before running the RTT cypher query
hosted_by_rel = graph.relationships.match(nodes=(endpoint_node, compute_node), r_type="hostedBy").first()
if hosted_by_rel is not None:
return {"forward_latencies": [], "reverse_latencies": [], "response_time": endpoint_node["response_time"]}
query_to_execute = RTT_CYPHER_QUERY_TEMPLATE.format(compute_node_label, endpoint_node_label, graph_id)
log.info("Executing cypher query: {0}".format(query_to_execute))
data = graph.run(query_to_execute).data() # returns a list of dictionaries, each dictionary represents a row in the result
return data[0] # we only expect one result from the query - a dictionary with the following fields: forward_latencies, reverse_latencies, response_time