From 72b7b6355ca7f2eaae8b76877583d50db14a79b0 Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Wed, 20 Feb 2019 12:48:21 +0000 Subject: [PATCH] Adds API endpoint to continuously execute the graph workflow - implements error handling functionality only --- src/service/clmcservice/__init__.py | 1 + src/service/clmcservice/graphapi/utilities.py | 43 +++++++++++++++++-- src/service/clmcservice/graphapi/views.py | 22 +++++++++- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/src/service/clmcservice/__init__.py b/src/service/clmcservice/__init__.py index 04b19c7..b603568 100644 --- a/src/service/clmcservice/__init__.py +++ b/src/service/clmcservice/__init__.py @@ -70,6 +70,7 @@ def main(global_config, **settings): config.add_route('graph_manage', '/graph/temporal/{graph_id}') config.add_route('graph_algorithms_rtt', '/graph/temporal/{graph_id}/round-trip-time') config.add_route('graph_network_topology', '/graph/network') + config.add_route('graph_execute_pipeline', '/graph/monitor') # add routes of the Alerts Configuration API config.add_route('alerts_configuration', '/alerts') diff --git a/src/service/clmcservice/graphapi/utilities.py b/src/service/clmcservice/graphapi/utilities.py index a23b80b..2988c19 100644 --- a/src/service/clmcservice/graphapi/utilities.py +++ b/src/service/clmcservice/graphapi/utilities.py @@ -29,6 +29,8 @@ import logging GRAPH_ROUND_TRIP_TIME_URL_PARAMS = ("startpoint", "endpoint") +GRAPH_MONITOR_QUERY_PARAMS = {"query_period", "results_measurement_name", "service_function_chain", "service_function_chain_instance", "service_functions"} + GRAPH_BUILD_QUERY_PARAMS = {"from", "to", "service_function_chain", "service_function_chain_instance", "service_functions"} GRAPH_BUILD_SF_QUERY_PARAMS = {"response_time_field", "request_size_field", "response_size_field", "measurement_name"} @@ -61,9 +63,9 @@ RETURN latencies as forward_latencies, reverse(latencies) as reverse_latencies, log = logging.getLogger('service_logger') -def validate_json_queries_body(body): +def validate_build_request_body(body): """ - Validates the request body containing mappings from service functions to queries to execute. + Validates the request body, with mappings from service functions to queries to execute, given in a request to build temporal graph. :param body: the request body to validate @@ -72,7 +74,7 @@ def validate_json_queries_body(body): :raise AssertionError: if the body is invalid """ - global GRAPH_BUILD_QUERY_PARAMS + global GRAPH_BUILD_QUERY_PARAMS, GRAPH_BUILD_SF_QUERY_PARAMS try: body = loads(body) @@ -92,7 +94,7 @@ def validate_json_queries_body(body): # except ValueError: # assert False, "Incorrect format of service function chain instance ID - use format <sfcID>_<instanceNum>" - assert type(body["service_functions"]) == dict, "The service function description should be represented with a dictionary." + assert type(body["service_functions"]) == dict, "The service functions description should be represented with a JSON object." for sf in body["service_functions"]: query_data = body["service_functions"][sf] @@ -107,6 +109,39 @@ def validate_json_queries_body(body): return body +def validate_monitor_request_body(body): + """ + Validates the request body, with mappings from service functions to queries to execute, given in a request to monitor graph metrics. + + :param body: the request body to validate + + :return the validated json queries dictionary object + + :raise AssertionError: if the body is invalid + """ + + global GRAPH_MONITOR_QUERY_PARAMS, GRAPH_BUILD_SF_QUERY_PARAMS + + try: + body = loads(body) + except Exception: + raise AssertionError("Configuration must be a JSON object.") + + assert GRAPH_MONITOR_QUERY_PARAMS == set(body.keys()), "Invalid JSON query document." + + assert type(body["service_functions"]) == dict, "The service functions description should be represented with a JSON object." + + for sf in body["service_functions"]: + query_data = body["service_functions"][sf] + assert type(query_data) == dict, "Each service function must be associated with a respective JSON object." + assert GRAPH_BUILD_SF_QUERY_PARAMS == set(query_data.keys()), "Invalid query data for service function {0} in the JSON query document".format(sf) + + assert type(body["query_period"]) == int, "'query_period' parameter must be an integer" + assert body["query_period"] > 0, "'query_period' parameter must be a positive integer." + + return body + + def validate_graph_rtt_params(params): """ Validates the request url parameters used in running a round trip time cypher query. diff --git a/src/service/clmcservice/graphapi/views.py b/src/service/clmcservice/graphapi/views.py index 32a32fd..4226f7b 100644 --- a/src/service/clmcservice/graphapi/views.py +++ b/src/service/clmcservice/graphapi/views.py @@ -23,7 +23,7 @@ """ -from clmcservice.graphapi.utilities import validate_json_queries_body, RTT_CYPHER_QUERY_TEMPLATE, \ +from clmcservice.graphapi.utilities import validate_build_request_body, validate_monitor_request_body, RTT_CYPHER_QUERY_TEMPLATE, \ build_network_graph, delete_network_graph, build_temporal_subgraph, delete_temporal_subgraph, validate_graph_rtt_params, find_node_with_possible_types from influxdb import InfluxDBClient from py2neo import Graph @@ -64,7 +64,7 @@ class GraphAPI(object): try: body = self.request.body.decode(self.request.charset) - json_queries = validate_json_queries_body(body) # validate the content and receive a json dictionary object + json_queries = validate_build_request_body(body) # validate the content and receive a json dictionary object except AssertionError as e: raise HTTPBadRequest("Bad request content: {0}".format(e.args)) @@ -334,3 +334,21 @@ class GraphAPI(object): deleted_switches, deleted_clusters, deleted_ues = delete_network_graph(graph) return {"deleted_switches_count": deleted_switches, "deleted_clusters_count": deleted_clusters, "deleted_ues_count": deleted_ues} + + @view_config(route_name='graph_execute_pipeline', request_method='POST') + def execute_graph_pipeline(self): + + try: + body = self.request.body.decode(self.request.charset) + json_queries = validate_monitor_request_body(body) # validate the content and receive a json dictionary object + except AssertionError as e: + raise HTTPBadRequest("Bad request content: {0}".format(e.args)) + + graph = Graph(host=self.request.registry.settings['neo4j_host'], password=self.request.registry.settings['neo4j_password']) + influx_client = InfluxDBClient(host=self.request.registry.settings['influx_host'], port=self.request.registry.settings['influx_port'], timeout=10) + + database_name = json_queries["service_function_chain"] + if database_name not in [db["name"] for db in influx_client.get_list_database()]: + raise HTTPBadRequest("Database for service function chain {0} not found.".format(database_name)) + + return json_queries -- GitLab