From 8a46aba7449ff88345bc521a52eb6bb94f67d1c9 Mon Sep 17 00:00:00 2001
From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk>
Date: Wed, 28 Mar 2018 11:27:51 +0100
Subject: [PATCH] [ Issue #61 ] - Endpoint configuration monitoring update

---
 clmctest/monitoring/LineProtocolGenerator.py |  79 +++++-----
 clmctest/monitoring/StreamingSim.py          |  37 +++--
 clmctest/monitoring/conftest.py              |   3 +-
 clmctest/monitoring/test_endpoint_config.py  | 143 -------------------
 clmctest/monitoring/test_simresults.py       |  14 +-
 5 files changed, 67 insertions(+), 209 deletions(-)
 delete mode 100644 clmctest/monitoring/test_endpoint_config.py

diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py
index 692405e..a286412 100644
--- a/clmctest/monitoring/LineProtocolGenerator.py
+++ b/clmctest/monitoring/LineProtocolGenerator.py
@@ -23,23 +23,6 @@ def generate_network_report(recieved_bytes, sent_bytes, time):
     return result
 
 
-# Formats VM config
-def generate_vm_config(state, cpu, mem, storage, time):
-    result = [{"measurement": "vm_res_alloc",
-               "tags": {
-                   "vm_state": state
-               },
-               "fields": {
-                   "cpu": cpu,
-                   "memory": mem,
-                   "storage": storage
-               },
-               "time": _getNSTime(time)
-               }]
-
-    return result
-
-
 # Reports cpu usage, scaling on requests
 def generate_cpu_report(cpu_usage, cpu_active_time, cpu_idle_time, time):
     result = [{"measurement": "cpu_usage",
@@ -87,41 +70,33 @@ def generate_ipendpoint_route(resource, requests, latency, time):
     return result
 
 
-def generate_endpoint_config(state, cpu, mem, storage, statePeriod, time):
+def generate_endpoint_config(cpu, mem, storage, time, **kwargs):
     """
-    generates a measurement
+    generates a measurement for a VM configuration states
 
     :param cpu: the number of CPUs of VM endpoint
     :param mem: memory of VM endpoint
     :param storage: storage capacity of VM endpoint
-    :param state: the state that was monitored (e.g. placing, booting or connecting)
-    :param statePeriod: the period of time, the IP endpoint was in this state
     :param time: time of measurement
-    :return: dictionary object representing the data to post to influx
+    :param kwargs: 'pythonic-style' keyword arguments used to store the state as a key and it's respective state period (in seconds) as value
+    :return: dictionary object representing the data to post on influx
     """
 
-    result = [{"measurement": "endpointConfig",
-               "tags":
-                   {
-                    "state": state
-                    },
-               "fields":
-                   {"cpus": cpu,
-                    "memory": mem,
-                    "storage": storage,
-                    "statePeriod": statePeriod,
-                    },
-               "time": _getNSTime(time)}]
+    # lambda function to validate whether a state is given as a key in the keyword arguments dictionary
+    validate = lambda key: kwargs.get(key) if key in kwargs else 0.0
 
-    return result
+    # generate and validate the state values
+    fields = {"cpus": cpu, "memory": mem, "storage": storage}   # NOTE: Do we need these fields ?
+    for state in ("placing", "booting", "connecting"):
+        fields[state] = validate(state)
+        fields[("avg_{0}".format(state))] = validate("avg_{0}".format(state))
 
+    result = [{"measurement": "endpoint_config",
+               "fields": fields,
+               "time": _getNSTime(time)}]
 
-# InfluxDB likes to have time-stamps in nanoseconds
-def _getNSTime(time):
-    # Convert to nano-seconds
-    timestamp = int(1000000000*time)
+    return result
 
-    return timestamp
 
 def generate_mc_service_config( mcMeasurement, stateTimeStats, time ):
 
@@ -171,6 +146,13 @@ def validate_state_time_stats( stateTimeStats ):
     return stateTimeStats
 
 
+# InfluxDB likes to have time-stamps in nanoseconds
+def _getNSTime(time):
+    # Convert to nano-seconds
+    timestamp = int(1000000000*time)
+
+    return timestamp
+
 
 # DEPRECATED
 # ____________________________________________________________________________
@@ -182,6 +164,23 @@ def quote_wrap(string):
     return "\"" + string + "\""
 
 
+# Formats VM config
+def generate_vm_config(state, cpu, mem, storage, time):
+    result = [{"measurement": "vm_res_alloc",
+               "tags": {
+                   "vm_state": state
+               },
+               "fields": {
+                   "cpu": cpu,
+                   "memory": mem,
+                   "storage": storage
+               },
+               "time": _getNSTime(time)
+               }]
+
+    return result
+
+
 def _generateClientRequest(cReq, id, time):
     # Tags first
     result = 'sid="' + str(id) + '",' + cReq
diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py
index 242fe83..d7d8bd1 100644
--- a/clmctest/monitoring/StreamingSim.py
+++ b/clmctest/monitoring/StreamingSim.py
@@ -3,9 +3,9 @@
 import clmctest.monitoring.LineProtocolGenerator as lp
 import time
 import urllib.parse
-import pytest
 import random
-import sys, getopt
+import sys
+import getopt
 from influxdb import InfluxDBClient
 
 # Simulation parameters
@@ -89,8 +89,7 @@ class Sim(object):
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
             delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0],
-                                             config_delay_dist['placing'][0] * config_delay_dist['placing'][1],
-                                             'placing', 'placed')
+                                             config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 'placing')
             max_delay = max(delay_time, max_delay)
         sim_time += max_delay
 
