Skip to content

Commit

Permalink
update logging & remove prints
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzoic committed Aug 13, 2024
1 parent 38ab26d commit b97ef47
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 41 deletions.
23 changes: 11 additions & 12 deletions countess/core/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,31 @@
import logging.handlers
import multiprocessing
import sys
import time

from .config import read_config

logging_queue: multiprocessing.Queue = multiprocessing.Queue()
logging.getLogger().addHandler(logging.handlers.QueueHandler(logging_queue))
logging.getLogger().setLevel(logging.INFO)

logging.handlers.QueueListener(logging_queue, logging.StreamHandler())

start_time = time.time()


def process_ini(config_filename):
def process_ini(config_filename) -> None:
graph = read_config(config_filename)
graph.run()


def run(argv):
def run(argv) -> None:
for config_filename in argv:
process_ini(config_filename)


def main():
def main() -> None:
logging_queue: multiprocessing.Queue = multiprocessing.Queue()
logging.getLogger().addHandler(logging.handlers.QueueHandler(logging_queue))
logging.getLogger().setLevel(logging.INFO)
logging_handler = logging.handlers.QueueListener(logging_queue, logging.StreamHandler())
logging_handler.start()

run(sys.argv[1:])

logging_handler.stop()


if __name__ == "__main__":
main() # pragma: no cover
9 changes: 0 additions & 9 deletions countess/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,12 @@
import io
import os.path
import re
import sys
from configparser import ConfigParser

from countess.core.pipeline import PipelineGraph, PipelineNode
from countess.core.plugins import load_plugin


def default_progress_callback(n, a, b, s=""):
print(f"{n:40s} {a:4d}/{b:4d} {s}")


def default_output_callback(output):
sys.stderr.write(repr(output))


def read_config_dict(name: str, base_dir: str, config_dict: dict) -> PipelineNode:
if "_module" in config_dict:
module_name = config_dict["_module"]
Expand Down
31 changes: 21 additions & 10 deletions countess/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def queue_output(self, result):
# XXX can we do this out-of-order if any queues are full?
for queue in self.output_queues:
queue.put(data)
logger.info("%s: %d/%d", self.name, self.counter_out, self.counter_in)

def finish_output(self):
for queue in self.output_queues:
Expand All @@ -139,6 +140,8 @@ def run_thread(self, row_limit: Optional[int] = None):
"""For each PipelineNode, this is run in its own thread."""
assert isinstance(self.plugin, (ProcessPlugin, FileInputPlugin))

logger.info("%s: 0%%", self.name)

self.plugin.prepare([node.name for node in self.parent_nodes], row_limit)

if len(self.parent_nodes) == 1:
Expand Down Expand Up @@ -177,6 +180,8 @@ def run_thread(self, row_limit: Optional[int] = None):
self.queue_output(self.plugin.finalize())
self.finish_output()

logger.info("%s: 100%%", self.name)

def load_config(self):
assert isinstance(self.plugin, BasePlugin)
if self.config:
Expand All @@ -203,10 +208,17 @@ def prerun(self, row_limit=PRERUN_ROW_LIMIT):
for data_in in parent_node.result:
self.plugin.preprocess(data_in, parent_node.name)
for data_in in parent_node.result:
self.result += list(self.plugin.process(data_in, parent_node.name))
self.result += list(self.plugin.finished(parent_node.name))
self.result += list(self.plugin.finalize())
for data_out in self.plugin.process(data_in, parent_node.name):
self.result.append(data_out)
logger.info("%s: %s/0", self.name, len(self.result))
for data_out in self.plugin.finished(parent_node.name):
self.result.append(data_out)
logger.info("%s: %s/0", self.name, len(self.result))
for data_out in self.plugin.finalize():
self.result.append(data_out)
logger.info("%s: %s/0", self.name, len(self.result))
self.is_dirty = False
logger.info("%s: 100%%", self.name)

def mark_dirty(self):
self.is_dirty = True
Expand Down Expand Up @@ -298,22 +310,21 @@ def traverse_nodes_backwards(self):

def run(self):
threads_and_nodes = []
logger.info("Starting")
start_time = time.time()
for node in self.traverse_nodes_backwards():
node.load_config()
threads_and_nodes.append((Thread(target=node.run_thread), node))

for thread, _ in threads_and_nodes:
thread.start()

while True:
print("------------------")
for thread, node in threads_and_nodes[::-1]:
if thread.is_alive():
print("%-40s %d %d" % (node.name, node.counter_in, node.counter_out))
if not any(t.is_alive() for t, _ in threads_and_nodes):
break
while any(t.is_alive() for t, _ in threads_and_nodes):
logger.info("Elapsed time: %d", time.time() - start_time)
time.sleep(10)

logger.info("Finished, elapsed time: %d", time.time() - start_time)

def reset(self):
for node in self.nodes:
node.result = None
Expand Down
7 changes: 5 additions & 2 deletions countess/core/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,15 @@ def prepare(self, sources: List[str], row_limit: Optional[int] = None):

