Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Nodes] Add Prebatch setting to ParallelMapper #1417

Merged
merged 7 commits into from
Jan 2, 2025
Merged

Conversation

andrewkho
Copy link
Contributor

@andrewkho andrewkho commented Dec 26, 2024

When ParallelMapper is used for very cheap operations, the overhead of sending items over queues can quickly add up. This is a nice parameter to be able to tune.

Fixes #1415

A few notes about the implementation:

  • I chose to compose 3 nodes (Batcher, ParallelMapper, Unbatcher) into one to implement this. This is the first time we're composing BaseNodes with other BaseNodes. This will require us to figure out graph-traversal for these options (see footnote).
  • this required us to have _ParallelMapperIter implement BaseNode, however getting reset to work correctly is going to be a bigger problem, so for now, just created an intermediate class with basically the current implementation of ParallelMapper, and this allows us to use torchdata.nodes composition to get things working easily.

Test Plan:

  • Unit tests
  • Ran a simple script to test this, output:
python examples/nodes/test_prebatch.py
[9999400009, 9999600004, 9999800001]
baseline: dt=3.0651697060093284s
[9999400009, 9999600004, 9999800001]
prebatch=16: dt=0.454918147996068s
[9999400009, 9999600004, 9999800001]
prebatch=256: dt=0.13740589004009962s
[9999400009, 9999600004, 9999800001]
prebatch=1024: dt=0.22711888700723648s

test script:

import time
import torchdata.nodes as tn


def run(prebatch):
    node = tn.IterableWrapper(range(100000))
    node = tn.ParallelMapper(node, map_fn=lambda x: x**2, prebatch=prebatch, method="thread", num_workers=8)
    loader = tn.Loader(node)
    x = list(loader)
    print(x[-3:])


if __name__ == "__main__":
    t0 = time.perf_counter()
    run(None)
    dt = time.perf_counter() - t0
    print(f"baseline: {dt=}s")

    for prebatch in (16, 256, 1024):
        t0 = time.perf_counter()
        run(prebatch)
        dt = time.perf_counter() - t0
        print(f"{prebatch=}: {dt=}s")

Footnote: Example of where this is a problem: In the ParallelMapper case here, traversing the dag with reflection (eg using instance.dict and checking for baseNode instances) would generate two sinks for the source, since self.source points to it, and self._it would eventally point to it as well. One way we could handle this is with an optional "get_source/get_parent" method on BaseNode, which returns the instance of where graph traversal should begin, and in this case it would return self._it, not self.source.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Dec 26, 2024
Copy link

pytorch-bot bot commented Dec 26, 2024

🔗 Helpful Links

🧪 See artifacts and rendered test results at hud.pytorch.org/pr/pytorch/data/1417

Note: Links to docs will display an error until the docs builds have been completed.

✅ No Failures

As of commit 6a99917 with merge base 88c7b96 (image):
💚 Looks good so far! There are no failures yet. 💚

This comment was automatically generated by Dr. CI and updates every 15 minutes.

@@ -272,6 +281,77 @@ def _shutdown(self):
t.join(timeout=QUEUE_TIMEOUT * 5)


class _ParallelMapperImpl(BaseNode[T]):
"""This class implements _ParallelMapperIter as a BaseNode, allowing it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This class implements _ParallelMapperIter and _InlineMapperIter as a BaseNode, ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Base automatically changed from andrewkh/unbatcher to main December 30, 2024 17:11
@andrewkho andrewkho merged commit 0d2b0a0 into main Jan 2, 2025
39 checks passed
@andrewkho andrewkho deleted the andrewkh/prebatcher branch January 14, 2025 01:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Nodes] Add pre-batch option to parallel mapper
4 participants