Skip to content
Snippets Groups Projects
Commit c68a16cd authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Merge branch 'aggregation' into docs

parents 90c88ca7 06d716eb
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
from threading import Thread, Event
import clmctest.monitoring.LineProtocolGenerator as lp
class Aggregator(Thread):
"""
A class used to perform the aggregation feature of the CLMC - aggregation network and media service measurements. Currently, implemented as a thread,
so that the implementation can be tested using pytest.
"""
PATH_SEPARATOR = "---" # currently, the path id is assumed to be in the format "SourceEndpointID---TargetEndpointID"
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_url: database url to use
"""
super(Aggregator, self).__init__() # call the constructor of the tread
# initialise a database client using the database url and the database name
url_object = urlparse(database_url)
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database, timeout=10)
self.db_url = database_url
self.db_name = database
# a stop flag event object used to handle the killing of the thread
self._stop_flag = Event()
def stop(self):
"""
A method used to stop the thread.
"""
self._stop_flag.set()
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
"""
current_time = int(time())
while True:
if self._stop_flag.is_set():
break
boundary_time = current_time - Aggregator.REPORT_PERIOD
boundary_time_nano = boundary_time * 1000000000
current_time_nano = current_time * 1000000000
# query the network delays and group them by path ID
network_delays = {}
result = self.db_client.query(
'SELECT mean(delay) as "Dnet" FROM "E2EMetrics"."autogen".network_delays WHERE time >= {0} and time < {1} GROUP BY path'.format(
boundary_time_nano, current_time_nano))
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
network_delays[tags['path']] = next(result_points)['Dnet']
# query the service delays and group them by FQDN, service function instance and endpoint
service_delays = {}
result = self.db_client.query(
'SELECT mean(response_time) as "Dresponse" FROM "E2EMetrics"."autogen".service_delays WHERE time >= {0} and time < {1} GROUP BY FQDN, sf_instance, endpoint'.format(
boundary_time_nano, current_time_nano))
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
service_delays[tags['endpoint']] = (next(result_points)['Dresponse'], tags['FQDN'], tags['sf_instance'])
# for each path identifier check if there is a media service delay report for the target endpoint - if so, generate an e2e_delay measurement
for path in network_delays:
# check if target endpoint is reported in service delays, that is there is a media service instance running on target endpoint
target_endpoint = self.get_target_endpoint(path)
if target_endpoint not in service_delays:
# if not continue with the other path IDs
continue
e2e_arguments = {"path_id_f": None, "path_id_r": None, "fqdn": None, "sf_instance": None, "delay_path_f": None, "delay_path_r": None,
"delay_service": None, "time": boundary_time}
e2e_arguments['path_id_f'] = path
e2e_arguments['delay_path_f'] = network_delays[path]
# reverse the path ID to get the network delay for the reversed path
reversed_path = self.reverse_path_id(path)
assert reversed_path in network_delays # an assertion is made here, since reversed path should always be reported as well
e2e_arguments['path_id_r'] = reversed_path
e2e_arguments['delay_path_r'] = network_delays[reversed_path]
service_delay = service_delays[target_endpoint]
response_time, fqdn, sf_instance = service_delay
e2e_arguments['delay_service'] = response_time
e2e_arguments['fqdn'] = fqdn
e2e_arguments['sf_isntnace'] = sf_instance
if None not in e2e_arguments.items():
self.db_client.write_points(
lp.generate_e2e_delay_report(e2e_arguments['path_id_f'], e2e_arguments['path_id_r'], e2e_arguments['fqdn'], e2e_arguments['sf_isntnace'],
e2e_arguments['delay_path_f'], e2e_arguments['delay_path_r'], e2e_arguments['delay_service'], e2e_arguments['time']))
old_timestamp = current_time
while current_time != old_timestamp + 5:
sleep(1)
current_time = int(time())
@staticmethod
def reverse_path_id(path):
"""
Reverses a path identifier.
:param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID"
:return: the reversed path ID, e.g. "TargetEndpointID---SourceEndpointID"
"""
source_endpoint, target_endpoint = path.split(Aggregator.PATH_SEPARATOR)
return "{0}{1}{2}".format(target_endpoint, Aggregator.PATH_SEPARATOR, source_endpoint)
@staticmethod
def get_target_endpoint(path):
"""
Get a target endpoint by parsing a path identifier.
:param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID"
:return: the target endpoint retrieved from the path identifier.
"""
return path.split(Aggregator.PATH_SEPARATOR)[1]
if __name__ == '__main__':
Aggregator().start()
...@@ -37,8 +37,8 @@ class Simulator(object): ...@@ -37,8 +37,8 @@ class Simulator(object):
Simulator used to generate E2E measurements. Simulator used to generate E2E measurements.
""" """
DATABASE = 'E2EMetrics' DATABASE = 'E2EMetrics' # default database name
DATABASE_URL = 'http://203.0.113.100:8086' DATABASE_URL = 'http://203.0.113.100:8086' # default database url
TICK = 1 # a simulation tick represents 1s TICK = 1 # a simulation tick represents 1s
SIMULATION_LENGTH = 120 # simulation time in seconds SIMULATION_LENGTH = 120 # simulation time in seconds
...@@ -66,37 +66,29 @@ class Simulator(object): ...@@ -66,37 +66,29 @@ class Simulator(object):
self.db_client.drop_database(self.db_name) self.db_client.drop_database(self.db_name)
self.db_client.create_database(self.db_name) self.db_client.create_database(self.db_name)
self._set_continuous_query()
def _set_continuous_query(self, sample_period=5, period='s'):
sample_period = "{0}{1}".format(sample_period, period)
query = 'CREATE CONTINUOUS QUERY aggregate_query ON {0} BEGIN SELECT mean("delay") as "Dnet", mean("response_time") as "Dresponse" INTO "E2EMetrics"."autogen"."e2e_delays" FROM "E2EMetrics"."autogen"."network_delays", "E2EMetrics"."autogen"."service_delays" GROUP BY time({1}) END'\
.format(self.db_name, sample_period)
self.db_client.query(query)
def run(self): def run(self):
""" """
Runs the simulation. Runs the simulation.
""" """
# all network delays start from 1 # all network delays start from 1ms, the dictionary stores the information to report
ip_endpoints = [ ip_endpoints = [
{'agent_id': 'endpoint1.ms-A.ict-flame.eu', {'agent_id': 'endpoint1.ms-A.ict-flame.eu',
'paths': [{ 'paths': [{
'target': 'endpoint2.ms-A.ict-flame.eu', 'target': 'endpoint2.ms-A.ict-flame.eu',
'path_id': 'endpoint1-endpoint2', 'path_id': 'endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu',
'network_delay': 1 'network_delay': 1
}]}, }]},
{'agent_id': 'endpoint2.ms-A.ict-flame.eu', {'agent_id': 'endpoint2.ms-A.ict-flame.eu',
'paths': [{ 'paths': [{
'target': 'endpoint1.ms-A.ict-flame.eu', 'target': 'endpoint1.ms-A.ict-flame.eu',
'path_id': 'endpoint2-endpoint1', 'path_id': 'endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu',
'network_delay': 1 'network_delay': 1
}]} }]}
] ]
# current time in nanoseconds (to test the continuous query we write influx data points related to future time), so we start from the current time # current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time
start_time = int(time.time()) start_time = int(time.time())
sim_time = start_time sim_time = start_time
...@@ -128,7 +120,8 @@ class Simulator(object): ...@@ -128,7 +120,8 @@ class Simulator(object):
# measure service response time every 5 seconds # measure service response time every 5 seconds
if i % sample_period_media == 0: if i % sample_period_media == 0:
self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, sim_time)) self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "ms-A.ict-flame.eu",
"test-sf-clmc-agent-build_INSTANCE", "endpoint2.ms-A.ict-flame.eu", sim_time))
# increase/decrease the delay in every sample report (min delay is 10) # increase/decrease the delay in every sample report (min delay is 10)
mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)])) mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)]))
...@@ -136,6 +129,9 @@ class Simulator(object): ...@@ -136,6 +129,9 @@ class Simulator(object):
# increase the time by one simulation tick # increase the time by one simulation tick
sim_time += self.TICK sim_time += self.TICK
end_time = sim_time
print("Simulation finished. Start time: {0}, End time: {1}".format(start_time, end_time))
if __name__ == "__main__": if __name__ == "__main__":
Simulator().run() Simulator().run()
...@@ -29,6 +29,39 @@ import uuid ...@@ -29,6 +29,39 @@ import uuid
from random import randint from random import randint
def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_path_f, delay_path_r, delay_service, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_id_f: The forward path identifier, e.g. endpoint1---endpoint2
:param path_id_r: The reverse path identifier, e.g. endpoint2---endpoint1
:param fqdn: FQDN of the media service
:param sf_instance: service function instance
:param delay_path_f: Path delay (Forward direction)
:param delay_path_r: Path delay (Reverse direction)
:param delay_service: the media service response time
:param time: measurement time
:return: a list of dict-formatted reports to post on influx
"""
result = [{"measurement": "e2e_delays",
"tags": {
"pathID_F": path_id_f,
"pathID_R": path_id_r,
"FQDN": fqdn,
"sf_instance": sf_instance
},
"fields": {
"D_path_F": float(delay_path_f),
"D_path_R": float(delay_path_r),
"D_service": float(delay_service)
},
"time": _getNSTime(time)
}]
return result
def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time): def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time):
""" """
Generates a platform measurement about the network delay between two specific endpoints. Generates a platform measurement about the network delay between two specific endpoints.
...@@ -56,16 +89,24 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e ...@@ -56,16 +89,24 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e
return result return result
def generate_service_delay_report(response_time, time): def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, time):
""" """
Generates a service measurement about the media service response time. Generates a service measurement about the media service response time.
:param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service) :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service)
:param fqdn: FQDN of the media service
:param sf_instance: service function instance
:param endpoint: endpoint ID
:param time: the measurement time :param time: the measurement time
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
result = [{"measurement": "service_delays", result = [{"measurement": "service_delays",
"tags": {
"FQDN": fqdn,
"sf_instance": sf_instance,
"endpoint": endpoint
},
"fields": { "fields": {
"response_time": response_time, "response_time": response_time,
}, },
...@@ -171,6 +212,7 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta ...@@ -171,6 +212,7 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta
return result return result
def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ): def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ):
""" """
generates a measurement line for a media component configuration state generates a measurement line for a media component configuration state
......
...@@ -28,7 +28,7 @@ import pkg_resources ...@@ -28,7 +28,7 @@ import pkg_resources
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
from clmctest.monitoring.StreamingSim import Sim from clmctest.monitoring.StreamingSim import Sim
from clmctest.monitoring.E2ESim import Simulator from clmctest.monitoring.E2ESim import Simulator
from clmctest.monitoring.E2EAggregator import Aggregator
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
...@@ -61,6 +61,12 @@ def influx_db(streaming_sim_config, request): ...@@ -61,6 +61,12 @@ def influx_db(streaming_sim_config, request):
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def simulator(streaming_sim_config): def simulator(streaming_sim_config):
"""
A fixture to obtain a simulator instance with the configuration parameters.
:param streaming_sim_config: the configuration object
:return: an instance of the simulator
"""
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
influx_db_name = streaming_sim_config['hosts'][1]['database_name'] influx_db_name = streaming_sim_config['hosts'][1]['database_name']
...@@ -76,7 +82,27 @@ def simulator(streaming_sim_config): ...@@ -76,7 +82,27 @@ def simulator(streaming_sim_config):
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def e2e_simulator(streaming_sim_config): def e2e_simulator(streaming_sim_config):
"""
A fixture to obtain a simulator instance with the configuration parameters.
:param streaming_sim_config: the configuration object
:return: an instance of the E2E simulator
"""
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086" influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
return Simulator(database_url=influx_url) return Simulator(database_url=influx_url)
@pytest.fixture(scope="module")
def e2e_aggregator(streaming_sim_config):
"""
A fixture to obtain an instance of the Aggregator class with the configuration parameters.
:param streaming_sim_config: the configuration object
:return: an instance of the Aggregator class
"""
influx_url = "http://" + streaming_sim_config['hosts'][0]['ip_address'] + ":8086"
return Aggregator(database_url=influx_url)
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
""" """
import pytest import pytest
import random
import time import time
...@@ -32,7 +33,19 @@ class TestE2ESimulation(object): ...@@ -32,7 +33,19 @@ class TestE2ESimulation(object):
""" """
@pytest.fixture(scope='class', autouse=True) @pytest.fixture(scope='class', autouse=True)
def run_simulator(self, e2e_simulator): def run_simulator(self, e2e_simulator, e2e_aggregator):
"""
A fixture, which runs the simulation before running the tests.
:param e2e_simulator: the simulator for the end-to-end data
:param e2e_aggregator: the aggregator which merges the network and service measurements
"""
random.seed(0) # Seed random function so we can reliably test for average queries
print("Starting aggregator...")
e2e_aggregator.start()
print("Running simulation, please wait...") print("Running simulation, please wait...")
e2e_simulator.run() e2e_simulator.run()
...@@ -40,13 +53,20 @@ class TestE2ESimulation(object): ...@@ -40,13 +53,20 @@ class TestE2ESimulation(object):
time.sleep(e2e_simulator.SIMULATION_LENGTH) # wait for data to finish arriving at the INFLUX database time.sleep(e2e_simulator.SIMULATION_LENGTH) # wait for data to finish arriving at the INFLUX database
print("... simulation data fixture finished") print("... simulation data fixture finished")
print("... stopping aggregator")
e2e_aggregator.stop()
@pytest.mark.parametrize("query, expected_result", [ @pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"',
{"time": "1970-01-01T00:00:00Z", "count_delay": 120}), {"time": "1970-01-01T00:00:00Z", "count_delay": 120}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), {"time": "1970-01-01T00:00:00Z", "count_response_time": 24}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "count_Dnet": 24, "count_Dresponse": 24}), {"time": "1970-01-01T00:00:00Z", "count_D_path_F": 24, "count_D_path_R": 24, "count_D_service": 24}),
('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "mean_D_path_F": 13.159722222222223, "mean_D_path_R": 3.256944444444444, "mean_D_service": 32.791666666666664}),
]) ])
def test_simulation(self, influx_db, query, expected_result): def test_simulation(self, influx_db, query, expected_result):
""" """
......
<!--
// © University of Southampton IT Innovation Centre, 2017
//
// Copyright in this software belongs to University of Southampton
// IT Innovation Centre of Gamma House, Enterprise Road,
// Chilworth Science Park, Southampton, SO16 7NS, UK.
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Nikolay Stanchev
// Created Date : 27-04-2018
// Created for Project : FLAME
-->
## **Flame CLMC - Network and Media Service measurements aggregation**
### **Idea**
The idea is to aggregate platform measurement points with media service measurement points and obtain a third measurement from which we can easily
understand both end-to-end and round-trip performance of a media service. This is achieved by having a python script running on the background and aggregating
the data from both measurements on a given sample period, e.g. every 10 seconds. The script then posts the aggregated data back to Influx in a new measurement.
### **Assumptions**
* Network measurement - assumption is that we have a measurement for the network link delays, called **network_delays**, providing the following information:
| path (tag) | delay | time |
| --- | --- | --- |
| path identifier | e2e delay for the given path | time of measurement |
Here, the **path** tag value is the identifier of the path between two nodes in the network topology obtained from FLIPS. The assumption is that those identifiers
will be structured in such a way that we can obtain the source and target endpoint IDs from the path identifier itself. For example:
**endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu**
We can easily split the string on **'---'** and, thus, find the source endpoint is **endpoint1.ms-A.ict-flame.eu**, while the target endpoint is
**endpoint2.ms-A.ict-flame.eu**.
The delay field value is the network end-to-end delay in milliseconds for the path identified in the tag value.
* A response will traverse the same network path as the request, but in reverse direction.
* Media service measurement - assumption is that we have a measurement for media services' response time, called **service_delays**, providing the following information:
| FQDN (tag) | sf_instance (tag) | endpoint (tag) | response_time | time |
| --- | --- | --- | --- | --- |
| media service FQDN | ID of the service function instance | endpoint identifier | response time for the media service (s) | time of measurement |
Here, the **FQDN**, **sf_instance** and **endpoint** tag values identify a unique response time measurement. The response time field value is the
response time (measured in seconds) for the media service only, and it does not take into account any of the network measurements.
### **Goal**
The ultimate goal is to populate a new measurement, called **e2e_delays**, which will be provided with the following information:
| pathID_F (tag) | pathID_R (tag) | FQDN (tag) | sf_instance (tag) | D_path_F | D_path_R | D_service | time |
| --- | --- | --- | --- | --- | --- | --- | --- |
* *pathID_F* - tag used to identify the path in forward direction, e.g. **endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu**
* *pathID_R* - tag used to identify the path in reverse direction, e.g. **endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu**
* *FQDN* - tag used to identify the media service
* *sf_instance* - tag used to identify the media service
* *D_path_F* - network delay for path in forward direction
* *D_path_R* - network delay for path in reverse direction
* *D_service* - media service response time
Then we can easily query on this measurement to obtain different performance indicators, such as end-to-end overall delays,
round-trip response time or any of the contributing parts in those performance indicators.
### **Aggregation script**
What the aggregation script does is very similat to the functionality of a continuous query. Given a sample report period, e.g. 10s,
the script executes at every 10-second-period querying the averaged data for the last 10 seconds. The executed queries are:
* Network delays query - to obtain the network delay values and group them by their **path** identifier:
```
SELECT mean(delay) as "Dnet" FROM "E2EMetrics"."autogen".network_delays WHERE time >= now() - 10s and time < now() GROUP BY path
```
* Media service response time query - to obtain the response time values of the media service instances and group them by **FQDN**, **sf_instance** and **endpoint** identifiers:
```
SELECT mean(response_time) as "Dresponse" FROM "E2EMetrics"."autogen".service_delays WHERE time >= now() - 10s and time < now() GROUP BY FQDN, sf_instance, endpoint
```
The results of the queries are then matched against each other on endpoint ID: on every match of the **endpoint** tag of the **service_delays** measurement with
the target endpoint ID of the **network_delays** measurement, the rows are combined to obtain an **e2e_delay** measurement row, which is posted back to influx.
Example:
* Result from first query:
```
name: network_delays
tags: path=endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu
time Dnet
---- ----
1524833145975682287 9.2
name: network_delays
tags: path=endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu
time Dnet
---- ----
1524833145975682287 10.3
```
* Result from second query
```
name: service_delays
tags: FQDN=ms-A.ict-flame.eu, endpoint=endpoint2.ms-A.ict-flame.eu, sf_instance=test-sf-clmc-agent-build_INSTANCE
time Dresponse
---- ---------
1524833145975682287 11
```
The script will parse the path identifier **endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu** and find the target endpoint being
**endpoint2.ms-A.ict-flame.eu**. Then the script checks if there is service delay measurement row matching this endpoint. Since there is one,
those values will be merged, so the result will be a row like this:
| pathID_F (tag) | pathID_R (tag) | FQDN (tag) | sf_instance (tag) | D_path_F | D_path_R | D_service | time |
| --- | --- | --- | --- | --- | --- | --- | --- |
| endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu | endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu | ms-A.ict-flame.eu | test-sf-clmc-agent-build_INSTANCE | 9.2 | 10.3 | 11 | 1524833145975682287 |
Here, another assumption is made that we can reverse the path identifier of a network delay row and that the reverse path delay would also
be reported in the **network_delays** measurement.
The resulting row would then be posted back to influx in the **e2e_delays** measurement.
### **Reasons why we cannot simply use a continuous query to do the job of the script**
* Influx is very limited in merging measurements functionality. When doing a **select into** from multiple measurements, e.g.
*SELECT * INTO measurement0 FROM measurement1, measurement2*
influx will try to merge the data on matching time stamps and tag values (if there are any tags). If the two measurements
differ in tags, then we get rows with missing data.
* When doing a continuous query, we cannot perform any kind of manipulations on the data, which disables us on choosing which
rows to merge together.
* Continuous queries were not meant to be used for merging measurements. The main use case the developers provide is for
downsampling the data in one measurement.
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment