Skip to content
Snippets Groups Projects
Commit f33d0e47 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

[ Issue #57 ] - simple streaming test written in pytest

parent 627d4aaf
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@ import pytest
import yaml
@pytest.fixture(scope="module", params=[{'config': {'rspec': 'test/streaming/rspec.yml'}}])
@pytest.fixture(scope="module", params=[{'config': {'rspec': '/vagrant/test/streaming/rspec.yml'}}])
def streaming_config(request):
"""
Reads the service configuration deployed for the streaming simulation test.
......
......@@ -49,6 +49,16 @@ This test currently just generates the load and does not have any assertions. It
And then point your browser to the Chronograf dashboard:
### Run the automated PyTests
SSH into the clmc-service VM:
`vagrant --fixture=streaming ssh clmc-service`
Run the automated tests written in pytest:
`pytest -s /vagrant/test/streaming/`
`http://localhost:8888`
### Manual test
......
#!/usr/bin/python3
from threading import Thread
from time import sleep
from queue import Queue
from xml.etree import ElementTree
from urllib.parse import urljoin
from os.path import isfile
from os import remove, system
import pytest
import requests
class TestStreamingAlerts(object):
"""
A testing class used to group all the tests related to the streaming scenario.
"""
@pytest.mark.parametrize("log", ["/tmp/RPSLoad.log"])
def test_alerts(self, log, streaming_url, streaming_manifest):
"""
This test case generates some streaming requests to the server to ensure an alert is triggered and then tests the log file for this alert. Different logs can be tested by
appending to the list of parameters in the pytest decorator
:param log: the path of the log file that is under test
:param streaming_url: the fixture providing the streaming url for this test case
:param streaming_manifest: the fixture providing the root of the XML streaming manifest
"""
try:
if isfile(log):
remove(log) # delete log file if existing from previous tests
except PermissionError:
system("sudo rm {0}".format(log)) # handles the case for running on linux where permission will be required to delete the old log file
segments = streaming_manifest.findall(".//{urn:mpeg:DASH:schema:MPD:2011}SegmentURL")
threads_num = 30
threads_queue = Queue(maxsize=threads_num) # a synchronized queue is used to track if all the threads has finished execution
threads = [StreamingThread(streaming_url, segments, threads_queue) for _ in range(threads_num)]
for t in threads:
t.start()
alert_created = False
while True:
# loop while threads are execution and do a check every 2.5 seconds to check if either alert log has been created or threads have finished execution
sleep(2.5)
if isfile(log):
for t in threads: # kill all running threads in case log file is created beforehand
t.stop()
alert_created = True
if threads_queue.full():
break
assert alert_created, "Alerts test failed: no log file is created indicating a triggered alert."
print("\nSuccessfully passed alert creation test.\n")
@staticmethod
@pytest.fixture(scope="class", params=[{"server": "http://192.168.50.11", "video": "/test_video/stream.mpd"}])
def streaming_url(request):
"""
A fixture with class scope - used only in the scope of the testing class.
:param request: the parameters for this fixture - server url and video relative url
:return: the combined URL for the video used for streaming
"""
return urljoin(request.param["server"], request.param["video"])
@staticmethod
@pytest.fixture(scope="class")
def streaming_manifest(streaming_url):
"""
A fixture to download the manifest file for the streamed video and parse the downloaded XML content
:param streaming_url: the fixture which provides the streaming url
:return: an XML root node object
"""
manifest_xml = requests.get(streaming_url).text
root = ElementTree.fromstring(manifest_xml)
return root
class StreamingThread(Thread):
def __init__(self, url, segments, queue):
"""
Subclassing the Thread class to create a custom streaming thread.
:param url: the streaming url
:param segments: the list of SegmentURL XML nodes
:param queue: an auxiliary parameter used to indicate when this thread has finished execution
"""
super(StreamingThread, self).__init__()
self.running = False
self.url = url
self.segments = segments
self.queue = queue
self._test_finished = False # a flag to indicate whether the thread should stop running
def stop(self):
"""
Kill this thread and suspend its execution.
"""
self._test_finished = True
def run(self):
"""
A function, which simulates an actual streaming by downloading different audio/video segments from the server using a request session,
which leaves the connection open until executing.
"""
size = len(self.segments)
size = size if size % 2 == 0 else size - 1
s = requests.session()
for i in range(0, int(size / 2), 1):
segment_audio = self.segments[0]
segment_video = self.segments[int(size / 2) + i]
segment_audio_url = segment_audio.attrib.get('media')
segment_video_url = segment_video.attrib.get('media')
s.get(urljoin(self.url, segment_audio_url))
s.get(urljoin(self.url, segment_video_url))
# check if thread is killed in case the test has already succeeded
if self._test_finished:
break
# a small time out to mimic the behaviour of a real streaming
sleep(2.5)
self.queue.put(True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment