From e8a8f2c098229e33902cc89ecb2e8ed128fc4c5a Mon Sep 17 00:00:00 2001 From: Nick Moore Date: Tue, 12 Sep 2023 14:52:09 +1000 Subject: [PATCH] Progress meters --- countess/core/pipeline.py | 28 ++++++++++++++++------- countess/gui/main.py | 48 +++++++++++++++++++++++++++------------ 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/countess/core/pipeline.py b/countess/core/pipeline.py index d0557da..0ea9cec 100644 --- a/countess/core/pipeline.py +++ b/countess/core/pipeline.py @@ -10,20 +10,23 @@ PRERUN_ROW_LIMIT = 100000 -def multi_iterator_map(function, values, args) -> Iterable: +def multi_iterator_map(function, values, args, progress_cb=None) -> Iterable: """Pretty much equivalent to: interleave_longest(function(v, *args) for v in values) but runs in multiple processes using a queue to organize `values` and another queue to organize the returned values.""" - nproc = min(((cpu_count() or 1) + 1, len(values))) + nproc = min(((cpu_count() or 1), len(values))) queue1: Queue = Queue() queue2: Queue = Queue(maxsize=nproc) for v in values: queue1.put(v) + if progress_cb: + progress_cb(0) + def __target(): try: while True: @@ -37,11 +40,15 @@ def __target(): for p in processes: p.start() - while any(p.is_alive() for p in processes): + while alive_count := sum(1 if p.is_alive() else 0 for p in processes): try: yield queue2.get(timeout=1) except Empty: - pass + if progress_cb: + progress_cb((100 * (len(values) - queue1.qsize() - alive_count)) // len(values)) + + if progress_cb: + progress_cb(100) @dataclass @@ -76,6 +83,8 @@ def process_parent_iterables(self, logger): iters_dict = {p.name: iter(p.result) for p in self.parent_nodes} while iters_dict: + logger.progress(self.name, None) + for name, it in list(iters_dict.items()): try: yield from self.plugin.process(next(it), name, logger) @@ -84,13 +93,15 @@ def process_parent_iterables(self, logger): yield from self.plugin.finished(name, logger) yield from self.plugin.finalize(logger) + logger.progress(self.name, 100) + def execute(self, logger: Logger, row_limit: Optional[int] = None): assert row_limit is None or isinstance(row_limit, int) if self.plugin is None: self.result = [] return - elif self.result and not self.is_dirty: + elif row_limit is not None and self.result and not self.is_dirty: return elif isinstance(self.plugin, FileInputPlugin): num_files = self.plugin.num_files() @@ -100,7 +111,10 @@ def execute(self, logger: Logger, row_limit: Optional[int] = None): row_limit_each_file = row_limit // num_files if row_limit is not None else None self.result = multi_iterator_map( - self.plugin.load_file, range(0, num_files), args=(logger, row_limit_each_file) + self.plugin.load_file, + range(0, num_files), + args=(logger, row_limit_each_file), + progress_cb=lambda p: logger.progress(self.name, p), ) elif isinstance(self.plugin, ProcessPlugin): self.plugin.prepare([p.name for p in self.parent_nodes], row_limit) @@ -126,13 +140,11 @@ def prerun(self, logger: Logger, row_limit=PRERUN_ROW_LIMIT): assert isinstance(logger, Logger) if self.is_dirty and self.plugin: - logger.progress("Start") for parent_node in self.parent_nodes: parent_node.prerun(logger, row_limit) self.load_config(logger) self.execute(logger, row_limit) self.is_dirty = False - logger.progress("Done") def mark_dirty(self): self.is_dirty = True diff --git a/countess/gui/main.py b/countess/gui/main.py index cd8872b..9bbcd3c 100644 --- a/countess/gui/main.py +++ b/countess/gui/main.py @@ -1,3 +1,4 @@ +import multiprocessing import re import sys import threading @@ -5,12 +6,11 @@ import webbrowser from tkinter import filedialog, messagebox, ttk from typing import Optional -import multiprocessing + import psutil from countess import VERSION from countess.core.config import export_config_graphviz, read_config, write_config -from countess.core.logger import ConsoleLogger from countess.core.pipeline import PipelineGraph from countess.core.plugins import get_plugin_classes from countess.gui.config import PluginConfigurator @@ -218,10 +218,11 @@ def config_change_task_callback_2(self): self.config_canvas.yview_moveto(pos1) self.config_scrollbar.set(pos1, pos2) - if self.logger.count == 0: + if self.logger_subframe.count == 0: self.logger_subframe.grid_forget() - else: - self.logger.progress_hide() + + # else: + # self.logger_subframe.progress_hide() def choose_plugin(self, plugin_class): self.node.plugin = plugin_class() @@ -269,26 +270,43 @@ def __init__(self, graph): self.logger_frame = LoggerFrame(self.toplevel) self.logger_frame.grid(row=1, column=0, stick=tk.NSEW) - self.logger = self.logger_frame.get_logger('') + self.logger = self.logger_frame.get_logger("") + self.logger.info("Started") - tk.Button(self.toplevel, text="Stop", command=self.stop).grid(row=2, column=0, sticky=tk.EW) + self.button = tk.Button(self.toplevel, text="Stop", command=self.on_button) + self.button.grid(row=2, column=0, sticky=tk.EW) self.process = multiprocessing.Process(target=self.subproc) self.process.start() + self.poll() def subproc(self): self.graph.run(self.logger) def poll(self): - if self.process.is_alive(): - self.logger.poll() - self.logger_frame.after(1000, self.poll) - - def stop(self): - for p in psutil.Process(self.process.pid).children(recursive=True): - p.terminate() - self.process.terminate() + if self.process: + if self.process.is_alive(): + self.logger_frame.after(1000, self.poll) + else: + self.logger.info("Finished") + self.process = None + self.button["text"] = "Close" + self.logger_frame.poll() + + def on_button(self): + if self.process: + for p in psutil.Process(self.process.pid).children(recursive=True): + p.terminate() + self.process.terminate() + self.button["text"] = "Close" + self.process = None + + self.logger.info("Stopped") + self.logger_frame.poll() + + else: + self.toplevel.destroy() class MainWindow: