From e3bfce580d9deb07f2ea5ef92972d8d54d43a0e2 Mon Sep 17 00:00:00 2001
From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk>
Date: Wed, 4 Apr 2018 09:53:24 +0100
Subject: [PATCH] [Issue #61] - normalised all reporting periods for endpoint
 configuration states to 1 second

---
 clmctest/monitoring/StreamingSim.py    | 137 +++++++++++++------------
 clmctest/monitoring/conftest.py        |   2 +-
 clmctest/monitoring/test_simresults.py |  67 +++---------
 3 files changed, 88 insertions(+), 118 deletions(-)

diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py
index ba9dbbe..88e85fb 100644
--- a/clmctest/monitoring/StreamingSim.py
+++ b/clmctest/monitoring/StreamingSim.py
@@ -73,63 +73,56 @@ class Sim(object):
                          'segment_size': 2, 'video_bit_rate': 80, 'packet_size': 1500}
                         ]
 
-        # Simulate configuration of the ip endpoints
-        # endpoint state->mu, sigma, secs normal distribution
-        config_delay_dist = {"unplaced": [1, 0.68], "placing": [10, 0.68], "placed": [1, 0.68], "booting": [10, 0.68],  "booted": [2, 0.68],
-                             "connecting": [10, 0.68], "connected": [8, 0.68]}
-
         print("\nSimulation started. Generating data...")
 
+        # Simulate configuration of the ip endpoints
+
         # Move endpoints from state unplaced to state placing
-        max_delay = 0
+        # reports: 1, unplaced: 0.7s, placing: 0.3s
         for ip_endpoint in ip_endpoints:
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['unplaced'][0],
-                                             config_delay_dist['unplaced'][0] * config_delay_dist['unplaced'][1], 0.7,
-                                             'unplaced', 'placing')
-            max_delay = max(delay_time, max_delay)
-        sim_time += max_delay
+            self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 7, 'unplaced', 'placing')
+        sim_time += TICK_TIME
 
         # Place endpoints
-        max_delay = 0
+        # reports: 10, placing: 9.1s, placed: 0.9s
         for ip_endpoint in ip_endpoints:
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placing'][0],
-                                             config_delay_dist['placing'][0] * config_delay_dist['placing'][1], 0.7,
-                                             'placing', 'placed')
-            max_delay = max(delay_time, max_delay)
-        sim_time += max_delay
+
+            for i in range(9):
+                self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "placing")
+
+            self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 9, 'placing', 'placed')
+        sim_time += 10 * TICK_TIME
 
         # Move endpoints from state placed to state booting
-        max_delay = 0
+        # reports: 1, placed: 0.8s, booting: 0.2s
         for ip_endpoint in ip_endpoints:
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['placed'][0],
-                                             config_delay_dist['placed'][0] * config_delay_dist['placed'][1], 0.7,
-                                             'placed', 'booting')
-            max_delay = max(delay_time, max_delay)
-        sim_time += max_delay
+            self._changeVMState(agent_db_client, sim_time, ip_endpoint, 10, 8, 'placed', 'booting')
+        sim_time += TICK_TIME
 
         # Boot endpoints
-        max_delay = 0
+        # reports: 10, booting: 9.4s, booted: 0.6s
         for ip_endpoint in ip_endpoints:
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booting'][0],
-                                             config_delay_dist['booting'][0] * config_delay_dist['booting'][1], 0.7,
-                                             'booting', 'booted')
-            max_delay = max(delay_time, max_delay)
-        sim_time += max_delay
+
+            for i in range(9):
+                self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "booting")
+
+            self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 4, 'booting', 'booted')
+        sim_time += 10*TICK_TIME
 
         # move mpegdash_service media component state from 'stopped' to 'starting' 
         # Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting')
         for ip_endpoint in ip_endpoints:
             agent_url    = urllib.parse.urlparse(ip_endpoint["agent_url"])
             influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            self._changeMCState( influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting' )
+            self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting')
         
         sim_time += TICK_TIME
 
@@ -147,26 +140,25 @@ class Sim(object):
         sim_time += 5 * TICK_TIME
 
         # Move endpoints from state booted to state connecting
-        max_delay = 0
+        # reports: 2, booted: 1.5s, connecting: 0.5s
         for ip_endpoint in ip_endpoints:
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['booted'][0],
-                                             config_delay_dist['booted'][0] * config_delay_dist['booted'][1], 0.7,
-                                             'booted', 'connecting')
-            max_delay = max(delay_time, max_delay)
-        sim_time += max_delay
+            self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "booted")
+            self._changeVMState(agent_db_client, sim_time + (1*TICK_TIME), ip_endpoint, 10, 5, 'booted', 'connecting')
+        sim_time += 2*TICK_TIME
 
         # Connect endpoints
