From f7801c238e91fdf8e33c3f44d5c2b4d666d593ab Mon Sep 17 00:00:00 2001
From: Simon Crowle <sgc@it-innovation.soton.ac.uk>
Date: Mon, 9 Apr 2018 13:39:33 +0100
Subject: [PATCH] Refactors media configuration state monitoring to align with
 latest state monitoring model

See

https://gitlab.it-innovation.soton.ac.uk/FLAME/flame-clmc/blob/mediaComponentConfig/docs/monitoring.md#user-content-configuration-status-modelling-and-monitoring

for documentation
---
 clmctest/monitoring/LineProtocolGenerator.py |  73 +++++-------
 clmctest/monitoring/StreamingSim.py          | 118 +++++++++++--------
 clmctest/monitoring/test_simresults.py       |  33 ++++--
 3 files changed, 121 insertions(+), 103 deletions(-)

diff --git a/clmctest/monitoring/LineProtocolGenerator.py b/clmctest/monitoring/LineProtocolGenerator.py
index fd109cf..50700fe 100644
--- a/clmctest/monitoring/LineProtocolGenerator.py
+++ b/clmctest/monitoring/LineProtocolGenerator.py
@@ -104,54 +104,43 @@ def generate_endpoint_config(time, cpu, mem, storage, current_state, current_sta
 
     return result
 
+def generate_mc_service_config( time, mcMeasurement, current_state, current_state_time, config_state_values ):
+    """
+    generates a measurement line for a media component configuration state
+
+    : time                - timestamp for the measurement
+    : mcMeasurement       - measurement label
+    : current_state       - the current state of the service configuration
+    : current_state_time  - the current length of time in the current state
+    : config_state_values - dictionary of media component configuration states (summed time and mean average over the sampling period)
+    :                     - stopped, starting, running, stopping [use '_sum' and '_mst' for sum and average respectively]
+    """
 
-def generate_mc_service_config( mcMeasurement, stateTimeStats, time ):
-
-    validStats = validate_state_time_stats( stateTimeStats )
-
-    result = [{ "measurement" : mcMeasurement,
-                "fields" :
-                { "stopped"      : validStats['stopped'],
-                  "avg_stopped"  : validStats['avg_stopped'],
-                  "starting"     : validStats['starting'],
-                  "avg_starting" : validStats['avg_starting'],
-                  "running"      : validStats['running'],
-                  "avg_running"  : validStats['avg_running'],
-                  "stopping"     : validStats['stopping'],
-                  "avg_stopping" : validStats['avg_stopping']
-                },
-                "time" : _getNSTime(time) }]
-
-    return result
-
-def validate_state_time_stats( stateTimeStats ):
-
-    if ( not 'stopped' in stateTimeStats ):
-        stateTimeStats['stopped'] = 0.0
-
-    if ( not 'avg_stopped' in stateTimeStats ):
-        stateTimeStats['avg_stopped'] = 0.0
-    
-    if ( not 'starting' in stateTimeStats ):
-        stateTimeStats['starting'] = 0.0
-
-    if ( not 'avg_starting' in stateTimeStats ):
-        stateTimeStats['avg_starting'] = 0.0
-
-    if ( not 'running' in stateTimeStats ):
-        stateTimeStats['running'] = 0.0
+    # define state value validation function (inserting key/0 where key is supplied)
+    validate_f = lambda key: config_state_values.get(key) if key in config_state_values else 0.0
 
-    if ( not 'avg_running' in stateTimeStats ):
-        stateTimeStats['avg_running'] = 0.0
+    # define expected keys
+    state_keys = [ "stopped_sum",  "stopped_mst", 
+                   "starting_sum", "starting_mst",
+                   "running_sum",  "running_mst",
+                   "stopping_sum", "stopping_mst" ]
 
-    if ( not 'stopping' in stateTimeStats ):
-        stateTimeStats['stopping'] = 0.0
+    # define current state time first
+    fields = {}
+    fields["current_state_time"] = current_state_time
 
-    if ( not 'avg_stopping' in stateTimeStats ):
-        stateTimeStats['avg_stopping'] = 0.0
+    # then add in validated state values
+    for key in state_keys :
+        fields[key] = validate_f(key)
 
-    return stateTimeStats
+    # compose result
+    result = [{ "measurement" : mcMeasurement,
+                "tags"        : { "current_state" : current_state },
+                "fields"      : fields,
+                "time"        : _getNSTime(time)
+             }]
 
+    return result
 
 # InfluxDB likes to have time-stamps in nanoseconds
 def _getNSTime(time):
diff --git a/clmctest/monitoring/StreamingSim.py b/clmctest/monitoring/StreamingSim.py
index 5f8fd2f..2348eb0 100644
--- a/clmctest/monitoring/StreamingSim.py
+++ b/clmctest/monitoring/StreamingSim.py
@@ -76,6 +76,9 @@ class Sim(object):
         endpoint_states_config = {self.agent1_url: {"current_state": "unplaced", "current_state_time": 0},
                                   self.agent2_url: {"current_state": "unplaced", "current_state_time": 0}}
 
+        mc_curr_states = { self.agent1_url : { "current_state": "stopped", "current_state_time": 0 },
+                           self.agent2_url : { "current_state": "stopped", "current_state_time": 0 } }
+
         print("\nSimulation started. Generating data...")
 
         # Simulate configuration of the ip endpoints
@@ -146,25 +149,40 @@ class Sim(object):
             endpoint_states_config[agent]["current_state_time"] = 0.6
         sim_time += 10*TICK_TIME
 
-        # move mpegdash_service media component state from 'stopped' to 'starting' 
-        # Total reports = 1, (0.2 seconds in 'stopped', 0.8 seconds in 'starting')
+        # move mpegdash media component state from 'stopped' to 'starting' 
+        # Total reports = 1, (completed: 0.2 seconds in 'stopped', current: 0.8 seconds in 'starting')
         for ip_endpoint in ip_endpoints:
-            agent_url    = urllib.parse.urlparse(ip_endpoint["agent_url"])
+            agent = ip_endpoint["agent_url"]
+            agent_url = urllib.parse.urlparse(agent)
             influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 2, 'stopped', 'starting')
+
+            completed_states = ( ('stopped', 0.2, 1 ), )
+            mc_curr_states[agent]["current_state"] = 'starting'
+            mc_curr_states[agent]["current_state_time"] = 0.8
+
+            self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'starting', 0.8 )
         
         sim_time += TICK_TIME
 
-        # move mpegdash_service media component state from 'starting' to 'running'
-        # Total reports = 5, (4.7 seconds in 'starting', 0.3 seconds in 'running')
+        # move mpegdash media component state from 'starting' to 'running'
+        # Total reports = 5, (4 incomplete 'starting', completed: 0.8 + 4 + 0.7 seconds in 'starting', current: 0.3 seconds in 'running')
         for ip_endpoint in ip_endpoints:
-            agent_url    = urllib.parse.urlparse(ip_endpoint["agent_url"])
+            agent = ip_endpoint["agent_url"]
+            agent_url = urllib.parse.urlparse(agent)
             influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
 
+            # 4 seconds of starting
             for i in range(0, 4):
-                self._writeMCSingleState(influxClient, 'mpegdash_service_config', 'starting', sim_time + (i * TICK_TIME))
+                mc_curr_states[agent]["current_state"] = 'running'
+                mc_curr_states[agent]["current_state_time"] += TICK_TIME
+                self._writeMCSingleState( influxClient, sim_time +(i * TICK_TIME), 'mpegdash_mc_config', 'running', mc_curr_states[agent]["current_state_time"] )
+
+            # Switch over to running
+            completed_states = ( ('starting', 0.8 + 4 + 0.7, 1 ), )
+            mc_curr_states[agent]["current_state"] = 'running'
+            mc_curr_states[agent]["current_state_time"] = 0.3
 
-            self._changeMCState(influxClient, sim_time + (4 * TICK_TIME), 'mpegdash_service_config', 10, 7, 'starting', 'running')
+            self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'running', 0.3 )
 
         sim_time += 5 * TICK_TIME
 
@@ -277,8 +295,12 @@ class Sim(object):
                 # update current state time in the config dictionary
                 endpoint_states_config[agent]["current_state_time"] += 1
 
-                # update mpegdash_service media component state (continuously 'running')
-                self._writeMCSingleState(agent_db_client, 'mpegdash_service_config', 'running', sim_time)
+                # update mpegdash media component state (continuously 'running')
+                mc_curr_states[agent]["current_state"] = 'running'
+                mc_curr_states[agent]["current_state_time"] += TICK_TIME
+
+                # Just write the current state out here (it will be summed and averaged at the end)
+                self._writeMCSingleState( agent_db_client, sim_time, 'mpegdash_mc_config', 'running', mc_curr_states[agent]["current_state_time"] )
                 
             sim_time += TICK_TIME
 
@@ -304,23 +326,33 @@ class Sim(object):
             endpoint_states_config[agent]["current_state_time"] = 0.3
         sim_time += 5 * TICK_TIME
 
-        # move mpegdash_service media component state from 'running' to 'stopping'
-        # Total reports = 2, (1.8 seconds in 'running', 0.2 seconds in 'stopping')
+        # move mpegdash media component state from 'running' to 'stopping'
+        # Total reports = 2, ( completed: n+1.8 seconds in 'running', current: 0.2 seconds in 'stopping')
         for ip_endpoint in ip_endpoints:
-            agent_url    = urllib.parse.urlparse(ip_endpoint["agent_url"])
+            agent = ip_endpoint["agent_url"]
+            agent_url = urllib.parse.urlparse(agent)
             influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
             
-            self._writeMCSingleState( influxClient, 'mpegdash_service_config', 'running', sim_time )
-            self._changeMCState( influxClient, sim_time + TICK_TIME, 'mpegdash_service_config', 10, 8, 'running', 'stopping' )
+            completed_states = ( ('running', mc_curr_states[ip_endpoint["agent_url"]]["current_state_time"] +1.8, 1), )
+            mc_curr_states[agent]["current_state"] = 'stopping'
+            mc_curr_states[agent]["current_state_time"] = 0.2
+
+            self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'stopping', 0.2 )
         
         sim_time += 2 * TICK_TIME
 
-        # move mpegdash_service media component state from 'stopping' to 'stopped'
+        # move mpegdash media component state from 'stopping' to 'stopped'
         # Total reports = 1, (0.9 seconds in 'stopping', 0.1 seconds in 'stopped')
         for ip_endpoint in ip_endpoints:
-            agent_url    = urllib.parse.urlparse(ip_endpoint["agent_url"])
+            agent = ip_endpoint["agent_url"]
+            agent_url = urllib.parse.urlparse(agent)
             influxClient = InfluxDBClient(host=agent_url.hostname, port=agent_url.port, database=self.influx_db_name, timeout=10)
-            self._changeMCState(influxClient, sim_time, 'mpegdash_service_config', 10, 9, 'stopping', 'stopped')
+            
+            completed_states = ( ('stopping', mc_curr_states[ip_endpoint["agent_url"]]["current_state_time"] +0.9, 1), )
+            mc_curr_states[agent]["current_state"] = 'stopped'
+            mc_curr_states[agent]["current_state_time"] = 0.1
+
+            self._changeMCState( influxClient, sim_time, 'mpegdash_mc_config', completed_states, 'stopped', 0.1 )
         
         sim_time += TICK_TIME
 
@@ -397,50 +429,38 @@ class Sim(object):
                                                                  next_state, next_state_time, **state_stats))
 
     @staticmethod
