Skip to content
Snippets Groups Projects
Commit 3ab13630 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Updated aggregator to reflect on the latest decisions made.

parent ef219637
No related branches found
No related tags found
No related merge requests found
...@@ -59,6 +59,10 @@ class Aggregator(Thread): ...@@ -59,6 +59,10 @@ class Aggregator(Thread):
# a stop flag event object used to handle the killing of the thread # a stop flag event object used to handle the killing of the thread
self._stop_flag = Event() self._stop_flag = Event()
# a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
self.network_cache = {}
self.service_cache = {}
def stop(self): def stop(self):
""" """
A method used to stop the thread. A method used to stop the thread.
...@@ -84,57 +88,69 @@ class Aggregator(Thread): ...@@ -84,57 +88,69 @@ class Aggregator(Thread):
# query the network delays and group them by path ID # query the network delays and group them by path ID
network_delays = {} network_delays = {}
result = self.db_client.query( result = self.db_client.query(
'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format( 'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano)) boundary_time_nano, current_time_nano))
for item in result.items(): for item in result.items():
metadata, result_points = item metadata, result_points = item
# measurement = metadata[0] # measurement = metadata[0]
tags = metadata[1] tags = metadata[1]
network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay'] result = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
# query the service delays and group them by endpoint, service function instance and sfr # query the service delays and group them by endpoint, service function instance and sfr
service_delays = {} service_delays = {}
result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano)) result = self.db_client.query('SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
for item in result.items(): for item in result.items():
metadata, result_points = item metadata, result_points = item
# measurement = metadata[0] # measurement = metadata[0]
tags = metadata[1] tags = metadata[1]
service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance']) result = next(result_points)
service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement # for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays: for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr # check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path path_id, source, target = path
if target not in service_delays: if target not in service_delays and target not in self.service_cache:
# if not continue with the other network path reports # if not continue with the other network path reports
continue continue
e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None, e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "time": boundary_time} "delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time}
e2e_arguments['path_ID'] = path_id e2e_arguments['path_ID'] = path_id
e2e_arguments['delay_forward'] = network_delays[path] e2e_arguments['source_SFR'] = source
e2e_arguments['target_SFR'] = target
e2e_arguments['delay_forward'] = network_delays[path][0]
e2e_arguments['avg_bandwidth'] = network_delays[path][1]
# reverse the path ID to get the network delay for the reversed path # reverse the path ID to get the network delay for the reversed path
reversed_path = (path_id, target, source) reversed_path = (path_id, target, source)
assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A if reversed_path in network_delays or reversed_path in self.network_cache:
e2e_arguments['delay_reverse'] = network_delays[reversed_path] # get the reverse delay, use the latest value if reported or the cache value
e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0]
else:
e2e_arguments['delay_reverse'] = None
# get the response time of the media component connected to the target SFR # get the response time of the media component connected to the target SFR
service_delay = service_delays[target] service_delay = service_delays.get(target, self.service_cache.get(target))
response_time, endpoint, sf_instance = service_delay response_time, request_size, response_size, endpoint, sf_instance = service_delay
# put these points in the e2e arguments dictionary # put these points in the e2e arguments dictionary
e2e_arguments['delay_service'] = response_time e2e_arguments['delay_service'] = response_time
e2e_arguments['avg_request_size'] = request_size
e2e_arguments['avg_response_size'] = response_size
e2e_arguments['endpoint'] = endpoint e2e_arguments['endpoint'] = endpoint
e2e_arguments['sf_instance'] = sf_instance e2e_arguments['sf_instance'] = sf_instance
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row # if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.items(): if None not in e2e_arguments.values():
self.db_client.write_points( self.db_client.write_points(
lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'], lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'], e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
e2e_arguments['time'])) e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'], e2e_arguments['time']))
old_timestamp = current_time old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed # wait until {REPORT_PERIOD} seconds have passed
......
...@@ -29,18 +29,21 @@ import uuid ...@@ -29,18 +29,21 @@ import uuid
from random import randint from random import randint
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time): def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
""" """
Generates a combined averaged measurement about the e2e delay and its contributing parts Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_ID: The path identifier, which is a bidirectional path ID for the request and the response path :param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_SFR: source service router :param source_sfr: source service router
:param target_SFR: target service router :param target_sfr: target service router
:param endpoint: endpoint of the media component :param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component) :param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction) :param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction) :param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time :param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:param time: measurement timestamp :param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
...@@ -56,7 +59,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst ...@@ -56,7 +59,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst
"fields": { "fields": {
"delay_forward": float(delay_forward), "delay_forward": float(delay_forward),
"delay_reverse": float(delay_reverse), "delay_reverse": float(delay_reverse),
"delay_service": float(delay_service) "delay_service": float(delay_service),
"avg_request_size": float(avg_request_size),
"avg_response_size": float(avg_response_size),
"avg_bandwidth": float(avg_bandwidth)
}, },
"time": _getNSTime(time) "time": _getNSTime(time)
}] }]
......
...@@ -56,17 +56,18 @@ class TestE2ESimulation(object): ...@@ -56,17 +56,18 @@ class TestE2ESimulation(object):
print("... stopping aggregator") print("... stopping aggregator")
e2e_aggregator.stop() e2e_aggregator.stop()
@pytest.mark.parametrize("query, expected_result", [ @pytest.mark.parametrize("query, expected_result", [
('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."network_delays"',
{"time": "1970-01-01T00:00:00Z", "count_latency": 120, "count_bandwidth": 120}), {"time": "1970-01-01T00:00:00Z", "count_latency": 120, "count_bandwidth": 120}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 24, "count_request_size": 24, "count_response_size": 24}), {"time": "1970-01-01T00:00:00Z", "count_response_time": 24, "count_request_size": 24, "count_response_size": 24}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "count_delay_forward": 24, "count_delay_reverse": 24, "count_delay_service": 24}), {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 38, "count_delay_reverse": 38, "count_delay_service": 38,
"count_avg_request_size": 38, "count_avg_response_size": 38, "count_avg_bandwidth": 38}),
('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 13.159722222222223, "mean_delay_reverse": 3.256944444444444, "mean_delay_service": 32.791666666666664}), {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 8.010964912280702, "mean_delay_reverse": 12.881578947368423, "mean_delay_service": 23.42105263157895,
'mean_avg_request_size': 10485760, 'mean_avg_response_size': 1024, 'mean_avg_bandwidth': 104857600}),
]) ])
def test_simulation(self, influx_db, query, expected_result): def test_simulation(self, influx_db, query, expected_result):
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment