Skip to content
Snippets Groups Projects
generate_network_measurements.py 5.77 KiB
#!/usr/bin/python3
"""
// © University of Southampton IT Innovation Centre, 2018
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
//      Created By :            Nikolay Stanchev
//      Created Date :          25-06-2018
//      Created for Project :   FLAME
"""
import getopt
import sys
from itertools import permutations
from influxdb import InfluxDBClient
from json import load
from py2neo import Graph, Node, Relationship


def report_network_measurements(influx_host, db_name, json_data, neo4j_host, neo4j_password):
    """
    Generates network measurements which follow the telegraf ping plugin format.

    :param influx_host: influx DB host
    :param db_name: name of database
    :param json_data: the network configuration data
    :param neo4j_host: the neo4j db host
    :param neo4j_password: the neo4j db password
    """

    # declares the data to push to influx - host, url, avg_response_ms, min_response_ms, max_response_ms
    data = tuple((link["source"], link["target"], link["avg_response_time"], link["min_response_time"], link["max_response_time"]) for link in json_data["links"])

    json_body = [
        {"measurement": "ping",
         "tags": {"host": host, "url": url},
         "fields": {"packets_transmitted": 10, "reply_received": 10, "packets_received": 10,
                    "percent_reply_loss": 0, "percent_packets_loss": 0, "errors": 0, "average_response_ms": avg_ms,
                    "minimum_response_ms": min_ms, "maximum_response_ms": max_ms, "result_code": 0},
         "time": 1528385860 * 10**9
         } for host, url, avg_ms, min_ms, max_ms in data
    ]

    print("Establishing connection with influx DB on {0} with database {1}".format(influx_host, db_name))
    db_client = InfluxDBClient(host=influx_host, timeout=10, database=db_name)
    db_client.drop_measurement("ping")  # clear data in the ping measurement from previous executions of this script
    print("Writing network latency data to influx..\n")
    assert db_client.write_points(json_body)  # assert the write method returns True - successful write

    graph = Graph(host=neo4j_host, password=neo4j_password)

    print("Building network links from the ping telegraf plugin in influx")
    compute_nodes = set([host for host, url, avg_ms, min_ms, max_ms in data])
    # retrieve all network latencies available from the influx ping table
    for network_link in permutations(compute_nodes, 2):
        from_node_name, to_node_name = network_link
        from_node = graph.nodes.match("ComputeNode", name=from_node_name).first()
        if from_node is None:
            from_node = Node("ComputeNode", name=from_node_name)
            graph.create(from_node)

        to_node = graph.nodes.match("ComputeNode", name=to_node_name).first()
        if to_node is None:
            to_node = Node("ComputeNode", name=to_node_name)
            graph.create(to_node)

        # query = 'SELECT mean(*) FROM "{0}"."autogen"."ping" WHERE host=\'{1}\' and url=\'{2}\' and time>={3} and time<{4}'.format(db_name, from_node['name'], to_node['name'], from_timestamp, to_timestamp)
        # In future when latencies are reported continuously, we should put timestamp filtering in the query for network links
        query = 'SELECT mean(*) FROM "{0}"."autogen"."ping" WHERE host=\'{1}\' and url=\'{2}\''.format(db_name, from_node['name'], to_node['name'])
        print("Executing query: {0}".format(query))

        result = db_client.query(query)  # execute the query
        # get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
        try:
            actual_result = next(result.get_points())
            latency = actual_result.get("mean_average_response_ms")/2
            if graph.relationships.match(nodes=(from_node, to_node), r_type="linkedTo").first() is None:
                edge = Relationship(from_node, "linkedTo", to_node, latency=latency)
                graph.create(edge)
        except StopIteration:
            # in this case there is no such link reported to Influx
            print("There is no direct link between {0} and {1}".format(from_node, to_node))


if __name__ == "__main__":
    try:
        opts, args = getopt.getopt(sys.argv[1:], "h:d:p:", ['host=', 'database=', 'path='])
    except getopt.GetoptError:
        print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>')
        sys.exit(1)

    if len(opts) != 3:
        print('generate_network_measurements.py -h <influx host> -d <influx database> -p <network configuration file path>')
        sys.exit(1)

    db_host, database, path = None, None, None
    # Apply options, if any
    for opt, arg in opts:
        if opt in ('-h', '--host'):
            db_host = arg
        elif opt in ('-d', '--database'):
            database = arg
        elif opt in ('-p', '--path'):
            path = arg

    if all([db_host is not None, database is not None, path is not None]):
        with open(path) as fh:
            json_data = load(fh)

        report_network_measurements(db_host, database, json_data, db_host, "admin")