Skip to content

Commit

Permalink
Progress meters
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzoic committed Sep 12, 2023
1 parent b308ecc commit e8a8f2c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
28 changes: 20 additions & 8 deletions countess/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@
PRERUN_ROW_LIMIT = 100000


def multi_iterator_map(function, values, args) -> Iterable:
def multi_iterator_map(function, values, args, progress_cb=None) -> Iterable:
"""Pretty much equivalent to:
interleave_longest(function(v, *args) for v in values)
but runs in multiple processes using a queue
to organize `values` and another queue to organize the
returned values."""

nproc = min(((cpu_count() or 1) + 1, len(values)))
nproc = min(((cpu_count() or 1), len(values)))
queue1: Queue = Queue()
queue2: Queue = Queue(maxsize=nproc)

for v in values:
queue1.put(v)

if progress_cb:
progress_cb(0)

def __target():
try:
while True:
Expand All @@ -37,11 +40,15 @@ def __target():
for p in processes:
p.start()

while any(p.is_alive() for p in processes):
while alive_count := sum(1 if p.is_alive() else 0 for p in processes):
try:
yield queue2.get(timeout=1)
except Empty:
pass
if progress_cb:
progress_cb((100 * (len(values) - queue1.qsize() - alive_count)) // len(values))

if progress_cb:
progress_cb(100)


@dataclass
Expand Down Expand Up @@ -76,6 +83,8 @@ def process_parent_iterables(self, logger):

iters_dict = {p.name: iter(p.result) for p in self.parent_nodes}
while iters_dict:
logger.progress(self.name, None)

for name, it in list(iters_dict.items()):
try:
yield from self.plugin.process(next(it), name, logger)
Expand All @@ -84,13 +93,15 @@ def process_parent_iterables(self, logger):
yield from self.plugin.finished(name, logger)
yield from self.plugin.finalize(logger)

logger.progress(self.name, 100)

def execute(self, logger: Logger, row_limit: Optional[int] = None):
assert row_limit is None or isinstance(row_limit, int)

if self.plugin is None:
self.result = []
return
elif self.result and not self.is_dirty:
elif row_limit is not None and self.result and not self.is_dirty:
return
elif isinstance(self.plugin, FileInputPlugin):
num_files = self.plugin.num_files()
Expand All @@ -100,7 +111,10 @@ def execute(self, logger: Logger, row_limit: Optional[int] = None):

row_limit_each_file = row_limit // num_files if row_limit is not None else None
self.result = multi_iterator_map(
self.plugin.load_file, range(0, num_files), args=(logger, row_limit_each_file)
self.plugin.load_file,
range(0, num_files),
args=(logger, row_limit_each_file),
progress_cb=lambda p: logger.progress(self.name, p),
)
elif isinstance(self.plugin, ProcessPlugin):
self.plugin.prepare([p.name for p in self.parent_nodes], row_limit)
Expand All @@ -126,13 +140,11 @@ def prerun(self, logger: Logger, row_limit=PRERUN_ROW_LIMIT):
assert isinstance(logger, Logger)

if self.is_dirty and self.plugin:
logger.progress("Start")
for parent_node in self.parent_nodes:
parent_node.prerun(logger, row_limit)
self.load_config(logger)
self.execute(logger, row_limit)
self.is_dirty = False
logger.progress("Done")

def mark_dirty(self):
self.is_dirty = True
Expand Down
48 changes: 33 additions & 15 deletions countess/gui/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import multiprocessing
import re
import sys
import threading
import tkinter as tk
import webbrowser
from tkinter import filedialog, messagebox, ttk
from typing import Optional
import multiprocessing

import psutil

from countess import VERSION
from countess.core.config import export_config_graphviz, read_config, write_config
from countess.core.logger import ConsoleLogger
from countess.core.pipeline import PipelineGraph
from countess.core.plugins import get_plugin_classes
from countess.gui.config import PluginConfigurator
Expand Down Expand Up @@ -218,10 +218,11 @@ def config_change_task_callback_2(self):
self.config_canvas.yview_moveto(pos1)
self.config_scrollbar.set(pos1, pos2)

if self.logger.count == 0:
if self.logger_subframe.count == 0:
self.logger_subframe.grid_forget()
else:
self.logger.progress_hide()

# else:
# self.logger_subframe.progress_hide()

def choose_plugin(self, plugin_class):
self.node.plugin = plugin_class()
Expand Down Expand Up @@ -269,26 +270,43 @@ def __init__(self, graph):

self.logger_frame = LoggerFrame(self.toplevel)
self.logger_frame.grid(row=1, column=0, stick=tk.NSEW)
self.logger = self.logger_frame.get_logger('')
self.logger = self.logger_frame.get_logger("")
self.logger.info("Started")

tk.Button(self.toplevel, text="Stop", command=self.stop).grid(row=2, column=0, sticky=tk.EW)
self.button = tk.Button(self.toplevel, text="Stop", command=self.on_button)
self.button.grid(row=2, column=0, sticky=tk.EW)

self.process = multiprocessing.Process(target=self.subproc)
self.process.start()

self.poll()

def subproc(self):
self.graph.run(self.logger)

def poll(self):
if self.process.is_alive():
self.logger.poll()
self.logger_frame.after(1000, self.poll)

def stop(self):
for p in psutil.Process(self.process.pid).children(recursive=True):
p.terminate()
self.process.terminate()
if self.process:
if self.process.is_alive():
self.logger_frame.after(1000, self.poll)
else:
self.logger.info("Finished")
self.process = None
self.button["text"] = "Close"
self.logger_frame.poll()

def on_button(self):
if self.process:
for p in psutil.Process(self.process.pid).children(recursive=True):
p.terminate()
self.process.terminate()
self.button["text"] = "Close"
self.process = None

self.logger.info("Stopped")
self.logger_frame.poll()

else:
self.toplevel.destroy()


class MainWindow:
Expand Down

0 comments on commit e8a8f2c

Please sign in to comment.