Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(wgt): enable DI using torch-rpc to support GPU-p2p and RDMA-rpc #562

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

SolenoidWGT
Copy link
Collaborator

Commit:

  1. Add torchrpc message queue.

  2. Implement buffer based on CUDA-shared-tensor to optimize the data path of torchrpc.

  3. Add 'bypass_eventloop' arg in Task() and Parallel().

  4. Add thread lock in distributer.py to prevent sender and receiver competition.

  5. Add message queue perf test for torchrpc, nccl, nng, shm

  6. Add comm_perf_helper.py to make program timing more convenient.

  7. Modified the subscribe() of class MQ, adding 'fn' parameter and 'is_once' parameter.

  8. Add new DummyLock and ConditionLock type in lock_helper.py

  9. Add message queues perf test.

  10. Introduced a new self-hosted runner to execute cuda, multiprocess, torchrpc related tests.

Description

DI-engine integrates torch.distributed.rpc module.

  1. CPU-P2P-RDMA: In IB network environment, support RDMA CPU-P2P transmission
  2. GPU-P2P-RDMA: supports GPU p2p communication

cli-ditask introduces new command line arguments

  1. --mq_type: Introduced torchrpc:cuda and torchrpc:cpu options

    1. torchrpc:cuda: Use torchrpc for communication, and allow setting device_map, can use GPU direct RDMA.
    2. torchrpc:cpu: Use torchrpc for communication, but device_map is not allowed to be set. All data on the GPU side will be copied to the CPU side for transmission.
  2. --init-method: Initialization entry for init_rpc (required if --mq_type is torchrpc)

  3. --local-cuda-devices: Set the rank range of local GPUs that can be used (optional, default is all visible devices)

  4. --cuda-device-map: Used to set device_map, the format is as follows:

    Format:
    <Peer node id>_<Local GPU rank>_<Peer GPU rank>,[...]
    example:
    --cuda-device-map=1_0_1,2_0_2,0_0_0
    
    pytorch api:
    options.set_device_map("worker1", {1: 2})
    

    (Optional, the default is to map all visible GPU to the GPU-0 of the peer)

Dynamic GPU communication groups

We create devices mappings between all possible devices in advance. This mapping is all-2-all, which can cover all communication situations. The purpose is to avoid errors caused by incomplete devicemap coverage. Setting redundant mappings will not have any side effects. The mappings are used to check the validity of the device during transport. Only after a new process joins the communication group will it try to create a channel based on these maps.

Node_0 device_maps:
("Node_1", {0: 0}), ("Node_2", {0: 0}), ...., ("Node_99", {0: 0})

Node_1 device_maps:
("Node_0", {0: 0}), ("Node_2", {0: 0}), ...., ("Node_99", {0: 0})

At the same time, we still expose the --cuda-device-map interface, which is used to allow users to configure the topology between devices, torchrpc will follow user input.

Related Issue

TODO

Load balancing capability: in a time-heterogeneous RL task environment, each worker can run at full capacity.

Check List

  • merge the latest version source branch/repo, and resolve all the conflicts
  • pass style check
  • pass all the tests

@PaParaZz1 PaParaZz1 added the efficiency optimization Efficiency optimization (time, memory and so on) label Dec 26, 2022
1. Add torchrpc message queue.

2. Implement buffer based on CUDA-shared-tensor to optimize the data path of torchrpc.

3. Add 'bypass_eventloop' arg in Task() and Parallel().

4. Add thread lock in distributer.py to prevent sender and receiver competition.

5. Add message queue perf test for torchrpc, nccl, nng, shm

6. Add comm_perf_helper.py to make program timing more convenient.

7. Modified the subscribe() of class MQ, adding 'fn' parameter and 'is_once' parameter.

8. Add new DummyLock and ConditionLock type in lock_helper.py

9. Add message queues perf test.

10. Introduced a new self-hosted runner to execute cuda, multiprocess, torchrpc related tests.
@codecov
Copy link

codecov bot commented Jan 12, 2023

Codecov Report

Merging #562 (3fa5319) into main (f798002) will decrease coverage by 1.20%.
The diff coverage is 37.75%.

❗ Current head 3fa5319 differs from pull request most recent head e32055b. Consider uploading reports for the commit e32055b to get more accurate results