-        max_delay = 0
+        # reports: 10, connecting: 9.7s, connected: 0.3s
         for ip_endpoint in ip_endpoints:
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connecting'][0],
-                                             config_delay_dist['connecting'][0] * config_delay_dist['connecting'][1], 0.7,
-                                             'connecting', 'connected')
-            max_delay = max(delay_time, max_delay)
-        sim_time += max_delay
+
+            for i in range(9):
+                self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connecting")
+
+            self._changeVMState(agent_db_client, sim_time + (9*TICK_TIME), ip_endpoint, 10, 7, 'connecting', 'connected')
+        sim_time += 10*TICK_TIME
 
         request_arrival_rate_inc = DEFAULT_REQUEST_RATE_INC
         inc_period_count = 0
@@ -233,8 +225,7 @@ class Sim(object):
                 ip_endpoint['request_queue'] -= int(requests_processed)
 
                 # update endpoint state (continuously 'connected')
-                agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time,
-                                                                         **{'connected': float(TICK_TIME), 'avg_connected': float(TICK_TIME)}))
+                self._writeVMSingleState(agent_db_client, sim_time, ip_endpoint, "connected")
 
                 # update mpegdash_service media component state (continuously 'running')
                 self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time)
@@ -244,15 +235,16 @@ class Sim(object):
         # Simulate tear-down of media components and endpoints
 
         # remove endpoints
-        max_delay = 0
+        # reports: 5, connected: 4.7s, unplaced: 0.3s
         for ip_endpoint in ip_endpoints:
             agent_url = urllib.parse.urlparse(ip_endpoint["agent_url"])
             agent_db_client = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            delay_time = self._changeVMState(agent_db_client, sim_time, ip_endpoint, config_delay_dist['connected'][0],
-                                             config_delay_dist['connected'][0] * config_delay_dist['connected'][1], 0.7,
-                                             'connected', 'unplaced')
-            max_delay = max(delay_time, max_delay)
-        sim_time += max_delay
+
+            for i in range(4):
+                self._writeVMSingleState(agent_db_client, sim_time + (i * TICK_TIME), ip_endpoint, "connected")
+
+            self._changeVMState(agent_db_client, sim_time + (4*TICK_TIME), ip_endpoint, 10, 7, 'connected', 'unplaced')
+        sim_time += 5 * TICK_TIME
 
         # move mpegdash_service media component state from 'running' to 'stopping'
         # Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping')
@@ -303,34 +295,49 @@ class Sim(object):
         return response_delay
 
     @staticmethod
-    def _changeVMState(agent_db_client, sim_time, ip_endpoint, mu, sigma, trans_ratio, transition_state, next_state):
+    def _writeVMSingleState(agent_db_client, sim_time, ip_endpoint, state):
+        """
+        Write a single state as a sample over TICK_TIME.
+
+        :param agent_db_client: agent used to send metric data to CLMC
+        :param sim_time:  current simulation time
+        :param ip_endpoint: dict with info for ip endpoint
+        :param state: state that's being reported
+        """
+
+        state_stats = {
+            state: float(TICK_TIME),
+            'avg_{0}'.format(state): float(TICK_TIME)
+        }
+
+        agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats))
+
+    @staticmethod
+    def _changeVMState(agent_db_client, sim_time, ip_endpoint, sample_count, trans_sample_count, transition_state, next_state):
         """
         Send influx data to change VM state. Declared as static method since it doesn't need access to any instance variables.
 
         :param sim_time: current simulation time
         :param ip_endpoint: endpoint configuration (cpu, memory, storage, etc.)
-        :param mu: mean value
-        :param sigma: standard deviation
-        :param trans_ratio: the part of the total delay time for transition state
+        :param sample_count: - the total number of samples in the reporting period (TICK_TIME)
+        :param trans_sample_count: - the number of samples in the transition state
         :param transition_state: name of transition state
         :param next_state: name of next state
         :return: the delay time
         """
 
