From e1556eedc87f3886931a8f8c6379af79c0461372 Mon Sep 17 00:00:00 2001
From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk>
Date: Tue, 25 Sep 2018 13:10:12 +0100
Subject: [PATCH] Converts threshold template into stream and batch versions

---
 .../alertsapi/alerts_specification_schema.py  |  4 +-
 src/service/clmcservice/alertsapi/tests.py    | 34 +++++++--
 .../clmcservice/alertsapi/utilities.py        | 70 ++++++++++++++-----
 src/service/clmcservice/alertsapi/views.py    | 27 ++++---
 ...ate.tick => threshold-batch-template.tick} |  0
 .../TICKscript/threshold-stream-template.tick | 28 ++++++++
 .../valid/alerts_test_config-1.yaml           |  4 +-
 .../valid/alerts_test_config-4.yaml           |  6 +-
 8 files changed, 133 insertions(+), 40 deletions(-)
 rename src/service/resources/TICKscript/{threshold-template.tick => threshold-batch-template.tick} (100%)
 create mode 100644 src/service/resources/TICKscript/threshold-stream-template.tick

diff --git a/src/service/clmcservice/alertsapi/alerts_specification_schema.py b/src/service/clmcservice/alertsapi/alerts_specification_schema.py
index 6faa517..e2365bf 100644
--- a/src/service/clmcservice/alertsapi/alerts_specification_schema.py
+++ b/src/service/clmcservice/alertsapi/alerts_specification_schema.py
@@ -49,7 +49,7 @@ INFLUX_QL_FUNCTIONS = (
 TICK_SCRIPT_TEMPLATES = ("threshold", "relative", "deadman")
 
 # Allowed comparison operators and their logical values
-COMPARISON_OPERATORS = {"lt": "<", "gt": ">", "lte": "<=", "gte": ">=", "eq": "=", "neq": "<>"}
+COMPARISON_OPERATORS = {"lt": "<", "gt": ">", "lte": "<=", "gte": ">=", "eq": "==", "neq": "!="}
 
 # Regular expression for validating http handlers
 URL_REGEX = re.compile(
@@ -84,7 +84,7 @@ ALERTS_SPECIFICATION_SCHEMA = Schema({
                             "metric": And(str, lambda s: len(s.split('.', 1)) == 2),
                             "condition": {
                                 "threshold": Or(int, float),
-                                "granularity": int,
+                                "granularity": And(int, lambda p: p > 0),
                                 Optional("aggregation_method"): And(str, lambda s: s in INFLUX_QL_FUNCTIONS),
                                 Optional("resource_type"): {
                                     And(str, lambda s: s in CLMC_INFORMATION_MODEL_GLOBAL_TAGS): str
diff --git a/src/service/clmcservice/alertsapi/tests.py b/src/service/clmcservice/alertsapi/tests.py
index 09e4d61..c142707 100644
--- a/src/service/clmcservice/alertsapi/tests.py
+++ b/src/service/clmcservice/alertsapi/tests.py
@@ -26,7 +26,6 @@
 # Python standard libs
 from os import listdir
 from os.path import isfile, join, dirname
-from urllib.parse import urlparse
 
 # PIP installed libs
 import pytest
@@ -208,13 +207,14 @@ class TestAlertsConfigurationAPI(object):
             assert "triggers_action_errors" not in clmc_service_response, "Unexpected error was returned for handlers specification"
 
             # traverse through all alert IDs and check that they are created within Kapacitor
-            for alert_id in alert_ids:
+            for alert_id, alert_type in alert_ids:
                 kapacitor_response = get("http://localhost:9092/kapacitor/v1/tasks/{0}".format(alert_id))
                 assert kapacitor_response.status_code == 200, "Alert with ID {0} was not created - test file {1}.".format(alert_id, alerts_test_file)
                 kapacitor_response_json = kapacitor_response.json()
                 assert "link" in kapacitor_response_json, "Incorrect response from kapacitor for alert with ID {0} - test file {1}".format(alert_id, alerts_test_file)
                 assert kapacitor_response_json["status"] == "enabled", "Alert with ID {0} was created but is disabled - test file {1}".format(alert_id, alerts_test_file)
                 assert kapacitor_response_json["executing"], "Alert with ID {0} was created and is enabled, but is not executing - test file {1}".format(alert_id, alerts_test_file)
+                assert kapacitor_response_json["type"] == alert_type,  "Alert with ID {0} was created with the wrong type - test file {1}".format(alert_id, alerts_test_file)
 
             # check that all topic IDs were registered within Kapacitor
             topic_ids = list(topic_handlers.keys())
@@ -284,23 +284,45 @@ def extract_alert_spec_data(alert_spec):
         policy_id = policy.name
         for trigger in policy.triggers:
             trigger_id = trigger.name
+            event_type = trigger.trigger_tpl["event_type"]
+            alert_period_integer = trigger.trigger_tpl["condition"]["granularity"]
 
             topic_id = "{0}\n{1}\n{2}\n{3}".format(sfc, sfc_instance, policy_id, trigger_id)
             topic_id = AlertsConfigurationAPI.get_hash(topic_id)
             topic_handlers[topic_id] = []
 
             alert_id = topic_id
-            alert_ids.append(alert_id)
+            alert_type = get_alert_type(event_type, alert_period_integer)
+            alert_ids.append((alert_id, alert_type))
 
             for handler_url in trigger.trigger_tpl["action"]["implementation"]:
-                handler_host = urlparse(handler_url).hostname
-                handler_id = "{0}\n{1}\n{2}\n{3}\n{4}".format(sfc, sfc_instance, policy_id, trigger_id, handler_host)
+                handler_id = "{0}\n{1}\n{2}\n{3}\n{4}".format(sfc, sfc_instance, policy_id, trigger_id, handler_url)
                 handler_id = AlertsConfigurationAPI.get_hash(handler_id)
                 topic_handlers[topic_id].append((handler_id, handler_url))
 
     return sfc, sfc_instance, alert_ids, topic_handlers
 
 
+def get_alert_type(event_type, alert_period):
+    """
+    Retrieve the alert type (stream ot batch) based on the event type and alert period.
+
+    :param event_type: event type, e.g. threshold, relative, deadman, etc.
+    :param alert_period: the alert period
+
+    :return: "batch" or "stream"
+    """
+
+    if event_type in AlertsConfigurationAPI.DUAL_VERSION_TEMPLATES:
+        if alert_period <= AlertsConfigurationAPI.STREAM_PERIOD_LIMIT:
+            return "stream"
+        else:
+            return "batch"
+    else:
+        events = {"relative": "batch", "deadman": "stream"}
+        return events[event_type]
+
+
 def clear_kapacitor_alerts(alert_ids, topic_handlers):
     """
     A utility function to clean up Kapacitor from the configured alerts, topics and handlers.
@@ -309,7 +331,7 @@ def clear_kapacitor_alerts(alert_ids, topic_handlers):
     :param topic_handlers: the dictionary of topic and handlers to delete
     """
 
-    for alert_id in alert_ids:
+    for alert_id, _ in alert_ids:
         kapacitor_response = delete("http://localhost:9092/kapacitor/v1/tasks/{0}".format(alert_id))  # delete alert
         assert kapacitor_response.status_code == 204
 
diff --git a/src/service/clmcservice/alertsapi/utilities.py b/src/service/clmcservice/alertsapi/utilities.py
index a2eec79..5a2f3af 100644
--- a/src/service/clmcservice/alertsapi/utilities.py
+++ b/src/service/clmcservice/alertsapi/utilities.py
@@ -135,22 +135,6 @@ class TICKScriptTemplateFiller:
     A utility class used for TICK script templates filtering.
     """
 
-    # a class variable used to hold the comparison operator used to build the where clause in TICK script templates,
-    # these differ if the where clause is built as a string opposed to when it is build as a lambda
-    _TEMPLATE_COMPARISON_OPERATOR = {"threshold": "=", "relative": "=", "deadman": "=="}
-
-    @staticmethod
-    def get_comparison_operator(template_type):
-        """
-        Get the correct comparison operator depending on the template type, if template type not recognized, return "=="
-
-        :param template_type: one of the template types, that are created within kapacitor
-
-        :return: the comparison operator that should be used in the template to build the where clause
-        """
-
-        return TICKScriptTemplateFiller._TEMPLATE_COMPARISON_OPERATOR.get(template_type, "==")
-
     @staticmethod
     def fill_template_vars(template_type, **kwargs):
         """
@@ -168,8 +152,8 @@ class TICKScriptTemplateFiller:
         return fill_function(**kwargs)
 
     @staticmethod
-    def _fill_threshold_template_vars(db=None, measurement=None, field=None, influx_function=None, critical_value=None,
-                                      comparison_operator=None, alert_period=None, topic_id=None, where_clause=None, **kwargs):
+    def _fill_threshold_batch_template_vars(db=None, measurement=None, field=None, influx_function=None, critical_value=None,
+                                            comparison_operator=None, alert_period=None, topic_id=None, where_clause=None, **kwargs):
         """
         Creates a dictionary object ready to be posted to kapacitor to create a "threshold" task from template.
 
@@ -227,6 +211,55 @@ class TICKScriptTemplateFiller:
 
         return template_vars
 
+    @staticmethod
+    def _fill_threshold_stream_template_vars(db=None, measurement=None, field=None, critical_value=None,
+                                             comparison_operator=None, topic_id=None, where_clause=None, **kwargs):
+        """
+        Creates a dictionary object ready to be posted to kapacitor to create a "threshold" task from template.
+
+        :param db: db name
+        :param measurement: measurement name
+        :param field: field name
+        :param influx_function: influx function to use for querying
+        :param critical_value: critical value to compare with
+        :param comparison_operator: type of comparison
+        :param alert_period: alert period to query influx
+        :param topic_id: topic identifier
+        :param where_clause: (OPTIONAL) argument for filtering the influx query by tag values
+
+        :return: a dictionary object ready to be posted to kapacitor to create a "threshold" task from template.
+        """
+
+        comparison_lambda = '"{0}" {1} {2}'.format(field, comparison_operator, critical_value)  # build up lambda string, e.g. "real_value" >= 10
+
+        template_vars = {
+            "db": {
+                "type": "string",
+                "value": db
+            },
+            "measurement": {
+                "type": "string",
+                "value": measurement
+            },
+            "comparisonLambda": {
+                "type": "lambda",
+                "value": comparison_lambda
+            },
+            "topicID": {
+                "type": "string",
+                "value": topic_id
+            }
+        }
+
+        if where_clause is not None:
+            where_clause = where_clause.replace("=", "==")  # stream templates use a lambda function, which requires "==" as comparison operator
+            template_vars["whereClause"] = {
+                "type": "lambda",
+                "value": where_clause
+            }
+
+        return template_vars
+
     @staticmethod
     def _fill_relative_template_vars(db=None, measurement=None, field=None, influx_function=None, critical_value=None, comparison_operator=None,
                                      alert_period=None, topic_id=None, where_clause=None, **kwargs):
@@ -326,6 +359,7 @@ class TICKScriptTemplateFiller:
         }
 
         if where_clause is not None:
+            where_clause = where_clause.replace("=", "==")  # stream templates use a lambda function, which requires "==" as comparison operator
             template_vars["whereClause"] = {
                 "type": "lambda",
                 "value": where_clause
diff --git a/src/service/clmcservice/alertsapi/views.py b/src/service/clmcservice/alertsapi/views.py
index 44183f1..027302e 100644
--- a/src/service/clmcservice/alertsapi/views.py
+++ b/src/service/clmcservice/alertsapi/views.py
@@ -24,7 +24,6 @@
 
 # Python standard libs
 import logging
-from urllib.parse import urlparse
 from hashlib import sha256
 
 # PIP installed libs
@@ -48,6 +47,10 @@ class AlertsConfigurationAPI(object):
     A class-based view for configuring alerts within CLMC.
     """
 
+    STREAM_PERIOD_LIMIT = 60  # if alert period is <= 60 seconds, then a stream template is used, otherwise use batch
+
+    DUAL_VERSION_TEMPLATES = {"threshold"}  # this set defines all template types that are written in two versions (stream and batch)
+
     def __init__(self, request):
         """
         Initialises the instance of the view with the request argument.
@@ -214,7 +217,8 @@ class AlertsConfigurationAPI(object):
 
                 condition = trigger.trigger_tpl["condition"]
                 critical_value = float(condition["threshold"])
-                alert_period = "{0}s".format(condition["granularity"])
+                alert_period_integer = condition["granularity"]
+                alert_period = "{0}s".format(alert_period_integer)
                 influx_function = condition.get("aggregation_method", "mean")  # if not specified, use "mean"
 
                 # check for tag filtering
@@ -225,12 +229,9 @@ class AlertsConfigurationAPI(object):
                     tags["flame_sfc"] = sfc
                     tags["flame_sfci"] = sfc_instance
 
-                    # NOTE: if the template has its where clause defined as lambda (stream templates), then use "==" as comparison operator,
-                    #       else if the template's where clause is defined as a string (batch templates), then use "=" as comparison operator
-                    filter_comparison_operator = TICKScriptTemplateFiller.get_comparison_operator(event_type)  # retrieves the correct comparison operator to use for building the where clause
-
                     # build up the where clause from the tags dictionary
-                    where_clause = " AND ".join(map(lambda tag_name: '"{0}"{1}\'{2}\''.format(tag_name, filter_comparison_operator, tags[tag_name]), tags))
+                    where_filters_list = map(lambda tag_name: '"{0}"=\'{1}\''.format(tag_name, tags[tag_name]), tags)
+                    where_clause = " AND ".join(where_filters_list)
 
                 comparison_operator = COMPARISON_OPERATORS[condition.get("comparison_operator", "gte")]  # if not specified, use "gte" (>=)
 
@@ -239,6 +240,15 @@ class AlertsConfigurationAPI(object):
                 topic_id = self.get_hash(topic_id)
                 alert_id = topic_id
 
+                # check whether the template needs to be a stream or a batch
+                if event_type in self.DUAL_VERSION_TEMPLATES:
+                    if alert_period_integer <= self.STREAM_PERIOD_LIMIT:
+                        template_id = "{0}-stream-template".format(event_type)
+                        event_type = "{0}_stream".format(event_type)
+                    else:
+                        template_id = "{0}-batch-template".format(event_type)
+                        event_type = "{0}_batch".format(event_type)
+
                 # built up the template vars dictionary depending on the event type (threshold, relative, etc.)
                 # all extracted properties from the trigger are passed, the TICKScriptTemplateFiller entry point then forwards those to the appropriate function for template filling
                 template_vars = TICKScriptTemplateFiller.fill_template_vars(event_type, db=db, measurement=measurement, field=field, influx_function=influx_function,
@@ -292,8 +302,7 @@ class AlertsConfigurationAPI(object):
 
         kapacitor_api_handlers_url = "http://{0}:{1}/kapacitor/v1/alerts/topics/{2}/handlers".format(kapacitor_host, kapacitor_port, topic_id)
         for http_handler_url in http_handlers:
-            http_handler_host = urlparse(http_handler_url).hostname
-            handler_id = "{0}\n{1}\n{2}\n{3}\n{4}".format(sfc, sfc_i, policy_id, event_id, http_handler_host)
+            handler_id = "{0}\n{1}\n{2}\n{3}\n{4}".format(sfc, sfc_i, policy_id, event_id, http_handler_url)
             handler_id = self.get_hash(handler_id)
             kapacitor_http_request_body = fill_http_post_handler_vars(handler_id, http_handler_url)
             response = post(kapacitor_api_handlers_url, json=kapacitor_http_request_body)
diff --git a/src/service/resources/TICKscript/threshold-template.tick b/src/service/resources/TICKscript/threshold-batch-template.tick
similarity index 100%
rename from src/service/resources/TICKscript/threshold-template.tick
rename to src/service/resources/TICKscript/threshold-batch-template.tick
diff --git a/src/service/resources/TICKscript/threshold-stream-template.tick b/src/service/resources/TICKscript/threshold-stream-template.tick
new file mode 100644
index 0000000..f0800dd
--- /dev/null
+++ b/src/service/resources/TICKscript/threshold-stream-template.tick
@@ -0,0 +1,28 @@
+var db string  // database per service function chain, so db is named after sfc
+
+var rp = 'autogen'  // default value for the retention policy
+
+var measurement string
+
+var whereClause = lambda: TRUE  // default value is a function which returns TRUE, hence no filtering of the query result
+
+var messageValue = 'TRUE'  // default value is TRUE, as this is what SFEMC expects as a notification for an event rule
+
+var comparisonLambda lambda  // comparison function e.g. "real_value" > 40
+
+var topicID string
+
+
+stream
+    | from()
+        .database(db)
+        .retentionPolicy(rp)
+        .measurement(measurement)
+        .where(whereClause)
+    | alert()
+        .id(topicID)
+        .details('db=' + db + ',measurement=' + measurement)
+        .crit(comparisonLambda)
+        .message(messageValue)
+        .topic(topicID)
+        .noRecoveries()
diff --git a/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-1.yaml b/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-1.yaml
index eba786b..0114a35 100644
--- a/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-1.yaml
+++ b/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-1.yaml
@@ -21,11 +21,11 @@ topology_template:
             metric: network.latency
             condition:
               threshold: 45
-              granularity: 120
+              granularity: 30
               aggregation_method: mean
               resource_type:
                 flame_location: watershed
-              comparison_operator: gt
+              comparison_operator: eq
             action:
               implementation:
                 - http://sfemc.flame.eu/notify
diff --git a/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-4.yaml b/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-4.yaml
index 87e3140..13808f9 100644
--- a/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-4.yaml
+++ b/src/service/resources/tosca/test-data/clmc-validator/valid/alerts_test_config-4.yaml
@@ -21,12 +21,12 @@ topology_template:
             metric: network.latency
             condition:
               threshold: 45
-              granularity: 120
+              granularity: 45
               aggregation_method: median
               resource_type:
                 flame_location: watershed
                 flame_server: watershed
-              comparison_operator: gt
+              comparison_operator: neq
             action:
               implementation:
                 - http://sfemc.flame.eu/notify
@@ -43,7 +43,7 @@ topology_template:
               granularity: 60
               aggregation_method: first
               # resource type missing - optional, so it is valid
-              comparison_operator: lt
+              comparison_operator: lte
             action:
               implementation:
                 - http://sfemc.flame.eu/notify
-- 
GitLab