def finalize(self) -> Iterable:
num_files = self.num_files()
logger.info("%s: 0%%", self.name)
if num_files > 1:
row_limit_per_file = self.row_limit // num_files if self.row_limit else None
yield from multiprocess_map(self.load_file, range(0, num_files), row_limit_per_file)
for n, r in enumerate(multiprocess_map(self.load_file, range(0, num_files), row_limit_per_file)):
logger.info("%s: %d%%", self.name, int(100 * n / num_files))
yield r
elif num_files == 1:
yield from self.load_file(0, self.row_limit)
logger.info("%s: 100%%", self.name)


class PandasProcessPlugin(ProcessPlugin):
Expand Down Expand Up @@ -404,7 +408,6 @@ def process_dataframe(self, dataframe: pd.DataFrame) -> Optional[pd.DataFrame]:
dataframe_merged.reset_index("__tmpidx", drop=True, inplace=True)

except Exception as exc: # pylint: disable=broad-exception-caught
print(repr(exc))
logger.warning("Exception", exc_info=exc)
return None

Expand Down
45 changes: 41 additions & 4 deletions countess/gui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@
from countess.gui.mini_browser import MiniBrowserFrame
from countess.gui.tabular import TabularDataFrame
from countess.gui.tree import FlippyCanvas, GraphWrapper
from countess.gui.widgets import ResizingFrame, ask_open_filename, ask_saveas_filename, get_icon, info_button
from countess.gui.widgets import (
LabeledProgressbar,
ResizingFrame,
ask_open_filename,
ask_saveas_filename,
get_icon,
info_button,
)
from countess.utils.pandas import concat_dataframes

# import faulthandler
Expand Down Expand Up @@ -341,21 +348,51 @@ def emit(self, record: logging.LogRecord) -> None:


class LoggerFrame(tk.Frame):
def __init__(self, *a, **k):
def __init__(self, *a, **k) -> None:
super().__init__(*a, **k)
self.count = 0
self.text = tk.Text(self)
self.text.grid(sticky=tk.NSEW)
self.progress_bars: dict[str, LabeledProgressbar] = {}

# Start a QueueListener in its own thread,
# logging_callback gets called for each record
# received.
logging.handlers.QueueListener(logging_queue, _CallbackLoggingHandler(self.logging_callback)).start()

self.columnconfigure(0, weight=1)
self.rowconfigure(0, weight=1)
# self.rowconfigure(1, weight=0)

self.pbars_done : list[LabeledProgressbar] = []
self.hide_event()

def logging_callback(self, record: logging.LogRecord) -> None:
message = record.getMessage()
self.count += 1
self.text.insert(tk.END, message + "\n")
if m := re.match(r"(.*): (\d+)(%|/\d+)", message):
name, n1, n2 = m.groups()
try:
pbar = self.progress_bars[name]
except KeyError:
pbar = LabeledProgressbar(self, value=0)
self.progress_bars[name] = pbar
pbar.grid(sticky=tk.EW, row=len(self.progress_bars), column=0)
if n2 == "%":
pbar.progress_update(message, int(n1))
else:
pbar.progress_update(message)
else:
self.text.insert(tk.END, message + "\n")
self.count += 1

def hide_event(self):
for pbar in self.pbars_done:
pbar.grid_forget()
self.pbars_done = []
for pbar in self.progress_bars.values():
if pbar.done:
self.pbars_done.append(pbar)
self.after(5000, self.hide_event)


class MainWindow:
Expand Down
11 changes: 9 additions & 2 deletions countess/gui/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,14 @@ def __init__(self, master, *args, **kwargs):
self.style.configure(self.style_name, background="green")

kwargs["style"] = self.style_name
self.done = False
super().__init__(master, *args, **kwargs)

def update_label(self, s):
self.style.configure(self.style_name, text=s)
def progress_update(self, text: str, percentage: Optional[int] = None):
if percentage is None:
self.config(mode="indeterminate")
self.step(5)
else:
self.done = percentage == 100
self.config(mode="determinate", value=percentage)
self.style.configure(self.style_name, text=text)
7 changes: 5 additions & 2 deletions countess/utils/parallel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gc
import logging
import threading
import time
from multiprocessing import Process, Queue, Value
Expand All @@ -18,6 +19,8 @@
V = TypeVar("V")
P = ParamSpec("P")

logger = logging.getLogger(__name__)


def multiprocess_map(
function: Callable[Concatenate[V, P], Iterable[D]], values: Iterable[V], *args: P.args, **kwargs: P.kwargs
Expand Down Expand Up @@ -61,8 +64,8 @@ def __process():
# Prevent processes from using up all
# available memory while waiting
# XXX this is probably a bad idea
while psutil.virtual_memory().percent > 75:
print(f"{getpid()} LOW MEMORY {psutil.virtual_memory().percent}")
while psutil.virtual_memory().percent > 90:
logger.warning("PID %d LOW MEMORY alert %f%%", getpid(), psutil.virtual_memory().percent)
time.sleep(1)

data_in = input_queue.get(timeout=1)
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ version = { attr = "countess.VERSION" }
readme = { file = "README.md", content-type="text/markdown" }

[tool.pylint]
load-plugins = "pylint.extensions.bad_builtin"
bad-functions = ['print']
disable = [
"consider-using-f-string",
"consider-using-with",
Expand Down

0 comments on commit b97ef47

Please sign in to comment.