-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TL/MLX5: generate schedule for zcopy allgather #1059
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I only left some minor comments.
Do I understand correctly that this PR doesn't implement any new behavior in the collective itself, but only the setup phase of a collective that will be implemented later?
@@ -104,6 +104,15 @@ static ucc_config_field_t ucc_tl_mlx5_lib_config_table[] = { | |||
ucc_offsetof(ucc_tl_mlx5_lib_config_t, mcast_conf.one_sided_reliability_enable), | |||
UCC_CONFIG_TYPE_BOOL}, | |||
|
|||
{"MCAST_ZERO_COPY_ALLGATHER_ENABLE", "1", "Enable truly zero copy allgather design for mcast", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "truly" mean in this context?
if (comm->allgather_comm.truly_zero_copy_allgather_enabled) { | ||
status = ucc_tl_mlx5_mcast_prepare_zero_copy_allgather(comm, req); | ||
if (UCC_OK != status) { | ||
return status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
memory leak of req
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also the memory needs to be deregistered
if (comm->allgather_comm.mcast_prepost_bucket_size > req->num_packets) { | ||
req->mcast_prepost_bucket_size = req->num_packets; | ||
} else { | ||
req->mcast_prepost_bucket_size = comm->allgather_comm.mcast_prepost_bucket_size; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (comm->allgather_comm.mcast_prepost_bucket_size > req->num_packets) { | |
req->mcast_prepost_bucket_size = req->num_packets; | |
} else { | |
req->mcast_prepost_bucket_size = comm->allgather_comm.mcast_prepost_bucket_size; | |
} | |
req->mcast_prepost_bucket_size = ucc_min(req->num_packets, comm->allgather_comm.mcast_prepost_bucket_size); |
return status; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent issue
ucc_tl_mlx5_mcast_validate_zero_copy_allgather_params(ucc_tl_mlx5_mcast_coll_comm_t *comm, | ||
ucc_tl_mlx5_mcast_coll_req_t *req) | ||
{ | ||
if ((req->concurreny_level % 2 == 0 && req->num_packets % req->mcast_prepost_bucket_size != 0) || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required, only suggesting: since the conditions are rather independent, I would separate then into separate if
blocks so that it is more readable and we can print a more precise and helpful warn message to clearly indicate what is the reason for the failure (as you did for the second if block).
"either reduce prepost_bucket_size or mcast group " | ||
"count or increase recv queue size " | ||
"mcast_prepost_bucket_size %d concurreny_level %d " | ||
"rx_depth %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"either reduce prepost_bucket_size or mcast group " | |
"count or increase recv queue size " | |
"mcast_prepost_bucket_size %d concurreny_level %d " | |
"rx_depth %d", | |
"we only support the case prepost_bucket_size * concurreny_level * 2 > rx_depth, " | |
"but got: prepost_bucket_size=%d, concurreny_level=%d, " | |
"rx_depth=%d". |
|
||
/* generate schedule */ | ||
for (i = 0; i < total_steps; i++) { | ||
ucc_assert(root < comm->commsize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it useful? (just checking)
total_steps = req->num_packets * (comm->commsize / req->concurreny_level) | ||
/ req->mcast_prepost_bucket_size + 1; | ||
|
||
new_sched = ucc_calloc(1, sizeof(ucc_tl_mlx5_mcast_pipelined_ag_schedule_t) * total_steps, "sched"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would rename the variable new_sched
to schedule
|
||
if (i > 0) { | ||
for (j = 0; j < req->concurreny_level; j++) { | ||
new_sched[i].multicast_op[j].group_id = new_sched[i - 1].prepost_buf_op[j].group_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that it way exceeds Col 80. Have you run git-clang-format
?
@Sergei-Lebedev @janjust do we want this formatting style to be compulsory? If yes we should have the CI check for it.
new_sched->total_steps = total_steps; | ||
req->total_steps = total_steps; | ||
req->ag_schedule = new_sched; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it redundant?
@@ -434,6 +438,8 @@ typedef struct ucc_tl_mlx5_mcast_coll_req { | |||
ucc_memory_type_t buf_mem_type; | |||
enum ucc_tl_mlx5_mcast_one_sided_reliability_scheme one_sided_reliability_scheme; | |||
uint32_t ag_counter; | |||
int concurreny_level; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: concurrency_level
"length %ld max_per_packet %d " | ||
"team size %d concurreny_level %d", | ||
req->num_packets, req->mcast_prepost_bucket_size, req->length, | ||
comm->max_per_packet, comm->commsize, req->concurreny_level); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should commsize
be renamed to comm_size
like the other variables?
The PR focuses on optimizing allgather communication by leveraging a pipelined multicast mechanism with configurable concurrency and prepost bucket sizes for recv buffers. It implements a new function,
ucc_tl_mlx5_mcast_prepare_zero_copy_allgather
, which calculates and sets up a pipelined schedule for the zero-copy allgather. The function validates parameters, allocates schedules, and registers the receive buffers as well.