From f33d0e47c69b91c1de46e13bcd44a39285f87b7b Mon Sep 17 00:00:00 2001 From: Nikolay Stanchev <ns17@it-innovation.soton.ac.uk> Date: Mon, 19 Mar 2018 13:37:40 +0000 Subject: [PATCH] [ Issue #57 ] - simple streaming test written in pytest --- test/streaming/conftest.py | 2 +- test/streaming/manual.md | 10 +++ test/streaming/test_streaming.py | 139 +++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 test/streaming/test_streaming.py diff --git a/test/streaming/conftest.py b/test/streaming/conftest.py index 854ce0b..77e0f1d 100644 --- a/test/streaming/conftest.py +++ b/test/streaming/conftest.py @@ -4,7 +4,7 @@ import pytest import yaml -@pytest.fixture(scope="module", params=[{'config': {'rspec': 'test/streaming/rspec.yml'}}]) +@pytest.fixture(scope="module", params=[{'config': {'rspec': '/vagrant/test/streaming/rspec.yml'}}]) def streaming_config(request): """ Reads the service configuration deployed for the streaming simulation test. diff --git a/test/streaming/manual.md b/test/streaming/manual.md index 9480d4a..aeffb93 100644 --- a/test/streaming/manual.md +++ b/test/streaming/manual.md @@ -49,6 +49,16 @@ This test currently just generates the load and does not have any assertions. It And then point your browser to the Chronograf dashboard: +### Run the automated PyTests + +SSH into the clmc-service VM: + +`vagrant --fixture=streaming ssh clmc-service` + +Run the automated tests written in pytest: + +`pytest -s /vagrant/test/streaming/` + `http://localhost:8888` ### Manual test diff --git a/test/streaming/test_streaming.py b/test/streaming/test_streaming.py new file mode 100644 index 0000000..a35ef37 --- /dev/null +++ b/test/streaming/test_streaming.py @@ -0,0 +1,139 @@ +#!/usr/bin/python3 + +from threading import Thread +from time import sleep +from queue import Queue +from xml.etree import ElementTree +from urllib.parse import urljoin +from os.path import isfile +from os import remove, system +import pytest +import requests + + +class TestStreamingAlerts(object): + """ + A testing class used to group all the tests related to the streaming scenario. + """ + + @pytest.mark.parametrize("log", ["/tmp/RPSLoad.log"]) + def test_alerts(self, log, streaming_url, streaming_manifest): + """ + This test case generates some streaming requests to the server to ensure an alert is triggered and then tests the log file for this alert. Different logs can be tested by + appending to the list of parameters in the pytest decorator + + :param log: the path of the log file that is under test + :param streaming_url: the fixture providing the streaming url for this test case + :param streaming_manifest: the fixture providing the root of the XML streaming manifest + """ + + try: + if isfile(log): + remove(log) # delete log file if existing from previous tests + except PermissionError: + system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file + + segments = streaming_manifest.findall(".//{urn:mpeg:DASH:schema:MPD:2011}SegmentURL") + + threads_num = 30 + threads_queue = Queue(maxsize=threads_num) # a synchronized queue is used to track if all the threads has finished execution + threads = [StreamingThread(streaming_url, segments, threads_queue) for _ in range(threads_num)] + for t in threads: + t.start() + + alert_created = False + while True: + # loop while threads are execution and do a check every 2.5 seconds to check if either alert log has been created or threads have finished execution + sleep(2.5) + if isfile(log): + for t in threads: # kill all running threads in case log file is created beforehand + t.stop() + alert_created = True + + if threads_queue.full(): + break + + assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert." + + print("\nSuccessfully passed alert creation test.\n") + + @staticmethod + @pytest.fixture(scope="class", params=[{"server": "http://192.168.50.11", "video": "/test_video/stream.mpd"}]) + def streaming_url(request): + """ + A fixture with class scope - used only in the scope of the testing class. + + :param request: the parameters for this fixture - server url and video relative url + :return: the combined URL for the video used for streaming + """ + + return urljoin(request.param["server"], request.param["video"]) + + @staticmethod + @pytest.fixture(scope="class") + def streaming_manifest(streaming_url): + """ + A fixture to download the manifest file for the streamed video and parse the downloaded XML content + + :param streaming_url: the fixture which provides the streaming url + :return: an XML root node object + """ + + manifest_xml = requests.get(streaming_url).text + root = ElementTree.fromstring(manifest_xml) + return root + + +class StreamingThread(Thread): + + def __init__(self, url, segments, queue): + """ + Subclassing the Thread class to create a custom streaming thread. + + :param url: the streaming url + :param segments: the list of SegmentURL XML nodes + :param queue: an auxiliary parameter used to indicate when this thread has finished execution + """ + + super(StreamingThread, self).__init__() + self.running = False + self.url = url + self.segments = segments + self.queue = queue + self._test_finished = False # a flag to indicate whether the thread should stop running + + def stop(self): + """ + Kill this thread and suspend its execution. + """ + + self._test_finished = True + + def run(self): + """ + A function, which simulates an actual streaming by downloading different audio/video segments from the server using a request session, + which leaves the connection open until executing. + """ + + size = len(self.segments) + size = size if size % 2 == 0 else size - 1 + + s = requests.session() + + for i in range(0, int(size / 2), 1): + segment_audio = self.segments[0] + segment_video = self.segments[int(size / 2) + i] + segment_audio_url = segment_audio.attrib.get('media') + segment_video_url = segment_video.attrib.get('media') + + s.get(urljoin(self.url, segment_audio_url)) + s.get(urljoin(self.url, segment_video_url)) + + # check if thread is killed in case the test has already succeeded + if self._test_finished: + break + + # a small time out to mimic the behaviour of a real streaming + sleep(2.5) + + self.queue.put(True) -- GitLab