Skip to content

Commit

Permalink
Multiprocessing! Works.
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzoic committed Jul 13, 2023
1 parent f0768d7 commit d7fe546
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
62 changes: 59 additions & 3 deletions countess/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,63 @@
from dataclasses import dataclass, field
from typing import Any, Optional

from queue import Empty
import multiprocessing
import time
import psutil

from countess.core.logger import Logger
from countess.core.plugins import BasePlugin, get_plugin_classes

PRERUN_ROW_LIMIT = 100000


class MultiprocessingProxy:

def __init__(self, iterator, name):
self.iterator = iterator
self.process = None
self.queue = multiprocessing.Queue(maxsize=3)
self.name = name

def target(self):
for x in self.iterator:
self.queue.put(x)

# Throttle back if system memory is running
# low.
# XXX risk of deadlocks, of course.
while psutil.virtual_memory().percent > 75:
time.sleep(1)

self.queue.close()

def __iter__(self):
self.process = multiprocessing.Process(target=self.target)
self.process.start()
return MultiprocessingProxyIterator(self.process, self.queue, self.name)

class MultiprocessingProxyIterator:

def __init__(self, process, queue, name):
self.process = process
self.queue = queue
self.name = name
self.count = 0

def __next__(self):
while True:
try:
msg = self.queue.get(timeout=2)
self.count += 1
return msg
except Empty:
if not self.process.is_alive():
break
raise StopIteration



@dataclass
class PipelineNode:
name: str
Expand Down Expand Up @@ -37,12 +88,17 @@ def execute(self, logger: Logger, row_limit: Optional[int] = None):
inputs = {p.name: p.result for p in self.parent_nodes}
self.result = []
if self.plugin:
print(f"EXECUTE {self.name} {self.plugin} {inputs}")
try:
self.result = self.plugin.process_inputs(inputs, logger, row_limit)
if not isinstance(self.result, (bytes, str)) and (row_limit is not None or len(self.child_nodes) > 1):
# XXX freeze to handle fan-out and reloading.
if isinstance(self.result, (bytes, str)):
pass
elif row_limit is not None:
self.result = list(self.result)
#if not isinstance(self.result, (bytes, str)) and (row_limit is not None or len(self.child_nodes) > 1):
# XXX freeze to handle fan-out and reloading.
# self.result = list(self.result)
elif hasattr(self.result, '__iter__'):
self.result = MultiprocessingProxy(self.result, self.name)
except Exception as exc: # pylint: disable=broad-exception-caught
logger.exception(exc)

Expand Down
3 changes: 2 additions & 1 deletion countess/gui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def show_notes_widget(self, notes=""):
def show_preview_subframe(self):
if self.preview_subframe:
self.preview_subframe.destroy()
if isinstance(self.node.result, str):

if isinstance(self.node.result, (str, bytes)):
self.preview_subframe = tk.Frame(self.frame)
self.preview_subframe.rowconfigure(1, weight=1)
n_lines = len(self.node.result.splitlines())
Expand Down

0 comments on commit d7fe546

Please sign in to comment.