Skip to content
Snippets Groups Projects
Commit cac1853d authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

Implements unit test for aggregator

parent 553fec8c
No related branches found
No related tags found
No related merge requests found
......@@ -144,6 +144,7 @@ echo "----> Running tox"
TOX_OUTPUT="$(tox)"
# check if tox output contains the 'congratulations :)' bit for tests passed
if [[ $TOX_OUTPUT != *"congratulations :)"* ]]; then
echo $TOX_OUTPUT
echo "CLMC service unit tests failed."
exit 1
fi
......
......@@ -22,7 +22,7 @@
## Created for Project : FLAME
"""
from threading import Event
from threading import Thread, Event
from influxdb import InfluxDBClient
from time import time, sleep
from urllib.parse import urlparse
......@@ -106,28 +106,43 @@ class Aggregator(object):
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result['net_latency'], result['net_bandwidth']
result_point = next(result_points)
network_delays[(tags['path'], tags['source'], tags['target'])] = result_point['net_latency'], result_point['net_bandwidth']
self.network_cache[(tags['path'], tags['source'], tags['target'])] = result_point['net_latency'], result_point['net_bandwidth']
# query the service delays and group them by endpoint, service function instance and sfr
service_delays = {}
result = self.db_client.query(
'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(self.db_name,
boundary_time_nano, current_time_nano))
'SELECT mean(response_time) as "response_time", mean(request_size) as "request_size", mean(response_size) as "response_size" FROM "{0}"."autogen"."service_delays" WHERE time >= {1} and time < {2} GROUP BY endpoint, sf_instance, sfr'.format(
self.db_name, boundary_time_nano, current_time_nano))
for item in result.items():
metadata, result_points = item
# measurement = metadata[0]
tags = metadata[1]
result = next(result_points)
service_delays[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result['response_time'], result['request_size'], result['response_size'], tags['endpoint'], tags['sf_instance'])
result_point = next(result_points)
service_delays[tags['sfr']] = (result_point['response_time'], result_point['request_size'], result_point['response_size'], tags['endpoint'], tags['sf_instance'])
self.service_cache[tags['sfr']] = (result_point['response_time'], result_point['request_size'], result_point['response_size'], tags['endpoint'], tags['sf_instance'])
# for each network path check if there is a media service delay report for the target sfr - if so, generate an e2e_delay measurement
for path in network_delays:
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
path_id, source, target = path
# check if we have a reverse path without a forward path for a potential aggregated row - e.g. SR3 to SR1 network row with service on SR3 and no row from SR1 to SR3
if (source in service_delays or source in self.service_cache) and (path_id, target, source) not in network_delays and (path_id, target, source) in self.network_cache:
# hence search for the forward path in the cache
forward_path = self.network_cache.get((path_id, target, source))
reverse_path = network_delays.get((path_id, source, target))
forward_delay = forward_path[0]
avg_bandwidth = forward_path[1]
reverse_delay = reverse_path[0]
service_delay = service_delays.get(source, self.service_cache.get(source))
response_time, request_size, response_size, endpoint, sf_instance = service_delay
self.db_client.write_points(
generate_e2e_delay_report(path_id, target, source, endpoint, sf_instance, forward_delay, reverse_delay, response_time,
request_size, response_size, avg_bandwidth, boundary_time))
# check if target sfr is reported in service delays, in other words - if there is a media service instance being connected to target sfr
if target not in service_delays and target not in self.service_cache:
# if not continue with the other network path reports
continue
......@@ -180,6 +195,54 @@ class Aggregator(object):
self.log.info("Aggregator stopped running.")
class AggregatorThread(Thread):
"""
A utility class used to wrap around the Aggregator class and return a Thread instance, which can then be used for testing (provides start and stop methods)
"""
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'CLMCMetrics' # default database the aggregator uses
DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_url: database url to use
"""
super(AggregatorThread, self).__init__() # call the constructor of the thread
self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period)
def stop(self):
"""
A method used to stop the thread.
"""
self.aggregator.stop()
def set_event_lock(self, event):
"""
Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing).
:param event: the event lock object
"""
setattr(self, 'event', event)
def run(self):
"""
The method to execute when the thread starts.
"""
if hasattr(self, 'event'):
self.event.set()
self.aggregator.run()
if __name__ == '__main__':
# initialise a file logger, only when module's main method is run (NOT when aggregator class is imported somewhere else)
log = logging.getLogger('aggregator')
......
from clmcservice.utilities import generate_e2e_delay_report
"""
A python module which provides auxiliary functions to mimic the behaviour of an InfluxDBClient when unit testing the aggregator.
"""
class MockResultSet(object):
"""
A mock object used to mimic the behaviour of a ResultSet in the influx library (we only need the functionality of an object that collects
a group of points and has an items() method to get the collected points.
"""
def __init__(self, points):
"""
Initialise the mock result set.
:param points: the collected points
"""
self.points = points
def items(self):
"""
Get the data points in the result set.
:return: the collected data points
"""
return self.points
# The following are network-related auxiliary functions to generate test data.
def _network_result_point(net_latency, net_bandwidth):
"""
Returns a generator, which yields one data point representing a network measurement (fields only)
:param net_latency: the reported network latency
:param net_bandwidth: the reported network bandwidth.
:return: a generator object with one element (same behaviour is used in the influxdb library even when only one point is returned from the query)
"""
yield {"net_latency": net_latency, "net_bandwidth": net_bandwidth}
def _network_tags(path, source, target):
"""
Returns a dictionary representing a network measurement (tags only)
:param path: the path identifier
:param source: the source service router
:param target: the target service router
:return: a dictionary with those values
"""
return {"path": path, "source": source, "target": target}
def _network_metadata(measurement, path, source, target):
"""
Returns an influxdb-styled metadata about a network measurement.
:param measurement: the measurement table name
:param path: the path identifier
:param source: the source service router
:param target: the target service router
:return: a tuple with the first element being the measurement name and the second element being a dictionary with the network measurement tag values
"""
return measurement, _network_tags(path, source, target)
def network_result_item(measurement, path, source, target, net_latency, net_bandwidth):
"""
Returns a full influxdb-styled network measurement item - with tag and field values.
:param measurement: the measurement table name
:param path: the path identifier
:param source: the source service router
:param target: the target service router
:param net_latency: the reported network latency
:param net_bandwidth: the reported network bandwidth.
:return: a tuple with the first element being the result metadata (measurement name and tags) and the second element being the data field points
"""
return _network_metadata(measurement, path, source, target), _network_result_point(net_latency, net_bandwidth)
# The following are service-related auxiliary functions to generate test data.
def _service_result_point(response_time, request_size, response_size):
"""
Returns a generator, which yields one data point representing a service measurement (fields only)
:param response_time: the response time of the service
:param request_size: the averaged request size of the service
:param response_size: the averaged response size of the service
:return: a generator object with one element (same behaviour is used in the influxdb library even when only one point is returned from the query)
"""
yield {"response_time": response_time, "request_size": request_size, "response_size": response_size}
def _service_tags(sfr, endpoint, sf_instance):
"""
Returns a dictionary representing a service measurement (tags only)
:param sfr: the service router to which the service's endpoint is connected to
:param endpoint: the endpoint the service is being deployed on
:param sf_instance: the service function instance (FQDN)
:return: a dictionary with those values
"""
return {"sfr": sfr, "endpoint": endpoint, "sf_instance": sf_instance}
def _service_metadata(measurement, sfr, endpoint, sf_instance):
"""
Returns an influxdb-styled metadata about a service measurement.
:param measurement: the measurement table name
:param sfr: the service router to which the service's endpoint is connected to
:param endpoint: the endpoint the service is being deployed on
:param sf_instance: the service function instance (FQDN)
:return: a tuple with the first element being the measurement name and the second element being a dictionary with the service measurement tag values
"""
return measurement, _service_tags(sfr, endpoint, sf_instance)
def service_result_item(measurement, sfr, endpoint, sf_instance, response_time, request_size, response_size):
"""
Returns a full influxdb-styled service measurement item - with tag and field values.
:param measurement: the measurement table name
:param sfr: the service router to which the service's endpoint is connected to
:param endpoint: the endpoint the service is being deployed on
:param sf_instance: the service function instance (FQDN)
:param response_time: the response time of the service
:param request_size: the averaged request size of the service
:param response_size: the averaged response size of the service
:return: a tuple with the first element being the result metadata (measurement name and tags) and the second element being the data field points
"""
return _service_metadata(measurement, sfr, endpoint, sf_instance), _service_result_point(response_time, request_size, response_size)
# The following are auxiliary functions for generating an e2e row used in the unit testing of the aggregator.
def drop_timestamp(d):
"""
Drops the time stamp from a dictionary-represented influx result item object
:param d: the dictionary object representing a measurement row from influx
:return: the same dictionary with no timestamp
"""
d.pop('time')
return d
def _generate_e2e_row(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts with default timestamp (set as 0)
:param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_sfr: source service router
:param target_sfr: target service router
:param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:return: a list of dict-formatted reports to post on influx
"""
return generate_e2e_delay_report(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward,
delay_reverse, delay_service, avg_request_size, avg_response_size, avg_bandwidth, 0)[0]
def generate_e2e_no_timestamp_row(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse,
delay_service, avg_request_size, avg_response_size, avg_bandwidth):
"""
Generates a combined averaged measurement about the e2e delay and its contributing parts (with no timestamp, used for testing)
:param path_id: The path identifier, which is a bidirectional path ID for the request and the response path
:param source_sfr: source service router
:param target_sfr: target service router
:param endpoint: endpoint of the media component
:param sf_instance: service function instance (media component)
:param delay_forward: Path delay (Forward direction)
:param delay_reverse: Path delay (Reverse direction)
:param delay_service: the media service component response time
:param avg_request_size: averaged request size
:param avg_response_size: averaged response size
:param avg_bandwidth: averaged bandwidth
:return: a list of dict-formatted reports to post on influx
"""
return drop_timestamp(_generate_e2e_row(path_id, source_sfr, target_sfr, endpoint, sf_instance, delay_forward, delay_reverse, delay_service,
avg_request_size, avg_response_size, avg_bandwidth))
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 04-06-2018
## Created for Project : FLAME
"""
from collections import OrderedDict
from threading import Event
from unittest import mock
from time import sleep
from clmcservice.aggregation.aggregator import AggregatorThread
from clmcservice.aggregation.influx_data_interface import MockResultSet, network_result_item, service_result_item, drop_timestamp, generate_e2e_no_timestamp_row
class TestAggregation(object):
ACTUAL_RESULTS = "actual_aggregated_results"
EXPECTED_RESULTS = "expected_aggregated_results"
FINISHED = "finished_event"
def points_generator(self, network_items, service_items):
assert len(network_items) == len(service_items), "The data points generator must receive the same number of network items as the number of service items"
index = 0
while not getattr(self, self.FINISHED).is_set():
items = network_items[index]
yield MockResultSet(items)
items = service_items[index]
# before yielding the service data points, check if both sets of data points are enumerated
if index == len(network_items)-1:
getattr(self, self.FINISHED).set()
yield MockResultSet(items)
index += 1
def setup_mock_db_client(self, mock_class):
setattr(self, self.ACTUAL_RESULTS, [])
setattr(self, self.EXPECTED_RESULTS, [
generate_e2e_no_timestamp_row(path_id="SR1-SR3", source_sfr="SR1", target_sfr="SR3", endpoint="endpoint1", sf_instance="ms1.flame.org", delay_forward=10,
delay_reverse=15, delay_service=10, avg_request_size=1024, avg_response_size=8, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR1-SR3", source_sfr="SR1", target_sfr="SR3", endpoint="endpoint1", sf_instance="ms1.flame.org", delay_forward=5,
delay_reverse=25, delay_service=40, avg_request_size=16, avg_response_size=2048, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR1-SR2", source_sfr="SR1", target_sfr="SR2", endpoint="endpoint2", sf_instance="ms2.flame.org", delay_forward=15,
delay_reverse=35, delay_service=60, avg_request_size=32, avg_response_size=1024, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR4-SR5", source_sfr="SR4", target_sfr="SR5", endpoint="endpoint5", sf_instance="ms5.flame.org", delay_forward=11,
delay_reverse=25, delay_service=50, avg_request_size=2048, avg_response_size=32, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR1-SR2", source_sfr="SR1", target_sfr="SR2", endpoint="endpoint2", sf_instance="ms2.flame.org", delay_forward=12,
delay_reverse=5, delay_service=60, avg_request_size=32, avg_response_size=1024, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR1-SR3", source_sfr="SR1", target_sfr="SR3", endpoint="endpoint1", sf_instance="ms1.flame.org", delay_forward=16,
delay_reverse=25, delay_service=40, avg_request_size=16, avg_response_size=2048, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR10-SR12", source_sfr="SR12", target_sfr="SR10", endpoint="endpoint10", sf_instance="ms4.flame.org", delay_forward=22,
delay_reverse=3, delay_service=75, avg_request_size=1024, avg_response_size=64, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR14-SR15", source_sfr="SR14", target_sfr="SR15", endpoint="endpoint15", sf_instance="ms2.flame.org", delay_forward=24,
delay_reverse=27, delay_service=105, avg_request_size=1024, avg_response_size=128, avg_bandwidth=1200),
generate_e2e_no_timestamp_row(path_id="SR14-SR15", source_sfr="SR15", target_sfr="SR14", endpoint="endpoint14", sf_instance="ms1.flame.org", delay_forward=27,
delay_reverse=24, delay_service=85, avg_request_size=32, avg_response_size=64, avg_bandwidth=1200),
])
setattr(self, self.FINISHED, Event())
mock_points = self.points_generator(
network_items=[
(
network_result_item("network_delays", "SR1-SR3", "SR1", "SR3", 10, 1200),
network_result_item("network_delays", "SR1-SR3", "SR3", "SR1", 15, 1200),
network_result_item("network_delays", "SR1-SR33", "SR1", "SR33", 15, 1200),
network_result_item("network_delays", "SR2-SR11", "SR11", "SR2", 15, 1200)
),
(
network_result_item("network_delays", "SR1-SR3", "SR1", "SR3", 5, 1200),
network_result_item("network_delays", "SR1-SR3", "SR3", "SR1", 25, 1200),
network_result_item("network_delays", "SR1-SR2", "SR1", "SR2", 15, 1200),
network_result_item("network_delays", "SR1-SR2", "SR2", "SR1", 35, 1200)
),
(
network_result_item("network_delays", "SR4-SR5", "SR4", "SR5", 5, 1200),
network_result_item("network_delays", "SR4-SR5", "SR5", "SR4", 25, 1200),
),
(),
(
network_result_item("network_delays", "SR4-SR5", "SR4", "SR5", 5, 1200),
network_result_item("network_delays", "SR4-SR5", "SR5", "SR4", 25, 1200),
network_result_item("network_delays", "SR0-SR1", "SR0", "SR1", 5, 1200),
network_result_item("network_delays", "SR0-SR1", "SR1", "SR0", 25, 1200),
network_result_item("network_delays", "SR10-SR12", "SR10", "SR12", 11, 1200),
network_result_item("network_delays", "SR10-SR12", "SR12", "SR10", 22, 1200),
network_result_item("network_delays", "SR14-SR15", "SR14", "SR15", 24, 1200),
network_result_item("network_delays", "SR14-SR15", "SR15", "SR14", 26, 1200),
),
(
network_result_item("network_delays", "SR4-SR5", "SR4", "SR5", 11, 1200),
network_result_item("network_delays", "SR1-SR2", "SR1", "SR2", 12, 1200),
network_result_item("network_delays", "SR1-SR2", "SR2", "SR1", 5, 1200),
network_result_item("network_delays", "SR1-SR3", "SR1", "SR3", 16, 1200),
network_result_item("network_delays", "SR10-SR12", "SR10", "SR12", 3, 1200),
network_result_item("network_delays", "SR14-SR15", "SR15", "SR14", 27, 1200),
)
],
service_items=[
(
service_result_item("service_delays", "SR3", "endpoint1", "ms1.flame.org", 10, 1024, 8),
service_result_item("service_delays", "SR33", "endpoint33", "ms2.flame.org", 20, 4096, 8),
service_result_item("service_delays", "SR11", "endpoint11", "ms3.flame.org", 30, 1024, 8),
),
(
service_result_item("service_delays", "SR3", "endpoint1", "ms1.flame.org", 40, 16, 2048),
service_result_item("service_delays", "SR2", "endpoint2", "ms2.flame.org", 60, 32, 1024)
),
(
service_result_item("service_delays", "SR6", "endpoint6", "ms1.flame.org", 60, 1024, 8),
service_result_item("service_delays", "SR7", "endpoint7", "ms1.flame.org", 70, 1024, 8),
),
(
service_result_item("service_delays", "SR6", "endpoint6", "ms1.flame.org", 65, 2048, 16),
service_result_item("service_delays", "SR8", "endpoint8", "ms2.flame.org", 75, 2048, 16),
service_result_item("service_delays", "SR9", "endpoint9", "ms3.flame.org", 25, 2048, 16),
),
(),
(
service_result_item("service_delays", "SR5", "endpoint5", "ms5.flame.org", 50, 2048, 32),
service_result_item("service_delays", "SR10", "endpoint10", "ms4.flame.org", 75, 1024, 64),
service_result_item("service_delays", "SR15", "endpoint15", "ms2.flame.org", 105, 1024, 128),
service_result_item("service_delays", "SR14", "endpoint14", "ms1.flame.org", 85, 32, 64),
)
]
)
mock_class.query = lambda query: next(mock_points)
mock_class.write_points = lambda points: getattr(self, self.ACTUAL_RESULTS).append(drop_timestamp(points[0]))
@mock.patch('clmcservice.aggregation.aggregator.InfluxDBClient', autospec=True)
def test_aggregator(self, MockDBClient):
self.setup_mock_db_client(MockDBClient.return_value)
t = AggregatorThread(report_period=2)
t.start()
while not getattr(self, self.FINISHED).is_set():
sleep(1)
t.stop()
expected_results = getattr(self, self.EXPECTED_RESULTS)
actual_results = getattr(self, self.ACTUAL_RESULTS)
assert type(actual_results) is list
assert type(expected_results) is list
assert len(actual_results) == len(expected_results), "Actual and expected result differ in length."
assert sorted(actual_results, key=lambda k: k['tags']['path_ID']) == sorted(expected_results, key=lambda k: k['tags']['path_ID']), \
"Test failure - aggregation process returns incorrect results."
......@@ -179,7 +179,7 @@ class AggregatorController(object):
:return: the process object of the started aggregator script
"""
dir_path = os.path.dirname(os.path.realpath(__file__))
dir_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'aggregation')
python_interpreter = sys.executable
command = [python_interpreter, 'aggregator.py', '--period', str(config.get('aggregator_report_period')), '--database',
config.get('aggregator_database_name'), '--url', config.get('aggregator_database_url')]
......
#!/usr/bin/python3
"""
## © University of Southampton IT Innovation Centre, 2018
##
## Copyright in this software belongs to University of Southampton
## IT Innovation Centre of Gamma House, Enterprise Road,
## Chilworth Science Park, Southampton, SO16 7NS, UK.
##
## This software may not be used, sold, licensed, transferred, copied
## or reproduced in whole or in part in any manner or form or in or
## on any media by any person other than in accordance with the terms
## of the Licence Agreement supplied with the software, or otherwise
## without the prior written consent of the copyright owners.
##
## This software is distributed WITHOUT ANY WARRANTY, without even the
## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
## PURPOSE, except where stated in the Licence Agreement supplied with
## the software.
##
## Created By : Nikolay Stanchev
## Created Date : 25-04-2018
## Created for Project : FLAME
"""
from threading import Thread
from clmcservice.aggregator import Aggregator
class TestAggregator(Thread):
REPORT_PERIOD = 5 # currently, report period is 5s, that is every 5 seconds the mean delay values for the last 5 seconds are aggregated
DATABASE = 'CLMCMetrics' # default database the aggregator uses
DATABASE_URL = 'http://172.40.231.51:8086' # default database URL the aggregator uses
def __init__(self, database=DATABASE, database_url=DATABASE_URL, report_period=REPORT_PERIOD):
"""
Constructs an Aggregator instance.
:param database: database name to use
:param database_url: database url to use
"""
super(TestAggregator, self).__init__() # call the constructor of the thread
self.aggregator = Aggregator(database_name=database, database_url=database_url, report_period=report_period)
def stop(self):
"""
A method used to stop the thread.
"""
self.aggregator.stop()
def set_event_lock(self, event):
"""
Auxiliary method to set a thread-safe event lock object to the aggregator (used for testing).
:param event: the event lock object
"""
setattr(self, 'event', event)
def run(self):
if hasattr(self, 'event'):
self.event.set()
self.aggregator.run()
......@@ -28,7 +28,7 @@ import pkg_resources
from influxdb import InfluxDBClient
from clmctest.monitoring.StreamingSim import Sim
from clmctest.monitoring.E2ESim import Simulator
from clmctest.monitoring.E2ETestAggregatorThread import TestAggregator
from clmcservice.aggregation.aggregator import AggregatorThread
@pytest.fixture(scope="module")
......@@ -109,4 +109,4 @@ def e2e_aggregator(streaming_sim_config):
influx_url = "http://" + streaming_sim_config[0]['ip_address'] + ":8086"
return TestAggregator(database_url=influx_url)
return AggregatorThread(database_url=influx_url)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment