Skip to content

Commit

Permalink
Allow using tuples of Paths to represent units of work
Browse files Browse the repository at this point in the history
  • Loading branch information
basnijholt committed Dec 22, 2023
1 parent 2672e30 commit b4d0b54
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 45 deletions.
119 changes: 75 additions & 44 deletions adaptive_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ async def sleep_unless_task_is_done(


def _update_progress_for_paths(
paths_dict: dict[str, set[Path]],
paths_dict: dict[str, set[Path | tuple[Path, ...]]],
progress: Progress,
total_task: TaskID | None,
task_ids: dict[str, TaskID],
Expand All @@ -1204,23 +1204,57 @@ def _update_progress_for_paths(
total_processed = 0
for key, paths in paths_dict.items():
to_discard = set()
for path in paths:
if path.exists():
for path_unit in paths:
# Check if the path_unit is a Path or a tuple of Paths
paths_to_check = [path_unit] if isinstance(path_unit, Path) else path_unit

# Progress only if all paths in the path_unit exist
if all(path.exists() for path in paths_to_check):
progress.update(task_ids[key], advance=1)
if total_task is not None:
progress.update(total_task, advance=1)
total_processed += 1
to_discard.add(path)
for path in to_discard:
paths_dict[key].discard(path)
to_discard.add(path_unit)

for path_unit in to_discard:
paths_dict[key].discard(path_unit)

return total_processed


def _remove_completed_paths(
paths_dict: dict[str, set[Path | tuple[Path, ...]]],
) -> tuple[dict[str, set[Path | tuple[Path, ...]]], dict[str, int]]:
n_completed = {}
new_paths_dict = {}

for key, paths in paths_dict.items():
completed_count = 0
new_paths = set()

for path_unit in paths:
# Check if it's a single Path or a tuple of Paths
paths_to_check = [path_unit] if isinstance(path_unit, Path) else path_unit

# Check if all paths in the path_unit exist
if all(path.exists() for path in paths_to_check):
completed_count += 1
else:
new_paths.add(path_unit)

n_completed[key] = completed_count
if new_paths:
new_paths_dict[key] = new_paths

# Use the updated paths_dict
paths_dict = new_paths_dict
return paths_dict, n_completed


async def _track_file_creation_progress(
paths_dict: dict[str, set[Path]],
paths_dict: dict[str, set[Path | tuple[Path, ...]]],
progress: Progress,
interval: int = 1,
interval: float = 1,
) -> None:
"""Asynchronously track and update the progress of file creation.
Expand All @@ -1233,36 +1267,29 @@ async def _track_file_creation_progress(
interval
The time interval (in seconds) at which to update the progress.
"""
# Count the number of files that already exist
n_completed = {
key: sum(path.exists() for path in paths) for key, paths in paths_dict.items()
}
# Remove the paths that already exist
paths_dict = {
key: {p for p in paths if not p.exists()}
for key, paths in paths_dict.items()
if paths
}
total_files = sum(len(paths) for paths in paths_dict.values())
total_files = sum(
len(paths) for paths in paths_dict.values()
) # before updating paths_dict
add_total_progress = len(paths_dict) > 1
paths_dict, n_completed = _remove_completed_paths(paths_dict)
total_done = sum(n_completed.values())
task_ids: dict[str, TaskID] = {}

# Add a total progress bar only if there are multiple entries in the dictionary
add_total_progress = len(paths_dict) > 1
total_task = (
progress.add_task(
"[cyan bold underline]Total",
total=total_files + total_done,
total=total_files,
completed=total_done,
)
if add_total_progress
else None
)
for key, paths in paths_dict.items():
n_done = n_completed.get(key, 0)
for key, n_done in n_completed.items():
n_remaining = len(paths_dict.get(key, []))
task_ids[key] = progress.add_task(
f"[green]{key}",
total=len(paths) + n_done,
total=n_remaining + n_done,
completed=n_done,
)
try:
Expand All @@ -1289,41 +1316,45 @@ async def _track_file_creation_progress(


def track_file_creation_progress(
paths_dict: dict[str, set[Path]],
paths_dict: dict[str, set[Path | tuple[Path, ...]]],
interval: int = 1,
) -> asyncio.Task:
"""Initialize and asynchronously track the progress of file creation.
This function sets up an asynchronous monitoring system that periodically
checks for the existence of specified files. For each file in the provided
dictionary, it updates individual and, if applicable, total progress bars to
reflect the current state of file creation. The tracking occurs at regular
intervals, specified by the user.
The function is designed to be used in environments where files are expected
to be created over time, and there is a need to visually and quantitatively
track this process. It leverages the `rich` library's progress bar for a
clear and interactive display.
checks for the existence of specified files or groups of files. Each item
in the provided dictionary can be a single file (Path object) or a group
of files (tuple of Path objects). The progress is updated for each file or
group of files only when all files in the group exist. This allows tracking
of complex file creation processes where multiple files together constitute
a single unit of work.
The tracking occurs at regular intervals, specified by the user, and updates
individual and, if applicable, total progress bars to reflect the current
state of file creation. It is particularly useful in environments where files
are expected to be created over time and need to be monitored collectively.
Parameters
----------
paths_dict
A dictionary with keys representing categories and values being sets
of file paths to monitor.
interval
The time interval (in seconds) at which to update the progress.
paths_dict : dict[str, set[Union[Path, Tuple[Path, ...]]]]
A dictionary with keys representing categories and values being sets of
file paths (Path objects) or groups of file paths (tuples of Path objects)
to monitor.
interval : int
The time interval (in seconds) at which the progress is updated.
Returns
-------
The task that is tracking the progress.
asyncio.Task
The asyncio Task object that is tracking the file creation progress.
Examples
--------
>>> paths_dict = {
... "docs": {Path("docs/environment.yml"), Path("yolo")},
... "example2": {Path("/path/to/file3"), Path("/path/to/file4")},
... }
>>> track_file_creation_progress(paths_dict)
"docs": {Path("docs/environment.yml"), (Path("doc1.md"), Path("doc2.md"))},
"example2": {Path("/path/to/file3"), Path("/path/to/file4")},
}
>>> task = track_file_creation_progress(paths_dict)
"""
get_console().clear_live() # avoid LiveError, only 1 live render allowed at a time
columns = (*Progress.get_default_columns(), TimeElapsedColumn())
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ ignore = [
]

[tool.ruff.per-file-ignores]
"tests/*" = ["SLF001"]
"tests/*" = ["SLF001", "PLR2004"]
"tests/test_examples.py" = ["E501"]
".github/*" = ["INP001"]

Expand Down
135 changes: 135 additions & 0 deletions tests/test_utils_file_creation_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Test the file creation progress tracking utilities."""
from __future__ import annotations

import asyncio
import contextlib
from typing import TYPE_CHECKING
from unittest.mock import Mock

import pytest
from rich.progress import Progress, TaskID

from adaptive_scheduler.utils import (
_remove_completed_paths,
_track_file_creation_progress,
_update_progress_for_paths,
)

if TYPE_CHECKING:
from pathlib import Path


def test_update_progress_for_paths(tmp_path: Path) -> None:
"""Test the update progress for paths function."""
# Create test files
create_test_files(tmp_path, ["file1", "file3", "file4"])

paths_dict: dict[str, set[Path | tuple[Path, ...]]] = {
"category1": {tmp_path / "file1", tmp_path / "file2"},
"category2": {(tmp_path / "file3", tmp_path / "file4")},
}

progress_mock = Mock(spec=Progress)
task_id_mock = Mock(spec=TaskID)
task_ids = {"category1": task_id_mock, "category2": task_id_mock}

processed = _update_progress_for_paths(
paths_dict,
progress_mock,
task_id_mock,
task_ids,
)

assert processed == 2 # Only "file1" and the tuple ("file3", "file4") exist
assert len(paths_dict["category1"]) == 1 # "file2" does not exist and should remain
assert len(paths_dict["category2"]) == 0 # Tuple paths exist and should be removed
progress_mock.update.assert_called()


def create_test_files(tmp_path: Path, file_names: list[str]) -> None:
"""Create test files in the given directory."""
for name in file_names:
(tmp_path / name).touch()


def test_remove_completed_paths(tmp_path: Path) -> None:
"""Test the remove completed paths function."""
# Create test files
existing_files = ["file1", "file3", "file4"]
create_test_files(tmp_path, existing_files)

paths_dict: dict[str, set[Path | tuple[Path, ...]]] = {
"category1": {tmp_path / "file1", tmp_path / "file2"},
"category2": {(tmp_path / "file3", tmp_path / "file4")},
}

new_paths_dict, n_completed = _remove_completed_paths(paths_dict)

assert n_completed == {"category1": 1, "category2": 1}
assert new_paths_dict == {"category1": {tmp_path / "file2"}}


@pytest.mark.asyncio()
async def test_track_file_creation_progress(tmp_path: Path) -> None:
"""Test the track file creation progress function."""
# Create test files
create_test_files(tmp_path, ["file1"])

paths_dict: dict[str, set[Path | tuple[Path, ...]]] = {
"category1": {tmp_path / "file1", tmp_path / "file2"},
"category2": {(tmp_path / "file3", tmp_path / "file4")},
}

progress = Progress(auto_refresh=False)
task = asyncio.create_task(
_track_file_creation_progress(paths_dict, progress, interval=1e-3),
)

# Allow some time for the task to process
await asyncio.sleep(0.05)

progress.stop()
assert "Total" in progress._tasks[0].description
assert progress._tasks[0].total == 3
assert progress._tasks[0].completed == 1

assert "category1" in progress._tasks[1].description
assert progress._tasks[1].total == 2
assert progress._tasks[1].completed == 1

assert "category2" in progress._tasks[2].description
assert progress._tasks[2].total == 1
assert progress._tasks[2].completed == 0

# Create one of the files of category2, should still not be completed
create_test_files(tmp_path, ["file3"])
await asyncio.sleep(0.05)

assert "category2" in progress._tasks[2].description
assert progress._tasks[2].total == 1
assert progress._tasks[2].completed == 0

# Create the other file of category2, should now be completed
create_test_files(tmp_path, ["file4"])
await asyncio.sleep(0.05)
assert "category2" in progress._tasks[2].description
assert progress._tasks[2].total == 1
assert progress._tasks[2].completed == 1

# Create the other file of category1, should now be completed
create_test_files(tmp_path, ["file2"])
await asyncio.sleep(0.05)
assert "category1" in progress._tasks[1].description
assert progress._tasks[1].total == 2
assert progress._tasks[1].completed == 2

# Check the total progress
assert "Total" in progress._tasks[0].description
assert progress._tasks[0].total == 3
assert progress._tasks[0].completed == 3

# Stop the progress and the task
progress.stop()
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task

Check notice

Code scanning / CodeQL

Statement has no effect Note test

This statement has no effect.

0 comments on commit b4d0b54

Please sign in to comment.