-
Nikolay Stanchev authoredNikolay Stanchev authored
generate_network_measurements.py 4.83 KiB
import getopt
import sys
from itertools import permutations
from influxdb import InfluxDBClient
from json import load
from py2neo import Graph, Node, Relationship
def report_network_measurements(influx_host, db_name, json_data, neo4j_host, neo4j_password):
"""
Generates network measurements which follow the telegraf ping plugin format.
:param influx_host: influx DB host
:param db_name: name of database
:param json_data: the network configuration data
:param neo4j_host: the neo4j db host
:param neo4j_password: the neo4j db password
"""
# declares the data to push to influx - host, url, avg_response_ms, min_response_ms, max_response_ms
data = tuple((link["source"], link["target"], link["avg_response_time"], link["min_response_time"], link["max_response_time"]) for link in json_data["links"])
json_body = [
{"measurement": "ping",
"tags": {"host": host, "url": url},
"fields": {"packets_transmitted": 10, "reply_received": 10, "packets_received": 10,
"percent_reply_loss": 0, "percent_packets_loss": 0, "errors": 0, "average_response_ms": avg_ms,
"minimum_response_ms": min_ms, "maximum_response_ms": max_ms, "result_code": 0},
"time": 1528385860 * 10**9
} for host, url, avg_ms, min_ms, max_ms in data
]
print("Establishing connection with influx DB on {0} with database {1}".format(influx_host, db_name))
db_client = InfluxDBClient(host=influx_host, timeout=10, database=db_name)
db_client.drop_measurement("ping") # clear data in the ping measurement from previous executions of this script
print("Writing network latency data to influx..\n")
assert db_client.write_points(json_body) # assert the write method returns True - successful write
graph = Graph(host=neo4j_host, password=neo4j_password, port=11005)
print("Building network links from the ping telegraf plugin in influx")
compute_nodes = set([host for host, url, avg_ms, min_ms, max_ms in data])
# retrieve all network latencies available from the influx ping table
for network_link in permutations(compute_nodes, 2):
from_node_name, to_node_name = network_link
from_node = graph.nodes.match("ComputeNode", name=from_node_name).first()
if from_node is None:
from_node = Node("ComputeNode", name=from_node_name)
graph.create(from_node)
to_node = graph.nodes.match("ComputeNode", name=to_node_name).first()
if to_node is None:
to_node = Node("ComputeNode", name=to_node_name)
graph.create(to_node)
# query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\' and time>={2} and time<{3}'.format(from_node['name'], to_node['name'], from_timestamp, to_timestamp)
# In future when latencies are reported continuously, we should put timestamp filtering in the query for network links
query = 'SELECT mean(*) FROM "CLMCMetrics"."autogen"."ping" WHERE host=\'{0}\' and url=\'{1}\''.format(from_node['name'], to_node['name'])
print("Executing query: {0}".format(query))
result = db_client.query(query) # execute the query
# get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
try:
actual_result = next(result.get_points())
latency = actual_result.get("mean_average_response_ms")
if graph.relationships.match(nodes=(from_node, to_node), r_type="linkedTo").first() is None:
edge = Relationship(from_node, "linkedTo", to_node, latency=latency)
graph.create(edge)
except StopIteration:
# in this case there is no such link reported to Influx
print("There is no direct link between {0} and {1}".format(from_node, to_node))
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h:d:p:", ['host=', 'database=', 'path='])
except getopt.GetoptError:
print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>')
sys.exit(1)
if len(opts) != 3:
print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>')
sys.exit(1)
db_host, database, path = None, None, None
# Apply options, if any
for opt, arg in opts:
if opt in ('-h', '--host'):
db_host = arg
elif opt in ('-d', '--database'):
database = arg
elif opt in ('-p', '--path'):
path = arg
if all([db_host is not None, database is not None, path is not None]):
with open(path) as fh:
json_data = load(fh)
report_network_measurements(db_host, database, json_data, db_host, "admin")