Skip to content
Snippets Groups Projects
Commit ef85b8f4 authored by Nikolay Stanchev's avatar Nikolay Stanchev
Browse files

[ Issue #56 ] - refactored the system-dependent ping testing for the streaming package

parent 14773a59
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,7 @@ import yaml
from influxdb import InfluxDBClient
@pytest.fixture(scope="module", params=[{'config1': {'rspec': 'test/streaming-sim/rspec.yml'}}])
@pytest.fixture(scope="module", params=[{'config': {'rspec': 'test/streaming-sim/rspec.yml'}}])
def streaming_sim_config(request):
"""
Reads the service configuration deployed for the streaming simulation test.
......@@ -14,7 +14,7 @@ def streaming_sim_config(request):
:return: the python object representing the read YAML file
"""
with open(request.param['config1']['rspec'], 'r') as stream:
with open(request.param['config']['rspec'], 'r') as stream:
data_loaded = yaml.load(stream)
return data_loaded
......
......@@ -15,6 +15,8 @@ def test_service_names(streaming_sim_config):
assert streaming_sim_config['hosts'][1]['name'] == 'ipendpoint1', "Invalid service name: {0}".format(streaming_sim_config['hosts'][1]['name'])
assert streaming_sim_config['hosts'][2]['name'] == 'ipendpoint2', "Invalid service name: {0}".format(streaming_sim_config['hosts'][2]['name'])
print("\nSuccessfully passed service names configuration test\n")
def test_ping(streaming_sim_config):
"""
......
......@@ -3,9 +3,16 @@
import pytest
import yaml
@pytest.fixture(scope="module")
def streaming_config():
"""Returns the service configuration deployed for the streaming test. In future this needs to be a parameterised fixture shared with other rspec.yml based tests"""
with open("test/streaming/rspec.yml", 'r') as stream:
@pytest.fixture(scope="module", params=[{'config': {'rspec': 'test/streaming/rspec.yml'}}])
def streaming_config(request):
"""
Reads the service configuration deployed for the streaming simulation test.
:param request: access the parameters of the fixture
:return: the python object representing the read YAML file
"""
with open(request.param['config']['rspec'], 'r') as stream:
data_loaded = yaml.load(stream)
return data_loaded
\ No newline at end of file
return data_loaded
#!/usr/bin/python3
import pytest
import os
from subprocess import run
from platform import system
def test_service_names(streaming_config):
print(streaming_config['hosts'][0]['name'])
assert streaming_config['hosts'][0]['name'] == 'clmc-service'
assert streaming_config['hosts'][1]['name'] == 'nginx1'
assert streaming_config['hosts'][2]['name'] == 'nginx2'
assert streaming_config['hosts'][3]['name'] == 'loadtest-streaming'
"""
Tests the service names in the configuration.
:param streaming_config: the configuration fixture collected from conftest.py
"""
assert streaming_config['hosts'][0]['name'] == 'clmc-service', "Invalid service name: {0}".format(streaming_config['hosts'][0]['name'])
assert streaming_config['hosts'][1]['name'] == 'nginx1', "Invalid service name: {0}".format(streaming_config['hosts'][1]['name'])
assert streaming_config['hosts'][2]['name'] == 'nginx2', "Invalid service name: {0}".format(streaming_config['hosts'][2]['name'])
assert streaming_config['hosts'][3]['name'] == 'loadtest-streaming', "Invalid service name: {0}".format(streaming_config['hosts'][3]['name'])
print("\nSuccessfully passed service names configuration test\n")
def test_ping(streaming_config):
"""This test will only run on linux"""
for x in streaming_config['hosts']:
print(x['ip_address'])
response = os.system("ping -c 1 " + x['ip_address'])
assert response == 0
"""
Pings each service to test for liveliness
:param streaming_config: the configuration fixture collected from conftest.py
"""
print("\n") # blank line printed for formatting purposes
ping_count = 1
system_dependent_param = "-n" if system().lower() == "windows" else "-c"
for service in streaming_config['hosts']:
command = ["ping", system_dependent_param, str(ping_count), service['ip_address']]
assert run(command).returncode == 0, "Service ping test failed for {0}".format(service['name'])
print("\nSuccessfully passed ping test for service: {0}\n".format(service['name']))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment