diff --git a/test/streaming/test_rule1.json b/test/streaming/test_rule1.json new file mode 100644 index 0000000000000000000000000000000000000000..17d4cdec27f49226968e54cb0f5cb3e794eddd33 --- /dev/null +++ b/test/streaming/test_rule1.json @@ -0,0 +1,9 @@ +{ + "id" : "TestRule1", + "type" : "batch", + "dbrps" : [{"db": "CLMCMetrics", "rp" : "autogen"}], + + "script" : "var ruleData = batch\r\n |query(''' SELECT mean(\"handled\") AS \"mean_handled\" FROM \"CLMCMetrics\".\"autogen\".\"nginx\" WHERE \"ipendpoint\"='adaptive_streaming_I1_nginx1' ''')\r\n .period(5s)\r\n .every(5s)\r\n\r\nvar ruleAlert = ruleData\r\n |alert()\r\n .id('{{ .Name }}\/adaptive_streaming_I1_nginx1')\r\n .message('{{ .ID }} is {{ .Level }} Mean handled connections: {{ index .Fields \"mean_handled\" }}')\r\n .warn(lambda: \"mean_handled\" > 10)\r\n .log( '\/tmp\/TestRule1.log' )", + + "status" : "enabled" +} \ No newline at end of file diff --git a/test/streaming/test_rule2.json b/test/streaming/test_rule2.json new file mode 100644 index 0000000000000000000000000000000000000000..c9adb8401df6662ba14882f3316c84b27e9aa50c --- /dev/null +++ b/test/streaming/test_rule2.json @@ -0,0 +1,9 @@ +{ + "id" : "TestRule2", + "type" : "batch", + "dbrps" : [{"db": "CLMCMetrics", "rp" : "autogen"}], + + "script" : "var ruleData = batch\r\n |query(''' SELECT mean(\"waiting\") AS \"mean_waiting\" FROM \"CLMCMetrics\".\"autogen\".\"nginx\" WHERE \"ipendpoint\"='adaptive_streaming_I1_nginx1' ''')\r\n .period(5s)\r\n .every(5s)\r\n\r\nvar ruleAlert = ruleData\r\n |alert()\r\n .id('{{ .Name }}\/adaptive_streaming_I1_nginx1')\r\n .message('{{ .ID }} is {{ .Level }} Mean waiting connections: {{ index .Fields \"mean_waiting\" }}')\r\n .warn(lambda: \"mean_waiting\" > 10)\r\n .log( '\/tmp\/TestRule2.log' )", + + "status" : "enabled" +} \ No newline at end of file diff --git a/test/streaming/test_streaming.py b/test/streaming/test_streaming.py index c1b7f77190e144a8c77a63c9f83d63e62d1fd1ad..aa8a96916e386d273eb5c4b7f85d7ecbc9a6063d 100644 --- a/test/streaming/test_streaming.py +++ b/test/streaming/test_streaming.py @@ -21,6 +21,8 @@ class TestStreamingAlerts(object): @pytest.mark.parametrize("rule, log", [ ("rules.json", "/tmp/RPSLoad.log"), + ("test_rule1.json", "/tmp/TestRule1.log"), + ("test_rule2.json", "/tmp/TestRule2.log"), ]) def test_alerts(self, rule, log, streaming_url, streaming_manifest): """ @@ -38,14 +40,10 @@ class TestStreamingAlerts(object): :param streaming_manifest: the fixture providing the root of the XML streaming manifest """ - kapacitor_setter = self.kapacitor_setting(rule) + kapacitor_setter = self.kapacitor_setting(rule, log) next(kapacitor_setter) # Setup the test rule - try: - if isfile(log): - remove(log) # delete log file if existing from previous tests - except PermissionError: - system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file + print("Testing alert creation for rule: {0}".format(rule)) segments = streaming_manifest.findall(".//{urn:mpeg:DASH:schema:MPD:2011}SegmentURL") @@ -56,9 +54,11 @@ class TestStreamingAlerts(object): t.start() alert_created = False + counter = 0 + time_delay = 2.5 while True: # loop while threads are execution and do a check every 2.5 seconds to check if either alert log has been created or threads have finished execution - sleep(2.5) + sleep(time_delay) if isfile(log): for t in threads: # kill all running threads in case log file is created beforehand t.stop() @@ -67,32 +67,53 @@ class TestStreamingAlerts(object): if threads_queue.full(): break - assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert." + counter += time_delay # the counter tracks the time taken, for the rules under test usually a 20 seconds time frame is enough + if counter >= 8*time_delay: + break + + assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert for rule {0}.".format(rule) - print("\nSuccessfully passed alert creation test.\n") + print("Successfully passed alert creation test for rule: {0}.".format(rule)) next(kapacitor_setter) # Teardown the test rule - def kapacitor_setting(self, rule): + def kapacitor_setting(self, rule, log): """ A generator function used to provide setUp/tearDown actions for a particular kapacitor rule. On setUp rule is initialized, on tearDown rule is deleted. Interleaving is achieved using the generator pattern. :param rule: the name of the json file for the rule under test + :param log: the absolute path of the log file that's being tested """ + # check if the log file is already created due to a previous test + try: + if isfile(log): + remove(log) # delete log file if existing from previous tests + except PermissionError: + system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file + # Initialization of the kapacitor rule - Test setUp (UnitTest style) with open(join(dirname(__file__), rule), "r") as rule_file: data = "".join(line.strip() for line in rule_file.readlines()) rule_data = json.loads(data) requests.delete(url=urljoin(self.kapacitor_url + "/", rule_data.get("id"))) # delete in case of a task with the same ID already set in the kapacitor - requests.post(url=self.kapacitor_url, data=data, headers={"Content-Type": "application/json"}) + r = requests.post(url=self.kapacitor_url, data=data, headers={"Content-Type": "application/json"}) + assert r.status_code == 200, "Couldn't create alert rule {0}".format(rule) + print("\nSuccessfully created test rule {0}".format(rule)) yield # Deleting the kapacitor rule used for testing - Test tearDown (UnitTest style) requests.delete(url=urljoin(self.kapacitor_url + "/", rule_data.get("id"))) + + # check if the log file is created and clean it up + try: + if isfile(log): + remove(log) # delete log file if existing from previous tests + except PermissionError: + system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file yield @staticmethod