#!/usr/bin/python3 """ ## © University of Southampton IT Innovation Centre, 2018 ## ## Copyright in this software belongs to University of Southampton ## IT Innovation Centre of Gamma House, Enterprise Road, ## Chilworth Science Park, Southampton, SO16 7NS, UK. ## ## This software may not be used, sold, licensed, transferred, copied ## or reproduced in whole or in part in any manner or form or in or ## on any media by any person other than in accordance with the terms ## of the Licence Agreement supplied with the software, or otherwise ## without the prior written consent of the copyright owners. ## ## This software is distributed WITHOUT ANY WARRANTY, without even the ## implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ## PURPOSE, except where stated in the Licence Agreement supplied with ## the software. ## ## Created By : Michael Boniface ## Created Date : 29-04-2018 ## Created for Project : FLAME """ import pytest import time import random import logging import sys from config_collector import ConfigCollector STATE_INDEX = 0 TIME_INDEX = 1 samples = [[['active', 0], ['active', 2]], [['active', 0], ['active', 2], ['active', 4]], [['active', 0], ['failed', 2]], [['active', 0], ['active', 2], ['inactive', 4], ['active', 6], ['failed', 8], ['inactive', 10]], [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]]] def get_sample_test(): global sample_set global current_index sample = (samples[sample_set][current_index][STATE_INDEX], time.time()) sample_count = len(samples[sample_set]) if current_index < sample_count-1: current_index +=1 else: current_index = 0 return sample def write_output(report): print("Writing report output {0}".format(report)) sample_set = 0 current_index = 0 def test_agg(): t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[0], 10, 12) assert report['fields']['current_state'] == 'active' assert report['fields']['current_state_time'] == 12 assert report['fields']['active_sum'] == 12 assert report['fields']['active_count'] == 1 assert report['time'] == 12 t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[1], 10, 14) assert report['fields']['current_state'] == 'active' assert report['fields']['current_state_time'] == 14 assert report['fields']['active_sum'] == 14 assert report['fields']['active_count'] == 1 assert report['time'] == 14 t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[2], 8, 10) assert report['fields']['current_state'] == 'failed' assert report['fields']['current_state_time'] == 0 assert report['fields']['active_sum'] == 2 assert report['fields']['active_count'] == 1 assert report['fields']['failed_sum'] == 0 assert report['fields']['failed_count'] == 1 assert report['time'] == 10 t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[3], 2, 12) assert report['fields']['current_state'] == 'inactive' assert report['fields']['current_state_time'] == 0 assert report['fields']['active_sum'] == 6 assert report['fields']['active_count'] == 2 assert report['fields']['inactive_sum'] == 2 assert report['fields']['inactive_count'] == 2 assert report['fields']['failed_sum'] == 2 assert report['fields']['failed_count'] == 1 assert report['time'] == 12 t = ConfigCollector(get_sample_test, write_output) report = t.agg_samples(samples[4], 4, 14) assert report['fields']['current_state'] == 'failed' assert report['fields']['current_state_time'] == 0 assert report['fields']['active_sum'] == 4 assert report['fields']['active_count'] == 2 assert report['fields']['inactive_sum'] == 4 assert report['fields']['inactive_count'] == 2 assert report['fields']['failed_sum'] == 2 assert report['fields']['failed_count'] == 2 assert report['time'] == 14 def test_one_period_collection(): global sample_set global current_index # one reporting period sample_set = 1 current_index = 0 t = ConfigCollector(get_sample_test, write_output, 2, 6) t.start() time.sleep(8) t.stop() print("Current report: {0}".format(str(t.current_report))) assert t.current_report['fields']['current_state'] == 'active' assert int(round(t.current_report['fields']['current_state_time'])) == 6 assert int(round(t.current_report['fields']['active_sum'])) == 6 assert int(round(t.current_report['fields']['active_count'])) == 1 def test_multi_period_single_state_collection(): global sample_set global current_index # two reporting periods sample_set = 1 current_index = 0 t = ConfigCollector(get_sample_test, write_output, 1, 3) t.start() time.sleep(7) t.stop() print("Current report: {0}".format(str(t.current_report))) assert t.current_report['fields']['current_state'] == 'active' assert int(round(t.current_report['fields']['current_state_time'])) == 6 assert int(round(t.current_report['fields']['active_sum'])) == 6 assert int(round(t.current_report['fields']['active_count'])) == 1 # [['active', 0], ['inactive', 2], ['failed', 4], ['active', 6], ['inactive', 8], ['failed', 10]] def test_multi_period_multi_state_collection(): global sample_set global current_index # 6 samples and 2 reporting periods sample_set = 4 current_index = 0 t = ConfigCollector(get_sample_test, write_output, 2, 10) t.start() time.sleep(13) t.stop() print("Current report: {0}".format(str(t.current_report))) assert t.current_report['fields']['current_state'] == 'failed' assert int(round(t.current_report['fields']['current_state_time'])) == 0 assert int(round(t.current_report['fields']['active_sum'])) == 4 assert int(round(t.current_report['fields']['active_count'])) == 2 assert int(round(t.current_report['fields']['inactive_sum'])) == 4 assert int(round(t.current_report['fields']['inactive_count'])) == 2 assert int(round(t.current_report['fields']['failed_sum'])) == 2 assert int(round(t.current_report['fields']['failed_count'])) == 2