-        # assert total delay time is at least 1 second
-        total_delay_time = max(random.normalvariate(mu, sigma), 1.0)
+        state_stats = {}
 
-        # part of the total delay time is the transition state period
-        transition_time = trans_ratio*total_delay_time
-        # the rest of the delay is the next state period
-        next_state_time = total_delay_time - transition_time
+        # period of transition state is part of the total sample count multiplied by the reporting period (TICK_TIME)
+        state_stats[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count
+        state_stats["avg_" + transition_state] = state_stats[transition_state] / float(TICK_TIME)
 
-        # transition state and state period are passed to the generate report function as keyword arguments
-        agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time,
-                                                                 **{transition_state: transition_time, 'avg_{0}'.format(transition_state): transition_time,
-                                                                    next_state: next_state_time, 'avg_{0}'.format(next_state): next_state_time}))
+        # the rest of the period is reported as time for the next state
+        state_stats[next_state] = float(TICK_TIME) - state_stats[transition_state]
+        state_stats["avg_" + next_state] = state_stats[next_state] / float(TICK_TIME)
 
-        return total_delay_time
+        # transition state and state period are passed to the generate report function as keyword arguments
+        agent_db_client.write_points(lp.generate_endpoint_config(ip_endpoint['cpu'], ip_endpoint['mem'], ip_endpoint['storage'], sim_time, **state_stats))
 
     @staticmethod
     def _writeMCSingleState(influxClient, measurement, state, sim_time):
