Skip to content
Snippets Groups Projects
Commit e8ecb471 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

aggregation script and simulator - updated

parent 06d716eb
No related branches found
No related tags found
No related merge requests found
...@@ -35,7 +35,6 @@ class Aggregator(Thread): ...@@ -35,7 +35,6 @@ class Aggregator(Thread):
so that the implementation can be tested using pytest. so that the implementation can be tested using pytest.
""" """
PATH_SEPARATOR = "---" # currently, the path id is assumed to be in the format "SourceEndpointID---TargetEndpointID"
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
...@@ -48,7 +47,7 @@ class Aggregator(Thread): ...@@ -48,7 +47,7 @@ class Aggregator(Thread):
:param database_url: database url to use :param database_url: database url to use
""" """
super(Aggregator, self).__init__() # call the constructor of the tread super(Aggregator, self).__init__() # call the constructor of the thread
# initialise a database client using the database url and the database name # initialise a database client using the database url and the database name
url_object = urlparse(database_url) url_object = urlparse(database_url)
...@@ -85,84 +84,61 @@ class Aggregator(Thread): ...@@ -85,84 +84,61 @@ class Aggregator(Thread):
# query the network delays and group them by path ID # query the network delays and group them by path ID
network_delays = {} network_delays = {}
result = self.db_client.query( result = self.db_client.query(
'SELECT mean(delay) as "Dnet" FROM "E2EMetrics"."autogen".network_delays WHERE time >= {0} and time < {1} GROUP BY path'.format( 'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano)) boundary_time_nano, current_time_nano))
for item in result.items(): for item in result.items():
metadata, result_points = item metadata, result_points = item
# measurement = metadata[0] # measurement = metadata[0]
tags = metadata[1] tags = metadata[1]
network_delays[tags['path']] = next(result_points)['Dnet'] network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay']
# query the service delays and group them by FQDN, service function instance and endpoint # query the service delays and group them by FQDN, service function instance and sfr
service_delays = {} service_delays = {}
result = self.db_client.query( result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY FQDN, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
'SELECT mean(response_time) as "Dresponse" FROM "E2EMetrics"."autogen".service_delays WHERE time >= {0} and time < {1} GROUP BY FQDN, sf_instance, endpoint'.format(
boundary_time_nano, current_time_nano))
for item in result.items(): for item in result.items():
metadata, result_points = item metadata, result_points = item
# measurement = metadata[0] # measurement = metadata[0]
tags = metadata[1] tags = metadata[1]
service_delays[tags['endpoint']] = (next(result_points)['Dresponse'], tags['FQDN'], tags['sf_instance']) service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['FQDN'], tags['sf_instance'])
# for each path identifier check if there is a media service delay report for the target endpoint - if so, generate an e2e_delay measurement # for each path identifier check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays: for path in network_delays:
# check if target endpoint is reported in service delays, that is there is a media service instance running on target endpoint # check if target sfr is reported in service delays, that is there is a media service instance being connected to target sfr
target_endpoint = self.get_target_endpoint(path) path_id, source, target = path
if target_endpoint not in service_delays: if target not in service_delays:
# if not continue with the other path IDs # if not continue with the other path reports
continue continue
e2e_arguments = {"path_id_f": None, "path_id_r": None, "fqdn": None, "sf_instance": None, "delay_path_f": None, "delay_path_r": None, e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "fqdn": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "time": boundary_time} "delay_service": None, "time": boundary_time}
e2e_arguments['path_id_f'] = path e2e_arguments['path_ID'] = path_id
e2e_arguments['delay_path_f'] = network_delays[path] e2e_arguments['delay_forward'] = network_delays[path]
# reverse the path ID to get the network delay for the reversed path # reverse the path ID to get the network delay for the reversed path
reversed_path = self.reverse_path_id(path) reversed_path = (path_id, target, source)
assert reversed_path in network_delays # an assertion is made here, since reversed path should always be reported as well assert reversed_path in network_delays # an assertion is made here, since reversed path must always be reported as well
e2e_arguments['path_id_r'] = reversed_path e2e_arguments['delay_reverse'] = network_delays[reversed_path]
e2e_arguments['delay_path_r'] = network_delays[reversed_path]
service_delay = service_delays[target_endpoint] # get the response time of the media component connected to the target SFR
service_delay = service_delays[target]
response_time, fqdn, sf_instance = service_delay response_time, fqdn, sf_instance = service_delay
e2e_arguments['delay_service'] = response_time e2e_arguments['delay_service'] = response_time
e2e_arguments['fqdn'] = fqdn e2e_arguments['fqdn'] = fqdn
e2e_arguments['sf_isntnace'] = sf_instance e2e_arguments['sf_instance'] = sf_instance
if None not in e2e_arguments.items(): if None not in e2e_arguments.items():
self.db_client.write_points( self.db_client.write_points(
lp.generate_e2e_delay_report(e2e_arguments['path_id_f'], e2e_arguments['path_id_r'], e2e_arguments['fqdn'], e2e_arguments['sf_isntnace'], lp.generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['fqdn'],
e2e_arguments['delay_path_f'], e2e_arguments['delay_path_r'], e2e_arguments['delay_service'], e2e_arguments['time'])) e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
e2e_arguments['time']))
old_timestamp = current_time old_timestamp = current_time
while current_time != old_timestamp + 5: while current_time != old_timestamp + 5:
sleep(1) sleep(1)
current_time = int(time()) current_time = int(time())
@staticmethod
def reverse_path_id(path):
"""
Reverses a path identifier.
:param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID"
:return: the reversed path ID, e.g. "TargetEndpointID---SourceEndpointID"
"""
source_endpoint, target_endpoint = path.split(Aggregator.PATH_SEPARATOR)
return "{0}{1}{2}".format(target_endpoint, Aggregator.PATH_SEPARATOR, source_endpoint)
@staticmethod
def get_target_endpoint(path):
"""
Get a target endpoint by parsing a path identifier.
:param path: the path ID assumed to be in format "SourceEndpointID---TargetEndpointID"
:return: the target endpoint retrieved from the path identifier.
"""
return path.split(Aggregator.PATH_SEPARATOR)[1]
if __name__ == '__main__': if __name__ == '__main__':
Aggregator().start() Aggregator().start()
...@@ -73,19 +73,15 @@ class Simulator(object): ...@@ -73,19 +73,15 @@ class Simulator(object):
""" """
# all network delays start from 1ms, the dictionary stores the information to report # all network delays start from 1ms, the dictionary stores the information to report
ip_endpoints = [ paths = [
{'agent_id': 'endpoint1.ms-A.ict-flame.eu', {'target': 'SR3',
'paths': [{ 'source': 'SR1',
'target': 'endpoint2.ms-A.ict-flame.eu', 'path_id': 'SR1---SR3',
'path_id': 'endpoint1.ms-A.ict-flame.eu---endpoint2.ms-A.ict-flame.eu', 'network_delay': 1},
'network_delay': 1 {'target': 'SR1',
}]}, 'source': 'SR3',
{'agent_id': 'endpoint2.ms-A.ict-flame.eu', 'path_id': 'SR1---SR3',
'paths': [{ 'network_delay': 1}
'target': 'endpoint1.ms-A.ict-flame.eu',
'path_id': 'endpoint2.ms-A.ict-flame.eu---endpoint1.ms-A.ict-flame.eu',
'network_delay': 1
}]}
] ]
# current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time # current time in seconds (to test the aggregation we write influx data points related to future time), so we start from the current time
...@@ -98,22 +94,18 @@ class Simulator(object): ...@@ -98,22 +94,18 @@ class Simulator(object):
sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) - service measurements reported every 5 seconds sample_period_media = 5 # sample period for reporting media service delays (measured in seconds) - service measurements reported every 5 seconds
for i in range(0, self.SIMULATION_LENGTH): for i in range(0, self.SIMULATION_LENGTH):
# measure net delay every 2 seconds for endpoint 1 (generates on tick 0, 2, 4, 6, 8, 10.. etc.) # measure net delay every 2 seconds for path SR1-SR3 (generates on tick 0, 2, 4, 6, 8, 10.. etc.)
if i % sample_period_net == 0: if i % sample_period_net == 0:
endpoint = ip_endpoints[0] path = paths[0]
paths = endpoint['paths'] self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time))
for path in paths:
self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time))
# increase/decrease the delay in every sample report (min delay is 1) # increase/decrease the delay in every sample report (min delay is 1)
path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3))
# measure net delay every 2 seconds for endpoint 2 (generates on tick 1, 3, 5, 7, 9, 11.. etc.) # measure net delay every 2 seconds for path SR2-SR3 (generates on tick 1, 3, 5, 7, 9, 11.. etc.)
if (i+1) % sample_period_net == 0: if (i+1) % sample_period_net == 0:
endpoint = ip_endpoints[1] path = paths[1]
paths = endpoint['paths'] self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], path['source'], path['target'], path['network_delay'], sim_time))
for path in paths:
self.db_client.write_points(lp.generate_network_delay_report(path['path_id'], endpoint['agent_id'], path['target'], path['network_delay'], sim_time))
# increase/decrease the delay in every sample report (min delay is 1) # increase/decrease the delay in every sample report (min delay is 1)
path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3)) path['network_delay'] = max(1, path['network_delay'] + random.randint(-3, 3))
...@@ -121,7 +113,7 @@ class Simulator(object): ...@@ -121,7 +113,7 @@ class Simulator(object):
# measure service response time every 5 seconds # measure service response time every 5 seconds
if i % sample_period_media == 0: if i % sample_period_media == 0:
self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "ms-A.ict-flame.eu", self.db_client.write_points(lp.generate_service_delay_report(mean_delay_seconds_media, "ms-A.ict-flame.eu",
"test-sf-clmc-agent-build_INSTANCE", "endpoint2.ms-A.ict-flame.eu", sim_time)) "test-sf-clmc-agent-build_INSTANCE", "SR3", sim_time))
# increase/decrease the delay in every sample report (min delay is 10) # increase/decrease the delay in every sample report (min delay is 10)
mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)])) mean_delay_seconds_media = max(10, mean_delay_seconds_media + random.choice([random.randint(10, 20), random.randint(-20, -10)]))
......
...@@ -29,32 +29,34 @@ import uuid ...@@ -29,32 +29,34 @@ import uuid
from random import randint from random import randint
def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_path_f, delay_path_r, delay_service, time): def generate_e2e_delay_report(path_id, source_sfr, target_sfr, fqdn, sf_instance, delay_forward, delay_reverse, delay_service, time):
""" """
Generates a combined averaged measurement about the e2e delay and its contributing parts Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_id_f: The forward path identifier, e.g. endpoint1---endpoint2 :param path_ID: The path identifier, which is a bidirectional path ID for the request and the response path
:param path_id_r: The reverse path identifier, e.g. endpoint2---endpoint1 :param source_SFR: source service router
:param target_SFR: target service router
:param fqdn: FQDN of the media service :param fqdn: FQDN of the media service
:param sf_instance: service function instance :param sf_instance: service function instance (media component)
:param delay_path_f: Path delay (Forward direction) :param delay_forward: Path delay (Forward direction)
:param delay_path_r: Path delay (Reverse direction) :param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service response time :param delay_service: the media service component response time
:param time: measurement time :param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
result = [{"measurement": "e2e_delays", result = [{"measurement": "e2e_delays",
"tags": { "tags": {
"pathID_F": path_id_f, "path_ID": path_id,
"pathID_R": path_id_r, "source_SFR": source_sfr,
"target_SFR": target_sfr,
"FQDN": fqdn, "FQDN": fqdn,
"sf_instance": sf_instance "sf_instance": sf_instance
}, },
"fields": { "fields": {
"D_path_F": float(delay_path_f), "delay_forward": float(delay_forward),
"D_path_R": float(delay_path_r), "delay_reverse": float(delay_reverse),
"D_service": float(delay_service) "delay_service": float(delay_service)
}, },
"time": _getNSTime(time) "time": _getNSTime(time)
}] }]
...@@ -62,23 +64,23 @@ def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_pat ...@@ -62,23 +64,23 @@ def generate_e2e_delay_report(path_id_f, path_id_r, fqdn, sf_instance, delay_pat
return result return result
def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e_delay, time): def generate_network_delay_report(path_id, source_sfr, target_sfr, e2e_delay, time):
""" """
Generates a platform measurement about the network delay between two specific endpoints. Generates a platform measurement about the network delay between two specific service routers.
:param path_id: the identifier of the path between the two endpoints (TODO can we derive source_endpoint and target_endpoint from this path) :param path_id: the identifier of the path between the two service routers
:param source_endpoint: the source endpoint :param source_sfr: the source service router
:param target_endpoint: the target endpoint :param target_sfr: the target service router
:param e2e_delay: the e2e network delay for traversing the path (source_endpoint) -> (target_endpoint) (NOT round-trip-time) :param e2e_delay: the e2e network delay for traversing the path between the two service routers
:param time: the measurement time :param time: the measurement timestamp
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
result = [{"measurement": "network_delays", result = [{"measurement": "network_delays",
"tags": { "tags": {
"path": path_id, "path": path_id,
"source_endpoint": source_endpoint, # TODO source and target endpoint tags might have to be removed if they can be derived from path ID "source": source_sfr,
"target_endpoint": target_endpoint "target": target_sfr
}, },
"fields": { "fields": {
"delay": e2e_delay "delay": e2e_delay
...@@ -89,15 +91,15 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e ...@@ -89,15 +91,15 @@ def generate_network_delay_report(path_id, source_endpoint, target_endpoint, e2e
return result return result
def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, time): def generate_service_delay_report(response_time, fqdn, sf_instance, sfr, time):
""" """
Generates a service measurement about the media service response time. Generates a service measurement about the media service response time.
:param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service) :param response_time: the media service response time (This is not the response time for the whole round-trip, but only for the processing part of the media service component)
:param fqdn: FQDN of the media service :param fqdn: FQDN of the media service
:param sf_instance: service function instance :param sf_instance: service function instance
:param endpoint: endpoint ID :param sfr: the service function router that connects the endpoint of the SF instance to the FLAME network
:param time: the measurement time :param time: the measurement timestamp
:return: a list of dict-formatted reports to post on influx :return: a list of dict-formatted reports to post on influx
""" """
...@@ -105,7 +107,7 @@ def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, ti ...@@ -105,7 +107,7 @@ def generate_service_delay_report(response_time, fqdn, sf_instance, endpoint, ti
"tags": { "tags": {
"FQDN": fqdn, "FQDN": fqdn,
"sf_instance": sf_instance, "sf_instance": sf_instance,
"endpoint": endpoint "sfr": sfr
}, },
"fields": { "fields": {
"response_time": response_time, "response_time": response_time,
......
...@@ -63,10 +63,10 @@ class TestE2ESimulation(object): ...@@ -63,10 +63,10 @@ class TestE2ESimulation(object):
('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."service_delays"',
{"time": "1970-01-01T00:00:00Z", "count_response_time": 24}), {"time": "1970-01-01T00:00:00Z", "count_response_time": 24}),
('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT count(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "count_D_path_F": 24, "count_D_path_R": 24, "count_D_service": 24}), {"time": "1970-01-01T00:00:00Z", "count_delay_forward": 24, "count_delay_reverse": 24, "count_delay_service": 24}),
('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"', ('SELECT mean(*) FROM "E2EMetrics"."autogen"."e2e_delays"',
{"time": "1970-01-01T00:00:00Z", "mean_D_path_F": 13.159722222222223, "mean_D_path_R": 3.256944444444444, "mean_D_service": 32.791666666666664}), {"time": "1970-01-01T00:00:00Z", "mean_delay_forward": 13.159722222222223, "mean_delay_reverse": 3.256944444444444, "mean_delay_service": 32.791666666666664}),
]) ])
def test_simulation(self, influx_db, query, expected_result): def test_simulation(self, influx_db, query, expected_result):
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment