Skip to content

Commit

Permalink
Merge pull request #25 from soda480/0.2.3
Browse files Browse the repository at this point in the history
Add get_process_data capability
  • Loading branch information
soda480 authored Mar 1, 2021
2 parents 4bda02f + e2f54ab commit 316d897
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 273 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ MPcurses(
Executing the code above results in the following:
![example](https://raw.githubusercontent.com/soda480/mpcurses/master/docs/images/example-multi.gif)

Serveral [examples](/examples) are included to help introduce the mpcurses library. Note the functions contained in all the examples are Python functions that have no context about multiprocessing or curses, they simply perform a function on a given dataset. Mpcurses takes care of setting up the multiprocessing, configuring the curses screen and maintaining the thread-safe queues that are required for inter-process communication.
Serveral [examples](https://github.com/soda480/mpcurses/tree/master/examples) are included to help introduce the mpcurses library. Note the functions contained in all the examples are Python functions that have no context about multiprocessing or curses, they simply perform a function on a given dataset. Mpcurses takes care of setting up the multiprocessing, configuring the curses screen and maintaining the thread-safe queues that are required for inter-process communication.

#### [example1](/examples/example1.py)
#### [example1](https://github.com/soda480/mpcurses/blob/master/examples/example1.py)
Execute a function that processes a list of random items. The screen maintains indicators showing the number of items that have been processed. Two lists are maintained displaying the items that had errors and warnings.
![example1](https://raw.githubusercontent.com/soda480/mpcurses/master/docs/images/example1.gif)

#### [example2](/examples/example2.py)
#### [example2](https://github.com/soda480/mpcurses/blob/master/examples/example2.py)
Execute a function that processes a list of random items. Execution is scaled across three processes where each is responsible for processing items for a particular group. The screen maintains indicators displaying the items that had errors and warnings for each group.
![example2](https://raw.githubusercontent.com/soda480/mpcurses/master/docs/images/example2.gif)

#### [example3](/examples/example3.py)
#### [example3](https://github.com/soda480/mpcurses/blob/master/examples/example3.py)
Execute a function that calculates prime numbers for a set range of integers. Execution is scaled across 10 different processes where each process computes the primes on a different set of numbers. For example, the first process computes primes for the set 1-10K, second process 10K-20K, third process 20K-30K, etc. The screen keeps track of the number of prime numbers encountered for each set and maintains a progress bar for each process.
![example3](https://raw.githubusercontent.com/soda480/mpcurses/master/docs/images/example3.gif)

Expand Down
2 changes: 1 addition & 1 deletion build.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
Author('Emilio Reyes', 'emilio.reyes@intel.com')]
summary = 'Mpcurses is an abstraction of the Python curses and multiprocessing libraries providing function execution and runtime visualization capabilities'
url = 'https://github.com/soda480/mpcurses'
version = '0.2.2'
version = '0.2.3'
default_task = [
'clean',
'analyze',
Expand Down
32 changes: 25 additions & 7 deletions examples/example5.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,35 @@ def get_hex():
return uuid.uuid4().hex.upper()


def get_servers(bays):
def get_servers(bays=None):
""" getting server data from enclosure
"""
servers = []
for bay in bays:
servers.append({
'bay': str(bay).zfill(2),
'firmware version': get_current_firmware(),
'servername': 'srv{}.company.com'.format(get_hex()[0:6]),
})
sleep(5)
return servers

def get_servers2(**kwargs):
""" getting server data from enclosure if being passed as get_process_data value to MPcurses then should accept **kwargs as argument
"""
bays = range(1, 17)
servers = []
for bay in bays:
servers.append({
'bay': str(bay).zfill(2),
'firmware version': get_current_firmware(),
'servername': 'srv{}.company.com'.format(get_hex()[0:6]),
})
sleep(5)
return servers



def simulate_error(value, message):
if value > 9:
raise Exception(message)
Expand Down Expand Up @@ -96,15 +114,15 @@ def get_current_firmware():
def main():
""" main program
"""
bays = range(1,17)
process_data = get_servers(bays)
MPcurses(
mpcurses = MPcurses(
function=update_firmware,
process_data=process_data,
get_process_data=get_servers,
processes_to_start=5,
screen_layout=get_screen_layout()).execute()
screen_layout=get_screen_layout(),
shared_data={'bays': range(1, 17)})
mpcurses.execute()

if any([process['result'] for process in process_data]):
if any([process['result'] for process in mpcurses.process_data]):
sys.exit(-1)


Expand Down
1 change: 0 additions & 1 deletion src/main/python/mpcurses/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@


from .handler import queue_handler
from .process import execute
from .mpcurses import MPcurses
109 changes: 64 additions & 45 deletions src/main/python/mpcurses/mpcurses.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
from queue import Empty

from .screen import initialize_screen
from .screen import initialize_screen_offsets
from .screen import finalize_screen
from .screen import update_screen
from .screen import echo_to_screen
from .screen import refresh_screen
from .screen import validate_screen_layout
from .screen import validate_screen_layout_processes
from .screen import update_screen_status
from .screen import blink
from .handler import queue_handler
Expand Down Expand Up @@ -77,44 +78,47 @@ def pop(self, *args):
class MPcurses():
""" mpcurses process pool
"""
def __init__(self, function, *, process_data=None, shared_data=None, processes_to_start=None, screen_layout=None, init_messages=None, setup_process_queue=True):
def __init__(self, function, *, process_data=None, shared_data=None, processes_to_start=None, screen_layout=None, init_messages=None, get_process_data=None):
""" MPCstate constructor
"""
if getattr(function, '__name__', None) == '_queue_handler':
# enable backwards compatibility for use cases where
# function was already decorated with queue_handler
# NOTE: this does not work for functions with multiple decorators
logger.debug('function is already decorated with queue_handler')
self.function = function
else:
logger.debug(f'decorating function {function.__name__} with queue_handler')
self.function = queue_handler(function)
if process_data and get_process_data:
raise ValueError('process_data and get_process_data values cannot both be set')

self.process_data = [{}] if process_data is None else process_data
if get_process_data and not callable(get_process_data):
raise ValueError('get_process_data value must be a callable function')

self.shared_data = {} if shared_data is None else shared_data
if get_process_data and not screen_layout:
raise ValueError('get_process_data can only be set if screen_layout value is provided')

self.processes_to_start = len(self.process_data) if processes_to_start is None else processes_to_start
logger.debug(f'decorating function {function.__name__} with queue_handler')
self.function = queue_handler(function)

self.active_processes = OnDict(on_change=self.on_state_change)
self.screen_layout = screen_layout

self.processes_to_start = processes_to_start

self.get_process_data = get_process_data

if self.get_process_data:
self.process_data = None
else:
# things we can set when get_process_data is not specified
self.process_data = [{}] if process_data is None else process_data
if not self.processes_to_start:
self.processes_to_start = len(self.process_data)

self.shared_data = {} if shared_data is None else shared_data

self.process_data_offset = [(self.process_data.index(item), item) for item in self.process_data]
self.active_processes = OnDict(on_change=self.on_state_change)

self.message_queue = Queue()

self.result_queue = Queue()

self.process_queue = SimpleQueue()

if screen_layout:
validate_screen_layout(len(self.process_data), self.processes_to_start, screen_layout)
self.screen_layout = screen_layout

self.init_messages = [] if init_messages is None else init_messages

if setup_process_queue:
self.setup_process_queue()

self.completed_processes = 0

self.screen = None
Expand All @@ -129,15 +133,6 @@ def __init__(self, function, *, process_data=None, shared_data=None, processes_t
if self.blink_screen:
self.blink_queue = Queue()

def setup_process_queue(self):
""" return queue containing data for each process
"""
logger.debug('populating the process queue')
for item in self.process_data_offset:
logger.debug(f'adding {item} to the process queue')
self.process_queue.put(item)
logger.debug(f'added {self.process_queue.qsize()} items to the process queue')

def start_blink_process(self):
""" start blink process
"""
Expand All @@ -156,9 +151,21 @@ def stop_blink_process(self):
self.blink_process.terminate()
logger.debug('terminated blink process')

def populate_process_queue(self):
""" populate process queue from process data offset
"""
logger.debug('populating the process queue')
for offset, data in enumerate(self.process_data):
item = (offset, data)
logger.debug(f'adding {item} to the process queue')
self.process_queue.put(item)
logger.debug(f'added {self.process_queue.qsize()} items to the process queue')

def start_processes(self):
""" start processes
"""
self.populate_process_queue()

logger.debug(f'there are {self.process_queue.qsize()} items in the process queue')
logger.debug(f'starting {self.processes_to_start} background processes')
for _ in range(self.processes_to_start):
Expand All @@ -180,8 +187,7 @@ def start_next_process(self):
kwargs={
'message_queue': self.message_queue,
'offset': offset,
'result_queue': self.result_queue
})
'result_queue': self.result_queue})
# logger.debug(f'starting background process at offset {offset} with data {process_data}')
process.start()
logger.info(f'started background process at offset {offset} with process id {process.pid}')
Expand Down Expand Up @@ -240,22 +246,39 @@ def on_state_change(self, process_completed=True):
queued=self.process_queue.qsize(),
completed=self.completed_processes)

def execute_get_process_data(self):
""" execute get_process_data function
"""
if self.get_process_data:
doc = self.get_process_data.__doc__
update_screen_status(self.screen, 'get-process-data', self.screen_layout['_screen'], data=doc)
kwargs = self.shared_data if self.shared_data else {}
self.process_data = self.get_process_data(**kwargs)
update_screen_status(self.screen, 'get-process-data', self.screen_layout['_screen'])
if not self.processes_to_start:
self.processes_to_start = len(self.process_data)

def setup_screen(self):
""" setup screen
"""
initialize_screen(self.screen, self.screen_layout, len(self.process_data_offset))
initialize_screen(self.screen, self.screen_layout)

self.execute_get_process_data()

# initialize screen with offsets
initialize_screen_offsets(self.screen, self.screen_layout, len(self.process_data), self.processes_to_start)

# update screen with all initialization messages if they were provided
for message in self.init_messages:
update_screen(message, self.screen, self.screen_layout)

# echo all process data to screen
for index, data in enumerate(self.process_data_offset):
echo_to_screen(self.screen, data[1], self.screen_layout, offset=index)

# echo shared data to screen
echo_to_screen(self.screen, self.shared_data, self.screen_layout)

# echo all process data to screen
for offset, data in enumerate(self.process_data):
echo_to_screen(self.screen, data, self.screen_layout, offset=offset)

self.start_blink_process()

def teardown_screen(self):
Expand Down Expand Up @@ -292,10 +315,7 @@ def get_message(self):
# if blink is enabled then process blink message first
blink_message = self.get_blink_message()
if blink_message:
update_screen_status(
self.screen,
blink_message,
self.screen_layout['_screen'])
update_screen_status(self.screen, blink_message, self.screen_layout['_screen'])

offset = None
control = None
Expand All @@ -307,8 +327,7 @@ def get_message(self):
return {
'offset': offset,
'control': control,
'message': message
}
'message': message}

def process_control_message(self, offset, control):
""" process control message
Expand Down
48 changes: 0 additions & 48 deletions src/main/python/mpcurses/process.py

This file was deleted.

Loading

0 comments on commit 316d897

Please sign in to comment.