diff --git a/test/streaming/conftest.py b/test/streaming/conftest.py index 854ce0b3bc50250026ab72f072258265afd2a8ca..77e0f1d2d5f50a2a13d7918d24b155563a377436 100644 --- a/test/streaming/conftest.py +++ b/test/streaming/conftest.py @@ -4,7 +4,7 @@ import pytest import yaml -@pytest.fixture(scope="module", params=[{'config': {'rspec': 'test/streaming/rspec.yml'}}]) +@pytest.fixture(scope="module", params=[{'config': {'rspec': '/vagrant/test/streaming/rspec.yml'}}]) def streaming_config(request): """ Reads the service configuration deployed for the streaming simulation test. diff --git a/test/streaming/manual.md b/test/streaming/manual.md index 9480d4ac45c1955e928ad626b9319e15bcd0434b..aeffb93a989e61806486bb425eb25b8b835f3949 100644 --- a/test/streaming/manual.md +++ b/test/streaming/manual.md @@ -49,6 +49,16 @@ This test currently just generates the load and does not have any assertions. It And then point your browser to the Chronograf dashboard: +### Run the automated PyTests + +SSH into the clmc-service VM: + +`vagrant --fixture=streaming ssh clmc-service` + +Run the automated tests written in pytest: + +`pytest -s /vagrant/test/streaming/` + `http://localhost:8888` ### Manual test diff --git a/test/streaming/test_streaming.py b/test/streaming/test_streaming.py new file mode 100644 index 0000000000000000000000000000000000000000..a35ef3760200a0b3b35d878bb492dcf6e66b4fbf --- /dev/null +++ b/test/streaming/test_streaming.py @@ -0,0 +1,139 @@ +#!/usr/bin/python3 + +from threading import Thread +from time import sleep +from queue import Queue +from xml.etree import ElementTree +from urllib.parse import urljoin +from os.path import isfile +from os import remove, system +import pytest +import requests + + +class TestStreamingAlerts(object): + """ + A testing class used to group all the tests related to the streaming scenario. + """ + + @pytest.mark.parametrize("log", ["/tmp/RPSLoad.log"]) + def test_alerts(self, log, streaming_url, streaming_manifest): + """ + This test case generates some streaming requests to the server to ensure an alert is triggered and then tests the log file for this alert. Different logs can be tested by + appending to the list of parameters in the pytest decorator + + :param log: the path of the log file that is under test + :param streaming_url: the fixture providing the streaming url for this test case + :param streaming_manifest: the fixture providing the root of the XML streaming manifest + """ + + try: + if isfile(log): + remove(log) # delete log file if existing from previous tests + except PermissionError: + system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file + + segments = streaming_manifest.findall(".//{urn:mpeg:DASH:schema:MPD:2011}SegmentURL") + + threads_num = 30 + threads_queue = Queue(maxsize=threads_num) # a synchronized queue is used to track if all the threads has finished execution + threads = [StreamingThread(streaming_url, segments, threads_queue) for _ in range(threads_num)] + for t in threads: + t.start() + + alert_created = False + while True: + # loop while threads are execution and do a check every 2.5 seconds to check if either alert log has been created or threads have finished execution + sleep(2.5) + if isfile(log): + for t in threads: # kill all running threads in case log file is created beforehand + t.stop() + alert_created = True + + if threads_queue.full(): + break + + assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert." + + print("\nSuccessfully passed alert creation test.\n") + + @staticmethod + @pytest.fixture(scope="class", params=[{"server": "http://192.168.50.11", "video": "/test_video/stream.mpd"}]) + def streaming_url(request): + """ + A fixture with class scope - used only in the scope of the testing class. + + :param request: the parameters for this fixture - server url and video relative url + :return: the combined URL for the video used for streaming + """ + + return urljoin(request.param["server"], request.param["video"]) + + @staticmethod + @pytest.fixture(scope="class") + def streaming_manifest(streaming_url): + """ + A fixture to download the manifest file for the streamed video and parse the downloaded XML content + + :param streaming_url: the fixture which provides the streaming url + :return: an XML root node object + """ + + manifest_xml = requests.get(streaming_url).text + root = ElementTree.fromstring(manifest_xml) + return root + + +class StreamingThread(Thread): + + def __init__(self, url, segments, queue): + """ + Subclassing the Thread class to create a custom streaming thread. + + :param url: the streaming url + :param segments: the list of SegmentURL XML nodes + :param queue: an auxiliary parameter used to indicate when this thread has finished execution + """ + + super(StreamingThread, self).__init__() + self.running = False + self.url = url + self.segments = segments + self.queue = queue + self._test_finished = False # a flag to indicate whether the thread should stop running + + def stop(self): + """ + Kill this thread and suspend its execution. + """ + + self._test_finished = True + + def run(self): + """ + A function, which simulates an actual streaming by downloading different audio/video segments from the server using a request session, + which leaves the connection open until executing. + """ + + size = len(self.segments) + size = size if size % 2 == 0 else size - 1 + + s = requests.session() + + for i in range(0, int(size / 2), 1): + segment_audio = self.segments[0] + segment_video = self.segments[int(size / 2) + i] + segment_audio_url = segment_audio.attrib.get('media') + segment_video_url = segment_video.attrib.get('media') + + s.get(urljoin(self.url, segment_audio_url)) + s.get(urljoin(self.url, segment_video_url)) + + # check if thread is killed in case the test has already succeeded + if self._test_finished: + break + + # a small time out to mimic the behaviour of a real streaming + sleep(2.5) + + self.queue.put(True)