From e2754483c264d13357057c4f9010d65b9d837879 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 8 Dec 2023 17:33:48 +0100 Subject: [PATCH 01/15] [WIP] Remove stage APIs. Signed-off-by: Michal Zientkiewicz --- dali/benchmark/caffe2_alexnet_bench.cc | 20 +++--- dali/benchmark/caffe_alexnet_bench.cc | 18 ++---- dali/benchmark/checkpointing_bench.cc | 6 +- dali/benchmark/decoder_bench.cc | 11 ++-- dali/benchmark/file_reader_alexnet_bench.cc | 11 ++-- .../file_reader_fast_forward_bench.cc | 6 +- dali/benchmark/resnet50_bench.cc | 31 +++------- dali/benchmark/resnet50_nvjpeg_bench.cc | 11 ++-- dali/c_api/c_api.cc | 33 ++++++---- dali/c_api/c_api_test.cc | 62 ++++++------------- dali/c_api/operator_trace_test.cc | 2 +- dali/fuzzing/dali_harness.h | 3 +- .../nvjpeg_decoder_decoupled_api_test.cc | 24 +++---- dali/operators/input/input_operator_test.cu | 2 +- .../math/expressions/arithmetic_test.cc | 21 +++---- dali/operators/random/rng_base_cpu_test.cc | 3 +- dali/operators/reader/coco_reader_op_test.cc | 20 +++--- dali/operators/reader/reader_op_test.cc | 19 ++---- .../reader/video_reader_decoder_op_test.cc | 11 ++-- dali/operators/reader/video_reader_op_test.cc | 53 +++++----------- dali/pipeline/executor/executor.cc | 21 +++++++ dali/pipeline/executor/executor.h | 13 ++-- dali/pipeline/executor/executor_test.cc | 20 ++---- .../builtin/conditional/split_merge_test.cc | 17 ++--- .../operator/builtin/external_source_test.cc | 11 ++-- dali/pipeline/operator/op_spec_test.cc | 5 +- dali/pipeline/pipeline.cc | 26 ++++---- dali/pipeline/pipeline.h | 58 +++++++++-------- dali/pipeline/pipeline_test.cc | 50 ++++++--------- dali/python/backend_impl.cc | 5 +- dali/python/nvidia/dali/pipeline.py | 3 +- dali/test/dali_operator_test.h | 5 +- dali/test/dali_plugin_manager_test.cc | 5 +- dali/test/dali_test_bboxes.h | 11 ++-- dali/test/dali_test_checkpointing.h | 3 +- dali/test/dali_test_single_op.h | 5 +- include/dali/c_api.h | 10 +++ 37 files changed, 267 insertions(+), 368 deletions(-) diff --git a/dali/benchmark/caffe2_alexnet_bench.cc b/dali/benchmark/caffe2_alexnet_bench.cc index 80ad8af844..19630500a9 100755 --- a/dali/benchmark/caffe2_alexnet_bench.cc +++ b/dali/benchmark/caffe2_alexnet_bench.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -105,8 +105,7 @@ BENCHMARK_DEFINE_F(C2Alexnet, Caffe2Pipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -114,11 +113,9 @@ BENCHMARK_DEFINE_F(C2Alexnet, Caffe2Pipe)(benchmark::State& st) { // NOLINT // We will start he processing for the next batch // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { @@ -236,8 +233,7 @@ BENCHMARK_DEFINE_F(C2Alexnet, HybridPipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -245,11 +241,9 @@ BENCHMARK_DEFINE_F(C2Alexnet, HybridPipe)(benchmark::State& st) { // NOLINT // We will start he processing for the next batch // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { diff --git a/dali/benchmark/caffe_alexnet_bench.cc b/dali/benchmark/caffe_alexnet_bench.cc index c9bef62804..6c6619f4b1 100755 --- a/dali/benchmark/caffe_alexnet_bench.cc +++ b/dali/benchmark/caffe_alexnet_bench.cc @@ -107,8 +107,7 @@ BENCHMARK_DEFINE_F(Alexnet, CaffePipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -116,11 +115,9 @@ BENCHMARK_DEFINE_F(Alexnet, CaffePipe)(benchmark::State& st) { // NOLINT // We will start he processing for the next batch // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { @@ -238,8 +235,7 @@ BENCHMARK_DEFINE_F(Alexnet, HybridPipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -247,11 +243,9 @@ BENCHMARK_DEFINE_F(Alexnet, HybridPipe)(benchmark::State& st) { // NOLINT // We will start he processing for the next batch // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { diff --git a/dali/benchmark/checkpointing_bench.cc b/dali/benchmark/checkpointing_bench.cc index f91906925d..112db4807a 100755 --- a/dali/benchmark/checkpointing_bench.cc +++ b/dali/benchmark/checkpointing_bench.cc @@ -40,13 +40,11 @@ class CheckpointingOverhead : public DALIBenchmark { Workspace ws; // Warmup - pipe->RunCPU(); - pipe->RunGPU(); + pipe->Run(); pipe->Outputs(&ws); while (st.KeepRunning()) { - pipe->RunCPU(); - pipe->RunGPU(); + pipe->Run(); pipe->Outputs(&ws); if (policy == CheckpointingPolicy::SaveEveryIter) { volatile auto cpt = pipe->GetCheckpoint(); diff --git a/dali/benchmark/decoder_bench.cc b/dali/benchmark/decoder_bench.cc index 6394c1673f..f93f319d97 100644 --- a/dali/benchmark/decoder_bench.cc +++ b/dali/benchmark/decoder_bench.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -56,8 +56,7 @@ class DecoderBench : public DALIBenchmark { // Run once to allocate the memory Workspace ws; pipe.SetExternalInput("raw_jpegs", data); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -66,13 +65,11 @@ class DecoderBench : public DALIBenchmark { // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work pipe.SetExternalInput("raw_jpegs", data); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } pipe.SetExternalInput("raw_jpegs", data); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations) { diff --git a/dali/benchmark/file_reader_alexnet_bench.cc b/dali/benchmark/file_reader_alexnet_bench.cc index f20d74099f..72f7670b62 100755 --- a/dali/benchmark/file_reader_alexnet_bench.cc +++ b/dali/benchmark/file_reader_alexnet_bench.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -106,8 +106,7 @@ BENCHMARK_DEFINE_F(FileReaderAlexnet, CaffePipe)(benchmark::State& st) { // NOLI // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -115,11 +114,9 @@ BENCHMARK_DEFINE_F(FileReaderAlexnet, CaffePipe)(benchmark::State& st) { // NOLI // We will start he processing for the next batch // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { diff --git a/dali/benchmark/file_reader_fast_forward_bench.cc b/dali/benchmark/file_reader_fast_forward_bench.cc index 942b513ffa..668bb96325 100755 --- a/dali/benchmark/file_reader_fast_forward_bench.cc +++ b/dali/benchmark/file_reader_fast_forward_bench.cc @@ -70,8 +70,7 @@ BENCHMARK_DEFINE_F(FileReaderFastForward, FastForward)(benchmark::State& st) { / Workspace ws; for (int i = 0; i < snapshot_at; i++) { - pipe->RunCPU(); - pipe->RunGPU(); + pipe->Run(); pipe->Outputs(&ws); } @@ -85,8 +84,7 @@ BENCHMARK_DEFINE_F(FileReaderFastForward, FastForward)(benchmark::State& st) { / pipe2->RestoreFromCheckpoint(cpt); st.PauseTiming(); - pipe2->RunCPU(); - pipe2->RunGPU(); + pipe2->Run(); pipe2->Outputs(&ws); st.ResumeTiming(); } diff --git a/dali/benchmark/resnet50_bench.cc b/dali/benchmark/resnet50_bench.cc index 5ece9ffcdd..ce94ba0968 100755 --- a/dali/benchmark/resnet50_bench.cc +++ b/dali/benchmark/resnet50_bench.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -107,8 +107,7 @@ BENCHMARK_DEFINE_F(RN50, C2Pipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -118,11 +117,9 @@ BENCHMARK_DEFINE_F(RN50, C2Pipe)(benchmark::State& st) { // NOLINT // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work pipe.SetExternalInput("raw_jpegs", data); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { @@ -242,9 +239,7 @@ BENCHMARK_DEFINE_F(RN50, HybridPipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); - pipe.Outputs(&ws); + pipe.Run(); while (st.KeepRunning()) { pipe.SetExternalInput("raw_jpegs", data); @@ -253,11 +248,9 @@ BENCHMARK_DEFINE_F(RN50, HybridPipe)(benchmark::State& st) { // NOLINT // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work pipe.SetExternalInput("raw_jpegs", data); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { @@ -355,9 +348,7 @@ BENCHMARK_DEFINE_F(RN50, nvJPEGPipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); - pipe.Outputs(&ws); + pipe.Run(); while (st.KeepRunning()) { pipe.SetExternalInput("raw_jpegs", data); @@ -366,11 +357,9 @@ BENCHMARK_DEFINE_F(RN50, nvJPEGPipe)(benchmark::State& st) { // NOLINT // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work pipe.SetExternalInput("raw_jpegs", data); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { diff --git a/dali/benchmark/resnet50_nvjpeg_bench.cc b/dali/benchmark/resnet50_nvjpeg_bench.cc index e71ff16457..2445c605b2 100755 --- a/dali/benchmark/resnet50_nvjpeg_bench.cc +++ b/dali/benchmark/resnet50_nvjpeg_bench.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -89,8 +89,7 @@ BENCHMARK_DEFINE_F(RealRN50, nvjpegPipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); while (st.KeepRunning()) { @@ -98,11 +97,9 @@ BENCHMARK_DEFINE_F(RealRN50, nvjpegPipe)(benchmark::State& st) { // NOLINT // We will start he processing for the next batch // immediately after issueing work to the gpu to // pipeline the cpu/copy/gpu work - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); if (st.iterations() == st.max_iterations && pipelined) { diff --git a/dali/c_api/c_api.cc b/dali/c_api/c_api.cc index dee97e9c1c..cf7d6d9e6f 100644 --- a/dali/c_api/c_api.cc +++ b/dali/c_api/c_api.cc @@ -251,10 +251,15 @@ daliCreatePipeline2(daliPipelineHandle *pipe_handle, const char *serialized_pipe bool se = separated_execution != 0; bool pe = pipelined_execution != 0; bool ae = async_execution != 0; + if (cpu_prefetch_queue_depth == gpu_prefetch_queue_depth) { + if (se) + DALI_WARN("Setting separated_execution to True has no effect if the queue sizes are equal"); + se = false; + } auto pipeline = std::make_unique(std::string(serialized_pipeline, length), max_batch_size, num_threads, device_id, pe, prefetch_queue_depth, ae); - pipeline->SetExecutionTypes(pe, se, ae); + pipeline->SetExecutionTypes(pe, ae); if (se) { pipeline->SetQueueSizes(cpu_prefetch_queue_depth, gpu_prefetch_queue_depth); } @@ -283,26 +288,31 @@ int daliGetMaxBatchSize(daliPipelineHandle_t pipe_handle) { return (*pipe_handle)->pipeline->max_batch_size(); } +void daliPrefetch(daliPipelineHandle_t pipe_handle) { + auto &pipeline = (*pipe_handle)->pipeline; + pipeline->Prefetch(); +} void daliPrefetchUniform(daliPipelineHandle_t pipe_handle, int queue_depth) { auto &pipeline = (*pipe_handle)->pipeline; - for (int i = 0; i < queue_depth; ++i) { - pipeline->RunCPU(); - pipeline->RunGPU(); + auto sz = pipeline->GetQueueSizes(); + if (queue_depth != sz.cpu_size || queue_depth != sz.gpu_size) { + DALI_WARN("daliPrefetchUniform is deprecated and setting queue_length different than" + " the one set ing the pipeline has no effect"); } + pipeline->Prefetch(); } void daliPrefetchSeparate(daliPipelineHandle_t pipe_handle, int cpu_queue_depth, int gpu_queue_depth) { auto &pipeline = (*pipe_handle)->pipeline; - for (int i = 0; i < gpu_queue_depth; ++i) { - pipeline->RunCPU(); - pipeline->RunGPU(); - } - for (int i = 0; i < cpu_queue_depth; ++i) { - pipeline->RunCPU(); + auto sz = pipeline->GetQueueSizes(); + if (cpu_queue_depth != sz.cpu_size || gpu_queue_depth != sz.gpu_size) { + DALI_WARN("daliPrefetchUniform is deprecated and setting queue_length different than" + " the one set ing the pipeline has no effect"); } + pipeline->Prefetch(); } @@ -402,8 +412,7 @@ dali_data_type_t daliGetExternalInputType(daliPipelineHandle_t pipe_handle, cons void daliRun(daliPipelineHandle_t pipe_handle) { dali::Pipeline *pipeline = (*pipe_handle)->pipeline.get(); - pipeline->RunCPU(); - pipeline->RunGPU(); + pipeline->Run(); } diff --git a/dali/c_api/c_api_test.cc b/dali/c_api/c_api_test.cc index 4ba541a2b0..20c4ad1d13 100644 --- a/dali/c_api/c_api_test.cc +++ b/dali/c_api/c_api_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -264,8 +264,7 @@ TYPED_TEST(CApiTest, FileReaderPipe) { pipe_ptr->Build(); for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } daliPipelineHandle handle; @@ -280,8 +279,7 @@ TYPED_TEST(CApiTest, FileReaderPipe) { } daliRun(&handle); - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); ComparePipelinesOutputs(handle, *pipe_ptr); daliDeletePipeline(&handle); @@ -293,8 +291,7 @@ TYPED_TEST(CApiTest, FileReaderDefaultPipe) { pipe_ptr->Build(); for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } daliPipelineHandle handle; @@ -306,8 +303,7 @@ TYPED_TEST(CApiTest, FileReaderDefaultPipe) { } daliRun(&handle); - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); ComparePipelinesOutputs(handle, *pipe_ptr); daliDeletePipeline(&handle); @@ -346,10 +342,9 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocPipe) { } for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } - daliPrefetchUniform(&handle, prefetch_queue_depth); + daliPrefetch(&handle); dali::Workspace ws; for (int i = 0; i < prefetch_queue_depth; i++) { @@ -367,8 +362,7 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocPipe) { input.get(), dali_data_type_t::DALI_UINT8, input_shape.data(), input_shape.sample_dim(), "HWC", cuda_stream, DALI_ext_default); daliRun(&handle); - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); ComparePipelinesOutputs(handle, *pipe_ptr); daliDeletePipeline(&handle); @@ -418,11 +412,8 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocVariableBatchSizePipe) { } for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } - daliPrefetchUniform(&handle, prefetch_queue_depth); - dali::Workspace ws; for (int i = 0; i < prefetch_queue_depth; i++) { ComparePipelinesOutputs(handle, *pipe_ptr, DALI_ext_default, @@ -469,10 +460,9 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocPipe) { } for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } - daliPrefetchUniform(&handle, prefetch_queue_depth); + daliPrefetch(&handle); dali::Workspace ws; for (int i = 0; i < prefetch_queue_depth; i++) { @@ -488,10 +478,7 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocPipe) { dali_data_type_t::DALI_UINT8, input_shape.data(), input_shape.sample_dim(), "HWC", cuda_stream, DALI_ext_default); daliRun(&handle); - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); - - ComparePipelinesOutputs(handle, *pipe_ptr); + pipe_ptr->Run(); daliDeletePipeline(&handle); } @@ -533,10 +520,9 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocDifferentBackendsTest) { } for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } - daliPrefetchUniform(&handle, prefetch_queue_depth); + daliPrefetch(&handle); dali::Workspace ws; for (int i = 0; i < prefetch_queue_depth; i++) { @@ -555,10 +541,7 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocDifferentBackendsTest) { input.get(), dali_data_type_t::DALI_UINT8, input_shape.data(), input_shape.sample_dim(), "HWC", DALI_ext_default); daliRun(&handle); - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); - - ComparePipelinesOutputs(handle, *pipe_ptr); + pipe_ptr->Run(); daliDeletePipeline(&handle); } @@ -604,10 +587,9 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocDifferentBackendsTest) { } for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } - daliPrefetchUniform(&handle, prefetch_queue_depth); + daliPrefetch(&handle); dali::Workspace ws; for (int i = 0; i < prefetch_queue_depth; i++) { @@ -626,9 +608,7 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocDifferentBackendsTest) { dali_data_type_t::DALI_UINT8, input_shape.data(), input_shape.sample_dim(), "HWC", DALI_ext_default); daliRun(&handle); - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); - + pipe_ptr->Run(); ComparePipelinesOutputs(handle, *pipe_ptr); daliDeletePipeline(&handle); } @@ -707,8 +687,7 @@ TYPED_TEST(CApiTest, UseCopyKernel) { } for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } daliPrefetchUniform(&handle, prefetch_queue_depth); @@ -821,8 +800,7 @@ void TestForceFlagRun(bool ext_src_no_copy, unsigned int flag_to_test, int devic } for (int i = 0; i < prefetch_queue_depth; i++) { - pipe_ptr->RunCPU(); - pipe_ptr->RunGPU(); + pipe_ptr->Run(); } daliPrefetchUniform(&handle, prefetch_queue_depth); diff --git a/dali/c_api/operator_trace_test.cc b/dali/c_api/operator_trace_test.cc index efec1bb59e..0f8ced9810 100644 --- a/dali/c_api/operator_trace_test.cc +++ b/dali/c_api/operator_trace_test.cc @@ -77,7 +77,7 @@ class OperatorTraceTest : public ::testing::TestWithParam(batch_size_, num_threads_, device_id_, -1, exec_pipelined_, cpu_queue_depth_, exec_async_); - pipeline_->SetExecutionTypes(exec_pipelined_, exec_separated_, exec_async_); + pipeline_->SetExecutionTypes(exec_pipelined_, exec_async_); PutTogetherDaliGraph(); diff --git a/dali/fuzzing/dali_harness.h b/dali/fuzzing/dali_harness.h index 167a11797d..77b05faaeb 100644 --- a/dali/fuzzing/dali_harness.h +++ b/dali/fuzzing/dali_harness.h @@ -66,8 +66,7 @@ class FileListHarness { SetupPipeline(pipeline); Workspace ws; - pipeline.RunCPU(); - pipeline.RunGPU(); + pipeline.Run(); pipeline.Outputs(&ws); } diff --git a/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api_test.cc b/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api_test.cc index 92362343ae..70ab8c6a8c 100644 --- a/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api_test.cc +++ b/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -241,8 +241,7 @@ class CudaDecoderUtilizationTest : public ::testing::Test { }; TEST_F(CudaDecoderUtilizationTest, UtilizationTest) { - this->pipeline_.RunCPU(); - this->pipeline_.RunGPU(); + this->pipeline_.Run(); auto node = this->pipeline_.GetOperatorNode(this->decoder_name_); auto nsamples_hw = node->op->GetDiagnostic("nsamples_hw"); @@ -296,8 +295,7 @@ class HwDecoderUtilizationTest : public ::testing::Test { TEST_F(HwDecoderUtilizationTest, UtilizationTest) { - this->pipeline_.RunCPU(); - this->pipeline_.RunGPU(); + this->pipeline_.Run(); auto node = this->pipeline_.GetOperatorNode(this->decoder_name_); auto nsamples_hw = node->op->GetDiagnostic("nsamples_hw"); @@ -342,8 +340,7 @@ class HwDecoderMemoryPoolTest : public ::testing::Test { }; TEST_F(HwDecoderMemoryPoolTest, MemoryPoolTest) { - this->pipeline_.RunCPU(); - this->pipeline_.RunGPU(); + this->pipeline_.Run(); } class HwDecoderSliceUtilizationTest : public ::testing::Test { @@ -408,8 +405,7 @@ class HwDecoderSliceUtilizationTest : public ::testing::Test { }; TEST_F(HwDecoderSliceUtilizationTest, UtilizationTest) { - this->pipeline_.RunCPU(); - this->pipeline_.RunGPU(); + this->pipeline_.Run(); auto node = this->pipeline_.GetOperatorNode(this->decoder_name_); auto nsamples_hw = node->op->GetDiagnostic("nsamples_hw"); @@ -461,9 +457,7 @@ class HwDecoderCropUtilizationTest : public ::testing::Test { }; TEST_F(HwDecoderCropUtilizationTest, UtilizationTest) { - this->pipeline_.RunCPU(); - this->pipeline_.RunGPU(); - + this->pipeline_.Run(); auto node = this->pipeline_.GetOperatorNode(this->decoder_name_); auto nsamples_hw = node->op->GetDiagnostic("nsamples_hw"); auto nsamples_cuda = node->op->GetDiagnostic("nsamples_cuda"); @@ -514,8 +508,7 @@ class HwDecoderRandomCropUtilizationTest : public ::testing::Test { }; TEST_F(HwDecoderRandomCropUtilizationTest, UtilizationTest) { - this->pipeline_.RunCPU(); - this->pipeline_.RunGPU(); + this->pipeline_.Run(); } #endif @@ -550,8 +543,7 @@ class Nvjpeg2kTest : public ::testing::Test { TEST_F(Nvjpeg2kTest, UtilizationTest) { - this->pipeline_.RunCPU(); - this->pipeline_.RunGPU(); + this->pipeline_.Run(); auto node = this->pipeline_.GetOperatorNode(this->decoder_name_); auto nsamples_nvjpeg2k = node->op->GetDiagnostic("nsamples_nvjpeg2k"); diff --git a/dali/operators/input/input_operator_test.cu b/dali/operators/input/input_operator_test.cu index 2e0f79a668..0bac1ff9e4 100644 --- a/dali/operators/input/input_operator_test.cu +++ b/dali/operators/input/input_operator_test.cu @@ -88,7 +88,7 @@ class InputOperatorMixedTest : public ::testing::TestWithParam(batch_size_, num_threads_, device_id_, -1, exec_pipelined_, cpu_queue_depth_, exec_async_); - pipeline_->SetExecutionTypes(exec_pipelined_, exec_separated_, exec_async_); + pipeline_->SetExecutionTypes(exec_pipelined_, exec_async_); PutTogetherDaliGraph(); diff --git a/dali/operators/math/expressions/arithmetic_test.cc b/dali/operators/math/expressions/arithmetic_test.cc index a0616e3ce5..4afde6f3c4 100644 --- a/dali/operators/math/expressions/arithmetic_test.cc +++ b/dali/operators/math/expressions/arithmetic_test.cc @@ -266,8 +266,7 @@ class BinaryArithmeticOpsTest pipe.SetExternalInput("data0", batch[0]); pipe.SetExternalInput("data1", batch[1]); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); vector result_cpu(shape.num_elements()); @@ -324,8 +323,7 @@ class BinaryArithmeticOpsTest pipe.SetExternalInput("data0", batch[0]); pipe.SetExternalInput("data1", batch[1]); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); ASSERT_EQ(DALI_INT32, ws.Output(0).type()); @@ -446,8 +444,7 @@ TEST(ArithmeticOpsTest, GenericPipeline) { pipe.SetExternalInput("data0", batch); pipe.SetExternalInput("data1", batch); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); @@ -503,8 +500,7 @@ TEST(ArithmeticOpsTest, FdivPipeline) { pipe.SetExternalInput("data0", batch[0]); pipe.SetExternalInput("data1", batch[1]); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); ASSERT_EQ(ws.Output(0).type(), DALI_FLOAT); @@ -571,8 +567,7 @@ TEST(ArithmeticOpsTest, ConstantsPipeline) { FillBatch(batch, uniform_list_shape(batch_size, {tensor_elements})); pipe.SetExternalInput("data0", batch); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); @@ -635,8 +630,7 @@ class ArithmeticOpsScalarTest : public ::testing::TestWithParam pipe.SetExternalInput("data0", batch[0]); pipe.SetExternalInput("data1", batch[1]); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); @@ -779,8 +773,7 @@ TEST(ArithmeticOpsTest, UnaryPipeline) { } pipe.SetExternalInput("data0", batch); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); vector result1_cpu(tensor_elements); diff --git a/dali/operators/random/rng_base_cpu_test.cc b/dali/operators/random/rng_base_cpu_test.cc index 3c1c49d07b..ce0243e467 100644 --- a/dali/operators/random/rng_base_cpu_test.cc +++ b/dali/operators/random/rng_base_cpu_test.cc @@ -41,8 +41,7 @@ class RNGCheckpointingTest : public ::testing::Test { Workspace ws; auto run_iteration = [&](Pipeline &pipe) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); std::vector result; diff --git a/dali/operators/reader/coco_reader_op_test.cc b/dali/operators/reader/coco_reader_op_test.cc index 261bf2de7e..89ff78f013 100644 --- a/dali/operators/reader/coco_reader_op_test.cc +++ b/dali/operators/reader/coco_reader_op_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -134,8 +134,7 @@ class CocoReaderTest : public ::testing::Test { } Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); auto ids = CopyIds(ws, outs.size()-1); @@ -588,8 +587,7 @@ TEST_F(CocoReaderTest, PixelwiseMasks) { pipe1.Build(Outputs(false, true)); Workspace ws1; - pipe1.RunCPU(); - pipe1.RunGPU(); + pipe1.Run(); pipe1.Outputs(&ws1); Pipeline pipe2(expected_size, 1, 0, kSeed); @@ -600,8 +598,7 @@ TEST_F(CocoReaderTest, PixelwiseMasks) { pipe2.Build(Outputs(false, true)); Workspace ws2; - pipe2.RunCPU(); - pipe2.RunGPU(); + pipe2.Run(); pipe2.Outputs(&ws2); for (auto *ws : {&ws1, &ws2}) { @@ -656,8 +653,7 @@ TEST_F(CocoReaderTest, BigSizeThreshold) { ASSERT_EQ(pipe.GetReaderMeta("coco_reader").epoch_size, this->ImagesWithBigObjects()); Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); auto ids = this->CopyIds(ws); @@ -677,14 +673,12 @@ TEST_F(CocoReaderTest, ShuffleAfterEpoch) { pipe.Build(this->Outputs()); Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); auto ids_epoch_1 = this->CopyIds(ws); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); auto ids_epoch_2 = this->CopyIds(ws); diff --git a/dali/operators/reader/reader_op_test.cc b/dali/operators/reader/reader_op_test.cc index 87b4e901fa..896f90bc77 100644 --- a/dali/operators/reader/reader_op_test.cc +++ b/dali/operators/reader/reader_op_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -129,9 +129,7 @@ TYPED_TEST(ReaderTest, SimpleTest) { Workspace ws; for (int i=0; i < 5; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); - pipe.Outputs(&ws); + pipe.Run(); } return; @@ -150,8 +148,7 @@ TYPED_TEST(ReaderTest, PrefetchQueueTest) { Workspace ws; for (int i=0; i < 5; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); } return; @@ -196,8 +193,7 @@ TYPED_TEST(ReaderTest, LazyInitTest) { Workspace ws; for (int i=0; i < 5; ++i) { - lazy_pipe.RunCPU(); - lazy_pipe.RunGPU(); + lazy_pipe.Run(); lazy_pipe.Outputs(&ws); } std::remove(filename.c_str()); @@ -219,9 +215,7 @@ TYPED_TEST(ReaderTest, SequenceTest) { Workspace ws; for (int i = 0; i < 4; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); - pipe.Outputs(&ws); + pipe.Run(); auto shape = ws.Output(0).AsTensor().shape(); // We have NFHWC format const auto batch_size = shape[0]; @@ -516,8 +510,7 @@ class FileReaderTest : public DALITest { std::vector RunIter(Pipeline &pipe, int batch_size) { std::vector result; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws_); auto shape = ws_.Output(0).AsTensor().shape(); diff --git a/dali/operators/reader/video_reader_decoder_op_test.cc b/dali/operators/reader/video_reader_decoder_op_test.cc index eb7d9ac000..bef91ce554 100644 --- a/dali/operators/reader/video_reader_decoder_op_test.cc +++ b/dali/operators/reader/video_reader_decoder_op_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -77,8 +77,7 @@ class VideoReaderDecoderBaseTest : public VideoTestBase { Workspace ws; while (sequence_id < num_sequences) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); auto &frame_video_output = ws.Output(0); @@ -144,8 +143,7 @@ class VideoReaderDecoderBaseTest : public VideoTestBase { Workspace ws; for (int sequence_id = 0; sequence_id < num_sequences; ++sequence_id) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); auto &frame_video_output = ws.Output(0); @@ -327,8 +325,7 @@ TEST_F(VideoReaderDecoderCompareTest, CompareReaders) { for (int batch_id = 0; batch_id < 20; ++batch_id) { Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); auto &cpu_frame_output = ws.Output(0); diff --git a/dali/operators/reader/video_reader_op_test.cc b/dali/operators/reader/video_reader_op_test.cc index 5857af0302..c4edaf46bd 100644 --- a/dali/operators/reader/video_reader_op_test.cc +++ b/dali/operators/reader/video_reader_op_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -69,9 +69,7 @@ TEST_F(VideoReaderTest, VariableFrameRate2) { pipe.Build(this->Outputs()); EXPECT_THROW([&]() { for (int i = 0; i < 10; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); - pipe.Outputs(&ws); + pipe.Run(); } }(), std::runtime_error); } @@ -106,8 +104,7 @@ TEST_F(VideoReaderTest, ConstantFrameRate) { pipe.Build(this->Outputs()); Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_output = ws.Output(0); @@ -158,8 +155,7 @@ TEST_F(VideoReaderTest, MultipleVideoResolution) { Workspace ws; pipe.Build(this->LabelledOutputs()); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_output = ws.Output(0); @@ -216,8 +212,7 @@ TEST_F(VideoReaderTest, PackedBFrames) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_output = ws.Output(0); const auto &frames_shape = frames_output.shape(); @@ -255,8 +250,7 @@ TEST_F(VideoReaderTest, Vp9Profile0) { try { pipe.Build(this->Outputs()); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); } catch (const std::exception &e) { if (IsUnsupportedCodec(e.what())) { @@ -294,8 +288,7 @@ TEST_F(VideoReaderTest, Vp9Profile2) { try { pipe.Build(this->Outputs()); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); } catch (const std::exception &e) { if (IsUnsupportedCodec(e.what())) { @@ -331,8 +324,7 @@ TEST_F(VideoReaderTest, Vp8Profile0) { try { pipe.Build(this->Outputs()); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); } catch (const std::exception &e) { if (IsUnsupportedCodec(e.what())) { @@ -370,8 +362,7 @@ TEST_F(VideoReaderTest, MJpeg) { try { pipe.Build(this->Outputs()); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); } catch (const std::exception &e) { if (IsUnsupportedCodec(e.what())) { @@ -410,9 +401,7 @@ TEST_F(VideoReaderTest, HEVC) { try { pipe.Build(this->Outputs()); for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); - pipe.Outputs(&ws); + pipe.Run(); } } catch (const std::exception &e) { if (IsUnsupportedCodec(e.what())) { @@ -450,8 +439,7 @@ TEST_F(VideoReaderTest, FrameLabels) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_gpu = ws.Output(0); const auto &label_gpu = ws.Output(1); @@ -496,9 +484,7 @@ TEST_F(VideoReaderTest, FrameLabelsFilenames) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); - pipe.Outputs(&ws); + pipe.Run(); const auto &frames_gpu = ws.Output(0); const auto &label_gpu = ws.Output(1); const auto &frame_num_gpu = ws.Output(2); @@ -544,8 +530,7 @@ TEST_F(VideoReaderTest, LabelsFilenames) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_gpu = ws.Output(0); const auto &label_gpu = ws.Output(1); @@ -592,8 +577,7 @@ TEST_F(VideoReaderTest, FrameLabelsWithFileListFrameNum) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_gpu = ws.Output(0); const auto &label_gpu = ws.Output(1); @@ -654,8 +638,7 @@ TEST_F(VideoReaderTest, TimestampLabels) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_gpu = ws.Output(0); const auto &label_gpu = ws.Output(1); @@ -700,8 +683,7 @@ TEST_F(VideoReaderTest, StartEndLabels) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_gpu = ws.Output(0); const auto &label_gpu = ws.Output(1); @@ -740,8 +722,7 @@ TEST_F(VideoReaderTest, MultipleFrameRates) { Workspace ws; for (int i = 0; i < iterations; ++i) { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); const auto &frames_output = ws.Output(0); const auto &frames_shape = frames_output.shape(); diff --git a/dali/pipeline/executor/executor.cc b/dali/pipeline/executor/executor.cc index 725a2d4d9a..54d1da8578 100644 --- a/dali/pipeline/executor/executor.cc +++ b/dali/pipeline/executor/executor.cc @@ -341,6 +341,27 @@ void Executor::RunGPUImpl(size_t iteration_id) { QueuePolicy::QueueOutputIdxs(gpu_idxs, gpu_op_stream_); } +template +void Executor::Run() { + RunCPU(); + RunMixed(); + RunGPU(); +} + +template +void Executor::Prefetch() { + int i; + for (i = 0; i < std::min(queue_sizes_.gpu_size, queue_sizes_.cpu_size); i++) { + RunCPU(); + RunMixed(); + RunGPU(); + } + + for (; i < queue_sizes_.cpu_size; i++) { + RunCPU(); + } +} + template void Executor::RunCPU() { try { diff --git a/dali/pipeline/executor/executor.h b/dali/pipeline/executor/executor.h index ca394218ed..cadc54f921 100644 --- a/dali/pipeline/executor/executor.h +++ b/dali/pipeline/executor/executor.h @@ -71,9 +71,8 @@ class DLL_PUBLIC ExecutorBase { DLL_PUBLIC virtual ~ExecutorBase() {} DLL_PUBLIC virtual void Build(OpGraph *graph, vector output_names) = 0; DLL_PUBLIC virtual void Init() = 0; - DLL_PUBLIC virtual void RunCPU() = 0; - DLL_PUBLIC virtual void RunMixed() = 0; - DLL_PUBLIC virtual void RunGPU() = 0; + DLL_PUBLIC virtual void Run() = 0; + DLL_PUBLIC virtual void Prefetch() = 0; DLL_PUBLIC virtual void Outputs(Workspace *ws) = 0; DLL_PUBLIC virtual void ShareOutputs(Workspace *ws) = 0; DLL_PUBLIC virtual void ReleaseOutputs() = 0; @@ -133,10 +132,9 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy { checkpointing_ = checkpointing; } DLL_PUBLIC void Build(OpGraph *graph, vector output_names) override; + DLL_PUBLIC void Run() override; + DLL_PUBLIC void Prefetch() override; DLL_PUBLIC void Init() override {} - DLL_PUBLIC void RunCPU() override; - DLL_PUBLIC void RunMixed() override; - DLL_PUBLIC void RunGPU() override; DLL_PUBLIC void Outputs(Workspace *ws) override; DLL_PUBLIC void ShareOutputs(Workspace *ws) override; DLL_PUBLIC void ReleaseOutputs() override; @@ -165,6 +163,9 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy { DLL_PUBLIC void RestoreStateFromCheckpoint(const Checkpoint &cpt) override; protected: + DLL_PUBLIC virtual void RunCPU(); + DLL_PUBLIC virtual void RunMixed(); + DLL_PUBLIC virtual void RunGPU(); DLL_PUBLIC void RunCPUImpl(size_t iteration_id); DLL_PUBLIC void RunMixedImpl(size_t iteration_id); DLL_PUBLIC void RunGPUImpl(size_t iteration_id); diff --git a/dali/pipeline/executor/executor_test.cc b/dali/pipeline/executor/executor_test.cc index 9b0e88b546..a74086c780 100644 --- a/dali/pipeline/executor/executor_test.cc +++ b/dali/pipeline/executor/executor_test.cc @@ -95,9 +95,7 @@ class ExecutorTest : public GenericDecoderTest { auto run_epoch = [&](std::unique_ptr &exec) { std::vector> results; for (int i = 0; i < epoch_size; i++) { - exec->RunCPU(); - exec->RunMixed(); - exec->RunGPU(); + exec->Run(); exec->Outputs(&ws); if (ws.OutputIsType(0)) { @@ -474,9 +472,7 @@ TYPED_TEST(ExecutorTest, TestRunBasicGraph) { test::MakeRandomBatch(tl, this->batch_size_); src_op->SetDataSource(tl); - exe->RunCPU(); - exe->RunMixed(); - exe->RunGPU(); + exe->Run(); Workspace ws; exe->Outputs(&ws); @@ -521,9 +517,7 @@ TYPED_TEST(ExecutorTest, TestRunBasicGraphWithCB) { test::MakeRandomBatch(tl, this->batch_size_); src_op->SetDataSource(tl); - exe->RunCPU(); - exe->RunMixed(); - exe->RunGPU(); + exe->Run(); Workspace ws; exe->Outputs(&ws); @@ -606,9 +600,7 @@ TYPED_TEST(ExecutorSyncTest, TestPrefetchedExecution) { auto run = [&src_op, &exe] (TensorList &input) { src_op->SetDataSource(input); - exe->RunCPU(); - exe->RunMixed(); - exe->RunGPU(); + exe->Run(); }; auto check = [&exe, &ws, &tl, batch_size] (int batch_idx) { @@ -713,9 +705,7 @@ TYPED_TEST(ExecutorTest, TestPinning) { tl.Resize(uniform_list_shape(this->batch_size_, TensorShape<>{}), DALI_FLOAT); src_op->SetDataSource(tl); - exe->RunCPU(); - exe->RunMixed(); - exe->RunGPU(); + exe->Run(); Workspace ws; exe->Outputs(&ws); diff --git a/dali/pipeline/operator/builtin/conditional/split_merge_test.cc b/dali/pipeline/operator/builtin/conditional/split_merge_test.cc index 50d7700727..a7a6bd6323 100644 --- a/dali/pipeline/operator/builtin/conditional/split_merge_test.cc +++ b/dali/pipeline/operator/builtin/conditional/split_merge_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -183,8 +183,7 @@ class SplitMergeTest : public ::testing::Test { pipe.SetExternalInput("input", input); pipe.SetExternalInput("pred", predicate); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); return {std::move(input), std::move(predicate)}; } @@ -435,8 +434,7 @@ TEST_F(SplitMergeNegativeTest, MismatchedMerge) { pipe.SetExternalInput("pred", predicate); try { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); FAIL() << "Exception was expected but was not thrown."; @@ -472,8 +470,7 @@ TEST_F(SplitMergeNegativeTest, MismatchedSplit) { pipe.SetExternalInput("pred", predicate); try { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); FAIL() << "Exception was expected but was not thrown."; @@ -511,8 +508,7 @@ TEST_F(SplitMergeNegativeTest, MismatchedTypes) { pipe.SetExternalInput("pred", predicate); try { - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); FAIL() << "Exception was expected but was not thrown."; @@ -562,8 +558,7 @@ TEST_F(SplitMergePinnedInputsTest, Mixes) { pipe.SetExternalInput("pinned_input", pinned_input); pipe.SetExternalInput("pred", predicate); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); diff --git a/dali/pipeline/operator/builtin/external_source_test.cc b/dali/pipeline/operator/builtin/external_source_test.cc index 59bcf751ed..3b4dc4bc3d 100644 --- a/dali/pipeline/operator/builtin/external_source_test.cc +++ b/dali/pipeline/operator/builtin/external_source_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -352,9 +352,7 @@ class ExternalSourceTest : public::testing::WithParamInterface, } void RunExe() { - exe_->RunCPU(); - exe_->RunMixed(); - exe_->RunGPU(); + exe_->Run(); } bool RunOutputs() { @@ -651,7 +649,7 @@ TEST(ExternalSourceTestNoInput, ThrowCpu) { vector outputs = {"data_out_cpu"}; exe->Build(&graph, outputs); - exe->RunCPU(); + exe->Run(); Workspace ws; EXPECT_THROW(exe->ShareOutputs(&ws), std::exception); } @@ -692,8 +690,7 @@ void TestRunExternalSource(Pipeline &pipe, const std::string &name, cudaStreamSynchronize(0); pipe.SetExternalInput("es", input_gpu); } - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); TensorList output_cpu; pipe.Outputs(&ws); diff --git a/dali/pipeline/operator/op_spec_test.cc b/dali/pipeline/operator/op_spec_test.cc index 8733791311..a7cb66db6f 100644 --- a/dali/pipeline/operator/op_spec_test.cc +++ b/dali/pipeline/operator/op_spec_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -438,8 +438,7 @@ TEST(ArgumentInputTest, OpSpecAccess) { vector> outputs = {{"I need to specify something", "cpu"}}; pipe.Build(outputs); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); diff --git a/dali/pipeline/pipeline.cc b/dali/pipeline/pipeline.cc index 5cb3235b10..142be6ad39 100644 --- a/dali/pipeline/pipeline.cc +++ b/dali/pipeline/pipeline.cc @@ -100,7 +100,7 @@ Pipeline::Pipeline(int max_batch_size, int num_threads, int device_id, int64_t s bool set_affinity, int max_num_stream, int default_cuda_stream_priority) : built_(false), separated_execution_{false} { InitializeMemoryResources(); - Init(max_batch_size, num_threads, device_id, seed, pipelined_execution, separated_execution_, + Init(max_batch_size, num_threads, device_id, seed, pipelined_execution, async_execution, bytes_per_sample_hint, set_affinity, max_num_stream, default_cuda_stream_priority, QueueSizes{prefetch_queue_depth}); } @@ -136,7 +136,6 @@ Pipeline::Pipeline(const string &serialized_pipe, int batch_size, int num_thread Init(this->max_batch_size_, this->num_threads_, this->device_id_, seed, pipelined_execution, - separated_execution_, async_execution, bytes_per_sample_hint, set_affinity, @@ -172,7 +171,7 @@ Pipeline::~Pipeline() { } void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t seed, - bool pipelined_execution, bool separated_execution, bool async_execution, + bool pipelined_execution, bool async_execution, size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream, int default_cuda_stream_priority, QueueSizes prefetch_queue_depth) { // guard cudaDeviceGetStreamPriorityRange call @@ -183,13 +182,13 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t using Clock = std::chrono::high_resolution_clock; this->original_seed_ = seed < 0 ? Clock::now().time_since_epoch().count() : seed; this->pipelined_execution_ = pipelined_execution; - this->separated_execution_ = separated_execution; this->async_execution_ = async_execution; this->bytes_per_sample_hint_ = bytes_per_sample_hint; this->set_affinity_ = set_affinity; this->max_num_stream_ = max_num_stream; this->default_cuda_stream_priority_ = default_cuda_stream_priority; this->prefetch_queue_depth_ = prefetch_queue_depth; + this->separated_execution_ = (prefetch_queue_depth.cpu_size != prefetch_queue_depth.gpu_size); DALI_ENFORCE(max_batch_size_ > 0, "Max batch size must be greater than 0"); int lowest_cuda_stream_priority = 0, highest_cuda_stream_priority = 0; @@ -582,19 +581,24 @@ void Pipeline::SetOutputDescs(std::vector output_descs) { output_descs_ = std::move(output_descs); } -void Pipeline::RunCPU() { +void Pipeline::Run() { DALI_ENFORCE(built_, "\"Build()\" must be called prior to executing the pipeline."); repeat_last_.Refeed(*this); + repeat_last_.Refeed(*this); repeat_last_.Refeed(*this); - executor_->RunCPU(); + executor_->Run(); } -void Pipeline::RunGPU() { - DALI_ENFORCE(built_, - "\"Build()\" must be called prior to executing the pipeline."); - executor_->RunMixed(); - executor_->RunGPU(); +void Pipeline::Prefetch() { + auto sz = GetQueueSizes(); + for (int i = 0; i < sz.cpu_size; i++) + repeat_last_.Refeed(*this); + for (int i = 0; i < sz.gpu_size; i++) { + repeat_last_.Refeed(*this); + repeat_last_.Refeed(*this); + } + executor_->Prefetch(); } bool Pipeline::ValidateOutputs(const Workspace &ws) const { diff --git a/dali/pipeline/pipeline.h b/dali/pipeline/pipeline.h index 402f84bb94..626c22f195 100644 --- a/dali/pipeline/pipeline.h +++ b/dali/pipeline/pipeline.h @@ -273,11 +273,10 @@ class DLL_PUBLIC Pipeline { * @param async_execution Use worker threads for RunX() functions */ DLL_PUBLIC void SetExecutionTypes(bool pipelined_execution = true, - bool separated_execution = false, bool async_execution = true) { + bool async_execution = true) { DALI_ENFORCE(!built_, "Alterations to the pipeline after " "\"Build()\" has been called are not allowed - cannot change execution type."); pipelined_execution_ = pipelined_execution; - separated_execution_ = separated_execution; async_execution_ = async_execution; } @@ -381,12 +380,15 @@ class DLL_PUBLIC Pipeline { DALI_ENFORCE(!built_, "Alterations to the pipeline after " "\"Build()\" has been called are not allowed - cannot set queue sizes."); - DALI_ENFORCE(separated_execution_ || (cpu_size == gpu_size), - "Setting different queue sizes for non-separated execution is not allowed"); + separated_execution_ = (cpu_size != gpu_size); DALI_ENFORCE(cpu_size > 0 && gpu_size > 0, "Only positive queue sizes allowed"); prefetch_queue_depth_ = QueueSizes(cpu_size, gpu_size); } + DLL_PUBLIC QueueSizes GetQueueSizes() const { + return prefetch_queue_depth_; + } + /** @{ */ /** * @brief Set descriptors of the outputs of the pipeline. Used to update the graph without @@ -402,30 +404,28 @@ class DLL_PUBLIC Pipeline { /** @} */ /** - * @brief Run the cpu portion of the pipeline. + * @brief Run the pipeline */ - DLL_PUBLIC void RunCPU(); + DLL_PUBLIC void Run(); /** - * @brief Run the gpu portion of the pipeline. + * @brief Fills the prefetch queues */ - DLL_PUBLIC void RunGPU(); + DLL_PUBLIC void Prefetch(); /** * @brief Fills the input device workspace with the output of the pipeline. * Previously returned buffers are released. - * This method blocks until the next batch is complete. RunCPU, RunMixed and RunGPU - * must be called prior to calling this or this method will result in - * deadlock. + * This method blocks until the next batch is complete. Run must be called prior to calling this + * method or it will result in a deadlock. */ DLL_PUBLIC void Outputs(Workspace *ws); /** * @brief Fills the input device workspace with the output of the pipeline. * To release previously returned buffers ReleaseOutputs need to be called. - * This method blocks until the next batch is complete. RunCPU, RunMixed and RunGPU - * must be called prior to calling this or this method will result in - * deadlock. + * This method blocks until the next batch is complete. Run must be called prior to calling this + * method or it will result in a deadlock. */ DLL_PUBLIC void ShareOutputs(Workspace *ws); @@ -559,7 +559,7 @@ class DLL_PUBLIC Pipeline { * @brief Initializes the Pipeline internal state */ void Init(int batch_size, int num_threads, int device_id, int64_t seed, bool pipelined_execution, - bool separated_execution, bool async_execution, size_t bytes_per_sample_hint, + bool async_execution, size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream, int default_cuda_stream_priority, QueueSizes prefetch_queue_depth = QueueSizes{2}); @@ -668,24 +668,24 @@ class DLL_PUBLIC Pipeline { const int MAX_SEEDS = 1024; - bool built_; - int max_batch_size_, num_threads_, device_id_; - bool pipelined_execution_; - bool separated_execution_; - bool async_execution_; - size_t bytes_per_sample_hint_; - int set_affinity_; - int max_num_stream_; - int default_cuda_stream_priority_; + bool built_ = false; + int max_batch_size_ = 1, num_threads_ = 0, device_id_ = CPU_ONLY_DEVICE_ID; + bool pipelined_execution_ = false; + bool separated_execution_ = false; + bool async_execution_ = false; + size_t bytes_per_sample_hint_ = 0; + int set_affinity_ = 0; + int max_num_stream_ = 0; + int default_cuda_stream_priority_ = 0; int next_logical_id_ = 0; int next_internal_logical_id_ = -1; - QueueSizes prefetch_queue_depth_; + QueueSizes prefetch_queue_depth_{}; bool enable_memory_stats_ = false; bool checkpointing_ = false; std::vector seed_; - int original_seed_; - size_t current_seed_; + int original_seed_ = 0; + size_t current_seed_ = 0; std::unique_ptr executor_; OpGraph graph_; @@ -768,6 +768,10 @@ class DLL_PUBLIC Pipeline { std::optional data_id; }; + bool empty() const { + return cpu_nodes_.empty() && gpu_nodes_.empty() && mixed_nodes_.empty(); + } + std::map> cpu_nodes_; std::map> gpu_nodes_; std::map> mixed_nodes_; diff --git a/dali/pipeline/pipeline_test.cc b/dali/pipeline/pipeline_test.cc index b8e3df0703..5ce3747176 100644 --- a/dali/pipeline/pipeline_test.cc +++ b/dali/pipeline/pipeline_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -456,8 +456,7 @@ TEST_F(PipelineTestOnce, TestPresize) { pipe.Build(outputs); pipe.SetExternalInput("raw_jpegs", data); Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); // we should not presize CPU buffers if they are not pinned @@ -575,7 +574,7 @@ TEST_F(PrefetchedPipelineTest, TestFillQueues) { Pipeline pipe(batch_size, 4, 0); // Cannot test async while setting external input - need to make sure that - pipe.SetExecutionTypes(true, true, true); + pipe.SetExecutionTypes(true, true); // Test coprime queue sizes pipe.SetQueueSizes(CPU, GPU); pipe.AddExternalInput("data"); @@ -595,62 +594,52 @@ TEST_F(PrefetchedPipelineTest, TestFillQueues) { test::MakeRandomBatch(tl, batch_size * N); // Split the batch into 5 - std::array, N> splited_tl; + std::array, N> split_tl; std::array>, N> shapes; for (int i = 0; i < N; i++) { shapes[i].resize(batch_size); for (int j = 0; j < batch_size; j++) { shapes[i][j] = tl.tensor_shape(i * batch_size + j); } - splited_tl[i].Resize({shapes[i]}, DALI_UINT8); + split_tl[i].Resize({shapes[i]}, DALI_UINT8); } for (int i = 0; i < N; i++) { for (int j = 0; j < batch_size; j++) { std::memcpy( - splited_tl[i].template mutable_tensor(j), + split_tl[i].template mutable_tensor(j), tl.template tensor(i * batch_size + j), volume(tl.tensor_shape(i * batch_size + j))); } } - // Fill queues in the same way as Python - this would be the first pipe.run() - for (int i = 0; i < GPU; i++) { - pipe.SetExternalInput("data", splited_tl[i]); - pipe.RunCPU(); - pipe.RunGPU(); - } - // We run CPU stage additional `CPU`-times, to fill the output queue - for (int i = GPU; i < GPU + CPU; i++) { - pipe.SetExternalInput("data", splited_tl[i]); - pipe.RunCPU(); + // Fill queues + int i = 0; + for (; i < std::max(CPU, GPU); i++) { + pipe.SetExternalInput("data", split_tl[i]); + if (i < std::min(CPU, GPU)) + pipe.Run(); } + pipe.Prefetch(); // Now we interleave the calls to Outputs() and Run() for the rest of the batch int obtained_outputs = 0; - for (int i = GPU + CPU; i < N; i++) { + for (; i < N; i++) { Workspace ws; pipe.Outputs(&ws); test::CheckResults(ws, batch_size, obtained_outputs++, tl); - pipe.SetExternalInput("data", splited_tl[i]); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.SetExternalInput("data", split_tl[i]); + pipe.Run(); } // We consumed all the data and have it in the Pipeline, now we need to run // Mixed and GPU stage to consume what was produced by the CPU - for (int i = 0; i < CPU; i++) { + /*for (int i = 0; i < CPU; i++) { Workspace ws; pipe.Outputs(&ws); test::CheckResults(ws, batch_size, obtained_outputs++, tl); pipe.RunGPU(); - } - // Now we consule what we buffered in the beggining - for (int i = 0; i < GPU; i++) { - Workspace ws; - pipe.Outputs(&ws); - test::CheckResults(ws, batch_size, obtained_outputs++, tl); - } + }*/ } class DummyOpToAdd : public Operator { @@ -847,8 +836,7 @@ TEST(PipelineTest, MultiOutputInputOp) { inp.mutable_tensor(0)[0] = input; pipe.SetExternalInput("DummyInput", inp); - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); Workspace ws; pipe.Outputs(&ws); diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index 3f2ec90306..03c644c4e8 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -1887,7 +1887,7 @@ PYBIND11_MODULE(backend_impl, m) { .def("Shutdown", &Pipeline::Shutdown, py::call_guard()) .def("SetExecutionTypes", [](Pipeline *p, bool exec_pipelined, bool exec_separated, bool exec_async) { - p->SetExecutionTypes(exec_pipelined, exec_separated, exec_async); + p->SetExecutionTypes(exec_pipelined, exec_async); }, "exec_pipelined"_a = true, "exec_separated"_a = false, @@ -1927,8 +1927,7 @@ PYBIND11_MODULE(backend_impl, m) { } p->SetOutputDescs(out_desc); }) - .def("RunCPU", &Pipeline::RunCPU, py::call_guard()) - .def("RunGPU", &Pipeline::RunGPU, py::call_guard()) + .def("Run", &Pipeline::Run, py::call_guard()) .def("Outputs", [](Pipeline *p) { Workspace ws; diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 35078525bb..9182dd7b1b 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -1271,8 +1271,7 @@ def _run_once(self): # Special case to prevent a deadlock if user didn't release the only buffer if not self._exec_async and self._prefetch_queue_depth == 1: self.release_outputs() - self._run_cpu() - self._run_gpu() + self._run() except StopIteration: self._last_iter = True diff --git a/dali/test/dali_operator_test.h b/dali/test/dali_operator_test.h index f6f8084c80..01f96c7bff 100644 --- a/dali/test/dali_operator_test.h +++ b/dali/test/dali_operator_test.h @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -131,8 +131,7 @@ inline Workspace CreateWorkspace() { inline void RunPipeline(Pipeline &pipeline) { - pipeline.RunCPU(); - pipeline.RunGPU(); + pipeline.Run(); } inline std::vector diff --git a/dali/test/dali_plugin_manager_test.cc b/dali/test/dali_plugin_manager_test.cc index 77842ae097..a1f1d0152b 100644 --- a/dali/test/dali_plugin_manager_test.cc +++ b/dali/test/dali_plugin_manager_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -59,8 +59,7 @@ class DummyTest : public ::dali::DALITest { pipe.Build(outputs); pipe.SetExternalInput("data", data); dali::Workspace ws; - pipe.RunCPU(); - pipe.RunGPU(); + pipe.Run(); pipe.Outputs(&ws); dali::test::CheckResults(ws, 3, 0, data); diff --git a/dali/test/dali_test_bboxes.h b/dali/test/dali_test_bboxes.h index e938ba581a..0c35462576 100644 --- a/dali/test/dali_test_bboxes.h +++ b/dali/test/dali_test_bboxes.h @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. #ifndef DALI_TEST_DALI_TEST_BBOXES_H_ #define DALI_TEST_DALI_TEST_BBOXES_H_ @@ -49,8 +49,7 @@ class GenericBBoxesTest : public DALISingleOpTest { this->SetTestCheckType(this->GetTestCheckType()); pipe->Build(DALISingleOpTest::outputs_); this->FillExternalInputs(); - pipe->RunCPU(); - pipe->RunGPU(); + pipe->Run(); Workspace ws; pipe->Outputs(&ws); @@ -89,8 +88,7 @@ class GenericBBoxesTest : public DALISingleOpTest { this->SetTestCheckType(this->GetTestCheckType()); pipe->Build({{"cropped_images", "gpu"}, {"resized_boxes", "gpu"}}); this->FillExternalInputs(); - pipe->RunCPU(); - pipe->RunGPU(); + pipe->Run(); Workspace ws; pipe->Outputs(&ws); @@ -136,8 +134,7 @@ class GenericBBoxesTest : public DALISingleOpTest { this->SetTestCheckType(this->GetTestCheckType()); pipe->Build({{"cropped_images", "cpu"}, {"resized_boxes", "cpu"}}); this->FillExternalInputs(); - pipe->RunCPU(); - pipe->RunGPU(); + pipe->Run(); Workspace ws; pipe->Outputs(&ws); diff --git a/dali/test/dali_test_checkpointing.h b/dali/test/dali_test_checkpointing.h index 439db63b9b..912fc8b580 100644 --- a/dali/test/dali_test_checkpointing.h +++ b/dali/test/dali_test_checkpointing.h @@ -80,8 +80,7 @@ class PipelineWrapper { template std::vector RunIteration() { - pipe_->RunCPU(); - pipe_->RunGPU(); + pipe_->Run(); pipe_->Outputs(&ws_); auto collect_value_from_each_sample = [](const TensorList &data) { diff --git a/dali/test/dali_test_single_op.h b/dali/test/dali_test_single_op.h index da47c4619f..acce95ffa5 100644 --- a/dali/test/dali_test_single_op.h +++ b/dali/test/dali_test_single_op.h @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -281,8 +281,7 @@ class DALISingleOpTest : public DALITest { void RunOperator(Workspace *ws) { SetTestCheckType(GetTestCheckType()); FillExternalInputs(); - pipeline_->RunCPU(); - pipeline_->RunGPU(); + pipeline_->Run(); pipeline_->Outputs(ws); } diff --git a/include/dali/c_api.h b/include/dali/c_api.h index 9f8de2276b..7e011d1efb 100644 --- a/include/dali/c_api.h +++ b/include/dali/c_api.h @@ -398,13 +398,23 @@ DLL_PUBLIC int daliGetExternalInputNdim(daliPipelineHandle *pipe_handle, const c */ DLL_PUBLIC void daliRun(daliPipelineHandle *pipe_handle); +/** + * @brief Schedule first runs to fill buffers for Executor. + */ +DLL_PUBLIC void daliPrefetch(daliPipelineHandle *pipe_handle); + /** * @brief Schedule first runs to fill buffers for Executor with UniformQueue policy. + * @param queue_depth Ignored; must be equal to the pipeline's queue depth + * @deprecated Use `daliPrefetch` instead */ DLL_PUBLIC void daliPrefetchUniform(daliPipelineHandle *pipe_handle, int queue_depth); /** * @brief Schedule first runs to fill buffers for Executor with SeparateQueue policy. + * @param cpu_queue_depth Ignored; must be equal to the pipeline's CPU queue depth + * @param gpu_queue_depth Ignored; must be equal to the pipeline's GPU queue depth + * @deprecated Use `daliPrefetch` instead */ DLL_PUBLIC void daliPrefetchSeparate(daliPipelineHandle *pipe_handle, int cpu_queue_depth, int gpu_queue_depth); From 9197891a5e6cbe069d6e356122e923987c423bac Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Wed, 13 Dec 2023 15:37:55 +0100 Subject: [PATCH 02/15] Add Pipeline::InputFeedCount. Add Pipeline::Prefetch. Rework prefetching mechanism in Python. Fix tests. Signed-off-by: Michal Zientkiewicz --- dali/c_api/c_api.cc | 12 ++---- dali/c_api/c_api_test.cc | 3 ++ dali/c_api/operator_trace_test.cc | 2 +- dali/operators/input/input_operator_test.cu | 2 +- dali/operators/reader/reader_op_test.cc | 1 + dali/operators/reader/video_reader_op_test.cc | 1 + .../executor/async_pipelined_executor.h | 14 +++--- .../async_separated_pipelined_executor.cc | 28 +++++++++++- .../async_separated_pipelined_executor.h | 18 +++++--- dali/pipeline/executor/executor.cc | 15 +++++++ dali/pipeline/executor/executor.h | 3 ++ dali/pipeline/pipeline.cc | 37 +++++++++++----- dali/pipeline/pipeline.h | 43 +++++++++++++------ dali/pipeline/pipeline_test.cc | 19 ++------ dali/python/backend_impl.cc | 2 +- dali/python/nvidia/dali/pipeline.py | 39 ++--------------- include/dali/c_api.h | 2 +- 17 files changed, 139 insertions(+), 102 deletions(-) diff --git a/dali/c_api/c_api.cc b/dali/c_api/c_api.cc index cf7d6d9e6f..8ed1fe197b 100644 --- a/dali/c_api/c_api.cc +++ b/dali/c_api/c_api.cc @@ -251,15 +251,11 @@ daliCreatePipeline2(daliPipelineHandle *pipe_handle, const char *serialized_pipe bool se = separated_execution != 0; bool pe = pipelined_execution != 0; bool ae = async_execution != 0; - if (cpu_prefetch_queue_depth == gpu_prefetch_queue_depth) { - if (se) - DALI_WARN("Setting separated_execution to True has no effect if the queue sizes are equal"); - se = false; - } + auto pipeline = std::make_unique(std::string(serialized_pipeline, length), max_batch_size, num_threads, device_id, pe, prefetch_queue_depth, ae); - pipeline->SetExecutionTypes(pe, ae); + pipeline->SetExecutionTypes(pe, se, ae); if (se) { pipeline->SetQueueSizes(cpu_prefetch_queue_depth, gpu_prefetch_queue_depth); } @@ -298,7 +294,7 @@ void daliPrefetchUniform(daliPipelineHandle_t pipe_handle, int queue_depth) { auto sz = pipeline->GetQueueSizes(); if (queue_depth != sz.cpu_size || queue_depth != sz.gpu_size) { DALI_WARN("daliPrefetchUniform is deprecated and setting queue_length different than" - " the one set ing the pipeline has no effect"); + " the one set in the pipeline has no effect. Use daliPrefetch instead."); } pipeline->Prefetch(); } @@ -310,7 +306,7 @@ void daliPrefetchSeparate(daliPipelineHandle_t pipe_handle, auto sz = pipeline->GetQueueSizes(); if (cpu_queue_depth != sz.cpu_size || gpu_queue_depth != sz.gpu_size) { DALI_WARN("daliPrefetchUniform is deprecated and setting queue_length different than" - " the one set ing the pipeline has no effect"); + " the one set in the pipeline has no effect. Use daliPrefetch instead."); } pipeline->Prefetch(); } diff --git a/dali/c_api/c_api_test.cc b/dali/c_api/c_api_test.cc index 20c4ad1d13..9dcdef3213 100644 --- a/dali/c_api/c_api_test.cc +++ b/dali/c_api/c_api_test.cc @@ -414,6 +414,8 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocVariableBatchSizePipe) { for (int i = 0; i < prefetch_queue_depth; i++) { pipe_ptr->Run(); } + daliPrefetch(&handle); + dali::Workspace ws; for (int i = 0; i < prefetch_queue_depth; i++) { ComparePipelinesOutputs(handle, *pipe_ptr, DALI_ext_default, @@ -479,6 +481,7 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocPipe) { input_shape.sample_dim(), "HWC", cuda_stream, DALI_ext_default); daliRun(&handle); pipe_ptr->Run(); + ComparePipelinesOutputs(handle, *pipe_ptr); daliDeletePipeline(&handle); } diff --git a/dali/c_api/operator_trace_test.cc b/dali/c_api/operator_trace_test.cc index 0f8ced9810..efec1bb59e 100644 --- a/dali/c_api/operator_trace_test.cc +++ b/dali/c_api/operator_trace_test.cc @@ -77,7 +77,7 @@ class OperatorTraceTest : public ::testing::TestWithParam(batch_size_, num_threads_, device_id_, -1, exec_pipelined_, cpu_queue_depth_, exec_async_); - pipeline_->SetExecutionTypes(exec_pipelined_, exec_async_); + pipeline_->SetExecutionTypes(exec_pipelined_, exec_separated_, exec_async_); PutTogetherDaliGraph(); diff --git a/dali/operators/input/input_operator_test.cu b/dali/operators/input/input_operator_test.cu index 0bac1ff9e4..2e0f79a668 100644 --- a/dali/operators/input/input_operator_test.cu +++ b/dali/operators/input/input_operator_test.cu @@ -88,7 +88,7 @@ class InputOperatorMixedTest : public ::testing::TestWithParam(batch_size_, num_threads_, device_id_, -1, exec_pipelined_, cpu_queue_depth_, exec_async_); - pipeline_->SetExecutionTypes(exec_pipelined_, exec_async_); + pipeline_->SetExecutionTypes(exec_pipelined_, exec_separated_, exec_async_); PutTogetherDaliGraph(); diff --git a/dali/operators/reader/reader_op_test.cc b/dali/operators/reader/reader_op_test.cc index 896f90bc77..bea666d538 100644 --- a/dali/operators/reader/reader_op_test.cc +++ b/dali/operators/reader/reader_op_test.cc @@ -216,6 +216,7 @@ TYPED_TEST(ReaderTest, SequenceTest) { Workspace ws; for (int i = 0; i < 4; ++i) { pipe.Run(); + pipe.Outputs(&ws); auto shape = ws.Output(0).AsTensor().shape(); // We have NFHWC format const auto batch_size = shape[0]; diff --git a/dali/operators/reader/video_reader_op_test.cc b/dali/operators/reader/video_reader_op_test.cc index c4edaf46bd..03444fc6c2 100644 --- a/dali/operators/reader/video_reader_op_test.cc +++ b/dali/operators/reader/video_reader_op_test.cc @@ -70,6 +70,7 @@ TEST_F(VideoReaderTest, VariableFrameRate2) { EXPECT_THROW([&]() { for (int i = 0; i < 10; ++i) { pipe.Run(); + pipe.Outputs(&ws); } }(), std::runtime_error); } diff --git a/dali/pipeline/executor/async_pipelined_executor.h b/dali/pipeline/executor/async_pipelined_executor.h index 1af2237d84..7e09a65175 100644 --- a/dali/pipeline/executor/async_pipelined_executor.h +++ b/dali/pipeline/executor/async_pipelined_executor.h @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -83,12 +83,6 @@ class DLL_PUBLIC AsyncPipelinedExecutor : public PipelinedExecutor { } } - DLL_PUBLIC void RunCPU() override; - - DLL_PUBLIC void RunMixed() override; - - DLL_PUBLIC void RunGPU() override; - DLL_PUBLIC void Outputs(Workspace *ws) override { CheckForErrors(); try { @@ -109,6 +103,12 @@ class DLL_PUBLIC AsyncPipelinedExecutor : public PipelinedExecutor { } protected: + DLL_PUBLIC void RunCPU() override; + + DLL_PUBLIC void RunMixed() override; + + DLL_PUBLIC void RunGPU() override; + void CheckForErrors() { cpu_thread_.CheckForErrors(); mixed_thread_.CheckForErrors(); diff --git a/dali/pipeline/executor/async_separated_pipelined_executor.cc b/dali/pipeline/executor/async_separated_pipelined_executor.cc index 3a69335312..5b4af6e5c6 100644 --- a/dali/pipeline/executor/async_separated_pipelined_executor.cc +++ b/dali/pipeline/executor/async_separated_pipelined_executor.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. +// Copyright (c) 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,4 +32,30 @@ void AsyncSeparatedPipelinedExecutor::RunGPU() { gpu_thread_.DoWork([this]() { SeparatedPipelinedExecutor::RunGPU(); }); } +void AsyncSeparatedPipelinedExecutor::Prefetch() { + for (int i = 0; i < queue_sizes_.gpu_size; i++) { + RunCPU(); + RunMixed(); + RunGPU(); + } + + for (int i = 0; i < queue_sizes_.cpu_size; i++) { + RunCPU(); + } +} + +int AsyncSeparatedPipelinedExecutor::InputFeedCount(const std::string &op_name) { + OpNode &node = graph_->Node(op_name); + switch (node.op_type) { + case OpType::CPU: + return queue_sizes_.cpu_size + queue_sizes_.gpu_size; + case OpType::MIXED: + case OpType::GPU: + return queue_sizes_.gpu_size; + default: + assert(!"Unreachable code detected"); + return 0; + } +} + } // namespace dali diff --git a/dali/pipeline/executor/async_separated_pipelined_executor.h b/dali/pipeline/executor/async_separated_pipelined_executor.h index 38d73956d6..935a835b5c 100644 --- a/dali/pipeline/executor/async_separated_pipelined_executor.h +++ b/dali/pipeline/executor/async_separated_pipelined_executor.h @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -76,12 +76,6 @@ class DLL_PUBLIC AsyncSeparatedPipelinedExecutor : public SeparatedPipelinedExec } } - DLL_PUBLIC void RunCPU() override; - - DLL_PUBLIC void RunMixed() override; - - DLL_PUBLIC void RunGPU() override; - DLL_PUBLIC void Outputs(Workspace *ws) override { CheckForErrors(); try { @@ -97,7 +91,17 @@ class DLL_PUBLIC AsyncSeparatedPipelinedExecutor : public SeparatedPipelinedExec } } + DLL_PUBLIC int InputFeedCount(const std::string &op_name) override; + protected: + DLL_PUBLIC void Prefetch() override; + + DLL_PUBLIC void RunCPU() override; + + DLL_PUBLIC void RunMixed() override; + + DLL_PUBLIC void RunGPU() override; + void CheckForErrors() { cpu_thread_.CheckForErrors(); mixed_thread_.CheckForErrors(); diff --git a/dali/pipeline/executor/executor.cc b/dali/pipeline/executor/executor.cc index 54d1da8578..55f3305d4a 100644 --- a/dali/pipeline/executor/executor.cc +++ b/dali/pipeline/executor/executor.cc @@ -348,6 +348,21 @@ void Executor::Run() { RunGPU(); } +template +int Executor::InputFeedCount(const std::string &op_name) { + OpNode &node = graph_->Node(op_name); + switch (node.op_type) { + case OpType::CPU: + return queue_sizes_.cpu_size; + case OpType::MIXED: + case OpType::GPU: + return queue_sizes_.gpu_size; + default: + assert(!"Unreachable code detected"); + return 0; + } +} + template void Executor::Prefetch() { int i; diff --git a/dali/pipeline/executor/executor.h b/dali/pipeline/executor/executor.h index cadc54f921..079a6cdb97 100644 --- a/dali/pipeline/executor/executor.h +++ b/dali/pipeline/executor/executor.h @@ -82,6 +82,7 @@ class DLL_PUBLIC ExecutorBase { DLL_PUBLIC virtual void Shutdown() = 0; DLL_PUBLIC virtual Checkpoint& GetCurrentCheckpoint() = 0; DLL_PUBLIC virtual void RestoreStateFromCheckpoint(const Checkpoint &cpt) = 0; + DLL_PUBLIC virtual int InputFeedCount(const std::string &input_name) = 0; protected: // virtual to allow the TestPruneWholeGraph test in gcc @@ -162,6 +163,8 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy { */ DLL_PUBLIC void RestoreStateFromCheckpoint(const Checkpoint &cpt) override; + DLL_PUBLIC int InputFeedCount(const std::string &op_name) override; + protected: DLL_PUBLIC virtual void RunCPU(); DLL_PUBLIC virtual void RunMixed(); diff --git a/dali/pipeline/pipeline.cc b/dali/pipeline/pipeline.cc index 142be6ad39..3e4a92601d 100644 --- a/dali/pipeline/pipeline.cc +++ b/dali/pipeline/pipeline.cc @@ -584,20 +584,14 @@ void Pipeline::SetOutputDescs(std::vector output_descs) { void Pipeline::Run() { DALI_ENFORCE(built_, "\"Build()\" must be called prior to executing the pipeline."); - repeat_last_.Refeed(*this); - repeat_last_.Refeed(*this); - repeat_last_.Refeed(*this); + repeat_last_.Refeed(*this); executor_->Run(); } void Pipeline::Prefetch() { - auto sz = GetQueueSizes(); - for (int i = 0; i < sz.cpu_size; i++) - repeat_last_.Refeed(*this); - for (int i = 0; i < sz.gpu_size; i++) { - repeat_last_.Refeed(*this); - repeat_last_.Refeed(*this); - } + DALI_ENFORCE(built_, + "\"Build()\" must be called prior to executing the pipeline."); + repeat_last_.Refeed(*this, true); executor_->Prefetch(); } @@ -811,7 +805,7 @@ std::map Pipeline::GetReaderMeta() { return ret; } -ReaderMeta Pipeline::GetReaderMeta(std::string name) { +ReaderMeta Pipeline::GetReaderMeta(const std::string &name) { ReaderMeta meta; for (Index i = 0; i < graph_.NumOp(); ++i) { const OpNode ¤t = graph_.Node(i); @@ -823,6 +817,9 @@ ReaderMeta Pipeline::GetReaderMeta(std::string name) { return meta; } +int Pipeline::InputFeedCount(const std::string &name) { + return executor_->InputFeedCount(name); +} const TensorLayout &Pipeline::GetInputLayout(const std::string &name) { DALI_ENFORCE(built_, "\"Build()\" must be called prior to calling \"GetInputLayout()\"."); @@ -1107,4 +1104,22 @@ void Pipeline::RepeatLastInputs::FindNodes(const OpGraph &graph) { } } +template +void Pipeline::RepeatLastInputs::Refeed(Pipeline &owner, bool fill_queue) { + auto &nodes = GetNodes(); + for (auto &[name, node] : nodes) { + int count = fill_queue ? owner.InputFeedCount(name.c_str()) : 1; + for (int i = 0; i < count; i++) + owner.SetExternalInputHelper(name, node.last_input, node.data_id, node.last_input.order(), + InputOperatorSettingMode{false, false, InputOperatorNoCopyMode::FORCE_NO_COPY}, + true); + } +} + +void Pipeline::RepeatLastInputs::Refeed(Pipeline &owner, bool fill_queue) { + Refeed(owner, fill_queue); + Refeed(owner, fill_queue); + Refeed(owner, fill_queue); +} + } // namespace dali diff --git a/dali/pipeline/pipeline.h b/dali/pipeline/pipeline.h index 626c22f195..07b0cfb591 100644 --- a/dali/pipeline/pipeline.h +++ b/dali/pipeline/pipeline.h @@ -273,10 +273,12 @@ class DLL_PUBLIC Pipeline { * @param async_execution Use worker threads for RunX() functions */ DLL_PUBLIC void SetExecutionTypes(bool pipelined_execution = true, + bool separated_execution = false, bool async_execution = true) { DALI_ENFORCE(!built_, "Alterations to the pipeline after " "\"Build()\" has been called are not allowed - cannot change execution type."); pipelined_execution_ = pipelined_execution; + separated_execution_ = separated_execution; async_execution_ = async_execution; } @@ -380,7 +382,8 @@ class DLL_PUBLIC Pipeline { DALI_ENFORCE(!built_, "Alterations to the pipeline after " "\"Build()\" has been called are not allowed - cannot set queue sizes."); - separated_execution_ = (cpu_size != gpu_size); + DALI_ENFORCE(separated_execution_ || (cpu_size == gpu_size), + "Setting different queue sizes for non-separated execution is not allowed"); DALI_ENFORCE(cpu_size > 0 && gpu_size > 0, "Only positive queue sizes allowed"); prefetch_queue_depth_ = QueueSizes(cpu_size, gpu_size); } @@ -410,9 +413,20 @@ class DLL_PUBLIC Pipeline { /** * @brief Fills the prefetch queues + * + * Runs a prefetching function in the executor so that internal and output queues are full. + * Note that it requires populating the external sources InputFeedCount(name) times. */ DLL_PUBLIC void Prefetch(); + /** + * @brief Calculates how many times a given input must be populated before the pipeline can be run + * + * @param input_name The name of the input, as specified in the input operator. + * @return The number of times that feed_input needs to be called. + */ + DLL_PUBLIC int InputFeedCount(const std::string &input_name); + /** * @brief Fills the input device workspace with the output of the pipeline. * Previously returned buffers are released. @@ -465,7 +479,7 @@ class DLL_PUBLIC Pipeline { /** * @brief Returns the reader meta for a node with given name */ - DLL_PUBLIC ReaderMeta GetReaderMeta(std::string name); + DLL_PUBLIC ReaderMeta GetReaderMeta(const std::string &name); /** * @brief Get the data layout required by the external input with a given name. @@ -718,7 +732,8 @@ class DLL_PUBLIC Pipeline { * This class maintains a list of such nodes, stores the most recently fed input and re-submits * it if no new data was fed. */ - struct RepeatLastInputs { + class RepeatLastInputs { + public: void FindNodes(const OpGraph &graph); template @@ -756,8 +771,18 @@ class DLL_PUBLIC Pipeline { return true; } + /** + * @brief Feeds the recently set inputs to the inputs that have `repeat_last` property + * + * @param owner The pipeline + * @param fill_queue If true, the inputs are fed `InputFeedCount(name)` times; + * otherwise it's fed once. + */ + void Refeed(Pipeline &owner, bool fill_queue = false); + + private: template - void Refeed(Pipeline &owner); + void Refeed(Pipeline &owner, bool fill_queue); template struct RepeatLastInput { @@ -790,16 +815,6 @@ class DLL_PUBLIC Pipeline { RepeatLastInputs repeat_last_; }; -template -void Pipeline::RepeatLastInputs::Refeed(Pipeline &owner) { - auto &nodes = GetNodes(); - for (auto &[name, node] : nodes) { - owner.SetExternalInputHelper(name, node.last_input, node.data_id, node.last_input.order(), - InputOperatorSettingMode{false, false, InputOperatorNoCopyMode::FORCE_NO_COPY}, - true); - } -} - } // namespace dali #endif // DALI_PIPELINE_PIPELINE_H_ diff --git a/dali/pipeline/pipeline_test.cc b/dali/pipeline/pipeline_test.cc index 5ce3747176..a7b9c0fb3f 100644 --- a/dali/pipeline/pipeline_test.cc +++ b/dali/pipeline/pipeline_test.cc @@ -574,7 +574,7 @@ TEST_F(PrefetchedPipelineTest, TestFillQueues) { Pipeline pipe(batch_size, 4, 0); // Cannot test async while setting external input - need to make sure that - pipe.SetExecutionTypes(true, true); + pipe.SetExecutionTypes(true, true, true); // Test coprime queue sizes pipe.SetQueueSizes(CPU, GPU); pipe.AddExternalInput("data"); @@ -615,11 +615,9 @@ TEST_F(PrefetchedPipelineTest, TestFillQueues) { // Fill queues int i = 0; - for (; i < std::max(CPU, GPU); i++) { + int feed_count = pipe.InputFeedCount("data"); + for (; i < feed_count; i++) pipe.SetExternalInput("data", split_tl[i]); - if (i < std::min(CPU, GPU)) - pipe.Run(); - } pipe.Prefetch(); // Now we interleave the calls to Outputs() and Run() for the rest of the batch @@ -631,15 +629,6 @@ TEST_F(PrefetchedPipelineTest, TestFillQueues) { pipe.SetExternalInput("data", split_tl[i]); pipe.Run(); } - - // We consumed all the data and have it in the Pipeline, now we need to run - // Mixed and GPU stage to consume what was produced by the CPU - /*for (int i = 0; i < CPU; i++) { - Workspace ws; - pipe.Outputs(&ws); - test::CheckResults(ws, batch_size, obtained_outputs++, tl); - pipe.RunGPU(); - }*/ } class DummyOpToAdd : public Operator { @@ -786,7 +775,7 @@ class DummyInputOperator: public InputOperator { int data = input.tensor(0)[0]; auto &out0 = ws.Output(0); auto &out1 = ws.Output(1); - auto out_shape = TensorListShape<-1>(1); + auto out_shape = TensorListShape<-1>(1, 1); out_shape.set_tensor_shape(0, {1}); out0.Resize(out_shape, DALIDataType::DALI_FLOAT); diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index 03c644c4e8..e21da68b05 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -1887,7 +1887,7 @@ PYBIND11_MODULE(backend_impl, m) { .def("Shutdown", &Pipeline::Shutdown, py::call_guard()) .def("SetExecutionTypes", [](Pipeline *p, bool exec_pipelined, bool exec_separated, bool exec_async) { - p->SetExecutionTypes(exec_pipelined, exec_async); + p->SetExecutionTypes(exec_pipelined, exec_separated, exec_async); }, "exec_pipelined"_a = true, "exec_separated"_a = false, diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 9182dd7b1b..51c25a2104 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -251,8 +251,6 @@ def __init__( self._consumer_iter = 0 self._consumer_epoch_idx = 0 self._batches_to_consume = 0 - self._cpu_batches_to_consume = 0 - self._gpu_batches_to_consume = 0 self._names_and_devices = None self._exec_async = exec_async self._bytes_per_sample = bytes_per_sample @@ -1073,12 +1071,11 @@ def outputs(self): """ with self._check_api_type_scope(types.PipelineAPIType.SCHEDULED): self._consumer_iter += 1 - if self._batches_to_consume == 0 or self._gpu_batches_to_consume == 0: + if self._batches_to_consume == 0: self._consumer_iter = 0 self._consumer_epoch_idx += 1 raise StopIteration self._batches_to_consume -= 1 - self._gpu_batches_to_consume -= 1 return self._outputs() def schedule_run(self): @@ -1123,12 +1120,11 @@ def share_outputs(self): """ with self._check_api_type_scope(types.PipelineAPIType.SCHEDULED): self._consumer_iter += 1 - if self._batches_to_consume == 0 or self._gpu_batches_to_consume == 0: + if self._batches_to_consume == 0: self._consumer_iter = 0 self._consumer_epoch_idx += 1 raise StopIteration self._batches_to_consume -= 1 - self._gpu_batches_to_consume -= 1 return self._pipe.ShareOutputs() # for the backward compatibility @@ -1253,7 +1249,8 @@ def _prefetch(self): raise RuntimeError("The pipeline was destroyed.") self._schedule_py_workers() if self._exec_separated: - self._fill_separated_queues() + for _ in range(self._prefetch_queue_depth.cpu_size + self._prefetch_queue_depth.gpu_size): + self._run_once() else: for _ in range(self._prefetch_queue_depth): self._run_once() @@ -1275,40 +1272,12 @@ def _run_once(self): except StopIteration: self._last_iter = True - def _run_up_to(self, stage_name): - """Call the `_run_X` up to `stage_name` (inclusive).""" - try: - if not self._last_iter: - self._iter_setup() - self._batches_to_consume += 1 - self._run_cpu() - if stage_name == "cpu": - return - self._run_gpu() - if stage_name == "gpu": - return - except StopIteration: - self._last_iter = True - def _schedule_py_workers(self): if self._py_pool is None: return for i, group in enumerate(self._parallel_input_callbacks): group.prefetch(self._py_pool, i, self._max_batch_size, self._epoch_idx) - def _fill_separated_queues(self): - """When using separated execution fill each of the prefetch queues""" - if not self._built: - raise RuntimeError("Pipeline must be built first.") - if not self._first_iter: - raise RuntimeError("Queues can be filled only on first iteration.") - if not self._exec_separated: - raise RuntimeError("This function should be only used with separated execution.") - for i in range(self._gpu_queue_size): - self._run_up_to("gpu") - for i in range(self._cpu_queue_size): - self._run_up_to("cpu") - def reset(self): """Resets pipeline iterator diff --git a/include/dali/c_api.h b/include/dali/c_api.h index 7e011d1efb..3cd42d2318 100644 --- a/include/dali/c_api.h +++ b/include/dali/c_api.h @@ -399,7 +399,7 @@ DLL_PUBLIC int daliGetExternalInputNdim(daliPipelineHandle *pipe_handle, const c DLL_PUBLIC void daliRun(daliPipelineHandle *pipe_handle); /** - * @brief Schedule first runs to fill buffers for Executor. + * @brief Schedule initial runs to fill the buffers. */ DLL_PUBLIC void daliPrefetch(daliPipelineHandle *pipe_handle); From 2cc317fdf4ed6486b9636e8332a9f9c338806c54 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 15 Dec 2023 18:08:17 +0100 Subject: [PATCH 03/15] Bug fixes, python stuff. Signed-off-by: Michal Zientkiewicz --- dali/c_api/c_api.cc | 5 ++ dali/c_api/operator_trace_test.cc | 10 ++-- dali/pipeline/pipeline.cc | 5 +- dali/pipeline/pipeline.h | 2 +- dali/python/backend_impl.cc | 1 + dali/python/nvidia/dali/external_source.py | 7 ++- dali/python/nvidia/dali/pipeline.py | 58 +++++++++++++++------- include/dali/c_api.h | 9 ++++ 8 files changed, 69 insertions(+), 28 deletions(-) diff --git a/dali/c_api/c_api.cc b/dali/c_api/c_api.cc index 8ed1fe197b..96353e7329 100644 --- a/dali/c_api/c_api.cc +++ b/dali/c_api/c_api.cc @@ -284,6 +284,11 @@ int daliGetMaxBatchSize(daliPipelineHandle_t pipe_handle) { return (*pipe_handle)->pipeline->max_batch_size(); } +int daliInputFeedCount(daliPipelineHandle_t pipe_handle, const char *input_name) { + auto &pipeline = (*pipe_handle)->pipeline; + return pipeline->InputFeedCount(input_name); +} + void daliPrefetch(daliPipelineHandle_t pipe_handle) { auto &pipeline = (*pipe_handle)->pipeline; pipeline->Prefetch(); diff --git a/dali/c_api/operator_trace_test.cc b/dali/c_api/operator_trace_test.cc index efec1bb59e..caea65e037 100644 --- a/dali/c_api/operator_trace_test.cc +++ b/dali/c_api/operator_trace_test.cc @@ -133,8 +133,8 @@ TEST_P(OperatorTraceTest, OperatorTraceTest) { num_threads_, device_id_, exec_pipelined_, exec_async_, exec_separated_, cpu_queue_depth_, cpu_queue_depth_, gpu_queue_depth_, 0); for (int iteration = 0; iteration < n_iterations_; iteration++) { + daliPrefetch(&h); auto prefetch_depth = std::min(cpu_queue_depth_, gpu_queue_depth_); - daliPrefetchUniform(&h, prefetch_depth); for (int i = 0; i < prefetch_depth; i++) { daliShareOutput(&h); @@ -234,7 +234,8 @@ TEST_P(OperatorTraceTestExternalInput, OperatorTraceTestExternalInput) { auto prefetch_depth = std::min(cpu_queue_depth_, gpu_queue_depth_); // Feed CPU input data. - for (int i = 0; i < prefetch_depth; i++) { + int feed_count_cpu = daliInputFeedCount(&h, "OP_TACE_IN_CPU"); + for (int i = 0; i < feed_count_cpu; i++) { size_t sample_size = 42; auto in_data = random_vector_cpu(rng, sample_size * batch_size_); std::vector shapes(batch_size_, sample_size); @@ -244,7 +245,8 @@ TEST_P(OperatorTraceTestExternalInput, OperatorTraceTestExternalInput) { } // Feed GPU input data. - for (int i = 0; i < prefetch_depth; i++) { + int feed_count_gpu = daliInputFeedCount(&h, "OP_TACE_IN_GPU"); + for (int i = 0; i < feed_count_gpu; i++) { int sample_size = 42; auto in_data = random_vector_gpu(rng, sample_size * batch_size_); std::vector shapes(batch_size_, sample_size); @@ -253,7 +255,7 @@ TEST_P(OperatorTraceTestExternalInput, OperatorTraceTestExternalInput) { shapes.data(), 1, nullptr, DALI_ext_default); } - daliPrefetchUniform(&h, prefetch_depth); + daliPrefetch(&h); for (int i = 0; i < prefetch_depth; i++) { daliShareOutput(&h); diff --git a/dali/pipeline/pipeline.cc b/dali/pipeline/pipeline.cc index 3e4a92601d..680ecb1111 100644 --- a/dali/pipeline/pipeline.cc +++ b/dali/pipeline/pipeline.cc @@ -100,7 +100,7 @@ Pipeline::Pipeline(int max_batch_size, int num_threads, int device_id, int64_t s bool set_affinity, int max_num_stream, int default_cuda_stream_priority) : built_(false), separated_execution_{false} { InitializeMemoryResources(); - Init(max_batch_size, num_threads, device_id, seed, pipelined_execution, + Init(max_batch_size, num_threads, device_id, seed, pipelined_execution, separated_execution_, async_execution, bytes_per_sample_hint, set_affinity, max_num_stream, default_cuda_stream_priority, QueueSizes{prefetch_queue_depth}); } @@ -136,6 +136,7 @@ Pipeline::Pipeline(const string &serialized_pipe, int batch_size, int num_thread Init(this->max_batch_size_, this->num_threads_, this->device_id_, seed, pipelined_execution, + separated_execution_, async_execution, bytes_per_sample_hint, set_affinity, @@ -171,7 +172,7 @@ Pipeline::~Pipeline() { } void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t seed, - bool pipelined_execution, bool async_execution, + bool pipelined_execution, bool separated_execution, bool async_execution, size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream, int default_cuda_stream_priority, QueueSizes prefetch_queue_depth) { // guard cudaDeviceGetStreamPriorityRange call diff --git a/dali/pipeline/pipeline.h b/dali/pipeline/pipeline.h index 07b0cfb591..6a12a9e875 100644 --- a/dali/pipeline/pipeline.h +++ b/dali/pipeline/pipeline.h @@ -573,7 +573,7 @@ class DLL_PUBLIC Pipeline { * @brief Initializes the Pipeline internal state */ void Init(int batch_size, int num_threads, int device_id, int64_t seed, bool pipelined_execution, - bool async_execution, size_t bytes_per_sample_hint, + bool separated_execution, bool async_execution, size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream, int default_cuda_stream_priority, QueueSizes prefetch_queue_depth = QueueSizes{2}); diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index e21da68b05..cf7108c64b 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -1985,6 +1985,7 @@ PYBIND11_MODULE(backend_impl, m) { } return ret; }) + .def("InputFeedCount", &Pipeline::InputFeedCount, "input_name"_a) .def("SetExternalTLInput", [](Pipeline *p, const string &name, const TensorList &tl, py::object /*cuda_stream*/, bool /*use_copy_kernel*/) { diff --git a/dali/python/nvidia/dali/external_source.py b/dali/python/nvidia/dali/external_source.py index ba443a392d..49486cc150 100644 --- a/dali/python/nvidia/dali/external_source.py +++ b/dali/python/nvidia/dali/external_source.py @@ -178,6 +178,9 @@ def append(self, instance): self.instances.append(instance) self.utilized_instances = self.instances + def feed_count(self, pipeline): + return pipeline.input_feed_count(self.utilized_instances[0]._name) + def disable_pruned_instances(self, pruned_mask): if len(pruned_mask) != len(self.instances): raise RuntimeError( @@ -392,7 +395,7 @@ class ExternalSource: `name` : str, optional The name of the data node. - Used when feeding the data in ``iter_setup`` and can be omitted if + Used when feeding the data with a call to ``feed_input`` and can be omitted if the data is provided by ``source``. `layout` : :ref:`layout str` or list/tuple thereof, optional @@ -974,7 +977,7 @@ def external_source( Creates a data node which is populated with data from a Python source. The data can be provided by the ``source`` function or iterable, or it can be provided by - ``pipeline.feed_input(name, data, layout, cuda_stream)`` inside ``pipeline.iter_setup``. + ``pipeline.feed_input(name, data, layout, cuda_stream)``. In the case of the GPU input, it is the user responsibility to modify the provided GPU memory content only using provided stream (DALI schedules a copy on it diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 51c25a2104..93af2552ca 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -246,7 +246,6 @@ def __init__( self._deserialized = False # Marked True when deserializing self._first_iter = True self._last_iter = False - self._iter = 0 self._epoch_idx = 0 self._consumer_iter = 0 self._consumer_epoch_idx = 0 @@ -282,6 +281,10 @@ def __init__( self._is_restored_from_checkpoint = False if type(prefetch_queue_depth) is dict: self._exec_separated = True + if not exec_async: + raise ValueError( + "`exec_async` must not evaluate to `False` when using separated queues." + ) self._cpu_queue_size = prefetch_queue_depth["cpu_size"] self._gpu_queue_size = prefetch_queue_depth["gpu_size"] elif type(prefetch_queue_depth) is int: @@ -903,7 +906,7 @@ def _restore_state_from_checkpoint(self): if self._checkpoint is not None: external_ctx_cpt = self._pipe.RestoreFromSerializedCheckpoint(self._checkpoint) self._consumer_epoch_idx = self._epoch_idx = external_ctx_cpt.epoch_idx - self._consumer_iter = self._iter = external_ctx_cpt.iter + self._consumer_iter = external_ctx_cpt.iter if self._input_callbacks: for group in self._input_callbacks: group.current_iter = external_ctx_cpt.iter @@ -933,7 +936,10 @@ def build(self): self._restore_state_from_checkpoint() self._built = True - def _feed_input(self, name, data, layout=None, cuda_stream=None, use_copy_kernel=False): + def input_feed_count(self, input_name): + return self._pipe.InputFeedCount(input_name) + + def _feed_input(self, name, data, layout=None, cuda_stream=None, use_copy_kernel=False, is_prefetch=False): from nvidia.dali.external_source import _prep_data_for_feed_input if cuda_stream is None: @@ -1248,13 +1254,18 @@ def _prefetch(self): if not self._pipe: raise RuntimeError("The pipeline was destroyed.") self._schedule_py_workers() + self._run_input_callbacks(True) + + prefetch_count = self._cpu_queue_size if self._exec_separated: - for _ in range(self._prefetch_queue_depth.cpu_size + self._prefetch_queue_depth.gpu_size): - self._run_once() - else: - for _ in range(self._prefetch_queue_depth): - self._run_once() + prefetch_count = self._cpu_queue_size + self._gpu_queue_size + + for i in range(prefetch_count): + self.iter_setup() + + self._batches_to_consume += self._gpu_queue_size self._first_iter = False + self._pipe.Prefetch() def _run_once(self): """Start running the whole pipeline once without waiting for its results. @@ -1268,7 +1279,7 @@ def _run_once(self): # Special case to prevent a deadlock if user didn't release the only buffer if not self._exec_async and self._prefetch_queue_depth == 1: self.release_outputs() - self._run() + self._pipe._run() except StopIteration: self._last_iter = True @@ -1286,7 +1297,6 @@ def reset(self): if self._last_iter: self._first_iter = True self._last_iter = False - self._iter = 0 self._epoch_idx += 1 if self._input_callbacks: for group in self._input_callbacks: @@ -1510,7 +1520,7 @@ def define_graph(self): It returns a list of outputs created by calling DALI Operators.""" raise NotImplementedError - def _run_input_callbacks(self): + def _run_input_callbacks(self, is_prefetch=False): if self._input_callbacks is None: return @@ -1518,16 +1528,20 @@ def _run_input_callbacks(self): stop_iter = False for i, group in enumerate(self._parallel_input_callbacks): try: - batches.append( - group.schedule_and_receive( - self, self._py_pool, i, self._max_batch_size, self._epoch_idx + count = group.feed_count(self) if is_prefetch else 1 + for i in range(count): + batches.append( + group.schedule_and_receive( + self, self._py_pool, i, self._max_batch_size, self._epoch_idx + ) ) - ) except StopIteration: stop_iter = True for group in self._seq_input_callbacks: try: - batches.append(group.get_batch(self, self._max_batch_size, self._epoch_idx)) + count = group.feed_count(self) if is_prefetch else 1 + for i in range(count): + batches.append(group.get_batch(self, self._max_batch_size, self._epoch_idx)) except StopIteration: stop_iter = True if stop_iter: @@ -1540,13 +1554,19 @@ def _run_input_callbacks(self): def _iter_setup(self): self._run_input_callbacks() self.iter_setup() - self._iter += 1 def iter_setup(self): - """This function can be overriden by user-defined + """A deprecated method of providing the pipeline with external inputs. + + This function can be overriden by user-defined pipeline to perform any needed setup for each iteration. For example, one can use this function to feed the input - data from NumPy arrays.""" + data from NumPy arrays. + + This method is deprecated and its use is discourage. Newer execution models may be + incompatible with this method of providing data to the pipeline. Use `source` argument + in ``external_source`` instead, where possible. + """ pass def _generate_build_args(self): diff --git a/include/dali/c_api.h b/include/dali/c_api.h index 3cd42d2318..30e0a41f15 100644 --- a/include/dali/c_api.h +++ b/include/dali/c_api.h @@ -229,6 +229,15 @@ DLL_PUBLIC void daliSetExternalInputDataId(daliPipelineHandle *pipe_handle, const char *operator_name, const char *data_id); +/** + * @brief Returns how many times daliSetExternalInput on a given input before calling daliPrefetch + * + * @param pipe_handle The handle to the pipeline + * @param input_name The name of the input in question + * @return The number of calls to be made + */ +DLL_PUBLIC int +daliInputFeedCount(daliPipelineHandle *pipe_handle, const char *input_name); /** @} */ From c88dc9a231c289ebab4ed8c946d31e98246669b4 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Wed, 3 Jan 2024 19:54:17 +0100 Subject: [PATCH 04/15] Fingers crossed. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/executor/executor_test.cc | 3 +- dali/pipeline/pipeline.h | 4 +- dali/python/backend_impl.cc | 1 + dali/python/nvidia/dali/pipeline.py | 137 ++++++++++++++++-------- dali_tf_plugin/dali_dataset_op.cc | 10 +- dali_tf_plugin/daliop.cc | 10 +- 6 files changed, 102 insertions(+), 63 deletions(-) diff --git a/dali/pipeline/executor/executor_test.cc b/dali/pipeline/executor/executor_test.cc index a74086c780..7621de8371 100644 --- a/dali/pipeline/executor/executor_test.cc +++ b/dali/pipeline/executor/executor_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ #include "dali/pipeline/executor/pipelined_executor.h" #include "dali/pipeline/executor/async_pipelined_executor.h" #include "dali/pipeline/executor/async_separated_pipelined_executor.h" +#include "dali/pipeline/operator/builtin/external_source.h" #include "dali/test/dali_test_utils.h" #include "dali/test/tensor_test_utils.h" diff --git a/dali/pipeline/pipeline.h b/dali/pipeline/pipeline.h index 6a12a9e875..f515f67e87 100644 --- a/dali/pipeline/pipeline.h +++ b/dali/pipeline/pipeline.h @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ #include "dali/pipeline/executor/executor.h" #include "dali/pipeline/graph/op_graph.h" #include "dali/pipeline/pipeline_output_desc.h" -#include "dali/pipeline/operator/builtin/external_source.h" +#include "dali/pipeline/operator/builtin/input_operator.h" #include "dali/pipeline/operator/checkpointing/checkpoint.h" diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index cf7108c64b..fc15787c88 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -1928,6 +1928,7 @@ PYBIND11_MODULE(backend_impl, m) { p->SetOutputDescs(out_desc); }) .def("Run", &Pipeline::Run, py::call_guard()) + .def("Prefetch", &Pipeline::Prefetch, py::call_guard()) .def("Outputs", [](Pipeline *p) { Workspace ws; diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 93af2552ca..557e90a666 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -939,9 +939,13 @@ def build(self): def input_feed_count(self, input_name): return self._pipe.InputFeedCount(input_name) - def _feed_input(self, name, data, layout=None, cuda_stream=None, use_copy_kernel=False, is_prefetch=False): + def _feed_input( + self, name, data, layout=None, cuda_stream=None, use_copy_kernel=False, is_prefetch=False + ): from nvidia.dali.external_source import _prep_data_for_feed_input + trace(name, data) + if cuda_stream is None: cuda_stream = types._get_default_stream_for_array(data) if cuda_stream == -1: @@ -1048,23 +1052,6 @@ def feed_input(self, data_node, data, layout=None, cuda_stream=None, use_copy_ke self._feed_input(name, data, layout, cuda_stream, use_copy_kernel) - def _run_cpu(self): - """Run CPU portion of the pipeline.""" - if not self._built: - raise RuntimeError("Pipeline must be built first.") - if not self._last_iter: - self._pipe.RunCPU() - self._cpu_batches_to_consume += 1 - - def _run_gpu(self): - """Run GPU portion of the pipeline.""" - if not self._built: - raise RuntimeError("Pipeline must be built first.") - if self._cpu_batches_to_consume > 0: - self._pipe.RunGPU() - self._cpu_batches_to_consume -= 1 - self._gpu_batches_to_consume += 1 - def outputs(self): """Returns the outputs of the pipeline and releases previous buffer. @@ -1153,7 +1140,8 @@ def release_outputs(self): with self._check_api_type_scope(types.PipelineAPIType.SCHEDULED): if not self._built: raise RuntimeError("Pipeline must be built first.") - return self._pipe.ReleaseOutputs() + ret = self._pipe.ReleaseOutputs() + return ret # for the backward compatibility def _release_outputs(self): @@ -1253,20 +1241,29 @@ def _prefetch(self): raise RuntimeError("Pipeline must be built first.") if not self._pipe: raise RuntimeError("The pipeline was destroyed.") + trace(self._first_iter, self._last_iter) self._schedule_py_workers() + + try: + self._prefetch_inputs() + self._first_iter = False + self._pipe.Prefetch() + except StopIteration: + self._last_iter = True + + def _prefetch_inputs(self): self._run_input_callbacks(True) - prefetch_count = self._cpu_queue_size if self._exec_separated: prefetch_count = self._cpu_queue_size + self._gpu_queue_size + self._batches_to_consume += self._gpu_queue_size + else: + prefetch_count = self._cpu_queue_size + self._batches_to_consume += prefetch_count for i in range(prefetch_count): self.iter_setup() - self._batches_to_consume += self._gpu_queue_size - self._first_iter = False - self._pipe.Prefetch() - def _run_once(self): """Start running the whole pipeline once without waiting for its results. @@ -1279,7 +1276,8 @@ def _run_once(self): # Special case to prevent a deadlock if user didn't release the only buffer if not self._exec_async and self._prefetch_queue_depth == 1: self.release_outputs() - self._pipe._run() + if not self._last_iter: + self._pipe.Run() except StopIteration: self._last_iter = True @@ -1294,6 +1292,7 @@ def reset(self): If pipeline iterator reached the end then reset its state to the beginning. """ + trace(self._last_iter) if self._last_iter: self._first_iter = True self._last_iter = False @@ -1309,6 +1308,7 @@ def reset(self): def empty(self): """If there is any work scheduled in the pipeline but not yet consumed""" + trace(self._batches_to_consume == 0) return self._batches_to_consume == 0 def serialize(self, define_graph=None, filename=None): @@ -1524,32 +1524,42 @@ def _run_input_callbacks(self, is_prefetch=False): if self._input_callbacks is None: return - batches = [] # data from external source callbacks is gathered here + max_count = 1 + done = False stop_iter = False - for i, group in enumerate(self._parallel_input_callbacks): - try: - count = group.feed_count(self) if is_prefetch else 1 - for i in range(count): - batches.append( + iter = 0 + while not done and not stop_iter: + done = True + batches = [] # data from external source callbacks is gathered here + for i, group in enumerate(self._parallel_input_callbacks): + try: + count = group.feed_count(self) if is_prefetch else 1 + if iter < count: group.schedule_and_receive( self, self._py_pool, i, self._max_batch_size, self._epoch_idx ) - ) - except StopIteration: - stop_iter = True - for group in self._seq_input_callbacks: - try: - count = group.feed_count(self) if is_prefetch else 1 - for i in range(count): - batches.append(group.get_batch(self, self._max_batch_size, self._epoch_idx)) - except StopIteration: - stop_iter = True - if stop_iter: - raise StopIteration() - - # we only fill external source queues when we know that all callbacks succeeded - for batch in batches: - batch.feed() + if iter + 1 < count: + done = False + except StopIteration: + stop_iter = True + for group in self._seq_input_callbacks: + try: + count = group.feed_count(self) if is_prefetch else 1 + if iter < count: + batches.append(group.get_batch(self, self._max_batch_size, self._epoch_idx)) + if iter + 1 < count: + done = False + except StopIteration: + stop_iter = True + + if stop_iter: + raise StopIteration() + + # we only fill external source queues when we know that all callbacks succeeded + for batch in batches: + batch.feed() + + iter += 1 def _iter_setup(self): self._run_input_callbacks() @@ -2088,3 +2098,36 @@ def _insert_experimental_pipeline_def(): _insert_experimental_pipeline_def() + + +_indent = 0 + + +def trace(*args, **kwargs): + pass + + +# def trace(*args, **kwargs): +# print(' ' * _indent, *args, **kwargs) + + +# def trace_pipeline_funcs(): +# for name, f in inspect.getmembers(Pipeline, predicate=inspect.isfunction): +# if name[0:2] == '__': +# continue +# #@functools.wraps(f) +# def decorate(name, f): +# def tmp(*args, **kwargs): +# global _indent +# try: +# trace(name, "--->") +# _indent += 1 +# return f(*args, **kwargs) +# finally: +# _indent -= 1 +# trace("<---", name) +# return tmp +# setattr(Pipeline, name, decorate(name, f)) + + +# trace_pipeline_funcs() diff --git a/dali_tf_plugin/dali_dataset_op.cc b/dali_tf_plugin/dali_dataset_op.cc index 10c44f56fb..e6b54cf12d 100644 --- a/dali_tf_plugin/dali_dataset_op.cc +++ b/dali_tf_plugin/dali_dataset_op.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -419,6 +419,7 @@ class DALIDatasetOp::Dataset::Iterator : public DatasetIterator { * When there are input datasets, feed the pipeline required number of input batches. * * TODO(klecki): Inputs handled only for an uniform executor + * TODO(michalz): Clean up the control flow (reverse if nesting) */ Status PrefetchPipeline(IteratorContext *context, daliPipelineHandle *pipeline_handle) { if (!dataset()->pipeline_def_.exec_separated) { @@ -441,14 +442,13 @@ class DALIDatasetOp::Dataset::Iterator : public DatasetIterator { } else { actual_prefetch_depth = prefetch_depth; } - TF_DALI_CALL(daliPrefetchUniform(pipeline_handle, actual_prefetch_depth)); + for (int i = 0; i < actual_prefetch_depth; i++) + TF_DALI_CALL(daliRun(pipeline_handle)); } else { if (dataset()->HasInputs()) { return errors::InvalidArgument("Input datasets are not compatible with split executor."); } - TF_DALI_CALL(daliPrefetchSeparate(pipeline_handle, - dataset()->pipeline_def_.cpu_prefetch_queue_depth, - dataset()->pipeline_def_.gpu_prefetch_queue_depth)); + TF_DALI_CALL(daliPrefetch(pipeline_handle)); } return Status(); } diff --git a/dali_tf_plugin/daliop.cc b/dali_tf_plugin/daliop.cc index ee59867b65..db0abd9bc5 100644 --- a/dali_tf_plugin/daliop.cc +++ b/dali_tf_plugin/daliop.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -155,13 +155,7 @@ class DaliOp : public tf::OpKernel { #endif LOG_LINE << "Pipeline created\n"; LOG_LINE << "Prefetching...\n"; - if (!exec_separated) { - TF_DALI_CALL(daliPrefetchUniform(&pipe_handle_, prefetch_queue_depth_)); - } else { - TF_DALI_CALL(daliPrefetchSeparate(&pipe_handle_, - cpu_prefetch_queue_depth, - prefetch_queue_depth_)); - } + TF_DALI_CALL(daliPrefetch(&pipe_handle_)); LOG_LINE << "After first run\n"; } From 771f55b0259fbc29c88dff08d2d30482f769459a Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Wed, 3 Jan 2024 19:57:36 +0100 Subject: [PATCH 05/15] Remove debugging code. Signed-off-by: Michal Zientkiewicz --- dali/python/nvidia/dali/pipeline.py | 38 ----------------------------- 1 file changed, 38 deletions(-) diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 557e90a666..2017c40937 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -944,8 +944,6 @@ def _feed_input( ): from nvidia.dali.external_source import _prep_data_for_feed_input - trace(name, data) - if cuda_stream is None: cuda_stream = types._get_default_stream_for_array(data) if cuda_stream == -1: @@ -1241,7 +1239,6 @@ def _prefetch(self): raise RuntimeError("Pipeline must be built first.") if not self._pipe: raise RuntimeError("The pipeline was destroyed.") - trace(self._first_iter, self._last_iter) self._schedule_py_workers() try: @@ -1292,7 +1289,6 @@ def reset(self): If pipeline iterator reached the end then reset its state to the beginning. """ - trace(self._last_iter) if self._last_iter: self._first_iter = True self._last_iter = False @@ -1308,7 +1304,6 @@ def reset(self): def empty(self): """If there is any work scheduled in the pipeline but not yet consumed""" - trace(self._batches_to_consume == 0) return self._batches_to_consume == 0 def serialize(self, define_graph=None, filename=None): @@ -2098,36 +2093,3 @@ def _insert_experimental_pipeline_def(): _insert_experimental_pipeline_def() - - -_indent = 0 - - -def trace(*args, **kwargs): - pass - - -# def trace(*args, **kwargs): -# print(' ' * _indent, *args, **kwargs) - - -# def trace_pipeline_funcs(): -# for name, f in inspect.getmembers(Pipeline, predicate=inspect.isfunction): -# if name[0:2] == '__': -# continue -# #@functools.wraps(f) -# def decorate(name, f): -# def tmp(*args, **kwargs): -# global _indent -# try: -# trace(name, "--->") -# _indent += 1 -# return f(*args, **kwargs) -# finally: -# _indent -= 1 -# trace("<---", name) -# return tmp -# setattr(Pipeline, name, decorate(name, f)) - - -# trace_pipeline_funcs() From fca78377ba07e340e632a4ce851f439c845fb638 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 4 Jan 2024 10:41:18 +0100 Subject: [PATCH 06/15] Remove unused variable; improve documentation. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/pipeline.h | 2 +- dali/python/nvidia/dali/pipeline.py | 1 - include/dali/c_api.h | 6 +++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dali/pipeline/pipeline.h b/dali/pipeline/pipeline.h index f515f67e87..5b844d5914 100644 --- a/dali/pipeline/pipeline.h +++ b/dali/pipeline/pipeline.h @@ -776,7 +776,7 @@ class DLL_PUBLIC Pipeline { * * @param owner The pipeline * @param fill_queue If true, the inputs are fed `InputFeedCount(name)` times; - * otherwise it's fed once. + * otherwise they're fed once. */ void Refeed(Pipeline &owner, bool fill_queue = false); diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 2017c40937..ed5c063f3b 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -1519,7 +1519,6 @@ def _run_input_callbacks(self, is_prefetch=False): if self._input_callbacks is None: return - max_count = 1 done = False stop_iter = False iter = 0 diff --git a/include/dali/c_api.h b/include/dali/c_api.h index 30e0a41f15..202b0eb9f0 100644 --- a/include/dali/c_api.h +++ b/include/dali/c_api.h @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -409,6 +409,10 @@ DLL_PUBLIC void daliRun(daliPipelineHandle *pipe_handle); /** * @brief Schedule initial runs to fill the buffers. + * + * This function should be called once, after a pipeline is created and external inputs + * (if any) are populated the required number of times. + * For subsequent runs, daliRun should be used. */ DLL_PUBLIC void daliPrefetch(daliPipelineHandle *pipe_handle); From 131b9780d5d1e6cc89cbf1c0daecb5f7dd57eab7 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 4 Jan 2024 16:58:41 +0100 Subject: [PATCH 07/15] Numerous fixes around prefetching in Python. Signed-off-by: Michal Zientkiewicz --- dali/python/nvidia/dali/pipeline.py | 82 ++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 19 deletions(-) diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index ed5c063f3b..a7c4fad411 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -1241,25 +1241,60 @@ def _prefetch(self): raise RuntimeError("The pipeline was destroyed.") self._schedule_py_workers() - try: - self._prefetch_inputs() - self._first_iter = False + # We probably need some benchmarking before we remove this code path + if not self._exec_separated: + self._legacy_interleaved_prefetch() + return + + # The new way: try to run the inputs and then feed them, finally call _pipe.Prefetch() + # If this fails, we just run `_pipe.Run()` a bunch of times. This will likely blow up for + # separated queues, which are not properly supported anyway. + iters_fed = 0 + self._first_iter = False + iters_fed, success = self._prefetch_inputs() + if success: self._pipe.Prefetch() - except StopIteration: + else: self._last_iter = True + for _ in range(iters_fed): + self._pipe.Run() + + # This is the old way of prefetching - the feeding and running steps are interleaved. + # Running all callbacks at once, then feeding, then running - may affect the performance + # of the 1st iteration. + def _legacy_interleaved_prefetch(self): + for _ in range(self._cpu_queue_size): + try: + self._first_iter = False + self._iter_setup() + self._batches_to_consume += 1 + if not self._exec_async and self._prefetch_queue_depth == 1: + self.release_outputs() + self._pipe.Run() + except StopIteration: + self._last_iter = True + break def _prefetch_inputs(self): - self._run_input_callbacks(True) + prefetched, success = self._run_input_callbacks(True) + self._batches_to_consume += prefetched - if self._exec_separated: - prefetch_count = self._cpu_queue_size + self._gpu_queue_size - self._batches_to_consume += self._gpu_queue_size - else: - prefetch_count = self._cpu_queue_size - self._batches_to_consume += prefetch_count + if success: + if self._exec_separated: + prefetch_count = self._cpu_queue_size + self._gpu_queue_size + else: + prefetch_count = self._cpu_queue_size - for i in range(prefetch_count): - self.iter_setup() + for i in range(prefetched, prefetch_count): + try: + self.iter_setup() + prefetched = i + 1 + self._batches_to_consume += 1 + except StopIteration: + success = False + break + + return prefetched, success def _run_once(self): """Start running the whole pipeline once without waiting for its results. @@ -1515,9 +1550,16 @@ def define_graph(self): It returns a list of outputs created by calling DALI Operators.""" raise NotImplementedError + def _iter_setup(self): + iters, success = self._run_input_callbacks() + if not success: + raise StopIteration + if iters == 0: + self.iter_setup() + def _run_input_callbacks(self, is_prefetch=False): if self._input_callbacks is None: - return + return 0, True done = False stop_iter = False @@ -1547,17 +1589,19 @@ def _run_input_callbacks(self, is_prefetch=False): stop_iter = True if stop_iter: - raise StopIteration() + return iter, False + + try: + self.iter_setup() + except StopIteration: + return iter, False # we only fill external source queues when we know that all callbacks succeeded for batch in batches: batch.feed() iter += 1 - - def _iter_setup(self): - self._run_input_callbacks() - self.iter_setup() + return iter, True def iter_setup(self): """A deprecated method of providing the pipeline with external inputs. From 28dad9347e4f4a22253b3bf9b521ae3482eb5c8b Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 4 Jan 2024 17:36:39 +0100 Subject: [PATCH 08/15] Typos. Signed-off-by: Michal Zientkiewicz --- dali/python/nvidia/dali/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index a7c4fad411..2ec79eec87 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -1606,12 +1606,12 @@ def _run_input_callbacks(self, is_prefetch=False): def iter_setup(self): """A deprecated method of providing the pipeline with external inputs. - This function can be overriden by user-defined + This function can be overridden by a user-defined pipeline to perform any needed setup for each iteration. For example, one can use this function to feed the input data from NumPy arrays. - This method is deprecated and its use is discourage. Newer execution models may be + This method is deprecated and its use is discouraged. Newer execution models may be incompatible with this method of providing data to the pipeline. Use `source` argument in ``external_source`` instead, where possible. """ From e57e8baad87cc4ba2601c90368fbd7a1bb79a52e Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 4 Jan 2024 18:15:38 +0100 Subject: [PATCH 09/15] Fix parallel external source. Signed-off-by: Michal Zientkiewicz --- dali/python/nvidia/dali/pipeline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 2ec79eec87..94471f95b9 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -1571,8 +1571,10 @@ def _run_input_callbacks(self, is_prefetch=False): try: count = group.feed_count(self) if is_prefetch else 1 if iter < count: - group.schedule_and_receive( - self, self._py_pool, i, self._max_batch_size, self._epoch_idx + batches.append( + group.schedule_and_receive( + self, self._py_pool, i, self._max_batch_size, self._epoch_idx + ) ) if iter + 1 < count: done = False From f841643ff99f1404c05648a02e97d9efbcbbadba Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 5 Jan 2024 15:50:05 +0100 Subject: [PATCH 10/15] Use one fill depth due to batch size lookahead. Signed-off-by: Michal Zientkiewicz --- dali/c_api/operator_trace_test.cc | 12 +++++++----- .../async_separated_pipelined_executor.cc | 15 +++------------ dali/pipeline/executor/executor.cc | 15 +++------------ dali/pipeline/executor/pipelined_executor.cc | 7 ++++++- dali/pipeline/executor/pipelined_executor.h | 12 +++++++++--- 5 files changed, 28 insertions(+), 33 deletions(-) diff --git a/dali/c_api/operator_trace_test.cc b/dali/c_api/operator_trace_test.cc index caea65e037..5dcb2f6ce7 100644 --- a/dali/c_api/operator_trace_test.cc +++ b/dali/c_api/operator_trace_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -45,10 +45,10 @@ OperatorTraceTestParam operator_trace_test_params_pipelined_executor_uniform_que }; OperatorTraceTestParam operator_trace_test_params_pipelined_executor_separate_queue[] = { - {2, 3, true, false}, - {3, 2, true, false}, {2, 3, true, true}, {3, 2, true, true}, + {2, 3, true, false}, + {2, 2, true, false}, }; std::array operator_under_test_names = { @@ -234,7 +234,8 @@ TEST_P(OperatorTraceTestExternalInput, OperatorTraceTestExternalInput) { auto prefetch_depth = std::min(cpu_queue_depth_, gpu_queue_depth_); // Feed CPU input data. - int feed_count_cpu = daliInputFeedCount(&h, "OP_TACE_IN_CPU"); + int feed_count_cpu = daliInputFeedCount(&h, "OP_TRACE_IN_CPU"); + ASSERT_GE(feed_count_cpu, 1); for (int i = 0; i < feed_count_cpu; i++) { size_t sample_size = 42; auto in_data = random_vector_cpu(rng, sample_size * batch_size_); @@ -245,7 +246,8 @@ TEST_P(OperatorTraceTestExternalInput, OperatorTraceTestExternalInput) { } // Feed GPU input data. - int feed_count_gpu = daliInputFeedCount(&h, "OP_TACE_IN_GPU"); + int feed_count_gpu = daliInputFeedCount(&h, "OP_TRACE_IN_GPU"); + ASSERT_GE(feed_count_gpu, 1); for (int i = 0; i < feed_count_gpu; i++) { int sample_size = 42; auto in_data = random_vector_gpu(rng, sample_size * batch_size_); diff --git a/dali/pipeline/executor/async_separated_pipelined_executor.cc b/dali/pipeline/executor/async_separated_pipelined_executor.cc index 5b4af6e5c6..27e48d1ef1 100644 --- a/dali/pipeline/executor/async_separated_pipelined_executor.cc +++ b/dali/pipeline/executor/async_separated_pipelined_executor.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2019-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -45,17 +45,8 @@ void AsyncSeparatedPipelinedExecutor::Prefetch() { } int AsyncSeparatedPipelinedExecutor::InputFeedCount(const std::string &op_name) { - OpNode &node = graph_->Node(op_name); - switch (node.op_type) { - case OpType::CPU: - return queue_sizes_.cpu_size + queue_sizes_.gpu_size; - case OpType::MIXED: - case OpType::GPU: - return queue_sizes_.gpu_size; - default: - assert(!"Unreachable code detected"); - return 0; - } + (void)graph_->Node(op_name); + return queue_sizes_.cpu_size + queue_sizes_.gpu_size; } } // namespace dali diff --git a/dali/pipeline/executor/executor.cc b/dali/pipeline/executor/executor.cc index 55f3305d4a..e2881dcf9d 100644 --- a/dali/pipeline/executor/executor.cc +++ b/dali/pipeline/executor/executor.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -350,17 +350,8 @@ void Executor::Run() { template int Executor::InputFeedCount(const std::string &op_name) { - OpNode &node = graph_->Node(op_name); - switch (node.op_type) { - case OpType::CPU: - return queue_sizes_.cpu_size; - case OpType::MIXED: - case OpType::GPU: - return queue_sizes_.gpu_size; - default: - assert(!"Unreachable code detected"); - return 0; - } + (void)graph_->Node(op_name); + return queue_sizes_.cpu_size; } template diff --git a/dali/pipeline/executor/pipelined_executor.cc b/dali/pipeline/executor/pipelined_executor.cc index 3b3c7530a7..9735aa2c98 100644 --- a/dali/pipeline/executor/pipelined_executor.cc +++ b/dali/pipeline/executor/pipelined_executor.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2018, NVIDIA CORPORATION. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -31,6 +31,11 @@ size_t PipelinedExecutorImpl::CalcIterationDataSiz this->queue_sizes_.gpu_size /* mixed_queue_size */ + 1; } +int SeparatedPipelinedExecutor::InputFeedCount(const std::string &op_name) { + (void)graph_->Node(op_name); + return queue_sizes_.cpu_size + queue_sizes_.gpu_size; +} + template class DLL_PUBLIC PipelinedExecutorImpl, UniformQueuePolicy>; diff --git a/dali/pipeline/executor/pipelined_executor.h b/dali/pipeline/executor/pipelined_executor.h index ee1ac845b7..5b8603b717 100644 --- a/dali/pipeline/executor/pipelined_executor.h +++ b/dali/pipeline/executor/pipelined_executor.h @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -114,8 +114,14 @@ std::vector PipelinedExecutorImpl::GetTensorQ using PipelinedExecutor = PipelinedExecutorImpl, UniformQueuePolicy>; -using SeparatedPipelinedExecutor = - PipelinedExecutorImpl, SeparateQueuePolicy>; + +class DLL_PUBLIC SeparatedPipelinedExecutor +: public PipelinedExecutorImpl, SeparateQueuePolicy> { + using ImplBase = PipelinedExecutorImpl, SeparateQueuePolicy>; + using ImplBase::ImplBase; + public: + int InputFeedCount(const std::string &name) override; +}; } // namespace dali From c935f9d4e9a3df742cc14907433a807d4608e037 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 5 Jan 2024 16:58:46 +0100 Subject: [PATCH 11/15] Fix GCC build. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/executor/pipelined_executor.cc | 3 --- dali/pipeline/executor/pipelined_executor.h | 3 +++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dali/pipeline/executor/pipelined_executor.cc b/dali/pipeline/executor/pipelined_executor.cc index 9735aa2c98..dd1b0233ba 100644 --- a/dali/pipeline/executor/pipelined_executor.cc +++ b/dali/pipeline/executor/pipelined_executor.cc @@ -36,10 +36,7 @@ int SeparatedPipelinedExecutor::InputFeedCount(const std::string &op_name) { return queue_sizes_.cpu_size + queue_sizes_.gpu_size; } - template class DLL_PUBLIC PipelinedExecutorImpl, UniformQueuePolicy>; -template -class DLL_PUBLIC PipelinedExecutorImpl, SeparateQueuePolicy>; } // namespace dali diff --git a/dali/pipeline/executor/pipelined_executor.h b/dali/pipeline/executor/pipelined_executor.h index 5b8603b717..ce799db7c2 100644 --- a/dali/pipeline/executor/pipelined_executor.h +++ b/dali/pipeline/executor/pipelined_executor.h @@ -115,6 +115,9 @@ std::vector PipelinedExecutorImpl::GetTensorQ using PipelinedExecutor = PipelinedExecutorImpl, UniformQueuePolicy>; +template +class DLL_PUBLIC PipelinedExecutorImpl, SeparateQueuePolicy>; + class DLL_PUBLIC SeparatedPipelinedExecutor : public PipelinedExecutorImpl, SeparateQueuePolicy> { using ImplBase = PipelinedExecutorImpl, SeparateQueuePolicy>; From bf6be685765ce5d1f86262908ac119023f15be38 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 5 Jan 2024 22:21:41 +0100 Subject: [PATCH 12/15] Fix operator tests. Signed-off-by: Michal Zientkiewicz --- dali/operators/input/input_operator_test.cu | 11 ++++++----- dali/operators/reader/video_reader_op_test.cc | 5 ++++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dali/operators/input/input_operator_test.cu b/dali/operators/input/input_operator_test.cu index 2e0f79a668..5854519024 100644 --- a/dali/operators/input/input_operator_test.cu +++ b/dali/operators/input/input_operator_test.cu @@ -1,4 +1,4 @@ -// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ InputOperatorMixedTestParam input_operator_test_params_pipelined_executor_unifor InputOperatorMixedTestParam input_operator_test_params_pipelined_executor_separate_queue[] = { {2, 3, true, false, true}, - {3, 2, true, false, false}, + {2, 2, true, false, false}, {2, 3, true, true, true}, {3, 2, true, true, false}, }; @@ -130,7 +130,7 @@ TEST_P(InputOperatorMixedTest, InputOperatorMixedTest) { num_threads_, device_id_, exec_pipelined_, exec_async_, exec_separated_, cpu_queue_depth_, cpu_queue_depth_, gpu_queue_depth_, 0); for (int iteration = 0; iteration < n_iterations_; iteration++) { - auto prefetch_depth = std::min(cpu_queue_depth_, gpu_queue_depth_); + int prefetch_depth = daliInputFeedCount(&h, operator_name_.c_str()); size_t sample_size = 42; thrust::host_vector in_data(sample_size * batch_size_, 2137); thrust::device_vector ref_data = in_data; @@ -143,8 +143,9 @@ TEST_P(InputOperatorMixedTest, InputOperatorMixedTest) { DALI_ext_force_copy); } - daliPrefetchUniform(&h, prefetch_depth); - for (int i = 0; i < prefetch_depth; i++) { + daliPrefetch(&h); + int num_output_batches = std::min(cpu_queue_depth_, gpu_queue_depth_); + for (int i = 0; i < num_output_batches; i++) { daliShareOutput(&h); auto sz = daliNumElements(&h, 0); thrust::device_vector out_data(sz); diff --git a/dali/operators/reader/video_reader_op_test.cc b/dali/operators/reader/video_reader_op_test.cc index 03444fc6c2..8ad5728ad3 100644 --- a/dali/operators/reader/video_reader_op_test.cc +++ b/dali/operators/reader/video_reader_op_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -403,6 +403,8 @@ TEST_F(VideoReaderTest, HEVC) { pipe.Build(this->Outputs()); for (int i = 0; i < iterations; ++i) { pipe.Run(); + + pipe.Outputs(&ws); } } catch (const std::exception &e) { if (IsUnsupportedCodec(e.what())) { @@ -486,6 +488,7 @@ TEST_F(VideoReaderTest, FrameLabelsFilenames) { Workspace ws; for (int i = 0; i < iterations; ++i) { pipe.Run(); + pipe.Outputs(&ws); const auto &frames_gpu = ws.Output(0); const auto &label_gpu = ws.Output(1); const auto &frame_num_gpu = ws.Output(2); From ae636d97c5be866efe40819f731c181a63d41a1a Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 8 Jan 2024 17:31:03 +0100 Subject: [PATCH 13/15] Fix accidentally removed lines. Signed-off-by: Michal Zientkiewicz --- dali/benchmark/resnet50_bench.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dali/benchmark/resnet50_bench.cc b/dali/benchmark/resnet50_bench.cc index ce94ba0968..3ff8222524 100755 --- a/dali/benchmark/resnet50_bench.cc +++ b/dali/benchmark/resnet50_bench.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -240,6 +240,7 @@ BENCHMARK_DEFINE_F(RN50, HybridPipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; pipe.Run(); + pipe.Outputs(&ws); while (st.KeepRunning()) { pipe.SetExternalInput("raw_jpegs", data); @@ -349,6 +350,7 @@ BENCHMARK_DEFINE_F(RN50, nvJPEGPipe)(benchmark::State& st) { // NOLINT // Run once to allocate the memory Workspace ws; pipe.Run(); + pipe.Outputs(&ws); while (st.KeepRunning()) { pipe.SetExternalInput("raw_jpegs", data); From 968a043b6447c92c804527ecc20a4f35246457bd Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 8 Jan 2024 17:37:07 +0100 Subject: [PATCH 14/15] Fix accidentally removed lines. Signed-off-by: Michal Zientkiewicz --- dali/operators/reader/reader_op_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dali/operators/reader/reader_op_test.cc b/dali/operators/reader/reader_op_test.cc index bea666d538..0d01548149 100644 --- a/dali/operators/reader/reader_op_test.cc +++ b/dali/operators/reader/reader_op_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -130,6 +130,7 @@ TYPED_TEST(ReaderTest, SimpleTest) { Workspace ws; for (int i=0; i < 5; ++i) { pipe.Run(); + pipe.Outputs(&ws); } return; From dbf894f3187ecee2f888a204156491554a158671 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Wed, 10 Jan 2024 15:18:14 +0100 Subject: [PATCH 15/15] Review fixes. Signed-off-by: Michal Zientkiewicz --- dali/c_api/c_api.cc | 4 ++-- dali/python/nvidia/dali/pipeline.py | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/dali/c_api/c_api.cc b/dali/c_api/c_api.cc index 96353e7329..7d9d82047d 100644 --- a/dali/c_api/c_api.cc +++ b/dali/c_api/c_api.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -310,7 +310,7 @@ void daliPrefetchSeparate(daliPipelineHandle_t pipe_handle, auto &pipeline = (*pipe_handle)->pipeline; auto sz = pipeline->GetQueueSizes(); if (cpu_queue_depth != sz.cpu_size || gpu_queue_depth != sz.gpu_size) { - DALI_WARN("daliPrefetchUniform is deprecated and setting queue_length different than" + DALI_WARN("daliPrefetchSeparate is deprecated and setting queue_length different than" " the one set in the pipeline has no effect. Use daliPrefetch instead."); } pipeline->Prefetch(); diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 94471f95b9..fd2333ba0d 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -939,9 +939,7 @@ def build(self): def input_feed_count(self, input_name): return self._pipe.InputFeedCount(input_name) - def _feed_input( - self, name, data, layout=None, cuda_stream=None, use_copy_kernel=False, is_prefetch=False - ): + def _feed_input(self, name, data, layout=None, cuda_stream=None, use_copy_kernel=False): from nvidia.dali.external_source import _prep_data_for_feed_input if cuda_stream is None: