From 151ce92c41bd527a8f2c63962b29e67c2f0f64c0 Mon Sep 17 00:00:00 2001 From: Emilio Reyes Date: Sun, 4 Apr 2021 10:52:03 -0700 Subject: [PATCH] Update reference to mpmq Signed-off-by: Emilio Reyes --- README.md | 2 + build.py | 4 +- requirements.txt | 1 + src/main/python/mpcurses/__init__.py | 3 - src/main/python/mpcurses/handler.py | 90 ------- src/main/python/mpcurses/mpcontroller.py | 224 ---------------- src/main/python/mpcurses/mpcurses.py | 8 +- src/unittest/python/test_handler.py | 137 ---------- src/unittest/python/test_mpcontroller.py | 323 ----------------------- src/unittest/python/test_mpcurses.py | 4 +- 10 files changed, 11 insertions(+), 785 deletions(-) delete mode 100644 src/main/python/mpcurses/handler.py delete mode 100644 src/main/python/mpcurses/mpcontroller.py delete mode 100644 src/unittest/python/test_handler.py delete mode 100644 src/unittest/python/test_mpcontroller.py diff --git a/README.md b/README.md index 9b8cd0b..9ebc20f 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ The main features are: Refer to documentation here: https://soda480.github.io/mpcurses/ + Mpcurses is a subclass of `mpmq`, see [mpmq](https://pypi.org/project/mpmq/) for more information. + ### Installation ### ```bash pip install mpcurses diff --git a/build.py b/build.py index d8cfe48..5b232dc 100644 --- a/build.py +++ b/build.py @@ -30,7 +30,7 @@ authors = [Author('Emilio Reyes', 'emilio.reyes@intel.com')] summary = 'Mpcurses is an abstraction of the Python curses and multiprocessing libraries providing function execution and runtime visualization capabilities' url = 'https://github.com/soda480/mpcurses' -version = '0.3.0' +version = '0.3.1' default_task = [ 'clean', 'analyze', @@ -71,6 +71,6 @@ def set_properties(project): 'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: System :: Networking', 'Topic :: System :: Systems Administration']) - project.set_property('radon_break_build_average_complexity_threshold', 3.6) + project.set_property('radon_break_build_average_complexity_threshold', 4) project.set_property('radon_break_build_complexity_threshold', 14) project.set_property('bandit_break_build', True) diff --git a/requirements.txt b/requirements.txt index e69de29..0faf396 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1 @@ +mpmq diff --git a/src/main/python/mpcurses/__init__.py b/src/main/python/mpcurses/__init__.py index ed9ff6e..0f19836 100644 --- a/src/main/python/mpcurses/__init__.py +++ b/src/main/python/mpcurses/__init__.py @@ -13,7 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from .handler import queue_handler from .mpcurses import MPcurses -from .mpcontroller import MPcontroller diff --git a/src/main/python/mpcurses/handler.py b/src/main/python/mpcurses/handler.py deleted file mode 100644 index 69bbb28..0000000 --- a/src/main/python/mpcurses/handler.py +++ /dev/null @@ -1,90 +0,0 @@ - -# Copyright (c) 2021 Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -from logging import Handler - -logger = logging.getLogger(__name__) - - -class QueueHandler(Handler): - """ subclass Handler enabling log messages to be sent to message queue - """ - - def __init__(self, message_queue, offset): - super(QueueHandler, self).__init__() - self.message_queue = message_queue - self.offset = offset - - def emit(self, record): - message = record.msg - - if record.levelno >= 40: - message = f'ERROR: {message}' - elif record.levelno >= 30: - message = f'WARN: {message}' - elif record.levelno == 20: - message = f'INFO: {message}' - - message = f'#{self.offset}-{message}' - self.message_queue.put(message) - - -def queue_handler(function): - """ adds QueueHandler to rootLogger in order to send log messages to a message queue - """ - - def _queue_handler(*args, **kwargs): - """ internal decorator for message queue handler - """ - root_logger = logging.getLogger() - - offset = kwargs.pop('offset', 0) - message_queue = kwargs.pop('message_queue', None) - result_queue = kwargs.pop('result_queue', None) - result = None - if message_queue: - logger.debug(f"configuring message queue log handler for '{function.__name__}' offset {offset}") - handler = QueueHandler(message_queue, offset) - log_formatter = logging.Formatter('%(asctime)s %(processName)s %(name)s [%(funcName)s] %(levelname)s %(message)s') - handler.setFormatter(log_formatter) - root_logger.addHandler(handler) - root_logger.setLevel(logging.DEBUG) - - try: - result = function(*args, **kwargs) - return result - - except Exception as exception: - result = exception - logger.error(str(exception), exc_info=True) - # log control message that an error occurred - logger.debug('ERROR') - - finally: - # add result to result queue with offset index - if result_queue: - logger.debug(f"adding '{function.__name__}' offset {offset} result to result queue") - result_queue.put({ - offset: result - }) - logger.debug(f'execution of {function.__name__} offset {offset} ended') - # log control message that method completed - logger.debug('DONE') - if message_queue: - root_logger.removeHandler(handler) - - return _queue_handler diff --git a/src/main/python/mpcurses/mpcontroller.py b/src/main/python/mpcurses/mpcontroller.py deleted file mode 100644 index e7bed98..0000000 --- a/src/main/python/mpcurses/mpcontroller.py +++ /dev/null @@ -1,224 +0,0 @@ - -# Copyright (c) 2021 Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import re -import sys -import logging -from multiprocessing import Queue -from multiprocessing import Process -from queue import Queue as SimpleQueue -from queue import Empty - -from mpcurses.handler import queue_handler - -logger = logging.getLogger(__name__) - - -class NoActiveProcesses(Exception): - """ Raise when NoActiveProcesses is used to signal end - """ - pass - - -class MPcontroller(): - """ multi-processing (MP) controller - execute function across multiple processes - queue function execution - queue function log messages to thread-safe message queue - process messages from log message queue - maintain result of all executed functions - terminate execution using keyboard interrupt - """ - def __init__(self, function, *, process_data=None, shared_data=None, processes_to_start=None): - """ MPcontroller constructor - """ - logger.debug('executing MPcontroller constructor') - logger.debug(f'decorating function {function.__name__} with queue_handler') - self.function = queue_handler(function) - self.process_data = [{}] if process_data is None else process_data - self.shared_data = {} if shared_data is None else shared_data - self.active_processes = {} - self.message_queue = Queue() - self.result_queue = Queue() - self.process_queue = SimpleQueue() - self.completed_processes = 0 - self.processes_to_start = processes_to_start if processes_to_start else len(self.process_data) - - def populate_process_queue(self): - """ populate process queue from process data offset - """ - logger.debug('populating the process queue') - for offset, data in enumerate(self.process_data): - item = (offset, data) - logger.debug(f'adding {item} to the process queue') - self.process_queue.put(item) - logger.debug(f'added {self.process_queue.qsize()} items to the process queue') - - def start_processes(self): - """ start processes - """ - self.populate_process_queue() - - logger.debug(f'there are {self.process_queue.qsize()} items in the process queue') - logger.debug(f'starting {self.processes_to_start} background processes') - for _ in range(self.processes_to_start): - if self.process_queue.empty(): - logger.debug('the process queue is empty - no more processes need to be started') - break - self.start_next_process() - logger.info(f'started {len(self.active_processes)} background processes') - - def start_next_process(self): - """ start next process in the process queue - """ - process_queue_data = self.process_queue.get() - offset = process_queue_data[0] - process_data = process_queue_data[1] - process = Process( - target=self.function, - args=(process_data, self.shared_data), - kwargs={ - 'message_queue': self.message_queue, - 'offset': offset, - 'result_queue': self.result_queue}) - process.start() - logger.info(f'started background process at offset {offset} with process id {process.pid}') - # update active_processes dictionary with process meta-data for the process offset - self.active_processes[str(offset)] = process - - def terminate_processes(self): - """ terminate all active processes - """ - for offset, process in self.active_processes.items(): - logger.info(f'terminating process at offset {offset} with process id {process.pid}') - process.terminate() - - def purge_process_queue(self): - """ purge process queue - """ - logger.info('purging all items from the to process queue') - while not self.process_queue.empty(): - logger.info(f'purged {self.process_queue.get()} from the to process queue') - - def remove_active_process(self, offset): - """ remove active process at offset - """ - process = self.active_processes.pop(offset, None) - process_id = process.pid if process else '-' - logger.info(f'process at offset {offset} process id {process_id} has completed') - - def update_result(self): - """ update process data with result - """ - logger.debug('updating process data with result from result queue') - while True: - try: - result_data = self.result_queue.get(False) - for offset, result in result_data.items(): - logger.debug(f'adding result of process at offset {offset} to process data') - self.process_data[int(offset)]['result'] = result - except Empty: - logger.debug('result queue is empty') - break - - def active_processes_empty(self): - """ return True if active processes is empty else False - method added to facilitate unit testing - """ - # no active processes means its empty - return not self.active_processes - - def get_message(self): - """ return message from top of message queue - """ - message = self.message_queue.get(False) - match = re.match(r'^#(?P\d+)-(?PDONE|ERROR)$', message) - if match: - return { - 'offset': match.group('offset'), - 'control': match.group('control'), - 'message': message - } - return { - 'offset': None, - 'control': None, - 'message': message - } - - def process_control_message(self, offset, control): - """ process control message - """ - if control == 'DONE': - self.remove_active_process(offset) - if self.process_queue.empty(): - logger.info('the to process queue is empty') - if self.active_processes_empty(): - raise NoActiveProcesses() - else: - self.start_next_process() - else: - logger.info(f'error detected for process at offset {offset}') - self.purge_process_queue() - - def process_non_control_message(self, offset, message): - """ process non-control message - to be overriden by child class - """ - pass - - def run(self): - """ run without screen - """ - self.start_processes() - - while True: - try: - message = self.get_message() - if message['control']: - self.process_control_message(message['offset'], message['control']) - else: - self.process_non_control_message(message['offset'], message['message']) - - except NoActiveProcesses: - logger.info('there are no more active processses - quitting') - break - - except Empty: - pass - - def execute_run(self): - """ wraps call to run - """ - self.run() - - def final(self): - """ called in finally block - """ - pass - - def execute(self): - """ public execute api - """ - try: - self.execute_run() - self.update_result() - - except KeyboardInterrupt: - logger.info('Keyboard Interrupt signal received - killing all active processes') - self.terminate_processes() - sys.exit(-1) - - finally: - self.final() diff --git a/src/main/python/mpcurses/mpcurses.py b/src/main/python/mpcurses/mpcurses.py index 2081019..b2ea722 100644 --- a/src/main/python/mpcurses/mpcurses.py +++ b/src/main/python/mpcurses/mpcurses.py @@ -31,8 +31,8 @@ from .screen import update_screen_status from .screen import blink -from .mpcontroller import MPcontroller -from .mpcontroller import NoActiveProcesses +from mpmq import MPmq +from mpmq.mpmq import NoActiveProcesses logger = logging.getLogger(__name__) @@ -69,8 +69,8 @@ def pop(self, *args): return value -class MPcurses(MPcontroller): - """ a subclass of mpcurses.MPcontroller providing multi-processing (MP) capabilities for a curses screen +class MPcurses(MPmq): + """ a subclass of MPmq providing multi-processing (MP) capabilities for a curses screen """ def __init__(self, *args, **kwargs): """ MPcurses constructor diff --git a/src/unittest/python/test_handler.py b/src/unittest/python/test_handler.py deleted file mode 100644 index fe20a26..0000000 --- a/src/unittest/python/test_handler.py +++ /dev/null @@ -1,137 +0,0 @@ - -# Copyright (c) 2021 Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from mock import patch -from mock import call -from mock import Mock -from mock import MagicMock - -from mpcurses.handler import QueueHandler -from mpcurses.handler import queue_handler - -import sys -import logging -logger = logging.getLogger(__name__) - -consoleHandler = logging.StreamHandler(sys.stdout) -logFormatter = logging.Formatter("%(asctime)s %(threadName)s %(name)s [%(funcName)s] %(levelname)s %(message)s") -consoleHandler.setFormatter(logFormatter) -rootLogger = logging.getLogger() -rootLogger.addHandler(consoleHandler) -rootLogger.setLevel(logging.DEBUG) - - -class TestHandler(unittest.TestCase): - - def setUp(self): - """ - """ - pass - - def tearDown(self): - """ - """ - pass - - @patch('mpcurses.handler.logging.Formatter') - @patch('mpcurses.handler.logging.getLogger') - @patch('mpcurses.handler.QueueHandler') - def test__queue_handler_Should_AddAndRemoveQueueHandler_When_DecoratedFunctionIsPassedMessageQueue(self, queue_handler_class, get_logger_mock, *patches): - root_logger_mock = Mock() - get_logger_mock.return_value = root_logger_mock - - queue_handler_object = Mock() - queue_handler_class.return_value = queue_handler_object - - function_mock = Mock(__name__='fn1') - function_mock.return_value = 'return' - message_queue_mock = Mock() - result = queue_handler(function_mock)(message_queue=message_queue_mock, offset=3) - self.assertEqual(result, function_mock.return_value) - - queue_handler_object.setFormatter.assert_called() - root_logger_mock.addHandler.assert_called_with(queue_handler_object) - root_logger_mock.setLevel.assert_called_with(logging.DEBUG) - root_logger_mock.removeHandler.assert_called_with(queue_handler_object) - - def test__queue_handler_Should_AddResultToResultQueue_When_DecoratedFunctionIsPassedResultQueue(self, *patches): - function_mock = Mock(__name__='fn1') - function_mock.return_value = 'function return value' - result_queue_mock = Mock() - result = queue_handler(function_mock)(offset=3, result_queue=result_queue_mock) - self.assertEqual(result, function_mock.return_value) - result_queue_mock.put.assert_called_once_with({3: function_mock.return_value}) - - def test__queue_handler_Should_AddDoneToMessageQueue_When_DecoratedFunctionIsPassedMessageQueueAndCompletes(self, *patches): - function_mock = Mock(__name__='fn1') - function_mock.return_value = 'function return value' - message_queue_mock = Mock() - result = queue_handler(function_mock)(offset=3, message_queue=message_queue_mock) - self.assertEqual(result, function_mock.return_value) - self.assertTrue(call('#3-execution of fn1 offset 3 ended') in message_queue_mock.put.mock_calls) - self.assertTrue(call('#3-DONE') in message_queue_mock.put.mock_calls) - - def test__queue_handler_Should_AddErrorMessagesToMessageQueue_When_FunctionThrowsException(self, *patches): - function_mock = Mock(__name__='fn1') - function_mock.side_effect = Exception('function exception') - message_queue_mock = Mock() - queue_handler(function_mock)(offset=3, message_queue=message_queue_mock) - call1 = call('#3-ERROR: function exception') - call2 = call('#3-ERROR') - call4 = call('#3-execution of fn1 offset 3 ended') - call3 = call('#3-DONE') - self.assertEqual(message_queue_mock.put.mock_calls, [call1, call2, call4, call3]) - - def test__queue_handler_Should_AddExceptionToResultQueue_When_FunctionThrowsException(self, *patches): - function_mock = Mock(__name__='fn1') - function_mock.side_effect = Exception('function exception') - message_queue_mock = Mock() - result_queue_mock = Mock() - queue_handler(function_mock)(offset=3, message_queue=message_queue_mock, result_queue=result_queue_mock) - result_queue_mock.put.assert_called_once_with({3: function_mock.side_effect}) - - @patch('mpcurses.handler.Handler') - def test__QueueHandler_Should_PutInfoMessageToMessageQueue_When_EmitInfoRecord(self, *patches): - message_queue_mock = Mock() - process_number = 1 - queue_handler_object = QueueHandler(message_queue_mock, process_number) - - record_mock = Mock(msg='some message', levelno=20) - - queue_handler_object.emit(record_mock) - message_queue_mock.put.assert_called_with('#1-INFO: some message') - - @patch('mpcurses.handler.Handler') - def test__QueueHandler_Should_PutInfoMessageToMessageQueue_When_EmitWarnRecord(self, *patches): - message_queue_mock = Mock() - process_number = 1 - queue_handler_object = QueueHandler(message_queue_mock, process_number) - - record_mock = Mock(msg='some message', levelno=30) - - queue_handler_object.emit(record_mock) - message_queue_mock.put.assert_called_with('#1-WARN: some message') - - @patch('mpcurses.handler.Handler') - def test__QueueHandler_Should_PutErrorMessageToMessageQueue_When_EmitErrorRecord(self, *patches): - message_queue_mock = Mock() - process_number = 1 - queue_handler_object = QueueHandler(message_queue_mock, process_number) - - record_mock = Mock(msg='some message', levelno=40) - - queue_handler_object.emit(record_mock) - message_queue_mock.put.assert_called_with('#1-ERROR: some message') diff --git a/src/unittest/python/test_mpcontroller.py b/src/unittest/python/test_mpcontroller.py deleted file mode 100644 index 38f3f6b..0000000 --- a/src/unittest/python/test_mpcontroller.py +++ /dev/null @@ -1,323 +0,0 @@ - -# Copyright (c) 2021 Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from mock import patch -from mock import call -from mock import Mock -from mock import MagicMock - -from queue import Empty - -from mpcurses.mpcontroller import MPcontroller -from mpcurses.mpcontroller import NoActiveProcesses -from mpcurses.handler import queue_handler - -import sys -import logging -logger = logging.getLogger(__name__) - - -class TestMPcontroller(unittest.TestCase): - - def setUp(self): - """ - """ - pass - - def tearDown(self): - """ - """ - pass - - def test__init_Should_SetDefaults_When_Called(self, *patches): - client = MPcontroller(function=Mock(__name__='mockfunc')) - self.assertEqual(client.process_data, [{}]) - self.assertEqual(client.shared_data, {}) - self.assertEqual(client.processes_to_start, 1) - - @patch('mpcurses.mpcontroller.queue_handler') - def test__init_Should_SetDefaults_When_FunctionNotWrapped(self, queue_handler_patch, *patches): - function_mock = Mock(__name__='mockfunc') - client = MPcontroller(function=function_mock) - self.assertEqual(client.process_data, [{}]) - self.assertEqual(client.shared_data, {}) - self.assertEqual(client.processes_to_start, 1) - self.assertEqual(client.function, queue_handler_patch(function_mock)) - - def test__populate_process_queue_Should_AddToProcessQueue_When_Called(self, *patches): - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - client.populate_process_queue() - self.assertEqual(client.process_queue.qsize(), 3) - - @patch('mpcurses.MPcontroller.start_next_process') - def test__start_processes_Should_CallStartNextProcess_When_Called(self, start_next_process_patch, *patches): - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=function_mock, process_data=process_data, processes_to_start=2) - client.start_processes() - self.assertEqual(len(start_next_process_patch.mock_calls), 2) - - @patch('mpcurses.MPcontroller.populate_process_queue') - @patch('mpcurses.MPcontroller.start_next_process') - def test__start_processes_Should_CallStartNextProcess_When_ProcessesToStartGreaterThanProcessQueueSize(self, start_next_process_patch, *patches): - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=function_mock, process_data=process_data) - client.start_processes() - self.assertEqual(len(start_next_process_patch.mock_calls), 0) - - @patch('mpcurses.mpcontroller.queue_handler') - @patch('mpcurses.mpcontroller.Process') - def test__start_next_process_Should_CallExpected_When_Called(self, process_patch, queue_handler_mock, *patches): - process_mock = Mock() - process_patch.return_value = process_mock - - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=function_mock, process_data=process_data, shared_data='--shared-data--') - client.populate_process_queue() - client.start_next_process() - - process_patch.assert_called_once_with( - target=queue_handler_mock(function_mock), - args=({'range': '0-1'}, '--shared-data--'), - kwargs={ - 'message_queue': client.message_queue, - 'offset': 0, - 'result_queue': client.result_queue - }) - - def test__terminate_processes_Should_CallExpected_When_Called(self, *patches): - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=function_mock, process_data=process_data, shared_data='--shared-data--') - process1_mock = Mock() - process2_mock = Mock() - client.active_processes = {'0': process1_mock, '1': process2_mock} - client.terminate_processes() - process1_mock.terminate.assert_called_once_with() - process2_mock.terminate.assert_called_once_with() - - def test__purge_process_queue_Should_PurgeProcessQueue_When_Called(self, *patches): - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=function_mock, process_data=process_data) - client.populate_process_queue() - self.assertEqual(client.process_queue.qsize(), 3) - client.purge_process_queue() - self.assertTrue(client.process_queue.empty()) - - @patch('mpcurses.mpcontroller.logger') - def test__remove_active_process_Should_CallExpected_When_Called(self, logger_patch, *patches): - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=function_mock, process_data=process_data) - process_mock = Mock(pid=121372) - client.active_processes['0'] = process_mock - client.remove_active_process('0') - logger_patch.info.assert_called_once_with('process at offset 0 process id 121372 has completed') - - def test__update_result_Should_CallExpected_When_Called(self, *patches): - result_queue_mock = Mock() - result_queue_mock.get.side_effect = [ - {'0': '--result0--'}, - {'1': '--result1--'}, - {'2': '--result2--'}, - Empty('empty') - ] - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] - client = MPcontroller(function=function_mock, process_data=process_data) - client.result_queue = result_queue_mock - client.update_result() - expected_process_data = [{'range': '0-1', 'result': '--result0--'}, {'range': '2-3', 'result': '--result1--'}, {'range': '4-5', 'result': '--result2--'}] - self.assertEqual(client.process_data, expected_process_data) - - def test__active_processes_empty_Should_ReturnExpected_When_Called(self, *patches): - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}] - client = MPcontroller(function=function_mock, process_data=process_data) - client.active_processes = {'1': True} - self.assertFalse(client.active_processes_empty()) - client.active_processes = {} - self.assertTrue(client.active_processes_empty()) - - def test__get_message_Should_ReturnExpected_When_ControlDone(self, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - message_queue_mock = Mock() - message_queue_mock.get.return_value = '#0-DONE' - client.message_queue = message_queue_mock - - result = client.get_message() - expected_result = { - 'offset': '0', - 'control': 'DONE', - 'message': '#0-DONE' - } - self.assertEqual(result, expected_result) - - def test__get_message_Should_ReturnExpected_When_ControlError(self, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - message_queue_mock = Mock() - message_queue_mock.get.return_value = '#3-ERROR' - client.message_queue = message_queue_mock - - result = client.get_message() - expected_result = { - 'offset': '3', - 'control': 'ERROR', - 'message': '#3-ERROR' - } - self.assertEqual(result, expected_result) - - def test__get_message_Should_ReturnExpected_When_NotControlMessage(self, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - message_queue_mock = Mock() - message_queue_mock.get.return_value = '#4-This is a log message' - client.message_queue = message_queue_mock - - result = client.get_message() - expected_result = { - 'offset': None, - 'control': None, - 'message': '#4-This is a log message' - } - self.assertEqual(result, expected_result) - - @patch('mpcurses.MPcontroller.remove_active_process') - @patch('mpcurses.MPcontroller.active_processes_empty', return_value=True) - def test__process_control_message_Should_RaiseNoActiveProcesses_When_ControlDoneAndProcessQueueEmptyAndNoActiveProcesses(self, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - process_queue_mock = Mock() - process_queue_mock.empty.return_value = True - client.process_queue = process_queue_mock - - with self.assertRaises(NoActiveProcesses): - client.process_control_message('0', 'DONE') - - @patch('mpcurses.MPcontroller.active_processes_empty', return_value=False) - @patch('mpcurses.MPcontroller.remove_active_process') - @patch('mpcurses.MPcontroller.start_next_process') - def test__process_control_message_Should_StartNextProcess_When_ControlDoneAndProcessQueueNotEmpty(self, start_next_process_patch, remove_active_process_patch, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - process_queue_mock = Mock() - process_queue_mock.empty.return_value = False - client.process_queue = process_queue_mock - - client.process_control_message('0', 'DONE') - start_next_process_patch.assert_called_once_with() - remove_active_process_patch.assert_called_once_with('0') - - @patch('mpcurses.MPcontroller.purge_process_queue') - def test__process_control_message_Should_PurgeProcessQueue_When_ControlError(self, purge_process_queue_patch, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - client.process_control_message('0', 'ERROR') - purge_process_queue_patch.assert_called_once_with() - - @patch('mpcurses.MPcontroller.remove_active_process') - @patch('mpcurses.MPcontroller.active_processes_empty', return_value=False) - def test__process_control_message_Should_DoNothing_When_ControlDoneAndProcessQueueEmptyAndActiveProcesses(self, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - process_queue_mock = Mock() - process_queue_mock.empty.return_value = True - client.process_queue = process_queue_mock - - client.process_control_message('0', 'DONE') - - def test__process_non_control_message_Should_DoNothing_When_Called(self, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - client.process_non_control_message(None, 'message') - - @patch('mpcurses.MPcontroller.start_processes') - @patch('mpcurses.mpcontroller.logger') - @patch('mpcurses.MPcontroller.get_message') - def test__run_Should_CallExpected_When_EmptyAndNoActiveProcesses(self, get_message_patch, logger_patch, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - get_message_patch.side_effect = [ - Empty('empty'), - NoActiveProcesses() - ] - client.run() - logger_patch.info.assert_called_once_with('there are no more active processses - quitting') - - @patch('mpcurses.MPcontroller.start_processes') - @patch('mpcurses.MPcontroller.process_non_control_message') - @patch('mpcurses.MPcontroller.process_control_message') - @patch('mpcurses.MPcontroller.get_message') - def test__run_Should_CallExpected_When_Called(self, get_message_patch, process_control_message_patch, process_non_control_message_patch, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - - get_message_patch.side_effect = [ - {'offset': None, 'control': None, 'message': '#0-this is message1'}, - {'offset': None, 'control': None, 'message': '#0-this is message2'}, - {'offset': '0', 'control': 'DONE', 'message': '#0-DONE'}, - NoActiveProcesses() - ] - client.run() - process_control_message_patch.assert_called_once_with('0', 'DONE') - self.assertTrue(call(None, '#0-this is message1') in process_non_control_message_patch.mock_calls) - - @patch('mpcurses.MPcontroller.run') - def test__execute_run_Should_CallExepcted_When_Called(self, run_patch, *patches): - process_data = [{'range': '0-1'}] - client = MPcontroller(function=Mock(__name__='mockfunc'), process_data=process_data) - client.execute_run() - run_patch.assert_called_once_with() - - @patch('mpcurses.MPcontroller.terminate_processes') - @patch('mpcurses.mpcontroller.sys') - @patch('mpcurses.MPcontroller.execute_run') - def test__execute_Should_CallTerminateProcesses_When_KeyboardInterrupt(self, execute_run_patch, sys_patch, terminate_processes_patch, *patches): - execute_run_patch.side_effect = [ - KeyboardInterrupt('keyboard interrupt') - ] - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}] - client = MPcontroller(function=function_mock, process_data=process_data) - client.execute() - terminate_processes_patch.assert_called_once_with() - sys_patch.exit.assert_called_once_with(-1) - - @patch('mpcurses.MPcontroller.final') - @patch('mpcurses.MPcontroller.update_result') - @patch('mpcurses.MPcontroller.execute_run') - def test__execute_Should_CallExpected_When_Called(self, execute_run_patch, update_result_patch, final_patch, *patches): - function_mock = Mock(__name__='mockfunc') - process_data = [{'range': '0-1'}] - client = MPcontroller(function=function_mock, process_data=process_data) - client.execute() - execute_run_patch.assert_called_once_with() - update_result_patch.assert_called_once_with() - final_patch.assert_called_once_with() diff --git a/src/unittest/python/test_mpcurses.py b/src/unittest/python/test_mpcurses.py index f36fd51..d280e18 100644 --- a/src/unittest/python/test_mpcurses.py +++ b/src/unittest/python/test_mpcurses.py @@ -22,9 +22,9 @@ from queue import Empty from mpcurses.mpcurses import MPcurses -from mpcurses.mpcurses import NoActiveProcesses +from mpmq.mpmq import NoActiveProcesses from mpcurses.mpcurses import OnDict -from mpcurses.handler import queue_handler +from mpmq.handler import queue_handler import sys import logging