@@            Coverage Diff             @@
##             main     #562      +/-   ##
==========================================
- Coverage   83.60%   82.41%   -1.20%     
==========================================
  Files         565      571       +6     
  Lines       46375    47198     +823     
==========================================
+ Hits        38774    38900     +126     
- Misses       7601     8298     +697     
Flag Coverage Δ
unittests 82.41% <37.75%> (-1.20%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
ding/torch_utils/data_helper.py 76.82% <0.00%> (-1.47%) ⬇️
ding/data/tests/test_shm_buffer.py 20.58% <16.07%> (-29.42%) ⬇️
ding/framework/message_queue/perfs/perf_nng.py 16.83% <16.83%> (ø)
...ramework/message_queue/perfs/perf_torchrpc_nccl.py 19.49% <19.49%> (ø)
ding/framework/message_queue/perfs/perf_shm.py 25.84% <25.84%> (ø)
ding/framework/parallel.py 66.21% <33.91%> (-19.22%) ⬇️
ding/data/shm_buffer.py 60.19% <34.42%> (-37.59%) ⬇️
ding/utils/comm_perf_helper.py 35.82% <35.82%> (ø)
ding/envs/env_manager/subprocess_env_manager.py 74.63% <38.46%> (-0.26%) ⬇️
ding/utils/lock_helper.py 81.08% <41.17%> (-11.91%) ⬇️
... and 118 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@SolenoidWGT SolenoidWGT force-pushed the p2p-rpc branch 2 times, most recently from 1da53e2 to 30b3a73 Compare January 17, 2023 07:18
@@ -1,4 +1,5 @@
[run]
concurrency = multiprocessing,thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add concurrency = multiprocessing, so that codecov can count the coverage of subprocesses, and the default concurrency is set to threading. However,there are some things need to pay attention in using, refer to: https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html

Makefile Outdated Show resolved Hide resolved
codecov.yml Outdated
# fix me
# The unittests of the torchrpc module are tested by different runners and cannot be included
# in the test_unittest's coverage report. To keep CI happy, we don't count torchrpc related coverage.
ignore:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether to add these ignore items to .coveragerc

self.shape = shape
self.device = device
# We don't want the buffer to be involved in the computational graph
with torch.no_grad():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creation tensor operation doesn't involve in computation graph, so we don't need torch.no_grad here


event_run = ctx.Event()
shm_buf_np = ShmBufferCuda(np.dtype(np.float32), shape=(1024, 1024), copy_on_get=True)
shm_buf_torch = ShmBufferCuda(torch.float32, shape=(1024, 1024), copy_on_get=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add another unittest for the case copy_on_get=False to validate it.

task.use(eps_greedy_handler(cfg))
task.use(StepCollector(cfg, policy.collect_mode, collector_env))
task.use(termination_checker(max_env_step=int(1e7)))
else:
raise KeyError("invalid router labels: {}".format(task.router.labels))

task.run()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this

Overview:
thread lock decorator.
Arguments:
- func ([type]): A function that needs to be protected by a lock.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callable

OUTPUT_DICT[func_name] = OUTPUT_DICT[func_name] + str(round(avg_tt, 4)) + ","


def print_timer_result_csv():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can use pretty_print function in ding.utils

- args (:obj:`any`): Rest arguments for listeners.
"""
# Check if need to broadcast event to connected nodes, default is True
assert self._running, "Please make sure the task is running before calling the this method, see the task.start"
if only_local:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this

@@ -71,8 +71,8 @@ def _train(ctx: Union["OnlineRLContext", "OfflineRLContext"]):

if ctx.train_data is None: # no enough data from data fetcher
return
data = ctx.train_data.to(policy._device)
train_output = policy.forward(data)
# data = ctx.train_data.to(policy._device)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why comment this

# so all data on the cpu side is copied to "cuda:0" here. In fact this
# copy is unnecessary, because torchrpc can support both cpu side and gpu
# side data to communicate using RDMA, but mixing the two transfer types
# will cause a bug, see issue:
Copy link
Member

@PaParaZz1 PaParaZz1 Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
efficiency optimization Efficiency optimization (time, memory and so on)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants