From d7fe5469466bd937c44106c7800bdf271b0d158e Mon Sep 17 00:00:00 2001 From: Nick Moore Date: Thu, 13 Jul 2023 13:45:36 +1000 Subject: [PATCH] Multiprocessing! Works. --- countess/core/pipeline.py | 62 +++++++++++++++++++++++++++++++++++++-- countess/gui/main.py | 3 +- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/countess/core/pipeline.py b/countess/core/pipeline.py index ca5026f..c658d1f 100644 --- a/countess/core/pipeline.py +++ b/countess/core/pipeline.py @@ -1,12 +1,63 @@ from dataclasses import dataclass, field from typing import Any, Optional +from queue import Empty +import multiprocessing +import time +import psutil + from countess.core.logger import Logger from countess.core.plugins import BasePlugin, get_plugin_classes PRERUN_ROW_LIMIT = 100000 +class MultiprocessingProxy: + + def __init__(self, iterator, name): + self.iterator = iterator + self.process = None + self.queue = multiprocessing.Queue(maxsize=3) + self.name = name + + def target(self): + for x in self.iterator: + self.queue.put(x) + + # Throttle back if system memory is running + # low. + # XXX risk of deadlocks, of course. + while psutil.virtual_memory().percent > 75: + time.sleep(1) + + self.queue.close() + + def __iter__(self): + self.process = multiprocessing.Process(target=self.target) + self.process.start() + return MultiprocessingProxyIterator(self.process, self.queue, self.name) + +class MultiprocessingProxyIterator: + + def __init__(self, process, queue, name): + self.process = process + self.queue = queue + self.name = name + self.count = 0 + + def __next__(self): + while True: + try: + msg = self.queue.get(timeout=2) + self.count += 1 + return msg + except Empty: + if not self.process.is_alive(): + break + raise StopIteration + + + @dataclass class PipelineNode: name: str @@ -37,12 +88,17 @@ def execute(self, logger: Logger, row_limit: Optional[int] = None): inputs = {p.name: p.result for p in self.parent_nodes} self.result = [] if self.plugin: - print(f"EXECUTE {self.name} {self.plugin} {inputs}") try: self.result = self.plugin.process_inputs(inputs, logger, row_limit) - if not isinstance(self.result, (bytes, str)) and (row_limit is not None or len(self.child_nodes) > 1): - # XXX freeze to handle fan-out and reloading. + if isinstance(self.result, (bytes, str)): + pass + elif row_limit is not None: self.result = list(self.result) + #if not isinstance(self.result, (bytes, str)) and (row_limit is not None or len(self.child_nodes) > 1): + # XXX freeze to handle fan-out and reloading. + # self.result = list(self.result) + elif hasattr(self.result, '__iter__'): + self.result = MultiprocessingProxy(self.result, self.name) except Exception as exc: # pylint: disable=broad-exception-caught logger.exception(exc) diff --git a/countess/gui/main.py b/countess/gui/main.py index aff8a92..93cdfdb 100644 --- a/countess/gui/main.py +++ b/countess/gui/main.py @@ -146,7 +146,8 @@ def show_notes_widget(self, notes=""): def show_preview_subframe(self): if self.preview_subframe: self.preview_subframe.destroy() - if isinstance(self.node.result, str): + + if isinstance(self.node.result, (str, bytes)): self.preview_subframe = tk.Frame(self.frame) self.preview_subframe.rowconfigure(1, weight=1) n_lines = len(self.node.result.splitlines())