diff --git a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp index f206ac6e1c71..2be680e93ddd 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp @@ -83,9 +83,9 @@ class BroadcastTest : public exec::test::OperatorTestBase { std::dynamic_pointer_cast(writerPlan) ->serdeRowType(); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = writerPlan; - auto [taskCursor, results] = readCursor(params, [](auto /*task*/) {}); + auto [taskCursor, results] = exec::test::readCursor(params, [](auto /*task*/) {}); std::vector broadcastFilePaths; for (auto result : results) { @@ -97,7 +97,7 @@ class BroadcastTest : public exec::test::OperatorTestBase { } std::pair< - std::unique_ptr, + std::unique_ptr, std::vector> executeBroadcastRead( RowTypePtr dataType, @@ -107,7 +107,7 @@ class BroadcastTest : public exec::test::OperatorTestBase { auto readerPlan = exec::test::PlanBuilder() .exchange(dataType, velox::VectorSerde::Kind::kPresto) .planNode(); - exec::test::CursorParameters broadcastReadParams; + exec::CursorParameters broadcastReadParams; broadcastReadParams.planNode = readerPlan; std::vector fileInfos; @@ -118,7 +118,7 @@ class BroadcastTest : public exec::test::OperatorTestBase { uint8_t splitIndex = 0; // Read back result using BroadcastExchangeSource. - return readCursor(broadcastReadParams, [&](auto* task) { + return exec::test::readCursor(broadcastReadParams, [&](auto* task) { if (splitIndex >= broadcastFilePaths.size()) { task->noMoreSplits("0"); return; @@ -361,11 +361,11 @@ TEST_F(BroadcastTest, malformedBroadcastInfoJson) { auto readerPlan = exec::test::PlanBuilder() .exchange(dataType, velox::VectorSerde::Kind::kPresto) .planNode(); - exec::test::CursorParameters broadcastReadParams; + exec::CursorParameters broadcastReadParams; broadcastReadParams.planNode = readerPlan; VELOX_ASSERT_THROW( - readCursor( + exec::test::readCursor( broadcastReadParams, [&](auto* task) { auto fileInfos = diff --git a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp index 6771e9015c6a..107351c46cb4 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp @@ -33,6 +33,7 @@ using namespace facebook::velox; using namespace facebook::velox::common::testutil; +using namespace facebook::velox::exec; using namespace facebook::presto; using namespace facebook::presto::operators; using namespace ::testing; @@ -407,10 +408,10 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { void testPartitionAndSerialize( const core::PlanNodePtr& plan, const RowVectorPtr& expected, - const exec::test::CursorParameters params, + const exec::CursorParameters params, const std::optional expectedOutputCount = std::nullopt) { auto [taskCursor, serializedResults] = - readCursor(params, [](auto /*task*/) {}); + exec::test::readCursor(params, [](auto /*task*/) {}); RowVectorPtr result = BaseVector::create(expected->type(), 0, pool()); @@ -432,18 +433,18 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { void testPartitionAndSerialize( const core::PlanNodePtr& plan, const RowVectorPtr& expected) { - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = plan; params.maxDrivers = 2; testPartitionAndSerialize(plan, expected, params); } - std::pair, std::vector> + std::pair, std::vector> runShuffleReadTask( - const exec::test::CursorParameters& params, + const exec::CursorParameters& params, const std::string& shuffleInfo) { bool noMoreSplits = false; - return readCursor(params, [&](auto* task) { + return exec::test::readCursor(params, [&](auto* task) { if (noMoreSplits) { return; } @@ -527,7 +528,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { .project(dataType->names()) .planNode(); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = plan; params.destination = partition; @@ -713,7 +714,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { auto queryCtx = core::QueryCtx::create(executor_.get(), core::QueryConfig(properties)); - auto params = exec::test::CursorParameters(); + auto params = exec::CursorParameters(); params.planNode = plan; params.queryCtx = queryCtx; @@ -743,12 +744,12 @@ TEST_F(UnsafeRowShuffleTest, operators) { 4, std::string(TestShuffleFactory::kShuffleName), info)) .planNode(); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = plan; params.maxDrivers = 2; auto [taskCursor, serializedResults] = - readCursor(params, [](auto /*task*/) {}); + exec::test::readCursor(params, [](auto /*task*/) {}); ASSERT_EQ(serializedResults.size(), 0); TestShuffleWriter::reset(); } @@ -769,7 +770,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) { VELOX_CHECK(nullFunction()); })); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = exec::test::PlanBuilder() .values({data}) @@ -779,7 +780,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) { .planNode(); VELOX_ASSERT_THROW( - readCursor(params, [](auto /*task*/) {}), + exec::test::readCursor(params, [](auto /*task*/) {}), "ShuffleWriter::collect failed"); TestShuffleWriter::reset(); @@ -795,7 +796,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) { auto info = testShuffleInfo(4, 1 << 20 /* 1MB */); TestShuffleWriter::createWriter(info, pool()); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = exec::test::PlanBuilder() .values({data}) @@ -804,7 +805,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) { 2, std::string(TestShuffleFactory::kShuffleName), info)) .planNode(); - ASSERT_NO_THROW(readCursor(params, [](auto /*task*/) {})); + ASSERT_NO_THROW(exec::test::readCursor(params, [](auto /*task*/) {})); std::function injectFailure = [&](TestShuffleReader* /*reader*/) {