diff --git a/clmctest/monitoring/conftest.py b/clmctest/monitoring/conftest.py
index 8d1ed62..9f3c42c 100644
--- a/clmctest/monitoring/conftest.py
+++ b/clmctest/monitoring/conftest.py
@@ -15,7 +15,7 @@ def streaming_sim_config():
     :return: the python object representing the read YAML file
     """
     rspec = pkg_resources.resource_filename('clmctest.monitoring', 'rspec.yml')
-    print("rspec file: {0}".format(rspec))
+    print("\nrspec file: {0}".format(rspec))
 
     with open(rspec, 'r') as stream:
         data_loaded = yaml.load(stream)
diff --git a/clmctest/monitoring/test_simresults.py b/clmctest/monitoring/test_simresults.py
index cde8c04..02d3913 100644
--- a/clmctest/monitoring/test_simresults.py
+++ b/clmctest/monitoring/test_simresults.py
@@ -32,24 +32,31 @@ class TestSimulation(object):
          {"time": "1970-01-01T00:00:00Z", "count_RX_BYTES_PORT_M": 7200, "count_TX_BYTES_PORT_M": 7200}),
 
         ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
-         {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3607, "count_avg_unplaced": 3607, "count_placing": 3607, "count_avg_placing": 3607, "count_placed": 3607, "count_avg_placed": 3607, "count_booting": 3607, "count_avg_booting": 3607, "count_booted": 3607,
-          "count_avg_booted": 3607, "count_connecting": 3607, "count_avg_connecting": 3607, "count_connected": 3607, "count_avg_connected": 3607, "count_cpus": 3607, "count_memory": 3607, "count_storage": 3607}),
+         {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
+          "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
         ('SELECT count(*) FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
-         {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3607, "count_avg_unplaced": 3607, "count_placing": 3607, "count_avg_placing": 3607, "count_placed": 3607, "count_avg_placed": 3607, "count_booting": 3607, "count_avg_booting": 3607, "count_booted": 3607,
-          "count_avg_booted": 3607, "count_connecting": 3607, "count_avg_connecting": 3607, "count_connected": 3607, "count_avg_connected": 3607, "count_cpus": 3607, "count_memory": 3607, "count_storage": 3607}),
+         {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
+          "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
 
         ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
          {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
         ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
          {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
 
-        ('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <>0',
+        ('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0',
+         {"time": "1970-01-01T00:00:00Z", "avg_placing": 0.9272727272727274}),
+        ('SELECT mean(avg_booting) as "avg_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_booting <> 0',
+         {"time": "1970-01-01T00:00:00Z", "avg_booting": 0.8727272727272727}),
+        ('SELECT mean(avg_connecting) as "avg_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connecting <> 0',
+         {"time": "1970-01-01T00:00:00Z", "avg_connecting":  0.9272727272727272}),
+
+        ('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0',
          {"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}),
-        ('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <>0',
+        ('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0',
          {"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}),
-        ('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <>0',
+        ('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0',
          {"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}),
-        ('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <>0',
+        ('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0',
          {"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55})
     ])
     def test_simulation(self, influx_db, query, expected_result):
@@ -76,47 +83,3 @@ class TestSimulation(object):
         assert expected_result == actual_result, "Simulation test failure"
 
         print("Successfully passed test for the following query: {0}".format(query))
-
-    @pytest.mark.parametrize("query, field", [
-        ('SELECT mean("placing") as "mean_transition_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" where "placing" <> 0 and "placed" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'',
-         'mean_transition_placing'),
-        ('SELECT mean("placing") as "mean_target_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" where "placing" <> 0 and "unplaced" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'',
-         'mean_target_placing'),
-        ('SELECT mean("booting") as "mean_transition_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "booting" <> 0 and "booted" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache1\'',
-         'mean_transition_booting'),
-        ('SELECT mean("booting") as "mean_target_booting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "booting" <> 0 and "placed" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'',
-         'mean_target_booting'),
-        ('SELECT mean("connecting") as "mean_transition_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "connecting" <> 0 and "connected" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'',
-         'mean_transition_connecting'),
-        ('SELECT mean("connecting") as "mean_target_connecting" FROM "CLMCMetrics"."autogen"."endpoint_config" where "connecting" <> 0 and "booted" <> 0 and "ipendpoint"=\'adaptive_streaming_I1_apache2\'',
-         'mean_target_connecting'),
-    ])
-    def test_mean_config_queries(self, influx_db, query, field):
-        """
-        Test queries for mean values in the configuration states model.
-
-        - 'mean_transition_{state}' - we want to know the mean time spent on the given state in cases where this was the actual transition state.
-            e.g. 'mean_transition_placing' - refers to the mean time spent on state 'placing' in transitions such as 'placing' -> 'placed'
-
-        - 'mean_target_{state}' - we want to know the mean time spent on the given state in cases where this was the actual target state
-            e.g. 'mean_target_placing' - refers to the mean time spent on state 'placing' in transitions such as 'unplaced' -> 'placing'
-
-        :param influx_db: influx db client
-        :param query: query under test
-        :param field: the field id to fetch
-        """
-
-        # pytest automatically goes through all queries under test, declared in the parameters decorator
-        print("\n")  # prints a blank line for formatting purposes
-
-        # the raise_errors=False argument is given so that we could actually test that the DB didn't return any errors instead of raising an exception
-        query_result = influx_db.query(query, raise_errors=False)
-
-        # test the error attribute of the result is None, that is no error is returned from executing the DB query
-        assert query_result.error is None, "An error was encountered while executing query {0}.".format(query)
-
-        # get the dictionary of result points; the next() function just gets the first element of the query results generator (we only expect one item in the generator)
-        result = next(query_result.get_points()).get(field)
-        assert float(result) >= 0.0, "Test failure. Reported mean values cannot be negative."
-
-        print("Successfully passed test for the following query: {0}".format(query))
-- 
GitLab