Skip to content

Commit

Permalink
add some debug prints and profilers
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandr-Solovev committed Oct 9, 2023
1 parent 46d4e42 commit 1e1b5d8
Showing 1 changed file with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ static result_t call_daal_kernel(const context_gpu& ctx,
auto& queue = ctx.get_queue();

data_keeper<Float> keeper(ctx);
keeper.init(local_data, local_weights);
{
ONEDAL_PROFILER_TASK(init_data, queue);
keeper.init(local_data, local_weights);
}
const std::int64_t block_size = keeper.get_block_size();
const std::int64_t block_start = 0;
const std::int64_t block_end = block_start + block_size;
Expand Down Expand Up @@ -82,7 +85,7 @@ static result_t call_daal_kernel(const context_gpu& ctx,
kernels_fp<Float>::start_next_cluster(queue, arr_cores, arr_responses);
cluster_index = cluster_index < block_size ? cluster_index + block_start : row_count;
{
ONEDAL_PROFILER_TASK(allreduce_cluster_index);
ONEDAL_PROFILER_TASK(allreduce_cluster_index, queue);
comm.allreduce(cluster_index, spmd::reduce_op::min).wait();
}
if (cluster_index < 0) {
Expand All @@ -101,15 +104,17 @@ static result_t call_daal_kernel(const context_gpu& ctx,
}
std::int32_t local_queue_size = queue_end - queue_begin;
std::int32_t total_queue_size = local_queue_size;
std::cout << "total_queue_size local init = " << total_queue_size << std::endl;
{
ONEDAL_PROFILER_TASK(allreduce_total_queue_size_outer);
ONEDAL_PROFILER_TASK(allreduce_total_queue_size_outer, queue);
comm.allreduce(total_queue_size, spmd::reduce_op::sum).wait();
}
std::cout << "total_queue_size global init = " << total_queue_size << std::endl;
while (total_queue_size > 0) {
auto recv_counts = array<std::int64_t>::zeros(rank_count);
recv_counts.get_mutable_data()[rank] = local_queue_size;
{
ONEDAL_PROFILER_TASK(allreduce_recv_counts);
ONEDAL_PROFILER_TASK(allreduce_recv_counts, queue);
comm.allreduce(recv_counts, spmd::reduce_op::sum).wait();
}
auto displs = array<std::int64_t>::zeros(rank_count);
Expand All @@ -119,6 +124,7 @@ static result_t call_daal_kernel(const context_gpu& ctx,
displs_ptr[i] = total_count;
total_count += recv_counts.get_data()[i];
}

ONEDAL_ASSERT(total_count > 0);
auto send_array = recv_counts[rank] > 0
? arr_queue.slice(queue_begin, recv_counts[rank]).flatten(queue)
Expand All @@ -134,7 +140,7 @@ static result_t call_daal_kernel(const context_gpu& ctx,
}
auto recv_array = arr_queue.slice(queue_begin, total_count).flatten(queue);
{
ONEDAL_PROFILER_TASK(allgather_cluster_data);
ONEDAL_PROFILER_TASK(allgather_cluster_data, queue);
comm.allgatherv(send_array, recv_array, recv_counts.get_data(), displs.get_data())
.wait();
}
Expand All @@ -157,16 +163,17 @@ static result_t call_daal_kernel(const context_gpu& ctx,
queue_end = kernels_fp<Float>::get_queue_front(queue, arr_queue_front);
local_queue_size = queue_end - queue_begin;
total_queue_size = local_queue_size;
std::cout << "total_queue_size global after update = " << total_queue_size << std::endl;
{
ONEDAL_PROFILER_TASK(allreduce_total_queue_size_inner);
ONEDAL_PROFILER_TASK(allreduce_total_queue_size_inner, queue);
comm.allreduce(total_queue_size, spmd::reduce_op::sum).wait();
}
}

std::cout << "total_queue_size global after while = " << total_queue_size << std::endl;
cluster_index = kernels_fp<Float>::start_next_cluster(queue, arr_cores, arr_responses);
cluster_index = cluster_index < block_size ? cluster_index + block_start : row_count;
{
ONEDAL_PROFILER_TASK(cluster_index);
ONEDAL_PROFILER_TASK(cluster_index, queue);
comm.allreduce(cluster_index, spmd::reduce_op::min).wait();
}
}
Expand Down

0 comments on commit 1e1b5d8

Please sign in to comment.