@@ -100,8 +99,7 @@ class Sim(object):
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
             delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0],
-                                             config_delay_dist['booting'][0] * config_delay_dist['booting'][1],
-                                             'booting', 'booted')
+                                             config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 'booting')
             max_delay = max(delay_time, max_delay)
         sim_time += max_delay
 
@@ -124,8 +122,7 @@ class Sim(object):
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
             delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0],
-                                             config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1],
-                                             'connecting', 'connected')
+                                             config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 'connecting')
             max_delay = max(delay_time, max_delay)
         sim_time += max_delay
 
@@ -244,23 +241,22 @@ class Sim(object):
         return response_delay
 
     @staticmethod
-    def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state, next_state):
+    def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, transition_state):
         """
         Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables.
-        :param sim_time:
-        :param ip_endpoint:
-        :param mu:
-        :param sigma:
-        :param transition_state:
-        :param next_state:
+
+        :param sim_time: current simulation time
+        :param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.)
+        :param mu: mean value
+        :param sigma: standard deviation
         :return: the delay time
         """
 
-        agent_db_client.write_points(lp.generate_vm_config(transition_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time))
-
-        delay_time = random.normalvariate(mu, sigma)
-
-        agent_db_client.write_points(lp.generate_vm_config(next_state, ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time + delay_time))
+        # assert transition time is at least 0
+        delay_time = max(random.normalvariate(mu, sigma), 0.0)
+        # transition state and state period are passed to the generate report function as keyword arguments
+        agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time,
+                                                                 **{transition_state: delay_time, 'avg_{0}'.format(transition_state): delay_time}))
 
         return delay_time
 
@@ -314,6 +310,7 @@ def run_simulation(generate=True, sTime=3600):
     else:
         simulator.db_client.drop_database(simulator.influx_db_name)
 
+
 if __name__ == "__main__":
     """
     The main entry for this module. Code here is executed only if the StreamingSim.py file is executed, 
diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py
index ad4d96b..6d59641 100644
--- a/clmctest/monitoring/conftest.py
+++ b/clmctest/monitoring/conftest.py
@@ -12,7 +12,6 @@ def streaming_sim_config():
     """
     Reads the service configuration deployed for the streaming simulation test.
 
-    :param request: access the parameters of the fixture
     :return: the python object representing the read YAML file
     """
     rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml')
@@ -22,6 +21,7 @@ def streaming_sim_config():
         data_loaded = yaml.load(stream)
     return data_loaded
 
+
 @pytest.fixture(scope="module")
 def streaming_sim_params(streaming_sim_config):
     """
@@ -51,6 +51,7 @@ def get_db_client(streaming_sim_config, request):
 
     return InfluxDBClient(host=streaming_sim_config['hosts'][0]['ip_address'], port=8086, database=request.param['database'], timeout=10)
 