-    def _writeMCSingleState(influxClient, measurement, state, sim_time):
+    def _writeMCSingleState(influxClient, sim_time, measurement, curr_state, curr_state_time):
         """
-        
         Write a single state as a sample over TICK_TIME
 
-        : influxClient - agent used to send metric data to CLMC
-        : measurement  - name of influx measurement set
-        : state        - state to be declared
-        : sim_time     - time stamp for this measurement
-
+        : influxClient    - agent used to send metric data to CLMC
+        : sim_time        - time stamp for this measurement
+        : measurement     - name of influx measurement set
+        : curr_state      - the current state
+        : curr_state_time - length of time in the current state
         """
 
-        state_stats = {}
-        state_stats[state] = float(TICK_TIME)
-        state_stats['avg_' + state] = float(TICK_TIME)
-        influxClient.write_points(lp.generate_mc_service_config(measurement, state_stats, sim_time))
+        influxClient.write_points( lp.generate_mc_service_config(sim_time, measurement, curr_state, curr_state_time, {}) )
 
     @staticmethod
-    def _changeMCState(influxClient, sim_time, mc_measurement, sample_count, trans_sample_count, transition_state, next_state):
+    def _changeMCState(influxClient, sim_time, measurement, completed_states, curr_state, curr_state_time):
         """
         Send INFLUX data indicating the time taken to transition to a new state
 
-        : influxClient - agent used to send metric data to CLMC
-        : sim_time - simulation time at start of state changing period
-        : mc_measurement -  measurement name
-        : sample_count - the total number of samples in the reporting period (TICK_TIME)
-        : trans_sample_count - the number of samples in the transition state
-        : transition_state - the state being exited
-        : next_state - the state being entered
-
+        : influxClient     - agent used to send metric data to CLMC
+        : sim_time         - simulation time at start of state changing period
+        : measurement      -  measurement name
+        : completed_states - dictionary of states that have been completed (each includes the state name, state time sum and count)
+        : curr_state       - the current state 
+        : curr_state_time  - the current state's running time
         """
 
         mc_states = {}
+        for state, state_sum, state_count in completed_states:
+            mc_states[ state +"_sum" ] = state_sum
+            mc_states[ state +"_mst" ] = state_sum / state_count
 
