Skip to content
Snippets Groups Projects
Commit fdc0505d authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Graph API tests

parent 9a8ebe57
No related branches found
No related tags found
No related merge requests found
......@@ -33,5 +33,5 @@ cd `dirname $0`
echo "Provisioning CLMC service"
./install-tick-stack.sh $@
./install-clmc-service.sh $@
./install-neo4j.sh $@
./install-clmc-service.sh $@
\ No newline at end of file
import getopt
import sys
from itertools import permutations
from influxdb import InfluxDBClient
from json import load
from py2neo import Graph, Node, Relationship
def generate_network_measurements(influx_host, db_name, network_config_path):
def report_network_measurements(influx_host, db_name, json_data, neo4j_host, neo4j_password):
"""
Generates network measurements which follow the telegraf ping plugin format.
:param influx_host: influx DB host
:param db_name: name of database
:param network_config_path the path to the network configuration file
:param json_data: the network configuration data
:param neo4j_host: the neo4j db host
:param neo4j_password: the neo4j db password
"""
with open(network_config_path) as fh:
json_data = load(fh)
# declares the data to push to influx - host, url, avg_response_ms, min_response_ms, max_response_ms
data = ((link["source"], link["target"], link["avg_response_time"], link["min_response_time"], link["max_response_time"]) for link in json_data["links"])
data = tuple((link["source"], link["target"], link["avg_response_time"], link["min_response_time"], link["max_response_time"]) for link in json_data["links"])
json_body = [
{"measurement": "ping",
......@@ -36,6 +36,40 @@ def generate_network_measurements(influx_host, db_name, network_config_path):
print("Writing network latency data to influx..\n")
assert db_client.write_points(json_body) # assert the write method returns True - successful write
graph = Graph(host=neo4j_host, password=neo4j_password, port=11005)
print("Building network links from the ping telegraf plugin in influx")
compute_nodes = set([host for host, url, avg_ms, min_ms, max_ms in data])
# retrieve all network latencies available from the influx ping table
for network_link in permutations(compute_nodes, 2):
from_node_name, to_node_name = network_link
from_node = graph.nodes.match("ComputeNode", name=from_node_name).first()
if from_node is None:
from_node = Node("ComputeNode", name=from_node_name)
graph.create(from_node)
to_node = graph.nodes.match("ComputeNode", name=to_node_name).first()
if to_node is None:
to_node = Node("ComputeNode", name=to_node_name)
graph.create(to_node)
# query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\' and time>={2} and time<{3}'.format(from_node['name'], to_node['name'], from_timestamp, to_timestamp)
# In future when latencies are reported continuously, we should put timestamp filtering in the query for network links
query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\''.format(from_node['name'], to_node['name'])
print("Executing query: {0}".format(query))
result = db_client.query(query) # execute the query
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
try:
actual_result = next(result.get_points())
latency = actual_result.get("mean_average_response_ms")
if graph.relationships.match(nodes=(from_node, to_node), r_type="linkedTo").first() is None:
edge = Relationship(from_node, "linkedTo", to_node, latency=latency)
graph.create(edge)
except StopIteration:
# in this case there is no such link reported to Influx
print("There is no direct link between {0} and {1}".format(from_node, to_node))
if __name__ == "__main__":
try:
......@@ -59,4 +93,7 @@ if __name__ == "__main__":
path = arg
if all([db_host is not None, database is not None, path is not None]):
generate_network_measurements(db_host, database, path)
with open(path) as fh:
json_data = load(fh)
report_network_measurements(db_host, database, json_data, db_host, "admin")
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 09-07-2018
// Created for Project : FLAME
"""
import pytest
from influxdb import InfluxDBClient
from clmcservice.generate_network_measurements import report_network_measurements
from py2neo import Graph
network_config = {
"links": [
{
"source": "DC1",
"target": "DC2",
"min_response_time": 10,
"max_response_time": 20,
"avg_response_time": 15
},
{
"source": "DC2",
"target": "DC1",
"min_response_time": 16,
"max_response_time": 28,
"avg_response_time": 22
},
{
"source": "DC1",
"target": "DC3",
"min_response_time": 17,
"max_response_time": 19,
"avg_response_time": 18
},
{
"source": "DC3",
"target": "DC1",
"min_response_time": 15,
"max_response_time": 25,
"avg_response_time": 20
},
{
"source": "DC1",
"target": "DC5",
"min_response_time": 27,
"max_response_time": 33,
"avg_response_time": 30
},
{
"source": "DC5",
"target": "DC1",
"min_response_time": 10,
"max_response_time": 42,
"avg_response_time": 26
},
{
"source": "DC2",
"target": "DC4",
"min_response_time": 11,
"max_response_time": 29,
"avg_response_time": 20
},
{
"source": "DC4",
"target": "DC2",
"min_response_time": 12,
"max_response_time": 40,
"avg_response_time": 26
},
{
"source": "DC3",
"target": "DC4",
"min_response_time": 23,
"max_response_time": 27,
"avg_response_time": 25
},
{
"source": "DC4",
"target": "DC3",
"min_response_time": 12,
"max_response_time": 18,
"avg_response_time": 15
},
{
"source": "DC5",
"target": "DC6",
"min_response_time": 3,
"max_response_time": 15,
"avg_response_time": 9
},
{
"source": "DC6",
"target": "DC5",
"min_response_time": 11,
"max_response_time": 11,
"avg_response_time": 11
},
]
}
@pytest.fixture(scope='module', autouse=True)
def db_testing_data():
"""
This fixture generates some testing data in influx to be used for testing, after which it clears up the DB.
:return: a pair of time stamps defining the from-to range for which the test data is reported
"""
global network_config
test_db_name = "TestInfluxDB"
influx = InfluxDBClient(host="localhost", port=8086, timeout=10)
graph = Graph(host="localhost", password="admin", port="11005")
graph.delete_all()
dbs = influx.get_list_database()
if "CLMCMetrics" not in dbs:
influx.create_database("CLMCMetrics")
report_network_measurements("localhost", "CLMCMetrics", network_config, "localhost", "admin")
if test_db_name in dbs:
influx.drop_database(test_db_name)
influx.create_database(test_db_name)
influx.switch_database(test_db_name)
from_timestamp = 1528385860
to_timestamp = 1528685860
data = [
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 5, 20, 1528385860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 8, 35, 1528385860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 15, 1528389860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 10, 23, 1528389860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 17, 1528395860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 15, 11, 1528395860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 17, 23, 1528485860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 19, 24, 1528485860),
("host4", "nginx_1_ep1", "DC4", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 16, 1528545860),
("host6", "nginx_1_ep2", "DC6", "nginx", "nginx_1", "test_sfc1", "test_sfc1_1", "sr6", 20, 18, 1528545860)
]
influx.write_points([
{"measurement": "nginx",
"tags": {"host": host, "ipendpoint": endpoint, "location": location, "sf": sf, "sf_i": sf_i, "sfc": sfc, "sfc_i": sfc_i, "sr": sr},
"fields": {"requests": num_requests, "avg_processing_time": processing_time},
"time": timestamp * 10 ** 9
} for host, endpoint, location, sf, sf_i, sfc, sfc_i, sr, num_requests, processing_time, timestamp in data
])
data = [
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 12, 86, 1528386860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 15, 75, 1528386860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 105, 1528388860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 60, 1528388860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 11, 121, 1528410860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 12, 154, 1528410860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 14, 84, 1528412860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 5, 45, 1528412860),
("host4", "minio_1_ep1", "DC4", "minio", "minio_1", "test_sfc1", "test_sfc1_1", "sr4", 7, 63, 1528414860),
("host5", "minio_2_ep1", "DC5", "minio", "minio_2", "test_sfc2", "test_sfc2_1", "sr5", 16, 86, 1528414860)
]
influx.write_points([
{"measurement": "minio_http",
"tags": {"host": host, "ipendpoint": endpoint, "location": location, "sf": sf, "sf_i": sf_i, "sfc": sfc, "sfc_i": sfc_i, "sr": sr},
"fields": {"total_requests_count": num_requests, "total_processing_time": processing_time},
"time": timestamp * 10 ** 9
} for host, endpoint, location, sf, sf_i, sfc, sfc_i, sr, num_requests, processing_time, timestamp in data
])
data = [
("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 15, 1528386860),
("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 17, 1528388860),
("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 19, 1528410860),
("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 24, 1528412860),
("host5", "apache_1_ep1", "DC5", "apache", "apache_1", "test_sfc2", "test_sfc2_1", "sr5", 13, 1528414860),
]
influx.write_points([
{"measurement": "apache",
"tags": {"host": host, "ipendpoint": endpoint, "location": location, "sf": sf, "sf_i": sf_i, "sfc": sfc, "sfc_i": sfc_i, "sr": sr},
"fields": {"avg_processing_time": processing_time},
"time": timestamp * 10 ** 9
} for host, endpoint, location, sf, sf_i, sfc, sfc_i, sr, processing_time, timestamp in data
])
yield from_timestamp, to_timestamp, test_db_name, graph
influx.switch_database("CLMCMetrics")
influx.drop_measurement("ping")
influx.drop_database("TestInfluxDB")
graph.delete_all()
This diff is collapsed.
......@@ -22,7 +22,6 @@
// Created for Project : FLAME
"""
from itertools import permutations
from json import loads
from py2neo import Node, Relationship
import logging
......@@ -71,6 +70,8 @@ def validate_json_queries_body(body):
assert GRAPH_BUILD_QUERY_PARAMS == set(body.keys()), "Invalid JSON query document."
assert type(body["service_functions"]) == dict, "The service function description should be represented with a dictionary."
for sf in body["service_functions"]:
query_data = body["service_functions"][sf]
assert type(query_data) == dict, "Each service function must be associated with a respective JSON object."
......@@ -231,25 +232,6 @@ def build_temporal_graph(request_id, from_timestamp, to_timestamp, json_queries,
compute_nodes.add(compute_node) # add the compute node to the set of compute nodes
log.info("Building network links from the ping telegraf plugin in influx")
# retrieve all network latencies available from the influx ping table
for network_link in permutations(compute_nodes, 2):
from_node, to_node = network_link
# query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\' and time>={2} and time<{3}'.format(from_node['name'], to_node['name'], from_timestamp, to_timestamp)
# In future when latencies are reported continuously, we should put timestamp filtering in the query for network links
query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\''.format(from_node['name'], to_node['name'])
log.info("Executing query: {0}".format(query))
result = influx_client.query(query) # execute the query
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
try:
actual_result = next(result.get_points())
latency = actual_result.get("mean_average_response_ms")
find_or_create_edge(graph, "linkedTo", from_node, to_node, latency=latency)
except StopIteration:
# in this case there is no such link reported to Influx
log.info("There is no direct link between {0} and {1}".format(from_node, to_node))
log.info("Finished building graph for service function chain {0} from database {1} with retention policy {2}".format(sfc_i, db, rp))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment