Skip to content

Commit

Permalink
Advance Velox and adjust Presto tests accordingly (#24324)
Browse files Browse the repository at this point in the history
Advance Velox and adjust Presto tests accordingly (#24324)
  • Loading branch information
zation99 authored Jan 8, 2025
1 parent b5bb716 commit 8883d76
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ class BroadcastTest : public exec::test::OperatorTestBase {
std::dynamic_pointer_cast<const BroadcastWriteNode>(writerPlan)
->serdeRowType();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = writerPlan;
auto [taskCursor, results] = readCursor(params, [](auto /*task*/) {});
auto [taskCursor, results] =
exec::test::readCursor(params, [](auto /*task*/) {});

std::vector<std::string> broadcastFilePaths;
for (auto result : results) {
Expand All @@ -96,9 +97,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {
return {serdeRowType, broadcastFilePaths};
}

std::pair<
std::unique_ptr<velox::exec::test::TaskCursor>,
std::vector<RowVectorPtr>>
std::pair<std::unique_ptr<velox::exec::TaskCursor>, std::vector<RowVectorPtr>>
executeBroadcastRead(
RowTypePtr dataType,
const std::string& basePath,
Expand All @@ -107,7 +106,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
exec::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

std::vector<std::string> fileInfos;
Expand All @@ -118,7 +117,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {

uint8_t splitIndex = 0;
// Read back result using BroadcastExchangeSource.
return readCursor(broadcastReadParams, [&](auto* task) {
return exec::test::readCursor(broadcastReadParams, [&](auto* task) {
if (splitIndex >= broadcastFilePaths.size()) {
task->noMoreSplits("0");
return;
Expand Down Expand Up @@ -361,11 +360,11 @@ TEST_F(BroadcastTest, malformedBroadcastInfoJson) {
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
exec::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

VELOX_ASSERT_THROW(
readCursor(
exec::test::readCursor(
broadcastReadParams,
[&](auto* task) {
auto fileInfos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
void testPartitionAndSerialize(
const core::PlanNodePtr& plan,
const RowVectorPtr& expected,
const exec::test::CursorParameters params,
const exec::CursorParameters params,
const std::optional<uint32_t> expectedOutputCount = std::nullopt) {
auto [taskCursor, serializedResults] =
readCursor(params, [](auto /*task*/) {});
exec::test::readCursor(params, [](auto /*task*/) {});

RowVectorPtr result =
BaseVector::create<RowVector>(expected->type(), 0, pool());
Expand All @@ -432,18 +432,18 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
void testPartitionAndSerialize(
const core::PlanNodePtr& plan,
const RowVectorPtr& expected) {
exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.maxDrivers = 2;
testPartitionAndSerialize(plan, expected, params);
}

std::pair<std::unique_ptr<exec::test::TaskCursor>, std::vector<RowVectorPtr>>
std::pair<std::unique_ptr<exec::TaskCursor>, std::vector<RowVectorPtr>>
runShuffleReadTask(
const exec::test::CursorParameters& params,
const exec::CursorParameters& params,
const std::string& shuffleInfo) {
bool noMoreSplits = false;
return readCursor(params, [&](auto* task) {
return exec::test::readCursor(params, [&](auto* task) {
if (noMoreSplits) {
return;
}
Expand Down Expand Up @@ -527,7 +527,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
.project(dataType->names())
.planNode();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.destination = partition;

Expand Down Expand Up @@ -713,7 +713,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {

auto queryCtx =
core::QueryCtx::create(executor_.get(), core::QueryConfig(properties));
auto params = exec::test::CursorParameters();
auto params = exec::CursorParameters();
params.planNode = plan;
params.queryCtx = queryCtx;

Expand Down Expand Up @@ -743,12 +743,12 @@ TEST_F(UnsafeRowShuffleTest, operators) {
4, std::string(TestShuffleFactory::kShuffleName), info))
.planNode();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.maxDrivers = 2;

auto [taskCursor, serializedResults] =
readCursor(params, [](auto /*task*/) {});
exec::test::readCursor(params, [](auto /*task*/) {});
ASSERT_EQ(serializedResults.size(), 0);
TestShuffleWriter::reset();
}
Expand All @@ -769,7 +769,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) {
VELOX_CHECK(nullFunction());
}));

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode =
exec::test::PlanBuilder()
.values({data})
Expand All @@ -779,7 +779,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) {
.planNode();

VELOX_ASSERT_THROW(
readCursor(params, [](auto /*task*/) {}),
exec::test::readCursor(params, [](auto /*task*/) {}),
"ShuffleWriter::collect failed");

TestShuffleWriter::reset();
Expand All @@ -795,7 +795,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) {
auto info = testShuffleInfo(4, 1 << 20 /* 1MB */);
TestShuffleWriter::createWriter(info, pool());

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode =
exec::test::PlanBuilder()
.values({data})
Expand All @@ -804,7 +804,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) {
2, std::string(TestShuffleFactory::kShuffleName), info))
.planNode();

ASSERT_NO_THROW(readCursor(params, [](auto /*task*/) {}));
ASSERT_NO_THROW(exec::test::readCursor(params, [](auto /*task*/) {}));

std::function<void(TestShuffleReader*)> injectFailure =
[&](TestShuffleReader* /*reader*/) {
Expand Down

0 comments on commit 8883d76

Please sign in to comment.