diff --git a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp index f206ac6e1c71b..8d5542d415ed0 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp @@ -83,9 +83,10 @@ class BroadcastTest : public exec::test::OperatorTestBase { std::dynamic_pointer_cast(writerPlan) ->serdeRowType(); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = writerPlan; - auto [taskCursor, results] = readCursor(params, [](auto /*task*/) {}); + auto [taskCursor, results] = + exec::test::readCursor(params, [](auto /*task*/) {}); std::vector broadcastFilePaths; for (auto result : results) { @@ -96,9 +97,7 @@ class BroadcastTest : public exec::test::OperatorTestBase { return {serdeRowType, broadcastFilePaths}; } - std::pair< - std::unique_ptr, - std::vector> + std::pair, std::vector> executeBroadcastRead( RowTypePtr dataType, const std::string& basePath, @@ -107,7 +106,7 @@ class BroadcastTest : public exec::test::OperatorTestBase { auto readerPlan = exec::test::PlanBuilder() .exchange(dataType, velox::VectorSerde::Kind::kPresto) .planNode(); - exec::test::CursorParameters broadcastReadParams; + exec::CursorParameters broadcastReadParams; broadcastReadParams.planNode = readerPlan; std::vector fileInfos; @@ -118,7 +117,7 @@ class BroadcastTest : public exec::test::OperatorTestBase { uint8_t splitIndex = 0; // Read back result using BroadcastExchangeSource. - return readCursor(broadcastReadParams, [&](auto* task) { + return exec::test::readCursor(broadcastReadParams, [&](auto* task) { if (splitIndex >= broadcastFilePaths.size()) { task->noMoreSplits("0"); return; @@ -361,11 +360,11 @@ TEST_F(BroadcastTest, malformedBroadcastInfoJson) { auto readerPlan = exec::test::PlanBuilder() .exchange(dataType, velox::VectorSerde::Kind::kPresto) .planNode(); - exec::test::CursorParameters broadcastReadParams; + exec::CursorParameters broadcastReadParams; broadcastReadParams.planNode = readerPlan; VELOX_ASSERT_THROW( - readCursor( + exec::test::readCursor( broadcastReadParams, [&](auto* task) { auto fileInfos = diff --git a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp index 6771e9015c6ad..1752b5a83f7bb 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp @@ -407,10 +407,10 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { void testPartitionAndSerialize( const core::PlanNodePtr& plan, const RowVectorPtr& expected, - const exec::test::CursorParameters params, + const exec::CursorParameters params, const std::optional expectedOutputCount = std::nullopt) { auto [taskCursor, serializedResults] = - readCursor(params, [](auto /*task*/) {}); + exec::test::readCursor(params, [](auto /*task*/) {}); RowVectorPtr result = BaseVector::create(expected->type(), 0, pool()); @@ -432,18 +432,18 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { void testPartitionAndSerialize( const core::PlanNodePtr& plan, const RowVectorPtr& expected) { - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = plan; params.maxDrivers = 2; testPartitionAndSerialize(plan, expected, params); } - std::pair, std::vector> + std::pair, std::vector> runShuffleReadTask( - const exec::test::CursorParameters& params, + const exec::CursorParameters& params, const std::string& shuffleInfo) { bool noMoreSplits = false; - return readCursor(params, [&](auto* task) { + return exec::test::readCursor(params, [&](auto* task) { if (noMoreSplits) { return; } @@ -527,7 +527,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { .project(dataType->names()) .planNode(); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = plan; params.destination = partition; @@ -713,7 +713,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { auto queryCtx = core::QueryCtx::create(executor_.get(), core::QueryConfig(properties)); - auto params = exec::test::CursorParameters(); + auto params = exec::CursorParameters(); params.planNode = plan; params.queryCtx = queryCtx; @@ -743,12 +743,12 @@ TEST_F(UnsafeRowShuffleTest, operators) { 4, std::string(TestShuffleFactory::kShuffleName), info)) .planNode(); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = plan; params.maxDrivers = 2; auto [taskCursor, serializedResults] = - readCursor(params, [](auto /*task*/) {}); + exec::test::readCursor(params, [](auto /*task*/) {}); ASSERT_EQ(serializedResults.size(), 0); TestShuffleWriter::reset(); } @@ -769,7 +769,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) { VELOX_CHECK(nullFunction()); })); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = exec::test::PlanBuilder() .values({data}) @@ -779,7 +779,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) { .planNode(); VELOX_ASSERT_THROW( - readCursor(params, [](auto /*task*/) {}), + exec::test::readCursor(params, [](auto /*task*/) {}), "ShuffleWriter::collect failed"); TestShuffleWriter::reset(); @@ -795,7 +795,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) { auto info = testShuffleInfo(4, 1 << 20 /* 1MB */); TestShuffleWriter::createWriter(info, pool()); - exec::test::CursorParameters params; + exec::CursorParameters params; params.planNode = exec::test::PlanBuilder() .values({data}) @@ -804,7 +804,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) { 2, std::string(TestShuffleFactory::kShuffleName), info)) .planNode(); - ASSERT_NO_THROW(readCursor(params, [](auto /*task*/) {})); + ASSERT_NO_THROW(exec::test::readCursor(params, [](auto /*task*/) {})); std::function injectFailure = [&](TestShuffleReader* /*reader*/) { diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 6171b52be4f4a..3e79fdbb48c3d 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 6171b52be4f4a846d32ea0ab31a8c2c028b93365 +Subproject commit 3e79fdbb48c3da1c432c4eccc7f1d8072bec9c46