Skip to content
Snippets Groups Projects
Commit 8b30dc41 authored by Michael Boniface's avatar Michael Boniface
Browse files

Merge branch 'clmc-service' into 'integration'

Clmc version 2.1.1

See merge request FLAME/consortium/3rdparties/flame-clmc!67
parents e3fc9dde 8e6914de
No related branches found
No related tags found
No related merge requests found
......@@ -35,8 +35,8 @@ build:tests:
- python setup.py sdist --dist-dir=$CI_PROJECT_DIR/build
artifacts:
paths:
- build/clmctest-2.0.4.tar.gz
- build/clmcservice-2.0.4.tar.gz
- build/clmctest-2.1.1.tar.gz
- build/clmcservice-2.1.1.tar.gz
expire_in: 1 day
test:all:
......@@ -50,8 +50,8 @@ test:all:
- echo "REPO_PASS=${REPO_PASS}" >> $CI_PROJECT_DIR/reporc
- sudo scripts/test/fixture.sh create -f src/test/clmctest/rspec.json -r $CI_PROJECT_DIR -c all
- sudo mkdir /var/lib/lxd/containers/test-runner/rootfs/opt/clmc/build
- sudo cp build/clmctest-2.0.4.tar.gz /var/lib/lxd/containers/test-runner/rootfs/opt/clmc/build
- sudo lxc exec test-runner -- pip3 install /opt/clmc/build/clmctest-2.0.4.tar.gz
- sudo cp build/clmctest-2.1.1.tar.gz /var/lib/lxd/containers/test-runner/rootfs/opt/clmc/build
- sudo lxc exec test-runner -- pip3 install /opt/clmc/build/clmctest-2.1.1.tar.gz
- sudo lxc exec test-runner -- pytest -s --tb=short -rfp --pyargs clmctest
when: on_success
......
......@@ -20,6 +20,7 @@ The CLMC depends on 3rd party open source software distributed using approved op
| tosca-parser | APACHE LICENSE v2 | https://github.com/openstack/tosca-parser/blob/master/LICENSE |
| schema | MIT LICENSE | https://github.com/keleshev/schema/blob/master/LICENSE-MIT |
| requests | APACHE LICENSE v2 | https://github.com/requests/requests/blob/master/LICENSE |
| psutil | BSD LICENSE | https://github.com/giampaolo/psutil/blob/master/LICENSE |
| pytest | MIT LICENSE | https://github.com/pytest-dev/pytest/blob/master/LICENSE |
| pytest-cov | MIT LICENSE | https://github.com/pytest-dev/pytest-cov/blob/master/LICENSE |
......
......@@ -339,8 +339,8 @@ with **/clmc-service** so that the nginx reverse proxy server (listening on port
* **DELETE** ***/graph/monitor/{request_id}***
This API methods instructs the CLMC service to stop running a graph monitoring pipeline script associated with the request identifier in the URL.
(retrieved from the response of a POST request for /graph/monitor), e.g. request sent to */graph/monitor/75df6f8d-3829-4fd8-a3e6-b3e917010141*
This API method instructs the CLMC service to stop running a graph monitoring pipeline script associated with the request identifier in the URL.
(could be retrieved from the response of a POST request for /graph/monitor), e.g. request sent to */graph/monitor/75df6f8d-3829-4fd8-a3e6-b3e917010141*
* Response:
......@@ -356,6 +356,28 @@ with **/clmc-service** so that the nginx reverse proxy server (listening on port
}
```
* **GET** ***/graph/monitor/{request_id}***
This API method fetches the status of a graph monitoring pipeline script associated with the request identifier in the URL.
(could be retrieved from the response of a POST request for /graph/monitor), e.g. request sent to */graph/monitor/75df6f8d-3829-4fd8-a3e6-b3e917010141*
* Response:
The response of this request is a JSON content, which contains a single output message along with the status of the monitoring pipeline (encountering
a 'sleeping' status should be expected, since the background process is executing only once per given delay; a 'zombie' status would indicate that
the process is dead).
Returns a 404 Not Found error if the request ID is not associated with any graph monitoring process.
* Response Body Example:
```json
{
"status": "sleeping",
"msg": "Successfully fetched status of graph pipeline process."
}
```
* **POST** ***/graph/temporal***
This API method sends a request to the CLMC service to build a graph snapshot in the time range between the *from* and *to* timestamps.
......@@ -573,6 +595,11 @@ with **/clmc-service** so that the nginx reverse proxy server (listening on port
}
```
* **PUT** ***/graph/network***
This API methods provides the same functionality as the *POST /graph/network* API endpoint with the only difference being
that the properties (e.g. latency) of any existing links between network nodes that already exist will be updated.
* **DELETE** ***/graph/network***
This API method instructs CLMC to delete the network topology from its graph database.
......
__version__ = "2.0.4"
\ No newline at end of file
__version__ = "2.1.1"
\ No newline at end of file
......@@ -45,7 +45,7 @@ SFEMC = "flame_sfemc" # describes the keyword used for the SFEMC alert handler
# Influx QL functions defined in the documentation https://docs.influxdata.com/influxdb/v1.6/query_language/functions/
INFLUX_QL_FUNCTIONS = (
"count", "mean", "median", "mode", "sum", "first", "last", "max", "min"
"count", "mean", "median", "mode", "sum", "first", "last", "max", "min", "spread", "stddev"
)
# Kapacitor Tick Script template IDs
......
......@@ -23,14 +23,15 @@
"""
from json import dumps, loads
from signal import SIGKILL
from psutil import NoSuchProcess, STATUS_SLEEPING
from unittest.mock import patch, Mock, MagicMock, PropertyMock
import pytest
from pyramid import testing
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPInternalServerError
from clmcservice.graphapi.views import GraphAPI
from clmcservice.models import MonitoringProcess
from clmcservice.graphapi.conftest import links, sdn_switches, ues, clusters
from clmcservice.graphapi.conftest import links, sdn_switches, switches, ues, clusters
from clmcservice.graphapi.utilities import delete_network_graph
class TestGraphAPI(object):
......@@ -494,22 +495,21 @@ class TestGraphAPI(object):
mock_response3.status_code = 200
mock_response3.json.return_value = links
# we are doing two calls to the API, hence need to repeat the responses
http_get_mock.side_effect = [mock_response1, mock_response2, mock_response3, mock_response1, mock_response2, mock_response3]
http_get_mock.side_effect = [mock_response1, mock_response2, mock_response3] * 2
# mock the behaviour of reading the clusters and ues mappping from files
file_open_mock.return_value = MagicMock() # use magic mock so that special methods (dunders) are auto generated
# we are doing two calls to the API, hence need to repeat the results
json_load_mock.side_effect = [clusters, ues, clusters, ues]
json_load_mock.side_effect = [clusters, ues] * 2
assert set([node["name"] for node in graph_db.nodes.match("Cluster")]) == set(), "Cluster nodes must not be created before the build request"
assert set([node["name"] for node in graph_db.nodes.match("Switch")]) == set(), "Switch nodes must not be created before the build request"
assert set([node["name"] for node in graph_db.nodes.match("UserEquipment")]) == set(), "UE nodes must not be created before the build request"
# sent request to build the network topology
# sent request to build the network topology through a POST request
request = testing.DummyRequest()
response = GraphAPI(request).build_network_topology()
assert response == {"new_switches_count": 6, "new_clusters_count": 6, "new_ues_count": 3}
assert set([node["name"] for node in graph_db.nodes.match("Cluster")]) == set(["DC" + str(i) for i in range(1, 7)]), "Cluster nodes must have been created"
assert set([node["name"] for node in graph_db.nodes.match("Switch")]) == set(["127.0.0." + str(i) for i in range(1, 7)]), "Switch nodes must have been created"
assert set([node["name"] for node in graph_db.nodes.match("UserEquipment")]) == set(["ue" + str(i) for i in (2, 3, 6)]), "UE nodes must have been created"
......@@ -518,11 +518,85 @@ class TestGraphAPI(object):
request = testing.DummyRequest()
response = GraphAPI(request).build_network_topology()
assert response == {"new_switches_count": 0, "new_clusters_count": 0, "new_ues_count": 0}
assert set([node["name"] for node in graph_db.nodes.match("Cluster")]) == set(["DC" + str(i) for i in range(1, 7)])
assert set([node["name"] for node in graph_db.nodes.match("Switch")]) == set(["127.0.0." + str(i) for i in range(1, 7)])
assert set([node["name"] for node in graph_db.nodes.match("UserEquipment")]) == set(["ue" + str(i) for i in (2, 3, 6)])
# clean up
delete_network_graph(graph_db)
@patch('clmcservice.graphapi.views.load')
@patch('clmcservice.graphapi.views.open')
@patch('clmcservice.graphapi.views.get')
def test_build_and_update_network(self, http_get_mock, file_open_mock, json_load_mock, db_testing_data):
"""
Tests the functionality to build and update the network graph.
:param http_get_mock: mocks the HTTP GET function
:param file_open_mock: mocks the open file function
:param json_load_mock: mocks the JSON load function
:param db_testing_data: fixture used to get a reference to the graph DB
"""
from_timestamp, to_timestamp, graph_db = db_testing_data # fixture, used to get reference to the graph DB
# mock the responses from the sdn controller - 3 GET requests are executed, so we need 3 responses
mock_response1 = Mock()
mock_response1.status_code = 200
mock_response1.json.return_value = sdn_switches
mock_response2 = Mock()
mock_response2.status_code = 200
mock_response2.json.return_value = links
mock_response3 = Mock()
mock_response3.status_code = 200
mock_response3.json.return_value = links
# we are doing two calls to the API, hence need to repeat the responses
http_get_mock.side_effect = [mock_response1, mock_response2, mock_response3] * 2
# mock the behaviour of reading the clusters and ues mappping from files
file_open_mock.return_value = MagicMock() # use magic mock so that special methods (dunders) are auto generated
# we are doing two calls to the API, hence need to repeat the results
json_load_mock.side_effect = [clusters, ues] * 2
# sent request to build the network topology through a PUT request
request = testing.DummyRequest()
response = GraphAPI(request).build_and_update_network_topology()
assert response == {"new_switches_count": 6, "new_clusters_count": 6, "new_ues_count": 3}
assert set([node["name"] for node in graph_db.nodes.match("Cluster")]) == set(["DC" + str(i) for i in range(1, 7)]), "Cluster nodes must have been created"
assert set([node["name"] for node in graph_db.nodes.match("Switch")]) == set(["127.0.0." + str(i) for i in range(1, 7)]), "Switch nodes must have been created"
assert set([node["name"] for node in graph_db.nodes.match("UserEquipment")]) == set(["ue" + str(i) for i in (2, 3, 6)]), "UE nodes must have been created"
# fetch the first network link as test link
test_link = links[0]
test_link_source = switches[test_link["src-switch"]]
test_link_destination = switches[test_link["dst-switch"]]
test_link_source_node = graph_db.nodes.match("Switch", name=test_link_source).first()
test_link_destination_node = graph_db.nodes.match("Switch", name=test_link_destination).first()
# assert the nodes exist
assert test_link_source_node is not None
assert test_link_destination_node is not None
# confirm the latency value of the edge
old_latency = test_link["latency"] / 1000 # convert to seconds
test_link_edge = graph_db.relationships.match(nodes=(test_link_source_node, test_link_destination_node), r_type="linkedTo").first()
# assert the edge exists
assert test_link_edge is not None
assert test_link_edge["latency"] == old_latency, "Edge was not created with the correct latency value"
# update the test link in the SDN controller mock data with a new latency
new_latency = 5 * old_latency
test_link["latency"] = new_latency * 1000 # convert to milliseconds
# send the same request again and ensure that the latency of the network edge is updated
request = testing.DummyRequest()
GraphAPI(request).build_and_update_network_topology()
graph_db.pull(test_link_edge)
assert test_link_edge["latency"] == new_latency, "Edge was not updated with the correct latency value"
# clean up
delete_network_graph(graph_db)
def test_delete_network(self, graph_network_topology, db_testing_data):
"""
Tests the delete network graph functionality.
......@@ -696,12 +770,52 @@ class TestGraphAPI(object):
assert len(popen_mock.call_args_list) == 2, "No subprocess should be started if the UE nodes list is empty (network topology not built)"
nodes_matcher_mock.assert_called_with("UserEquipment") # assert that the graph nodes match function has been called with "UserEquipment" as argument
@patch('clmcservice.graphapi.views.kill')
def test_stop_graph_pipeline(self, mock_kill):
@patch('clmcservice.graphapi.views.Process')
def test_get_graph_pipeline_status(self, mock_process):
"""
Tests the functionality to fetch the status of a graph monitoring script.
:param mock_process: mock object to mimic the behavior of the psutil.Process functionality
"""
# mock a monitoring process
pid = 111
reqid = "test_request_id"
MonitoringProcess.add({"request_id": reqid, "process_id": pid})
# test behaviour with not-existing request UUID
request = testing.DummyRequest()
request.matchdict["request_id"] = "unknown-request-uuid"
error_raised = False
try:
GraphAPI(request).get_graph_pipeline_status()
except HTTPNotFound:
error_raised = True
assert error_raised, "Error must have been raised for unrecognised request UUID."
# test the behaviour when the PID doesn't exist or another OSError is thrown
mock_process.side_effect = NoSuchProcess("error")
request = testing.DummyRequest()
request.matchdict["request_id"] = reqid
response = GraphAPI(request).get_graph_pipeline_status()
assert response == {"msg": "Monitoring process has been stopped or killed or terminated before this request was executed."}
# test behaviour with existing request UUID and existing PID
assert MonitoringProcess.exists(reqid)
mock_process.side_effect = None
mock_process.return_value.status = Mock(return_value=STATUS_SLEEPING)
request = testing.DummyRequest()
request.matchdict["request_id"] = reqid
response = GraphAPI(request).get_graph_pipeline_status()
assert response == {"status": STATUS_SLEEPING, "msg": "Successfully fetched status of graph pipeline process."}
mock_process.return_value.status.assert_called_with()
@patch('clmcservice.graphapi.views.Process')
def test_stop_graph_pipeline(self, mock_process):
"""
Tests the funcitonality to stop a graph monitoring script.
Tests the functionality to stop a graph monitoring script.
:param mock_kill: mock object to mimic the behavior of the os.kill functionality
:param mock_process: mock object to mimic the behavior of the psutil.Process functionality
"""
# mock a monitoring process
......@@ -720,21 +834,23 @@ class TestGraphAPI(object):
assert error_raised, "Error must have been raised for unrecognised request UUID."
# test the behaviour when the PID doesn't exist or another OSError is thrown
mock_kill.side_effect = OSError("error")
mock_process.side_effect = NoSuchProcess("error")
request = testing.DummyRequest()
request.matchdict["request_id"] = reqid
response = GraphAPI(request).stop_graph_pipeline()
assert response == {"msg": "Monitoring process has been stopped before this request was executed."}
assert response == {"msg": "Monitoring process has been stopped or killed or terminated before this request was executed."}
# test behaviour with existing request UUID and existing PID
MonitoringProcess.add({"request_id": reqid, "process_id": pid})
assert MonitoringProcess.exists(reqid)
mock_kill.side_effect = None
mock_process.side_effect = None
mock_process.return_value = Mock()
request = testing.DummyRequest()
request.matchdict["request_id"] = reqid
response = GraphAPI(request).stop_graph_pipeline()
assert response == {"msg": "Monitoring process has been successfully stopped."}
mock_kill.assert_called_with(pid, SIGKILL) # assert that os.kill was called with termination signal
mock_process.return_value.terminate.assert_called_with()
mock_process.return_value.wait.assert_called_with(timeout=3)
assert not MonitoringProcess.exists(reqid), "Request ID must be removed when the process is killed."
@staticmethod
......
......@@ -350,7 +350,7 @@ def delete_temporal_subgraph(graph, subgraph_id):
return nodes_matched
def build_network_graph(graph, switches, links, clusters, ues):
def build_network_graph(graph, switches, links, clusters, ues, update_existing_links=True):
"""
A function used to build the network topology in the neo4j graph given the collection of switches, links and clusters.
......@@ -359,6 +359,7 @@ def build_network_graph(graph, switches, links, clusters, ues):
:param links: a collection of all switch-to-switch links in the network topology - JSON format, list of objects, each object must have "src-switch", "dst-switch" and "latency" as keys
:param clusters: a collection of all clusters and the IP address of the service router that they are connected to - mapping between an IP address of a service router and a cluster identifier
:param ues: a collection of all ues and the IP address of the service router that they are connected to - mapping between an IP address of a servicer router and a ue identifier
:param update_existing_links: (defaults to True) a flag to indicate if existing network edges must be updated with the latest latencies
"""
new_switches_count = 0
......@@ -391,7 +392,7 @@ def build_network_graph(graph, switches, links, clusters, ues):
# create the link between the two nodes
edge = find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency)
if edge["latency"] != latency:
if update_existing_links and edge["latency"] != latency:
log.info("Updating latency for edge {0}, old latency {1}, new latency {2}".format(edge, edge["latency"], latency))
edge["latency"] = latency # make sure that the latency is updated if the edge already existed
graph.push(edge) # update the relationship in the DB
......
......@@ -34,8 +34,7 @@ from requests import exceptions, get
from uuid import uuid4
from json import load, dumps
from subprocess import Popen
from os import kill
from signal import SIGKILL
from psutil import Process, NoSuchProcess, TimeoutExpired
from logging import getLogger
......@@ -255,11 +254,32 @@ class GraphAPI(object):
@view_config(route_name='graph_network_topology', request_method='POST')
def build_network_topology(self):
"""
An API endpoint to build/update the network topology in the neo4j graph.
An API endpoint to build the network topology in the neo4j graph, however, without updating the latency of existing edges.
:return: A JSON response with the number of switches, clusters and ues that were built.
"""
return self._build_network_topology(update_existing_links=False)
@view_config(route_name='graph_network_topology', request_method='PUT')
def build_and_update_network_topology(self):
"""
An API endpoint to build the network topology in the neo4j graph and also update the latency of existing edges.
:return: A JSON response with the number of switches, clusters and ues that were built.
"""
return self._build_network_topology(update_existing_links=True)
def _build_network_topology(self, update_existing_links):
"""
A utility method used to build the network graph topology with the option of updating existing links.
:param update_existing_links: a flag to be set to True if existing network links need to be updated
:return: A dictionary with the number of switches, clusters and ues that were created.
"""
graph = self.get_graph_reference()
sdn_controller_ip = self.request.registry.settings['sdn_controller_ip']
......@@ -307,8 +327,8 @@ class GraphAPI(object):
ues = {}
# build the network graph and retrieve the number of switch nodes and cluster nodes that were created
tmp_switch_count, tmp_clusters_count, tmp_ues_count = build_network_graph(graph, switches, external_links, clusters, ues)
switch_count, clusters_count, ues_count = build_network_graph(graph, switches, local_links, clusters, ues)
tmp_switch_count, tmp_clusters_count, tmp_ues_count = build_network_graph(graph, switches, external_links, clusters, ues, update_existing_links=update_existing_links)
switch_count, clusters_count, ues_count = build_network_graph(graph, switches, local_links, clusters, ues, update_existing_links=update_existing_links)
switch_count += tmp_switch_count
clusters_count += tmp_clusters_count
ues_count += tmp_ues_count
......@@ -416,6 +436,32 @@ class GraphAPI(object):
log.warning("Graph pipeline process for SFC {0} with PID {1} has finished executing unexpectedly with return code {2}".format(sfc, process_pid, process_return_code))
raise HTTPInternalServerError("An unexpected error occurred while trying to start monitoring graph measurements for service function chain {0}".format(sfc))
@view_config(route_name='graph_manage_pipeline', request_method='GET')
def get_graph_pipeline_status(self):
"""
An API endpoint to get the status of a monitoring graph pipeline script.
:return: A JSON response with the status of the background process.
"""
request_id = self.request.matchdict['request_id'] # get the UUID of the request from the URL
process_id = MonitoringProcess.get(request_id)
if process_id is None:
raise HTTPNotFound("A monitoring process with ID {0} couldn't be found.".format(request_id))
# create a process management class instance
try:
process_obj = Process(process_id)
status = process_obj.status()
log.info("Fetching process status with request ID {0} and process ID {1}, status - {2}".format(request_id, process_id, status))
response = {"status": status, "msg": "Successfully fetched status of graph pipeline process."}
except NoSuchProcess as e:
log.warning("Unexpected error occurred when trying to get a process that is registered in the CLMC service, but doesn't exist on the OS - {0}".format(e))
response = {"msg": "Monitoring process has been stopped or killed or terminated before this request was executed."}
return response
@view_config(route_name='graph_manage_pipeline', request_method='DELETE')
def stop_graph_pipeline(self):
"""
......@@ -430,13 +476,19 @@ class GraphAPI(object):
if process_id is None:
raise HTTPNotFound("A monitoring process with ID {0} couldn't be found.".format(request_id))
# create a process management class instance and terminate the process
try:
kill(process_id, SIGKILL)
log.info("Successfully stopped process with request ID {0} and process ID {1}".format(request_id, process_id))
process_obj = Process(process_id)
log.info("Terminating process with request ID {0} and process ID {1}, status before termination - {2}".format(request_id, process_id, process_obj.status()))
process_obj.terminate()
process_obj.wait(timeout=3)
response = {"msg": "Monitoring process has been successfully stopped."}
except OSError as e:
log.warning("Couldn't stop monitoring process with request ID {0} and process ID {1} due to error {2}".format(request_id, process_id, e))
response = {"msg": "Monitoring process has been stopped before this request was executed."}
except TimeoutExpired as e:
log.error("Unexpected error occurred while waiting for a graph pipeline process to terminate - {0}".format(e))
raise HTTPInternalServerError("Termination of monitoring process couldn't be completed.")
except NoSuchProcess as e:
log.warning("Unexpected error occurred when trying to get a process that is registered in the CLMC service, but doesn't exist on the OS - {0}".format(e))
response = {"msg": "Monitoring process has been stopped or killed or terminated before this request was executed."}
MonitoringProcess.delete(request_id)
......
......@@ -40,7 +40,7 @@ topology_template:
condition:
threshold: 5
granularity: 60
aggregation_method: first
aggregation_method: spread
resource_type:
flame_sfp: storage
flame_sf: storage-users
......
......@@ -41,7 +41,7 @@ topology_template:
condition:
threshold: 5
granularity: 60
aggregation_method: first
aggregation_method: stddev
# resource type missing - optional, so it is valid
comparison_operator: lte
action:
......
......@@ -66,6 +66,7 @@ requires = [
'tosca-parser==1.1.0',
'schema==0.6.8',
'requests==2.21.0',
'psutil==5.6.1',
'pytest==3.8.1'
]
......
__version__ = "2.0.4"
\ No newline at end of file
__version__ = "2.1.1"
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment