LineProtocolGenerator.py 16.47 KiB
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Michael Boniface
## Created Date : 02-02-2018
## Created for Project : FLAME
"""
# line protocol
# Method to create a full InfluxDB request statement (based on partial statement from client)
import uuid
from random import randint
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_sfr: source service router
:param target_sfr: target service router
:param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "e2e_delays",
"tags": {
"path_ID": path_id,
"source_SFR": source_sfr,
"target_SFR": target_sfr,
"endpoint": endpoint,
"sf_instance": sf_instance
},
"fields": {
"delay_forward": float(delay_forward),
"delay_reverse": float(delay_reverse),
"delay_service": float(delay_service),
"avg_request_size": float(avg_request_size),
"avg_response_size": float(avg_response_size),
"avg_bandwidth": float(avg_bandwidth)
},
"time": _getNSTime(time)
}]
return result
def generate_network_delay_report(path_id, source_sfr, target_sfr, latency, bandwidth, time):
"""
Generates a platform measurement about the network delay between two specific service routers.
:param path_id: the identifier of the path between the two service routers
:param source_sfr: the source service router
:param target_sfr: the target service router
:param latency: the e2e network delay for traversing the path between the two service routers
:param bandwidth: the bandwidth of the path (minimum of bandwidths of the links it is composed of)
:param time: the measurement timestamp
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "network_delays",
"tags": {
"path": path_id,
"source": source_sfr,
"target": target_sfr
},
"fields": {
"latency": latency,
"bandwidth": bandwidth
},
"time": _getNSTime(time)
}]
return result
def generate_service_delay_report(endpoint, sf_instance, sfr, response_time, request_size, response_size, time):
"""
Generates a service measurement about the media service response time.
:param endpoint: endpoint of the media component
:param sf_instance: service function instance
:param sfr: the service function router that connects the endpoint of the SF instance to the FLAME network
:param response_time: the media service response time (this is not the response time for the whole round-trip, but only for the processing part of the media service component)
:param request_size: the size of the request received by the service in Bytes
:param response_size: the size of the response received by the service in Bytes
:param time: the measurement timestamp
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "service_delays",
"tags": {
"endpoint": endpoint,
"sf_instance": sf_instance,
"sfr": sfr
},
"fields": {
"response_time": response_time,
"request_size": request_size,
"response_size": response_size
},
"time": _getNSTime(time)
}]
return result
# Reports TX and RX, scaling on requested quality
def generate_network_report(recieved_bytes, sent_bytes, time):
result = [{"measurement": "net_port_io",
"tags": {
"port_id": "enps03"
},
"fields": {
"RX_BYTES_PORT_M": recieved_bytes,
"TX_BYTES_PORT_M": sent_bytes
},
"time": _getNSTime(time)
}]
return result
# Reports cpu usage, scaling on requests
def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time):
result = [{"measurement": "cpu_usage",
"fields": {
"cpu_usage": cpu_usage,
"cpu_active_time": cpu_active_time,
"cpu_idle_time": cpu_idle_time
},
"time": _getNSTime(time)
}]
return result
# Reports response times, scaling on number of requests
def generate_mpegdash_report(resource, requests, avg_response_time, peak_response_time, time):
result = [{"measurement": "mpegdash_service",
"tags": {
"cont_nav": resource
},
"fields": {
"requests": requests,
"avg_response_time": avg_response_time,
"peak_response_time": peak_response_time
},
"time": _getNSTime(time)
}]
return result
# ipendpoint_route,ipendpoint_id,cont_nav=FQDN HTTP_REQUESTS_FQDN_M, NETWORK_FQDN_LATENCY timestamp
def generate_ipendpoint_route(resource, requests, latency, time):
result = [{"measurement": "ipendpoint_route",
"tags": {
"cont_nav": str(resource)
},
"fields": {
"http_requests_fqdn_m": requests,
"network_fqdn_latency": latency
},
"time": _getNSTime(time)
}]
return result
def generate_endpoint_config(time, cpu, mem, storage, current_state, current_state_time, **kwargs):
"""
generates a measurement for a VM configuration states
:param cpu: the number of CPUs of VM endpoint
:param mem: memory of VM endpoint
:param storage: storage capacity of VM endpoint
:param current_state: the current state the endpoint is in (TAG)
:param current_state_time: the part of the sampling period the endpoint was in the current state
:param time: time of measurement
:param kwargs: 'python-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value
:return: dictionary object representing the data to post on influx
"""
# lambda function to validate whether a state is given as a key in the keyword arguments dictionary
validate = lambda key: kwargs.get(key) if key in kwargs else 0.0
# generate and validate the state values
fields = {"cpus": cpu, "memory": mem, "storage": storage, "current_state_time": current_state_time} # NOTE: Do we need the cpus, memory and storage fields ?
for state in ("unplaced", "placing", "placed", "booting", "booted", "connecting", "connected"):
fields["{0}_sum".format(state)] = validate(state)
fields[("{0}_mst".format(state))] = validate("{0}_mst".format(state))
result = [{"measurement": "endpoint_config",
"tags": {
"current_state": current_state
},
"fields": fields,
"time": _getNSTime(time)}]
return result
def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ):
"""
generates a measurement line for a media component configuration state
: time - timestamp for the measurement
: mcMeasurement - measurement label
: current_state - the current state of the service configuration
: current_state_time - the current length of time in the current state
: config_state_values - dictionary of media component configuration states (summed time and mean average over the sampling period)
: - stopped, starting, running, stopping [use '_sum' and '_mst' for sum and average respectively]
"""
# define state value validation function (inserting key/0 where key is supplied)
validate_f = lambda key: config_state_values.get(key) if key in config_state_values else 0.0
# define expected keys
state_keys = [ "stopped_sum", "stopped_mst",
"starting_sum", "starting_mst",
"running_sum", "running_mst",
"stopping_sum", "stopping_mst" ]
# define current state time first
fields = {}
fields["current_state_time"] = current_state_time
# then add in validated state values
for key in state_keys :
fields[key] = validate_f(key)
# compose result
result = [{ "measurement" : mcMeasurement,
"tags" : { "current_state" : current_state },
"fields" : fields,
"time" : _getNSTime(time)
}]
return result
# InfluxDB likes to have time-stamps in nanoseconds
def _getNSTime(time):
# Convert to nano-seconds
timestamp = int(1000000000*time)
return timestamp
# DEPRECATED
# ____________________________________________________________________________
# DEPRECATED: old structure, not part of new spec
# Influx needs strings to be quoted, this provides a utility interface to do this
def quote_wrap(string):
return "\"" + string + "\""
# Formats VM config
def generate_vm_config(state, cpu, mem, storage, time):
result = [{"measurement": "vm_res_alloc",
"tags": {
"vm_state": state
},
"fields": {
"cpu": cpu,
"memory": mem,
"storage": storage
},
"time": _getNSTime(time)
}]
return result
def _generateClientRequest(cReq, id, time):
# Tags first
result = 'sid="' + str(id) + '",' + cReq
# Fields
# No additional fields here yet
# Timestamp
result += ' ' + str(_getNSTime(time))
# Measurement
return 'request,' + result
# Method to create a full InfluxDB response statement
# DEPRECATED: old structure, not part of new spec
def _generateServerResponse(reqID, quality, time, cpuUsage, qualityDifference):
# Tags first
result = ' '
# Fields
result += 'quality=' + str(quality) + ','
result += 'cpuUsage=' + str(cpuUsage) + ','
result += 'qualityDifference=' + str(qualityDifference) + ','
result += 'requestID="' + str(reqID) + '",'
result += 'index="' + str(uuid.uuid4()) + '"'
# Timestamp
result += ' ' + str(_getNSTime(time))
# Measurement
# print('response'+result)
return 'response' + result
# Formats server config
def _generateServerConfig(ID, location, cpu, mem, storage, time):
# metric
result = 'host_resource'
# Tags
result += ',slice_id=' + quote_wrap(ID)
result += ',location=' + quote_wrap(location)
result += ' '
# Fields
result += 'cpu=' + str(cpu)
result += ',memory=' + quote_wrap(mem)
result += ',storage=' + quote_wrap(storage)
# Time
result += ' ' + str(_getNSTime(time))
print(result)
return result
# Format port config
def _configure_port(port_id, state, rate, time):
# metric
result = 'net_port_config '
# Fields
result += 'port_id=' + quote_wrap('enps' + port_id)
result += ',port_state=' + quote_wrap(state)
result += ',tx_constraint=' + quote_wrap(rate)
result += ' '
# Time
result += ' ' + str(_getNSTime(time))
print(result)
return result
# Format service function config
def _configure_service_function(state, max_connected_clients):
# measurement
result = 'mpegdash_service_config'
# tags
result += ',service_state='+quote_wrap(state)
result += ' '
# fields
result += 'max_connected_clients='+str(max_connected_clients)
return result
# Reports memory usage, scaling on requests
def generate_mem_report(requests, total_mem, time):
# Measurement
result = 'mem'
result += ' '
# field
used = randint(0, min(100, 5*requests))
available = 100-used
result += 'available_percent='+str(available)
result += ',used_percent='+str(used)
result += ',total='+str(total_mem)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats compute node config
def generate_compute_node_config(slice_id, location, node_id, cpus, mem, storage, time):
# Measurement
result = 'compute_node_config'
# CommonContext Tag
result += ',slide_id='+quote_wrap(slice_id)
# Tag
result += ',location='+quote_wrap(location)
result += ',comp_node_id='+quote_wrap(node_id)
result += ' '
# field
result += 'cpus='+str(cpus)
result += ',memory='+str(mem)
result += ',storage='+str(storage)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats network resource config
def generate_network_resource_config(slice_id, network_id, bandwidth, time):
# Measurement
result = 'network_resource_config'
# Meta Tag
result += ',slice_id='+quote_wrap(slice_id)
# Tag
result += 'network_id='+quote_wrap(network_id)
result += ' '
# field
result += 'bandwidth='+str(bandwidth)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats network interface config
def generate_network_interface_config(slice_id, comp_node_id, port_id, rx_constraint, tx_constraint, time):
# Measurement
result = 'network_interface_config'
# Meta Tag
result += ',slice_id'+quote_wrap(slice_id)
# Tags
result += ',comp_node_id='+quote_wrap(comp_node_id)
result += ',port_id='+quote_wrap(port_id)
result += ' '
# field
result += 'rx_constraint='+str(rx_constraint)
result += ',tx_constraint='+str(tx_constraint)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Format SF instance config
def generate_sf_instance_surrogate_config(loc, sfc, sfc_i, sf_package, sf_i, cpus, mem, storage, time):
# Measurement
result = 'sf_instance_surrogate_config'
# Meta Tag
result += ',location'+quote_wrap(loc)
result += ',sfc'+quote_wrap(sfc)
result += ',sfc_i'+quote_wrap(sfc_i)
result += ',sf_package'+quote_wrap(sf_package)
result += ',sf_i'+quote_wrap(sf_i)
result += ' '
# field
result += 'cpus='+str(cpus)
result += ',memory='+str(mem)
result += ',storage='+str(storage)
result += ' '
# Time
result += str(_getNSTime(time))
print(result)
return result
# Formats context container as part of other line protocol generators
def service_function_measurement(measurement, service_function_context):
result = measurement
result += ',sfc'+quote_wrap(service_function_context.sfc)
result += ',sfc_i'+quote_wrap(service_function_context.sfc_i)
result += ',sf_package'+quote_wrap(service_function_context.sf_package)
result += ',sf_i'+quote_wrap(service_function_context.sf_i)
return result