Skip to content

Commit

Permalink
feat: release 0.4.0 (#11)
Browse files Browse the repository at this point in the history
* ci: update build workflow
* feat: add on start and stop process hooks
* fix: simplify get_message function
* build: update build version
---------
Signed-off-by: Emilio Reyes <soda480@gmail.com>
  • Loading branch information
soda480 authored Sep 21, 2024
1 parent 2fb6831 commit 6131879
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ jobs:
matrix:
version: ['3.7', '3.8', '3.9', '3.10']
name: Build Python Docker images
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: build mpmq ${{ matrix.version }} image
run:
docker image build --build-arg PYTHON_VERSION=${{ matrix.version }} -t mpmq:${{ matrix.version }} .
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# mpmq
[![GitHub Workflow Status](https://github.com/soda480/mpmq/workflows/build/badge.svg)](https://github.com/soda480/mpmq/actions)
[![vulnerabilities](https://img.shields.io/badge/vulnerabilities-None-brightgreen)](https://pypi.org/project/bandit/)
[![coverage](https://img.shields.io/badge/coverage-100%25-brightgreen)](https://pybuilder.io/)
[![coverage](https://img.shields.io/badge/coverage-99%25-brightgreen)](https://pybuilder.io/)
[![complexity](https://img.shields.io/badge/complexity-A-brightgreen)](https://radon.readthedocs.io/en/latest/api.html#module-radon.complexity)
[![PyPI version](https://badge.fury.io/py/mpmq.svg)](https://badge.fury.io/py/mpmq)
[![python](https://img.shields.io/badge/python-3.7%20%7C%203.8%20%7C%203.9%20%7C%203.10-teal)](https://www.python.org/downloads/)
Expand Down
2 changes: 1 addition & 1 deletion build.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
authors = [Author('Emilio Reyes', 'emilio.reyes@intel.com')]
summary = 'Mpmq is an abstraction of the Python multiprocessing library providing execution pooling and message queuing capabilities.'
url = 'https://github.com/soda480/mpmq'
version = '0.3.2'
version = '0.4.0'
default_task = [
'clean',
'analyze',
Expand Down
42 changes: 25 additions & 17 deletions src/main/python/mpmq/mpmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def __init__(self, function, *, process_data=None, shared_data=None, processes_t
self.process_queue = SimpleQueue()
self.processes_to_start = processes_to_start if processes_to_start else len(self.process_data)
self.timeout = timeout if timeout else TIMEOUT
self.active_processes = 0
self.completed_processes = 0

def populate_process_queue(self):
""" populate process queue from process data offset
Expand All @@ -82,8 +84,10 @@ def start_processes(self):
logger.debug('the process queue is empty - no more processes need to be started')
break
self.start_next_process()
active_processes = sum(meta['active'] for _, meta in self.processes.items())
logger.info(f'started {active_processes} background processes')
logger.info(f'started {self.active_processes} background processes')

def on_start_process(self):
pass

def start_next_process(self):
""" start next process in the process queue
Expand All @@ -103,20 +107,21 @@ def start_next_process(self):
'process': process,
'start_time': datetime.datetime.now(),
'stop_time': None,
'duration': None,
'active': True
'duration': None
}
self.active_processes += 1
self.on_start_process()

def terminate_processes(self):
""" terminate all active processes
"""
for offset, meta in self.processes.items():
process = meta['process']
if not meta['active']:
if not meta['process'].is_alive():
continue
logger.info(f"terminating process at offset:{offset} with id:{process.pid} name:{process.name}")
process.terminate()
meta['active'] = False
self.active_processes = 0

def purge_process_queue(self):
""" purge process queue
Expand All @@ -134,6 +139,9 @@ def get_duration(start_time, stop_time):
duration = str(datetime.datetime.strptime(stop, '%H:%M:%S') - datetime.datetime.strptime(start, '%H:%M:%S'))
return duration

def on_complete_process(self):
pass

def complete_process(self, offset):
""" complete the process at offset
"""
Expand All @@ -144,7 +152,9 @@ def complete_process(self, offset):
meta['duration'] = self.get_duration(meta['start_time'], meta['stop_time'])
logger.info(f"joining process at offset:{offset} with id:{process.pid} name:{process.name}")
process.join(self.timeout)
meta['active'] = False
self.active_processes -= 1
self.completed_processes += 1
self.on_complete_process()

def get_results(self):
""" return results of function execution from all processes
Expand All @@ -170,15 +180,14 @@ def get_message(self):
"""
message = self.message_queue.get(False)
match = re.match(r'^#(?P<offset>\d+)-(?P<control>DONE|ERROR)$', message)
offset = None
control = None
if match:
return {
'offset': int(match.group('offset')),
'control': match.group('control'),
'message': message
}
offset = int(match.group('offset'))
control = match.group('control')
return {
'offset': None,
'control': None,
'offset': offset,
'control': control,
'message': message
}

Expand All @@ -189,10 +198,9 @@ def process_control_message(self, offset, control):
self.complete_process(offset)
if self.process_queue.empty():
logger.info('the to process queue is empty')
active_processes = sum(meta['active'] for _, meta in self.processes.items())
if not active_processes:
if not self.active_processes:
raise NoActiveProcesses()
logger.debug(f'there are {active_processes} background processes still alive')
logger.debug(f'there are {self.active_processes} background processes still alive')
else:
self.start_next_process()
else:
Expand Down
14 changes: 9 additions & 5 deletions src/unittest/python/test_mpmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ def test__start_processes_Should_CallStartNextProcess_When_ProcessesToStartGreat
client.start_processes()
self.assertEqual(len(start_next_process_patch.mock_calls), 0)

@patch('mpmq.MPmq.on_start_process')
@patch('mpmq.mpmq.QueueHandlerDecorator')
@patch('mpmq.mpmq.Process')
def test__start_next_process_Should_CallExpected_When_Called(self, process_patch, queue_handler_mock, *patches):
def test__start_next_process_Should_CallExpected_When_Called(self, process_patch, queue_handler_mock, on_start_process_patch, *patches):
process_mock = Mock()
process_patch.return_value = process_mock

Expand All @@ -102,6 +103,7 @@ def test__start_next_process_Should_CallExpected_When_Called(self, process_patch
'offset': 0,
'result_queue': client.result_queue
})
on_start_process_patch.assert_called_once_with()

def test__terminate_processes_Should_CallExpected_When_Called(self, *patches):
function_mock = Mock(__name__='mockfunc')
Expand All @@ -122,17 +124,19 @@ def test__purge_process_queue_Should_PurgeProcessQueue_When_Called(self, *patche
client.purge_process_queue()
self.assertTrue(client.process_queue.empty())

@patch('mpmq.MPmq.on_complete_process')
@patch('mpmq.mpmq.datetime')
@patch('mpmq.MPmq.get_duration')
def test__complete_process_Should_CallExpected_When_Called(self, get_duration_patch, datetime_patch, *patches):
def test__complete_process_Should_CallExpected_When_Called(self, get_duration_patch, datetime_patch, on_complete_process_patch, *patches):
function_mock = Mock(__name__='mockfunc')
process_data = [{'range': '0-1'}, {'range': '2-3'}, {'range': '4-5'}]
client = MPmq(function=function_mock, process_data=process_data)
process_mock = Mock(pid=121372, name='Process-1')
process_mock.name = 'Process-1'
client.processes['0'] = {'process': process_mock, 'start_time': '--time--', 'stop_time': None, 'duration': None, 'active': True}
client.processes['0'] = {'process': process_mock, 'start_time': '--time--', 'stop_time': None, 'duration': None}
client.complete_process('0')
self.assertEqual(client.processes['0'], {'process': process_mock, 'start_time': '--time--', 'stop_time': datetime_patch.datetime.now.return_value, 'duration': get_duration_patch.return_value, 'active': False})
self.assertEqual(client.processes['0'], {'process': process_mock, 'start_time': '--time--', 'stop_time': datetime_patch.datetime.now.return_value, 'duration': get_duration_patch.return_value})
on_complete_process_patch.assert_called_once_with()

def test__get_results_Should_CallExpected_When_Called(self, *patches):
result_queue_mock = Mock()
Expand Down Expand Up @@ -236,7 +240,7 @@ def test__process_control_message_Should_PurgeProcessQueue_When_ControlError(sel
def test__process_control_message_Should_DoNothing_When_ControlDoneAndProcessQueueEmptyAndActiveProcesses(self, *patches):
process_data = [{'range': '0-1'}]
client = MPmq(function=Mock(__name__='mockfunc'), process_data=process_data)

client.active_processes = 1
process_queue_mock = Mock()
process_queue_mock.empty.return_value = True
client.process_queue = process_queue_mock
Expand Down

0 comments on commit 6131879

Please sign in to comment.