diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index fad35bb..eeefb59 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -14,9 +14,9 @@ jobs: matrix: version: ['3.7', '3.8', '3.9', '3.10'] name: Build Python Docker images - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: build mpmq ${{ matrix.version }} image run: docker image build --build-arg PYTHON_VERSION=${{ matrix.version }} -t mpmq:${{ matrix.version }} . diff --git a/README.md b/README.md index 41729be..d42e452 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # mpmq [![GitHub Workflow Status](https://github.com/soda480/mpmq/workflows/build/badge.svg)](https://github.com/soda480/mpmq/actions) [![vulnerabilities](https://img.shields.io/badge/vulnerabilities-None-brightgreen)](https://pypi.org/project/bandit/) -[![coverage](https://img.shields.io/badge/coverage-100%25-brightgreen)](https://pybuilder.io/) +[![coverage](https://img.shields.io/badge/coverage-99%25-brightgreen)](https://pybuilder.io/) [![complexity](https://img.shields.io/badge/complexity-A-brightgreen)](https://radon.readthedocs.io/en/latest/api.html#module-radon.complexity) [![PyPI version](https://badge.fury.io/py/mpmq.svg)](https://badge.fury.io/py/mpmq) [![python](https://img.shields.io/badge/python-3.7%20%7C%203.8%20%7C%203.9%20%7C%203.10-teal)](https://www.python.org/downloads/) diff --git a/build.py b/build.py index 57314b1..b1e2df6 100644 --- a/build.py +++ b/build.py @@ -30,7 +30,7 @@ authors = [Author('Emilio Reyes', 'emilio.reyes@intel.com')] summary = 'Mpmq is an abstraction of the Python multiprocessing library providing execution pooling and message queuing capabilities.' url = 'https://github.com/soda480/mpmq' -version = '0.3.2' +version = '0.4.0' default_task = [ 'clean', 'analyze', diff --git a/src/main/python/mpmq/mpmq.py b/src/main/python/mpmq/mpmq.py index dfa1949..eef03d1 100644 --- a/src/main/python/mpmq/mpmq.py +++ b/src/main/python/mpmq/mpmq.py @@ -59,6 +59,8 @@ def __init__(self, function, *, process_data=None, shared_data=None, processes_t self.process_queue = SimpleQueue() self.processes_to_start = processes_to_start if processes_to_start else len(self.process_data) self.timeout = timeout if timeout else TIMEOUT + self.active_processes = 0 + self.completed_processes = 0 def populate_process_queue(self): """ populate process queue from process data offset @@ -82,8 +84,10 @@ def start_processes(self): logger.debug('the process queue is empty - no more processes need to be started') break self.start_next_process() - active_processes = sum(meta['active'] for _, meta in self.processes.items()) - logger.info(f'started {active_processes} background processes') + logger.info(f'started {self.active_processes} background processes') + + def on_start_process(self): + pass def start_next_process(self): """ start next process in the process queue @@ -103,20 +107,21 @@ def start_next_process(self): 'process': process, 'start_time': datetime.datetime.now(), 'stop_time': None, - 'duration': None, - 'active': True + 'duration': None } + self.active_processes += 1 + self.on_start_process() def terminate_processes(self): """ terminate all active processes """ for offset, meta in self.processes.items(): process = meta['process'] - if not meta['active']: + if not meta['process'].is_alive(): continue logger.info(f"terminating process at offset:{offset} with id:{process.pid} name:{process.name}") process.terminate() - meta['active'] = False + self.active_processes = 0 def purge_process_queue(self): """ purge process queue @@ -134,6 +139,9 @@ def get_duration(start_time, stop_time): duration = str(datetime.datetime.strptime(stop, '%H:%M:%S') - datetime.datetime.strptime(start, '%H:%M:%S')) return duration + def on_complete_process(self): + pass + def complete_process(self, offset): """ complete the process at offset """ @@ -144,7 +152,9 @@ def complete_process(self, offset): meta['duration'] = self.get_duration(meta['start_time'], meta['stop_time']) logger.info(f"joining process at offset:{offset} with id:{process.pid} name:{process.name}") process.join(self.timeout) - meta['active'] = False + self.active_processes -= 1 + self.completed_processes += 1 + self.on_complete_process() def get_results(self): """ return results of function execution from all processes @@ -170,15 +180,14 @@ def get_message(self): """ message = self.message_queue.get(False) match = re.match(r'^#(?P\d+)-(?PDONE|ERROR)$', message) + offset = None + control = None if match: - return { - 'offset': int(match.group('offset')), - 'control': match.group('control'), - 'message': message - } + offset = int(match.group('offset')) + control = match.group('control') return { - 'offset': None, - 'control': None, + 'offset': offset, + 'control': control, 'message': message } @@ -189,10 +198,9 @@ def process_control_message(self, offset, control): self.complete_process(offset) if self.process_queue.empty(): logger.info('the to process queue is empty') - active_processes = sum(meta['active'] for _, meta in self.processes.items()) - if not active_processes: + if not self.active_processes: raise NoActiveProcesses() - logger.debug(f'there are {active_processes} background processes still alive') + logger.debug(f'there are {self.active_processes} background processes still alive') else: self.start_next_process() else: diff --git a/src/unittest/python/test_mpmq.py b/src/unittest/python/test_mpmq.py index 5fce721..3b4d2df 100644 --- a/src/unittest/python/test_mpmq.py +++ b/src/unittest/python/test_mpmq.py @@ -82,9 +82,10 @@ def test__start_processes_Should_CallStartNextProcess_When_ProcessesToStartGreat client.start_processes() self.assertEqual(len(start_next_process_patch.mock_calls), 0) + @patch('mpmq.MPmq.on_start_process') @patch('mpmq.mpmq.QueueHandlerDecorator') @patch('mpmq.mpmq.Process') - def test__start_next_process_Should_CallExpected_When_Called(self, process_patch, queue_handler_mock, *patches): + def test__start_next_process_Should_CallExpected_When_Called(self, process_patch, queue_handler_mock, on_start_process_patch, *patches): process_mock = Mock() process_patch.return_value = process_mock @@ -102,6 +103,7 @@ def test__start_next_process_Should_CallExpected_When_Called(self, process_patch 'offset': 0, 'result_queue': client.result_queue }) + on_start_process_patch.assert_called_once_with() def test__terminate_processes_Should_CallExpected_When_Called(self, *patches): function_mock = Mock(__name__='mockfunc') @@ -122,17 +124,19 @@ def test__purge_process_queue_Should_PurgeProcessQueue_When_Called(self, *patche client.purge_process_queue() self.assertTrue(client.process_queue.empty()) + @patch('mpmq.MPmq.on_complete_process') @patch('mpmq.mpmq.datetime') @patch('mpmq.MPmq.get_duration') - def test__complete_process_Should_CallExpected_When_Called(self, get_duration_patch, datetime_patch, *patches): + def test__complete_process_Should_CallExpected_When_Called(self, get_duration_patch, datetime_patch, on_complete_process_patch, *patches): function_mock = Mock(__name__='mockfunc') process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}] client = MPmq(function=function_mock, process_data=process_data) process_mock = Mock(pid=121372, name='Process-1') process_mock.name = 'Process-1' - client.processes['0'] = {'process': process_mock, 'start_time': '--time--', 'stop_time': None, 'duration': None, 'active': True} + client.processes['0'] = {'process': process_mock, 'start_time': '--time--', 'stop_time': None, 'duration': None} client.complete_process('0') - self.assertEqual(client.processes['0'], {'process': process_mock, 'start_time': '--time--', 'stop_time': datetime_patch.datetime.now.return_value, 'duration': get_duration_patch.return_value, 'active': False}) + self.assertEqual(client.processes['0'], {'process': process_mock, 'start_time': '--time--', 'stop_time': datetime_patch.datetime.now.return_value, 'duration': get_duration_patch.return_value}) + on_complete_process_patch.assert_called_once_with() def test__get_results_Should_CallExpected_When_Called(self, *patches): result_queue_mock = Mock() @@ -236,7 +240,7 @@ def test__process_control_message_Should_PurgeProcessQueue_When_ControlError(sel def test__process_control_message_Should_DoNothing_When_ControlDoneAndProcessQueueEmptyAndActiveProcesses(self, *patches): process_data = [{'range': '0-1'}] client = MPmq(function=Mock(__name__='mockfunc'), process_data=process_data) - + client.active_processes = 1 process_queue_mock = Mock() process_queue_mock.empty.return_value = True client.process_queue = process_queue_mock