Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize track_file_creation_progress for existing files #194

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 23 additions & 29 deletions adaptive_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,52 +1201,44 @@ def _update_progress_for_paths(
task_ids: dict[str, TaskID],
) -> int:
"""Update progress bars for each set of paths."""
total_processed = 0
for key, paths in paths_dict.items():
to_discard = set()
for path_unit in paths:
# Check if the path_unit is a Path or a tuple of Paths
paths_to_check = [path_unit] if isinstance(path_unit, Path) else path_unit

# Progress only if all paths in the path_unit exist
if all(path.exists() for path in paths_to_check):
progress.update(task_ids[key], advance=1)
if total_task is not None:
progress.update(total_task, advance=1)
total_processed += 1
to_discard.add(path_unit)

for path_unit in to_discard:
paths_dict[key].discard(path_unit)

return total_processed
n_completed = _remove_completed_paths(paths_dict)
total_completed = sum(n_completed.values())
for key, n_done in n_completed.items():
progress.update(task_ids[key], advance=n_done)
if total_task is not None:
progress.update(total_task, advance=total_completed)
return total_completed


def _remove_completed_paths(
paths_dict: dict[str, set[Path | tuple[Path, ...]]],
) -> tuple[dict[str, set[Path | tuple[Path, ...]]], dict[str, int]]:
) -> dict[str, int]:
n_completed = {}
new_paths_dict = {}

for key, paths in paths_dict.items():
completed_count = 0
new_paths = set()
to_discard = set()
to_add = set()

for path_unit in paths:
# Check if it's a single Path or a tuple of Paths
paths_to_check = [path_unit] if isinstance(path_unit, Path) else path_unit

# Check if all paths in the path_unit exist
if all(path.exists() for path in paths_to_check):
if all(p.exists() for p in paths_to_check):
completed_count += 1
else:
new_paths.add(path_unit)
to_discard.add(path_unit)
elif isinstance(path_unit, tuple):
exists = {p for p in path_unit if p.exists()}
if any(exists):
to_discard.add(path_unit)
to_add.add(tuple(p for p in path_unit if p not in exists))

n_completed[key] = completed_count
if new_paths:
new_paths_dict[key] = new_paths
paths_dict[key] -= to_discard
paths_dict[key] |= to_add

return new_paths_dict, n_completed
return n_completed


async def _track_file_creation_progress(
Expand All @@ -1270,7 +1262,7 @@ async def _track_file_creation_progress(
# create total_files and add_total_progress before updating paths_dict
total_files = sum(len(paths) for paths in paths_dict.values())
add_total_progress = len(paths_dict) > 1
paths_dict, n_completed = _remove_completed_paths(paths_dict)
n_completed = _remove_completed_paths(paths_dict) # updates paths_dict in-place
total_done = sum(n_completed.values())
task_ids: dict[str, TaskID] = {}

Expand Down Expand Up @@ -1320,6 +1312,8 @@ def track_file_creation_progress(
) -> asyncio.Task:
"""Initialize and asynchronously track the progress of file creation.

WARNING: This function modifies the provided dictionary in-place.

This function sets up an asynchronous monitoring system that periodically
checks for the existence of specified files or groups of files. Each item
in the provided dictionary can be a single file (Path object) or a group
Expand Down
6 changes: 4 additions & 2 deletions tests/test_utils_file_creation_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ def test_remove_completed_paths(tmp_path: Path) -> None:
"category2": {(tmp_path / "file3", tmp_path / "file4")},
}

new_paths_dict, n_completed = _remove_completed_paths(paths_dict)
n_completed = _remove_completed_paths(paths_dict)

assert n_completed == {"category1": 1, "category2": 1}
assert new_paths_dict == {"category1": {tmp_path / "file2"}}
assert paths_dict == {"category1": {tmp_path / "file2"}, "category2": set()}


@pytest.mark.asyncio()
Expand Down Expand Up @@ -107,13 +107,15 @@ async def test_track_file_creation_progress(tmp_path: Path) -> None:
assert "category2" in progress._tasks[2].description
assert progress._tasks[2].total == 1
assert progress._tasks[2].completed == 0
assert paths_dict["category2"] == {(tmp_path / "file4",)}

# Create the other file of category2, should now be completed
create_test_files(tmp_path, ["file4"])
await asyncio.sleep(0.05)
assert "category2" in progress._tasks[2].description
assert progress._tasks[2].total == 1
assert progress._tasks[2].completed == 1
assert paths_dict["category2"] == set()

# Create the other file of category1, should now be completed
create_test_files(tmp_path, ["file2"])
Expand Down
Loading