Skip to content

Commit

Permalink
fix all of the comments, still same bug
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffnvidia committed Aug 27, 2024
1 parent 4cbdc7e commit 00a9097
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 48 deletions.
71 changes: 48 additions & 23 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
args->root : 0;
ucc_rank_t rank = VRANK(task->subset.myrank, broot, size);
size_t local = GET_LOCAL_COUNT(args, size, rank);
ucp_mem_h *mh_list = task->mh_list;
int max_count = task->count_mh;
int count_mh = 0;
ucp_mem_h *mh_list = task->allgather_kn.mh_list;
int max_mh = task->allgather_kn.max_mh;
void *sbuf;
ptrdiff_t peer_seg_offset, local_seg_offset;
ucc_rank_t peer, peer_dist;
Expand All @@ -64,7 +63,6 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)

EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test",
task->allgather_kn.etask);

task->allgather_kn.etask = NULL;
UCC_KN_GOTO_PHASE(task->allgather_kn.phase);
if (KN_NODE_EXTRA == node_type) {
Expand All @@ -74,27 +72,27 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
local * dt_size, mem_type,
ucc_ep_map_eval(task->subset.map,
INV_VRANK(peer,broot,size)),
team, task, mh_list[count_mh++]),
team, task, mh_list[task->allgather_kn.count_mh++]),
task, out);
ucc_assert(count_mh >= max_count);
ucc_assert(task->allgather_kn.count_mh >= max_mh);

}
UCPCHECK_GOTO(ucc_tl_ucp_send_nb_with_mem(rbuf, data_size, mem_type,
ucc_ep_map_eval(task->subset.map,
INV_VRANK(peer,broot,size)),
team, task, mh_list[count_mh++]),
team, task, mh_list[task->allgather_kn.count_mh++]),
task, out);
ucc_assert(count_mh >= max_count);
ucc_assert(task->allgather_kn.count_mh >= max_mh);
}
if ((p->type != KN_PATTERN_ALLGATHERX) && (node_type == KN_NODE_PROXY)) {
peer = ucc_knomial_pattern_get_extra(p, rank);
extra_count = GET_LOCAL_COUNT(args, size, peer);
peer = ucc_ep_map_eval(task->subset.map, peer);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb_with_mem(PTR_OFFSET(task->allgather_kn.sbuf,
local * dt_size), extra_count * dt_size,
mem_type, peer, team, task, mh_list[count_mh++]),
mem_type, peer, team, task, mh_list[task->allgather_kn.count_mh++]),
task, out);
ucc_assert(count_mh >= max_count);
ucc_assert(task->allgather_kn.count_mh >= max_mh);
}

UCC_KN_PHASE_EXTRA:
Expand Down Expand Up @@ -123,13 +121,14 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
continue;
}
}
printf("progress : count_mh: %d, mh: %lx\n", task->allgather_kn.count_mh, (unsigned long)mh_list[task->allgather_kn.count_mh]);
UCPCHECK_GOTO(ucc_tl_ucp_send_nb_with_mem(sbuf, local_seg_count * dt_size,
mem_type,
ucc_ep_map_eval(task->subset.map,
INV_VRANK(peer, broot, size)),
team, task, mh_list[count_mh++]),
team, task, mh_list[task->allgather_kn.count_mh++]),
task, out);
ucc_assert(count_mh >= max_count);
ucc_assert(task->allgather_kn.count_mh >= max_mh);
}

for (loop_step = 1; loop_step < radix; loop_step++) {
Expand All @@ -151,9 +150,9 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
peer_seg_count * dt_size, mem_type,
ucc_ep_map_eval(task->subset.map,
INV_VRANK(peer, broot, size)),
team, task, mh_list[count_mh++]),
team, task, mh_list[task->allgather_kn.count_mh++]),
task, out);
ucc_assert(count_mh >= max_count);
ucc_assert(task->allgather_kn.count_mh >= max_mh);
}
UCC_KN_PHASE_LOOP:
if (UCC_INPROGRESS == ucc_tl_ucp_test_recv_with_etasks(task)) {
Expand All @@ -169,9 +168,9 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
mem_type,
ucc_ep_map_eval(task->subset.map,
INV_VRANK(peer, broot, size)),
team, task, mh_list[count_mh++]),
team, task, mh_list[task->allgather_kn.count_mh++]),
task, out);
ucc_assert(count_mh >= max_count);
ucc_assert(task->allgather_kn.count_mh >= max_mh);
}
UCC_KN_PHASE_PROXY:
if (UCC_INPROGRESS == ucc_tl_ucp_test_with_etasks(task)) {
Expand All @@ -180,6 +179,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
}

out:
ucc_assert(task->allgather_kn.count_mh-1 == max_mh);
ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task));
task->super.status = UCC_OK;
UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_done", 0);
Expand All @@ -205,6 +205,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
task->allgather_kn.etask = NULL;
task->allgather_kn.phase = UCC_KN_PHASE_INIT;
if (ct == UCC_COLL_TYPE_ALLGATHER) {
ucc_kn_ag_pattern_init(size, rank, radix, args->dst.info.count,
Expand Down Expand Up @@ -245,7 +246,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

void register_memory(ucc_coll_task_t *coll_task){
ucc_status_t register_memory(ucc_coll_task_t *coll_task){

ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task,
ucc_tl_ucp_task_t);
Expand Down Expand Up @@ -283,10 +284,9 @@ void register_memory(ucc_coll_task_t *coll_task){
UCP_MEM_MAP_PARAM_FIELD_LENGTH |
UCP_MEM_MAP_PARAM_FIELD_MEMORY_TYPE;
mmap_params.memory_type = ucc_memtype_to_ucs[mem_type];

printf("I'm in register memory");
if (KN_NODE_EXTRA == node_type) {
if (p->type != KN_PATTERN_ALLGATHERX) {

mmap_params.address = task->allgather_kn.sbuf;
mmap_params.length = local * dt_size;
MEM_MAP();
Expand All @@ -310,11 +310,13 @@ void register_memory(ucc_coll_task_t *coll_task){
goto out;
}
while (!ucc_knomial_pattern_loop_done(p)) {
printf("in the while loop");
ucc_kn_ag_pattern_peer_seg(rank, p, &local_seg_count,
&local_seg_offset);
sbuf = PTR_OFFSET(rbuf, local_seg_offset * dt_size);

for (loop_step = radix - 1; loop_step > 0; loop_step--) {
printf("in the for loop");
peer = ucc_knomial_pattern_get_loop_peer(p, rank, loop_step);
if (peer == UCC_KN_PEER_NULL)
continue;
Expand All @@ -327,6 +329,7 @@ void register_memory(ucc_coll_task_t *coll_task){
}
mmap_params.address = sbuf;
mmap_params.length = local_seg_count * dt_size;
printf("register memory : count_mh: %d, mh: %lx\n", count_mh, (unsigned long)mh_list[count_mh]);
MEM_MAP();
}

Expand Down Expand Up @@ -358,35 +361,57 @@ void register_memory(ucc_coll_task_t *coll_task){
}

out:
task->mh_list = mh_list;
task->count_mh = count_mh-1;
task->allgather_kn.mh_list = mh_list;
task->allgather_kn.max_mh = count_mh-1;
task->allgather_kn.count_mh = 0;
return UCC_OK;
}

ucc_status_t ucc_tl_ucp_allgather_knomial_finalize(ucc_coll_task_t *coll_task){
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task,
ucc_tl_ucp_task_t);

ucc_mpool_cleanup(&task->allgather_kn.etask_node_mpool, 1);
free(task->allgather_kn.mh_list);

return UCC_OK;
};

ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
ucc_base_coll_args_t *coll_args, ucc_base_team_t *team,
ucc_coll_task_t **task_h, ucc_kn_radix_t radix)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task;
ucc_sbgp_t *sbgp;
ucc_status_t status;

task = ucc_tl_ucp_init_task(coll_args, team);
ucc_mpool_init(&task->allgather_kn.etask_node_mpool, 0, sizeof(node_ucc_ee_executor_task_t),
status = ucc_mpool_init(&task->allgather_kn.etask_node_mpool, 0, sizeof(node_ucc_ee_executor_task_t),
0, UCC_CACHE_LINE_SIZE, 16, UINT_MAX, NULL,
tl_team->super.super.context->ucc_context->thread_mode, "etasks_linked_list_nodes");
if (status < 0){
tl_error(UCC_TASK_LIB(task),
"failed to initialize ucc_mpool");
}

if (tl_team->cfg.use_reordering &&
coll_args->args.coll_type == UCC_COLL_TYPE_ALLREDUCE) {
sbgp = ucc_topo_get_sbgp(tl_team->topo, UCC_SBGP_FULL_HOST_ORDERED);
task->subset.myrank = sbgp->group_rank;
task->subset.map = sbgp->map;
}
register_memory(&task->super);
status = register_memory(&task->super);
if (status < 0){
tl_error(UCC_TASK_LIB(task),
"failed to register memory");
}
task->allgather_kn.etask_linked_list_head = NULL;
task->allgather_kn.p.radix = radix;
task->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR;
task->super.post = ucc_tl_ucp_allgather_knomial_start;
task->super.progress = ucc_tl_ucp_allgather_knomial_progress;
task->super.finalize = ucc_tl_ucp_allgather_knomial_finalize;
*task_h = &task->super;
return UCC_OK;
}
Expand Down
50 changes: 33 additions & 17 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ void ucc_tl_ucp_team_default_score_str_free(
#define MEM_MAP() do { \
status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh); \
if (UCC_OK != status) { \
task->super.status = status; \
return; \
return status; \
} \
if (count_mh == size_of_list){ \
size_of_list *= 2; \
Expand Down Expand Up @@ -109,8 +108,6 @@ typedef struct ucc_tl_ucp_allreduce_sw_host_allgather
typedef struct ucc_tl_ucp_task {
ucc_coll_task_t super;
uint32_t flags;
ucp_mem_h *mh_list;
int count_mh;
union {
struct {
uint32_t send_posted;
Expand Down Expand Up @@ -197,10 +194,13 @@ typedef struct ucc_tl_ucp_task {
int phase;
ucc_knomial_pattern_t p;
void *sbuf;
ucc_ee_executor_task_t *etask;
node_ucc_ee_executor_task_t *etask_linked_list_head;
ucc_rank_t recv_dist;
ucc_mpool_t etask_node_mpool;
ucc_ee_executor_task_t *etask;
ucp_mem_h *mh_list;
int count_mh;
int max_mh;
} allgather_kn;
struct {
/*
Expand Down Expand Up @@ -427,27 +427,34 @@ static inline ucc_status_t ucc_tl_ucp_test_with_etasks(ucc_tl_ucp_task_t *task)
{
int polls = 0;
ucc_status_t status;
ucc_status_t status_2;
node_ucc_ee_executor_task_t *current_node;
node_ucc_ee_executor_task_t *prev_node;

if (UCC_TL_UCP_TASK_P2P_COMPLETE(task) && task->allgather_kn.etask_linked_list_head==NULL) {
return UCC_OK;
}
while (polls++ < task->n_polls) {
node_ucc_ee_executor_task_t *current_node;
node_ucc_ee_executor_task_t *prev_node;
current_node = task->allgather_kn.etask_linked_list_head;
prev_node = NULL;
while(current_node != NULL) {
status = ucc_ee_executor_task_test(current_node->etask);
if (status > 0) {
ucp_memcpy_device_complete(current_node->etask->completion, status);
ucc_ee_executor_task_finalize(current_node->etask);
ucp_memcpy_device_complete(current_node->etask->completion, ucc_status_to_ucs_status(status));
status_2 = ucc_ee_executor_task_finalize(current_node->etask);
ucc_mpool_put(current_node);
if (ucc_unlikely(status_2 < 0)){
tl_error(UCC_TASK_LIB(task), "task finalize didnt work");
return status_2;
}
if (prev_node != NULL){
prev_node->next = current_node->next; //to remove from list
}
else{ //i'm on first node
task->allgather_kn.etask_linked_list_head = current_node->next;
}
}
else {
prev_node = current_node;
}
prev_node = current_node;
current_node = current_node->next; //to iterate to next node
}
if (UCC_TL_UCP_TASK_P2P_COMPLETE(task) && task->allgather_kn.etask_linked_list_head == NULL) {
Expand Down Expand Up @@ -483,17 +490,26 @@ static inline ucc_status_t ucc_tl_ucp_test_recv(ucc_tl_ucp_task_t *task)
static inline ucc_status_t ucc_tl_ucp_test_recv_with_etasks(ucc_tl_ucp_task_t *task) {
int polls = 0;
ucc_status_t status;
ucc_status_t status_2;
node_ucc_ee_executor_task_t *current_node;
node_ucc_ee_executor_task_t *prev_node;

if (UCC_TL_UCP_TASK_RECV_COMPLETE(task) && task->allgather_kn.etask_linked_list_head==NULL) {
return UCC_OK;
}
while (polls++ < task->n_polls) {
node_ucc_ee_executor_task_t *current_node;
node_ucc_ee_executor_task_t *prev_node;
current_node = task->allgather_kn.etask_linked_list_head;
prev_node = NULL;
while(current_node != NULL) {
status = ucc_ee_executor_task_test(current_node->etask); \
status = ucc_ee_executor_task_test(current_node->etask);
if (status > 0) {
ucp_memcpy_device_complete(current_node->etask->completion, status); \
ucc_ee_executor_task_finalize(current_node->etask); \
ucp_memcpy_device_complete(current_node->etask->completion, status);
status_2 = ucc_ee_executor_task_finalize(current_node->etask);
ucc_mpool_put(current_node);
if (ucc_unlikely(status_2 < 0)){
tl_error(UCC_TASK_LIB(task), "task finalize didnt work");
return status_2;
}
if (prev_node != NULL){
prev_node->next = current_node->next; //to remove from list
}
Expand Down
15 changes: 8 additions & 7 deletions src/components/tl/ucp/tl_ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "utils/arch/cpu.h"
#include "schedule/ucc_schedule_pipelined.h"
#include <limits.h>
#include <ucp/api/ucp.h>

#define UCP_CHECK(function, msg, go, ctx) \
status = function; \
Expand Down Expand Up @@ -144,7 +143,6 @@ static int memcpy_device_start(void *dest, void *src, size_t size,

status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}

Expand All @@ -154,8 +152,12 @@ static int memcpy_device_start(void *dest, void *src, size_t size,
eargs.copy.len = size;
node_ucc_ee_executor_task_t *new_node;
new_node = ucc_mpool_get(&task->allgather_kn.etask_node_mpool);
if (ucc_unlikely(!new_node)) {
return UCC_ERR_NO_MEMORY;
}
status = ucc_ee_executor_task_post(exec, &eargs,
&new_node->etask);
task->allgather_kn.etask_linked_list_head->etask->completion = completion;

if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
Expand All @@ -164,7 +166,6 @@ static int memcpy_device_start(void *dest, void *src, size_t size,
new_node->next = task->allgather_kn.etask_linked_list_head;
task->allgather_kn.etask_linked_list_head = new_node;

task->allgather_kn.etask_linked_list_head->etask->completion = completion;
return 1;

}
Expand All @@ -179,7 +180,6 @@ static int memcpy_device(void *dest, void *src, size_t size, void *user_data){

status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}

Expand All @@ -190,18 +190,19 @@ static int memcpy_device(void *dest, void *src, size_t size, void *user_data){

status = ucc_ee_executor_task_post(exec, &eargs, &etask);
if (ucc_unlikely(status < 0)) {
task->super.status = status;
return status;
}
status = ucc_ee_executor_task_test(etask);
while (status>0) {
status = ucc_ee_executor_task_test(etask);
if (ucc_unlikely(status < 0)) {
task->super.status = status;
return status;
}
}
ucc_ee_executor_task_finalize(etask);
status = ucc_ee_executor_task_finalize(etask);
if (ucc_unlikely(status < 0)) {
return status;
}
return 1;
}

Expand Down
1 change: 0 additions & 1 deletion src/schedule/ucc_schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#ifndef UCC_SCHEDULE_H_
#define UCC_SCHEDULE_H_

#include <ucp/api/ucp.h>
#include "ucc/api/ucc.h"
#include "utils/ucc_list.h"
#include "utils/ucc_log.h"
Expand Down

0 comments on commit 00a9097

Please sign in to comment.