Skip to content
Snippets Groups Projects
LineProtocolGenerator.py 16.47 KiB
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road, 
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
##      Created By :            Michael Boniface
##      Created Date :          02-02-2018
##      Created for Project :   FLAME
"""

# line protocol

# Method to create a full InfluxDB request statement (based on partial statement from client)
import uuid
from random import randint


def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
    """
    Generates a combined averaged measurement about the e2e delay and its contributing parts

    :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
    :param source_sfr: source service router
    :param target_sfr: target service router
    :param endpoint: endpoint of the media component
    :param sf_instance: service function instance (media component)
    :param delay_forward: Path delay (Forward direction)
    :param delay_reverse: Path delay (Reverse direction)
    :param delay_service: the media service component response time
    :param avg_request_size: averaged request size
    :param avg_response_size: averaged response size
    :param avg_bandwidth: averaged bandwidth
    :param time: measurement timestamp
    :return: a list of dict-formatted reports to post on influx
    """

    result = [{"measurement": "e2e_delays",
               "tags": {
                   "path_ID": path_id,
                   "source_SFR": source_sfr,
                   "target_SFR": target_sfr,
                   "endpoint": endpoint,
                   "sf_instance": sf_instance
               },
               "fields": {
                   "delay_forward": float(delay_forward),
                   "delay_reverse": float(delay_reverse),
                   "delay_service": float(delay_service),
                   "avg_request_size": float(avg_request_size),
                   "avg_response_size": float(avg_response_size),
                   "avg_bandwidth": float(avg_bandwidth)
               },
               "time": _getNSTime(time)
               }]

    return result


def generate_network_delay_report(path_id, source_sfr, target_sfr, latency, bandwidth, time):
    """
    Generates a platform measurement about the network delay between two specific service routers.

    :param path_id: the identifier of the path between the two service routers
    :param source_sfr: the source service router
    :param target_sfr: the target service router
    :param latency: the e2e network delay for traversing the path between the two service routers
    :param bandwidth: the bandwidth of the path (minimum of bandwidths of the links it is composed of)
    :param time: the measurement timestamp
    :return: a list of dict-formatted reports to post on influx
    """

    result = [{"measurement": "network_delays",
               "tags": {
                   "path": path_id,
                   "source": source_sfr,
                   "target": target_sfr
               },
               "fields": {
                   "latency": latency,
                   "bandwidth": bandwidth
               },
               "time": _getNSTime(time)
               }]

    return result


def generate_service_delay_report(endpoint, sf_instance, sfr, response_time, request_size, response_size, time):
    """
    Generates a service measurement about the media service response time.

    :param endpoint: endpoint of the media component
    :param sf_instance: service function instance
    :param sfr: the service function router that connects the endpoint of the SF instance to the FLAME network
    :param response_time: the media service response time (this is not the response time for the whole round-trip, but only for the processing part of the media service component)
    :param request_size: the size of the request received by the service in Bytes
    :param response_size: the size of the response received by the service in Bytes
    :param time: the measurement timestamp
    :return: a list of dict-formatted reports to post on influx
    """

    result = [{"measurement": "service_delays",
               "tags": {
                   "endpoint": endpoint,
                   "sf_instance": sf_instance,
                   "sfr": sfr
               },
               "fields": {
                   "response_time": response_time,
                   "request_size": request_size,
                   "response_size": response_size
               },
               "time": _getNSTime(time)
               }]

    return result


# Reports TX and RX, scaling on requested quality
def generate_network_report(recieved_bytes, sent_bytes, time):
    result = [{"measurement": "net_port_io",
               "tags": {
                   "port_id": "enps03"
               },
               "fields": {
                   "RX_BYTES_PORT_M": recieved_bytes,
                   "TX_BYTES_PORT_M": sent_bytes
               },
               "time": _getNSTime(time)
               }]

    return result


# Reports cpu usage, scaling on requests
def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time):
    result = [{"measurement": "cpu_usage",
              "fields": {
                  "cpu_usage": cpu_usage,
                  "cpu_active_time": cpu_active_time,
                  "cpu_idle_time": cpu_idle_time
              },
              "time": _getNSTime(time)
              }]

    return result


# Reports response times, scaling on number of requests
def generate_mpegdash_report(resource, requests, avg_response_time, peak_response_time, time):
    result = [{"measurement": "mpegdash_service",
               "tags": {
                   "cont_nav": resource
               },
               "fields": {
                   "requests": requests,
                   "avg_response_time": avg_response_time,
                   "peak_response_time": peak_response_time
               },
               "time": _getNSTime(time)
               }]

    return result


# ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp
def generate_ipendpoint_route(resource, requests, latency, time):
    result = [{"measurement": "ipendpoint_route",
               "tags": {
                   "cont_nav": str(resource)
               },
               "fields": {
                   "http_requests_fqdn_m": requests,
                   "network_fqdn_latency": latency
               },
               "time": _getNSTime(time)
               }]

    return result


def generate_endpoint_config(time, cpu, mem, storage, current_state, current_state_time, **kwargs):
    """
    generates a measurement for a VM configuration states

    :param cpu: the number of CPUs of VM endpoint
    :param mem: memory of VM endpoint
    :param storage: storage capacity of VM endpoint
    :param current_state: the current state the endpoint is in (TAG)
    :param current_state_time: the part of the sampling period the endpoint was in the current state
    :param time: time of measurement
    :param kwargs: 'python-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value

    :return: dictionary object representing the data to post on influx
    """

    # lambda function to validate whether a state is given as a key in the keyword arguments dictionary
    validate = lambda key: kwargs.get(key) if key in kwargs else 0.0

    # generate and validate the state values
    fields = {"cpus": cpu, "memory": mem, "storage": storage, "current_state_time": current_state_time}   # NOTE: Do we need the cpus, memory and storage fields ?
    for state in ("unplaced", "placing", "placed", "booting", "booted", "connecting", "connected"):
        fields["{0}_sum".format(state)] = validate(state)
        fields[("{0}_mst".format(state))] = validate("{0}_mst".format(state))

    result = [{"measurement": "endpoint_config",
               "tags": {
                   "current_state": current_state
               },
               "fields": fields,
               "time": _getNSTime(time)}]

    return result


def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ):
    """
    generates a measurement line for a media component configuration state

    : time                - timestamp for the measurement
    : mcMeasurement       - measurement label
    : current_state       - the current state of the service configuration
    : current_state_time  - the current length of time in the current state
    : config_state_values - dictionary of media component configuration states (summed time and mean average over the sampling period)
    :                     - stopped, starting, running, stopping [use '_sum' and '_mst' for sum and average respectively]
    """

    # define state value validation function (inserting key/0 where key is supplied)
    validate_f = lambda key: config_state_values.get(key) if key in config_state_values else 0.0

    # define expected keys
    state_keys = [ "stopped_sum",  "stopped_mst", 
                   "starting_sum", "starting_mst",
                   "running_sum",  "running_mst",
                   "stopping_sum", "stopping_mst" ]

    # define current state time first
    fields = {}
    fields["current_state_time"] = current_state_time

    # then add in validated state values
    for key in state_keys :
        fields[key] = validate_f(key)

    # compose result
    result = [{ "measurement" : mcMeasurement,
                "tags"        : { "current_state" : current_state },
                "fields"      : fields,
                "time"        : _getNSTime(time)
             }]

    return result

# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
    # Convert to nano-seconds
    timestamp = int(1000000000*time)

    return timestamp


# DEPRECATED
# ____________________________________________________________________________

# DEPRECATED: old structure, not part of new spec

# Influx needs strings to be quoted, this provides a utility interface to do this
def quote_wrap(string):
    return "\"" + string + "\""


# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
    result = [{"measurement": "vm_res_alloc",
               "tags": {
                   "vm_state": state
               },
               "fields": {
                   "cpu": cpu,
                   "memory": mem,
                   "storage": storage
               },
               "time": _getNSTime(time)
               }]

    return result


def _generateClientRequest(cReq, id, time):
    # Tags first
    result = 'sid="' + str(id) + '",' + cReq

    # Fields
    # No additional fields here yet

    # Timestamp
    result += ' ' + str(_getNSTime(time))

    # Measurement
    return 'request,' + result


# Method to create a full InfluxDB response statement
# DEPRECATED: old structure, not part of new spec
def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference):
    # Tags first
    result = ' '

    # Fields
    result += 'quality=' + str(quality) + ','
    result += 'cpuUsage=' + str(cpuUsage) + ','
    result += 'qualityDifference=' + str(qualityDifference) + ','
    result += 'requestID="' + str(reqID) + '",'
    result += 'index="' + str(uuid.uuid4()) + '"'

    # Timestamp
    result += ' ' + str(_getNSTime(time))

    # Measurement
    # print('response'+result)
    return 'response' + result


# Formats server config
def _generateServerConfig(ID, location, cpu, mem, storage, time):
    # metric
    result = 'host_resource'
    # Tags
    result += ',slice_id=' + quote_wrap(ID)
    result += ',location=' + quote_wrap(location)
    result += ' '
    # Fields
    result += 'cpu=' + str(cpu)
    result += ',memory=' + quote_wrap(mem)
    result += ',storage=' + quote_wrap(storage)

    # Time
    result += ' ' + str(_getNSTime(time))

    print(result)
    return result


# Format port config
def _configure_port(port_id, state, rate, time):
    # metric
    result = 'net_port_config '
    # Fields
    result += 'port_id=' + quote_wrap('enps' + port_id)
    result += ',port_state=' + quote_wrap(state)
    result += ',tx_constraint=' + quote_wrap(rate)
    result += ' '

    # Time
    result += ' ' + str(_getNSTime(time))

    print(result)
    return result


# Format service function config
def _configure_service_function(state, max_connected_clients):
    # measurement
    result = 'mpegdash_service_config'
    # tags
    result += ',service_state='+quote_wrap(state)
    result += ' '
    # fields
    result += 'max_connected_clients='+str(max_connected_clients)

    return result


# Reports memory usage, scaling on requests
def generate_mem_report(requests, total_mem, time):
    # Measurement
    result = 'mem'
    result += ' '
    # field
    used = randint(0, min(100, 5*requests))
    available = 100-used
    result += 'available_percent='+str(available)
    result += ',used_percent='+str(used)
    result += ',total='+str(total_mem)
    result += ' '
    # Time
    result += str(_getNSTime(time))
    print(result)
    return result


# Formats compute node config
def generate_compute_node_config(slice_id, location, node_id, cpus, mem, storage, time):
    # Measurement
    result = 'compute_node_config'
    # CommonContext Tag
    result += ',slide_id='+quote_wrap(slice_id)
    # Tag
    result += ',location='+quote_wrap(location)
    result += ',comp_node_id='+quote_wrap(node_id)
    result += ' '
    # field
    result += 'cpus='+str(cpus)
    result += ',memory='+str(mem)
    result += ',storage='+str(storage)
    result += ' '
    # Time
    result += str(_getNSTime(time))
    print(result)
    return result


# Formats network resource config
def generate_network_resource_config(slice_id, network_id, bandwidth, time):
    # Measurement
    result = 'network_resource_config'
    # Meta Tag
    result += ',slice_id='+quote_wrap(slice_id)
    # Tag
    result += 'network_id='+quote_wrap(network_id)
    result += ' '
    # field
    result += 'bandwidth='+str(bandwidth)
    result += ' '
    # Time
    result += str(_getNSTime(time))
    print(result)
    return result


# Formats network interface config
def generate_network_interface_config(slice_id, comp_node_id, port_id, rx_constraint, tx_constraint, time):
    # Measurement
    result = 'network_interface_config'
    # Meta Tag
    result += ',slice_id'+quote_wrap(slice_id)
    # Tags
    result += ',comp_node_id='+quote_wrap(comp_node_id)
    result += ',port_id='+quote_wrap(port_id)
    result += ' '
    # field
    result += 'rx_constraint='+str(rx_constraint)
    result += ',tx_constraint='+str(tx_constraint)
    result += ' '
    # Time
    result += str(_getNSTime(time))
    print(result)
    return result


# Format SF instance config
def generate_sf_instance_surrogate_config(loc, sfc, sfc_i, sf_package, sf_i, cpus, mem, storage, time):
    # Measurement
    result = 'sf_instance_surrogate_config'
    # Meta Tag
    result += ',location'+quote_wrap(loc)
    result += ',sfc'+quote_wrap(sfc)
    result += ',sfc_i'+quote_wrap(sfc_i)
    result += ',sf_package'+quote_wrap(sf_package)
    result += ',sf_i'+quote_wrap(sf_i)
    result += ' '
    # field
    result += 'cpus='+str(cpus)
    result += ',memory='+str(mem)
    result += ',storage='+str(storage)
    result += ' '
    # Time
    result += str(_getNSTime(time))
    print(result)
    return result


# Formats context container as part of other line protocol generators
def service_function_measurement(measurement, service_function_context):
    result = measurement
    result += ',sfc'+quote_wrap(service_function_context.sfc)
    result += ',sfc_i'+quote_wrap(service_function_context.sfc_i)
    result += ',sf_package'+quote_wrap(service_function_context.sf_package)
    result += ',sf_i'+quote_wrap(service_function_context.sf_i)

    return result