-        # Report total time in transition and its average of the reporting period
-        mc_states[transition_state] = (float(TICK_TIME) / sample_count) * trans_sample_count
-        mc_states["avg_" + transition_state] = mc_states[transition_state] / float(TICK_TIME)
-
-        # Use the time remaining as the length for the time in the next state
-        mc_states[next_state] = float(TICK_TIME) - mc_states[transition_state]
-        mc_states["avg_" + next_state] = mc_states[next_state] / float(TICK_TIME)
-
-        influxClient.write_points(lp.generate_mc_service_config(mc_measurement, mc_states, sim_time))
-
+        influxClient.write_points( lp.generate_mc_service_config(sim_time, measurement, curr_state, curr_state_time, mc_states) )
 
 def run_simulation(generate=True, sTime=3600):
     """
diff --git a/clmctest/monitoring/test_simresults.py b/clmctest/monitoring/test_simresults.py
index a18f282..4112093 100644
--- a/clmctest/monitoring/test_simresults.py
+++ b/clmctest/monitoring/test_simresults.py
@@ -38,10 +38,10 @@ class TestSimulation(object):
          {"time": "1970-01-01T00:00:00Z", "count_unplaced": 3639, "count_avg_unplaced": 3639, "count_placing": 3639, "count_avg_placing": 3639, "count_placed": 3639, "count_avg_placed": 3639, "count_booting": 3639, "count_avg_booting": 3639, "count_booted": 3639,
           "count_avg_booted": 3639, "count_connecting": 3639, "count_avg_connecting": 3639, "count_connected": 3639, "count_avg_connected": 3639, "count_cpus": 3639, "count_memory": 3639, "count_storage": 3639}),
 
-        ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
-         {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
-        ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
-         {"time": "1970-01-01T00:00:00Z", "count_avg_running": 3609, "count_avg_starting": 3609, "count_avg_stopped": 3609, "count_avg_stopping": 3609, "count_running": 3609, "count_starting": 3609, "count_stopped": 3609, "count_stopping": 3609}),
+        ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache1\'',
+         {"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3607, "count_running_mst": 3607, "count_running_sum": 3607, "count_starting_mst": 3607, "count_starting_sum": 3607, "count_stopped_mst": 3607, "count_stopped_sum": 3607, "count_stopping_mst": 3607, "count_stopping_sum": 3607}),
+        ('SELECT count(*) FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE ipendpoint=\'adaptive_streaming_I1_apache2\'',
+         {"time": "1970-01-01T00:00:00Z", "count_current_state_time": 3607, "count_running_mst": 3607, "count_running_sum": 3607, "count_starting_mst": 3607, "count_starting_sum": 3607, "count_stopped_mst": 3607, "count_stopped_sum": 3607, "count_stopping_mst": 3607, "count_stopping_sum": 3607}),
 
         ('SELECT mean(avg_placing) as "avg_placing" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_placing <> 0 and ipendpoint=\'adaptive_streaming_I1_apache1\'',
          {"time": "1970-01-01T00:00:00Z", "avg_placing": 9.4}),
@@ -60,15 +60,24 @@ class TestSimulation(object):
         ('SELECT mean(avg_connected) as "avg_connected" FROM "CLMCMetrics"."autogen"."endpoint_config" WHERE avg_connected <> 0 and ipendpoint=\'adaptive_streaming_I1_apache2\'',
          {"time": "1970-01-01T00:00:00Z", "avg_connected": 3605.0}),
 
-        ('SELECT mean(avg_stopped) as "avg_stopped" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopped <> 0',
-         {"time": "1970-01-01T00:00:00Z", "avg_stopped": 0.15}),
-        ('SELECT mean(avg_starting) as "avg_starting" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_starting <> 0',
-         {"time": "1970-01-01T00:00:00Z", "avg_starting": 0.9166666666666666}),
-        ('SELECT mean(avg_running) as "avg_running" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_running <> 0',
-         {"time": "1970-01-01T00:00:00Z", "avg_running": 0.9997502081598669}),
-        ('SELECT mean(avg_stopping) as "avg_stopping" FROM "CLMCMetrics"."autogen"."mpegdash_service_config" WHERE avg_stopping <> 0',
-         {"time": "1970-01-01T00:00:00Z", "avg_stopping": 0.55})
+        ('SELECT mean(stopped_sum) as "stopped_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopped_sum <> 0',
+         {"time": "1970-01-01T00:00:00Z", "stopped_sum": 0.2}),
+        ('SELECT mean(stopped_mst) as "stopped_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopped_mst <> 0',
+         {"time": "1970-01-01T00:00:00Z", "stopped_mst": 0.2}),
+        ('SELECT mean(starting_sum) as "starting_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE starting_sum <> 0',
+         {"time": "1970-01-01T00:00:00Z", "starting_sum": 5.5}),
+        ('SELECT mean(starting_mst) as "starting_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE starting_mst <> 0',
+         {"time": "1970-01-01T00:00:00Z", "starting_mst": 5.5}),
+        ('SELECT mean(running_sum) as "running_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE running_sum <> 0',
+         {"time": "1970-01-01T00:00:00Z", "running_sum": 3602.1000000000004}),
+        ('SELECT mean(running_mst) as "running_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE running_mst <> 0',
+         {"time": "1970-01-01T00:00:00Z", "running_mst": 3602.1000000000004}),
+        ('SELECT mean(stopping_sum) as "stopping_sum" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopping_sum <> 0',
+         {"time": "1970-01-01T00:00:00Z", "stopping_sum": 1.1}),
+        ('SELECT mean(stopping_mst) as "stopping_mst" FROM "CLMCMetrics"."autogen"."mpegdash_mc_config" WHERE stopping_mst <> 0',
+         {"time": "1970-01-01T00:00:00Z", "stopping_mst": 1.1}),
     ])
+    
     def test_simulation(self, influx_db, query, expected_result):
         """
         This is the entry point of the test. This method will be found and executed when the module is ran using pytest
-- 
GitLab