+
 @pytest.fixture(scope='module')
 def run_simulation_fixture(streaming_sim_params):
     """
diff --git a/clmctest/monitoring/test_endpoint_config.py b/clmctest/monitoring/test_endpoint_config.py
deleted file mode 100644
index 0d0a09d..0000000
--- a/clmctest/monitoring/test_endpoint_config.py
+++ /dev/null
@@ -1,143 +0,0 @@
-#!/usr/bin/python3
-
-import pytest
-from influxdb import InfluxDBClient
-import clmctest.monitoring.LineProtocolGenerator as lp
-import random
-import time
-
-
-"""
-Tests the monitoring of endpoints' configuration states - currently based on model with three states (placing -> booting -> connecting)
-
-Line Protocol report format:
- 
-endpointConfig <global tags>,<state> <statePeriod=milliseconds>,<cpus=numberOfCPUs>, <memory=memoryOfVM>, <storage=storageCapacityOfVM> time
-"""
-
-
-# Random initialization of state periods
-state_delays = [{'placing': random.randint(7500, 12500), 'booting': random.randint(7500, 12500), 'connecting': random.randint(7500, 12500)},
-                {'placing': random.randint(7500, 12500), 'booting': random.randint(7500, 12500), 'connecting': random.randint(7500, 12500)}]
-
-measurement_name = 'endpointConfig'  # measurement name for configuration of media components' endpoints
-
-
-@pytest.mark.parametrize("query, result", [
-    ('SELECT "state", "statePeriod", "cpus", "memory", "storage" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
-     {'placing': {"state": "placing", "statePeriod": state_delays[0]['placing'], "cpus": 1, "memory": 2048, "storage": '10GB'},
-      'booting': {"state": "booting", "statePeriod": state_delays[0]['booting'], "cpus": 1, "memory": 2048, "storage": '10GB'},
-      'connecting': {"state": "connecting", "statePeriod": state_delays[0]['connecting'], "cpus": 1, "memory": 2048, "storage": '10GB'}}),
-    ('SELECT "state", "statePeriod", "cpus", "memory", "storage" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'' ,
-     {'placing': {"state": "placing", "statePeriod": state_delays[1]['placing'], "cpus": 1, "memory": 2048, "storage": '10GB'},
-      'booting': {"state": "booting", "statePeriod": state_delays[1]['booting'], "cpus": 1, "memory": 2048, "storage": '10GB'},
-      'connecting': {"state": "connecting", "statePeriod": state_delays[1]['connecting'], "cpus": 1, "memory": 2048, "storage": '10GB'}})
-])
-def test_endpoint_config(query, result, get_db_client):
-    """
-    Test endpoint configuration state measurements in general.
-
-    :param query: query under test for the endpoint configuration state
-    :param result: expected result of executed query
-    :param get_db_client: the InfluxDB client fixture from conftest.py
-    """
-
-    print("\n")  # blank line for formatting purposes
-
-    query_result = get_db_client.query(query, raise_errors=False)
-    # test the error attribute of the result is None, that is no error is returned from executing the DB query
-    assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
-    measurements = query_result.get_points()
-
-    assert all(map(lambda measurement: compare(measurement, result), measurements)), "Comparison failure for query:\n{0}".format(query)
-
-    print("Successfully passed test for query:\n{0}".format(query))
-
-
-@pytest.mark.parametrize("query, result", [
-    ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'placing\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
-     {"mean_statePeriod": state_delays[0]['placing']}),
-    ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'booting\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
-     {"mean_statePeriod": state_delays[0]['booting']}),
-    ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'connecting\' and ipendpoint=\'adaptive_streaming_I1_apache1\' and sf=\'adaptive_streaming\'',
-     {"mean_statePeriod": state_delays[0]['connecting']}),
-    ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'placing\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'',
-     {"mean_statePeriod": state_delays[1]['placing']}),
-    ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'booting\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'',
-     {"mean_statePeriod": state_delays[1]['booting']}),
-    ('SELECT MEAN("statePeriod") as "mean_statePeriod" FROM "CLMCMetrics"."autogen"."endpointConfig" WHERE state=\'connecting\' and ipendpoint=\'adaptive_streaming_I1_apache2\' and sf=\'adaptive_streaming\'',
-     {"mean_statePeriod": state_delays[1]['connecting']})
-])
-def test_mean_config_periods(query, result, get_db_client):
-    """
-    :param query: query under test for the endpoint configuration state
-    :param result: expected result of executed query
-    :param get_db_client: the InfluxDB client fixture from conftest.py
-    """
-
-    print("\n")  # blank line for formatting purposes
-
-    query_result = get_db_client.query(query, raise_errors=False)
-    # test the error attribute of the result is None, that is no error is returned from executing the DB query
-    assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
-
-    measurement = next(query_result.get_points())
-    measurement.pop('time')
-
-    assert measurement == result, "Comparison failure for query of mean state period:\n{0}".format(query)
-
-    print("Successfully passed test for query of mean state period:\n{0}".format(query))
-
-
-@pytest.fixture(scope='module', autouse=True)
-def generate_states_data(streaming_sim_config, get_db_client):
-    """
-    Sends configuration state data to influx for the two mc endpoints
-
-    :param streaming_sim_config: the configuration fixture from conftest.py
-    :param get_db_client: the influx db client
-    """
-
-    global state_delays
-    global measurement_name
-
-    if measurement_name in (measurement['name'] for measurement in get_db_client.get_list_measurements()):
-        get_db_client.drop_measurement(measurement_name)  # clear DB measurement from previous test if exists
-
-    time.sleep(2)
-
-    for i in range(1, 3):
-        measurement_time = 0
-        for state in ('placing', 'booting', 'connecting'):
-            epURL = streaming_sim_config['hosts'][i]['ip_address']   # endpoint URL
-            endpointID = streaming_sim_config['hosts'][i]['ipendpoint_id']
-            cpu = streaming_sim_config['hosts'][i]['cpus']
-            mem = streaming_sim_config['hosts'][i]['memory']
-            disk = streaming_sim_config['hosts'][i]['disk']
-
-            db_name = streaming_sim_config['hosts'][i]['database_name']
-            db_client = InfluxDBClient(host=epURL, port=8186, database=db_name, timeout=10)
-
-            period = state_delays[i-1].get(state)
-            print("Endpoint - {0}, state - {1}, period - {2}ms".format(endpointID, state, period))
-
-            db_client.write_points(lp.generate_endpoint_config(state, cpu, mem, disk, period, measurement_time))
-            measurement_time += period
-
-    time.sleep(10)
-
-
-def compare(actual_measurement, measurement_dict):
-    """
-    Auxiliary function to check whether the measurements are the same, excluding the time stamp field
-
-    :param actual_measurement: the actual measurement from Influx (the one with a time stamp
-    :param measurement_dict: the dictionary of expected measurements
-    :return: True for equality and False otherwise
-    """
-
-    actual_measurement.pop('time')
-    # get the measurement state we are comparing and fetch it from the dictionary of expected measurements
-    expected_measurement = measurement_dict.get(actual_measurement.get('state'))
-
-    return actual_measurement == expected_measurement
diff --git a/clmctest/monitoring/test_simresults.py b/clmctest/monitoring/test_simresults.py
index 73d984a..0277167 100644
--- a/clmctest/monitoring/test_simresults.py
+++ b/clmctest/monitoring/test_simresults.py
@@ -17,13 +17,17 @@ class TestSimulation(object):
          {"time": "1970-01-01T00:00:00Z", "count_avg_response_time": 7200, "count_peak_response_time": 7200, "count_requests": 7200}),
         ('SELECT count(*) FROM "CLMCMetrics"."autogen"."net_port_io"',
          {"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}),
-        ('SELECT count(*) FROM "CLMCMetrics"."autogen"."vm_res_alloc"',
-         {"time": "1970-01-01T00:00:00Z", "count_cpu": 12, "count_memory": 12, "count_storage": 12}),
-         
+
+        ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
+         {"time": "1970-01-01T00:00:00Z", "count_placing": 3, "count_avg_placing": 3, "count_booting": 3, "count_avg_booting": 3, "count_connecting": 3, "count_avg_connecting": 3, "count_cpus": 3, "count_memory": 3, "count_storage": 3}),
+        ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
+         {"time": "1970-01-01T00:00:00Z", "count_placing": 3, "count_avg_placing": 3, "count_booting": 3, "count_avg_booting": 3, "count_connecting": 3, "count_avg_connecting": 3,
+          "count_cpus": 3, "count_memory": 3, "count_storage": 3}),
+
         ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
-         {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3602, "count_avg_starting" : 3602, "count_avg_stopped" : 3602, "count_avg_stopping" : 3602, "count_running" : 3602, "count_starting" : 3602, "count_stopped" : 3602, "count_stopping" : 3602}),
+         {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3602, "count_avg_starting": 3602, "count_avg_stopped": 3602, "count_avg_stopping": 3602, "count_running": 3602, "count_starting": 3602, "count_stopped": 3602, "count_stopping": 3602}),
         ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
-         {"time" : "1970-01-01T00:00:00Z", "count_avg_running" : 3602, "count_avg_starting" : 3602, "count_avg_stopped" : 3602, "count_avg_stopping" : 3602, "count_running" : 3602, "count_starting" : 3602, "count_stopped" : 3602, "count_stopping" : 3602}),
+         {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3602, "count_avg_starting": 3602, "count_avg_stopped": 3602, "count_avg_stopping": 3602, "count_running": 3602, "count_starting": 3602, "count_stopped": 3602, "count_stopping": 3602}),
     ])
     def test_simulation(self, query, expected_result, get_db_client, run_simulation_fixture):
         """
-- 
GitLab