Skip to content
Snippets Groups Projects
Commit 9282951b authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Updated the aggregator the CLMC service uses

parent 01d1df5f
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 15-05-2018
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
......@@ -32,13 +32,13 @@ import sys
class Aggregator(object):
"""
A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process
A class used to perform the aggregation feature of the CLMC - aggregating network and media service measurements. Implemented as a separate process.
"""
REPORT_PERIOD = 5 # default report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'E2EMetrics' # default database the aggregator uses
DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses
RETRY_PERIOD = 5
DATABASE_URL = 'http://203.0.113.100:8086' # default database URL the aggregator uses
RETRY_PERIOD = 5 # number of seconds to wait before retrying connection/posting data to Influx
def __init__(self, database_name=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
......@@ -51,7 +51,6 @@ class Aggregator(object):
# initialise a database client using the database url and the database name
url_object = urlparse(database_url)
while True:
try:
self.db_client = InfluxDBClient(host=url_object.hostname, port=url_object.port, database=database_name, timeout=10)
......@@ -63,6 +62,10 @@ class Aggregator(object):
self.db_name = database_name
self.report_period = report_period
# a cache-like dictionaries to store the last reported values, which can be used to fill in missing values
self.network_cache = {}
self.service_cache = {}
def run(self):
"""
Performs the functionality of the aggregator - query data from both measurements merge that data and post it back in influx every 5 seconds.
......@@ -82,7 +85,7 @@ class Aggregator(object):
while True:
try:
result = self.db_client.query(
'SELECT mean(delay) as "net_delay" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
'SELECT mean(latency) as "net_latency", mean(bandwidth) as "net_bandwidth" FROM "E2EMetrics"."autogen"."network_delays" WHERE time >= {0} and time < {1} GROUP BY path, source, target'.format(
boundary_time_nano, current_time_nano))
break
except:
......@@ -93,14 +96,18 @@ class Aggregator(object):
# measurement = metadata[0]
tags = metadata[1]
network_delays[(tags['path'], tags['source'], tags['target'])] = next(result_points)['net_delay']
result = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
# query the service delays and group them by endpoint, service function instance and sfr
service_delays = {}
while True:
try:
result = self.db_client.query('SELECT mean(response_time) as "response_time" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(boundary_time_nano, current_time_nano))
result = self.db_client.query(
'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "E2EMetrics"."autogen"."service_delays" WHERE time >= {0} and time < {1} GROUP BY endpoint, sf_instance, sfr'.format(
boundary_time_nano, current_time_nano))
break
except:
sleep(self.RETRY_PERIOD)
......@@ -109,50 +116,62 @@ class Aggregator(object):
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
service_delays[tags['sfr']] = (next(result_points)['response_time'], tags['endpoint'], tags['sf_instance'])
result = next(result_points)
service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path
if target not in service_delays:
if target not in service_delays and target not in self.service_cache:
# if not continue with the other network path reports
continue
e2e_arguments = {"path_ID": None, "source_SFR": None, "target_SFR": None, "endpoint": None, "sf_instance": None, "delay_forward": None, "delay_reverse": None,
"delay_service": None, "time": boundary_time}
"delay_service": None, "avg_request_size": None, "avg_response_size": None, "avg_bandwidth": None, "time": boundary_time}
e2e_arguments['path_ID'] = path_id
e2e_arguments['delay_forward'] = network_delays[path]
e2e_arguments['source_SFR'] = source
e2e_arguments['target_SFR'] = target
e2e_arguments['delay_forward'] = network_delays[path][0]
e2e_arguments['avg_bandwidth'] = network_delays[path][1]
# reverse the path ID to get the network delay for the reversed path
reversed_path = (path_id, target, source)
assert reversed_path in network_delays # reversed path must always be reported with the forward one - if there is network path A-B, there is also network path B-A
e2e_arguments['delay_reverse'] = network_delays[reversed_path]
if reversed_path in network_delays or reversed_path in self.network_cache:
# get the reverse delay, use the latest value if reported or the cache value
e2e_arguments['delay_reverse'] = network_delays.get(reversed_path, self.network_cache.get(reversed_path))[0]
else:
e2e_arguments['delay_reverse'] = None
# get the response time of the media component connected to the target SFR
service_delay = service_delays[target]
response_time, endpoint, sf_instance = service_delay
service_delay = service_delays.get(target, self.service_cache.get(target))
response_time, request_size, response_size, endpoint, sf_instance = service_delay
# put these points in the e2e arguments dictionary
e2e_arguments['delay_service'] = response_time
e2e_arguments['avg_request_size'] = request_size
e2e_arguments['avg_response_size'] = response_size
e2e_arguments['endpoint'] = endpoint
e2e_arguments['sf_instance'] = sf_instance
# if all the arguments of the e2e delay measurements were reported, then generate and post to Influx an E2E measurement row
if None not in e2e_arguments.items():
if None not in e2e_arguments.values():
while True:
try:
self.db_client.write_points(
generate_e2e_delay_report(e2e_arguments['path_ID'], e2e_arguments['source_SFR'], e2e_arguments['target_SFR'], e2e_arguments['endpoint'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'], e2e_arguments['delay_service'],
e2e_arguments['sf_instance'], e2e_arguments['delay_forward'], e2e_arguments['delay_reverse'],
e2e_arguments['delay_service'],
e2e_arguments["avg_request_size"], e2e_arguments['avg_response_size'], e2e_arguments['avg_bandwidth'],
e2e_arguments['time']))
break
except:
sleep(self.RETRY_PERIOD)
old_timestamp = current_time
# wait until {REPORT_PERIOD} seconds have passed
# wait until {report_period) seconds have passed
while current_time < old_timestamp + self.report_period:
sleep(1)
current_time = int(time())
......
......@@ -98,9 +98,9 @@ def validate_action_content(content):
return content
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, time):
def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, time):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts.
Generates a combined averaged measurement about the e2e delay and its contributing parts
:param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_sfr: source service router
......@@ -110,6 +110,9 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst
:param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:param time: measurement timestamp
:return: a list of dict-formatted reports to post on influx
"""
......@@ -125,7 +128,10 @@ def generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_inst
"fields": {
"delay_forward": float(delay_forward),
"delay_reverse": float(delay_reverse),
"delay_service": float(delay_service)
"delay_service": float(delay_service),
"avg_request_size": float(avg_request_size),
"avg_response_size": float(avg_response_size),
"avg_bandwidth": float(avg_bandwidth)
},
"time": int(1000000000*time)
}]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment