Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Advance Velox and corresponding unit test updates #24324

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ class BroadcastTest : public exec::test::OperatorTestBase {
std::dynamic_pointer_cast<const BroadcastWriteNode>(writerPlan)
->serdeRowType();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = writerPlan;
auto [taskCursor, results] = readCursor(params, [](auto /*task*/) {});
auto [taskCursor, results] =
exec::test::readCursor(params, [](auto /*task*/) {});

std::vector<std::string> broadcastFilePaths;
for (auto result : results) {
Expand All @@ -96,9 +97,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {
return {serdeRowType, broadcastFilePaths};
}

std::pair<
std::unique_ptr<velox::exec::test::TaskCursor>,
std::vector<RowVectorPtr>>
std::pair<std::unique_ptr<velox::exec::TaskCursor>, std::vector<RowVectorPtr>>
executeBroadcastRead(
RowTypePtr dataType,
const std::string& basePath,
Expand All @@ -107,7 +106,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
exec::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

std::vector<std::string> fileInfos;
Expand All @@ -118,7 +117,7 @@ class BroadcastTest : public exec::test::OperatorTestBase {

uint8_t splitIndex = 0;
// Read back result using BroadcastExchangeSource.
return readCursor(broadcastReadParams, [&](auto* task) {
return exec::test::readCursor(broadcastReadParams, [&](auto* task) {
if (splitIndex >= broadcastFilePaths.size()) {
task->noMoreSplits("0");
return;
Expand Down Expand Up @@ -361,11 +360,11 @@ TEST_F(BroadcastTest, malformedBroadcastInfoJson) {
auto readerPlan = exec::test::PlanBuilder()
.exchange(dataType, velox::VectorSerde::Kind::kPresto)
.planNode();
exec::test::CursorParameters broadcastReadParams;
exec::CursorParameters broadcastReadParams;
broadcastReadParams.planNode = readerPlan;

VELOX_ASSERT_THROW(
readCursor(
exec::test::readCursor(
broadcastReadParams,
[&](auto* task) {
auto fileInfos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
void testPartitionAndSerialize(
const core::PlanNodePtr& plan,
const RowVectorPtr& expected,
const exec::test::CursorParameters params,
const exec::CursorParameters params,
const std::optional<uint32_t> expectedOutputCount = std::nullopt) {
auto [taskCursor, serializedResults] =
readCursor(params, [](auto /*task*/) {});
exec::test::readCursor(params, [](auto /*task*/) {});

RowVectorPtr result =
BaseVector::create<RowVector>(expected->type(), 0, pool());
Expand All @@ -432,18 +432,18 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
void testPartitionAndSerialize(
const core::PlanNodePtr& plan,
const RowVectorPtr& expected) {
exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.maxDrivers = 2;
testPartitionAndSerialize(plan, expected, params);
}

std::pair<std::unique_ptr<exec::test::TaskCursor>, std::vector<RowVectorPtr>>
std::pair<std::unique_ptr<exec::TaskCursor>, std::vector<RowVectorPtr>>
runShuffleReadTask(
const exec::test::CursorParameters& params,
const exec::CursorParameters& params,
const std::string& shuffleInfo) {
bool noMoreSplits = false;
return readCursor(params, [&](auto* task) {
return exec::test::readCursor(params, [&](auto* task) {
if (noMoreSplits) {
return;
}
Expand Down Expand Up @@ -527,7 +527,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
.project(dataType->names())
.planNode();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.destination = partition;

Expand Down Expand Up @@ -713,7 +713,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {

auto queryCtx =
core::QueryCtx::create(executor_.get(), core::QueryConfig(properties));
auto params = exec::test::CursorParameters();
auto params = exec::CursorParameters();
params.planNode = plan;
params.queryCtx = queryCtx;

Expand Down Expand Up @@ -743,12 +743,12 @@ TEST_F(UnsafeRowShuffleTest, operators) {
4, std::string(TestShuffleFactory::kShuffleName), info))
.planNode();

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode = plan;
params.maxDrivers = 2;

auto [taskCursor, serializedResults] =
readCursor(params, [](auto /*task*/) {});
exec::test::readCursor(params, [](auto /*task*/) {});
ASSERT_EQ(serializedResults.size(), 0);
TestShuffleWriter::reset();
}
Expand All @@ -769,7 +769,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) {
VELOX_CHECK(nullFunction());
}));

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode =
exec::test::PlanBuilder()
.values({data})
Expand All @@ -779,7 +779,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) {
.planNode();

VELOX_ASSERT_THROW(
readCursor(params, [](auto /*task*/) {}),
exec::test::readCursor(params, [](auto /*task*/) {}),
"ShuffleWriter::collect failed");

TestShuffleWriter::reset();
Expand All @@ -795,7 +795,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) {
auto info = testShuffleInfo(4, 1 << 20 /* 1MB */);
TestShuffleWriter::createWriter(info, pool());

exec::test::CursorParameters params;
exec::CursorParameters params;
params.planNode =
exec::test::PlanBuilder()
.values({data})
Expand All @@ -804,7 +804,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) {
2, std::string(TestShuffleFactory::kShuffleName), info))
.planNode();

ASSERT_NO_THROW(readCursor(params, [](auto /*task*/) {}));
ASSERT_NO_THROW(exec::test::readCursor(params, [](auto /*task*/) {}));

std::function<void(TestShuffleReader*)> injectFailure =
[&](TestShuffleReader* /*reader*/) {
Expand Down
Loading