Skip to content

Commit

Permalink
Allow single thread execution mode for local partition to support spa…
Browse files Browse the repository at this point in the history
…rk union for gluten (#11531)

Summary:
Pull Request resolved: #11531

Local partition could be used to support spark union for gluten. For union, it will be translated to local partition
with a followup distinct aggregation. Gluten is single thread execution mode using API like task::next to pull data.
Currently, Velox only allow single thread execution mode if the front of a pipeline is not local exchange which is not necessary.
Task::next actually pull results from all the pipelines so it should be good. This PR removes that check and verified with the
unit test.

Reviewed By: bikramSingh91, oerling

Differential Revision: D65917749

fbshipit-source-id: 1ebe8c5782d7d36d4a77cdc792169856c03d0dbd
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Nov 14, 2024
1 parent 8374802 commit f37dc00
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
3 changes: 1 addition & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,7 @@ struct DriverFactory {
static void registerAdapter(DriverAdapter adapter);

bool supportsSerialExecution() const {
return !needsPartitionedOutput() && !needsExchangeClient() &&
!needsLocalExchange();
return !needsPartitionedOutput() && !needsExchangeClient();
}

const core::PlanNodeId& leafNodeId() const {
Expand Down
23 changes: 13 additions & 10 deletions velox/exec/tests/LocalPartitionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,12 @@ TEST_F(LocalPartitionTest, unionAllLocalExchange) {
auto data1 = makeRowVector({"d0"}, {makeFlatVector<StringView>({"x"})});
auto data2 = makeRowVector({"e0"}, {makeFlatVector<StringView>({"y"})});

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
for (bool serialExecutionMode : {false, true}) {
SCOPED_TRACE(fmt::format("serialExecutionMode {}", serialExecutionMode));
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
AssertQueryBuilder(duckDbQueryRunner_)
.serialExecution(serialExecutionMode)
.plan(PlanBuilder(planNodeIdGenerator)
.localPartitionRoundRobin(
{PlanBuilder(planNodeIdGenerator)
.values({data1})
Expand All @@ -625,12 +629,11 @@ TEST_F(LocalPartitionTest, unionAllLocalExchange) {
.project({"e0 as c0"})
.planNode()})
.project({"length(c0)"})
.planNode();

assertQuery(
plan,
"SELECT length(c0) FROM ("
" SELECT * FROM (VALUES ('x')) as t1(c0) UNION ALL "
" SELECT * FROM (VALUES ('y')) as t2(c0)"
")");
.planNode())
.assertResults(
"SELECT length(c0) FROM ("
" SELECT * FROM (VALUES ('x')) as t1(c0) UNION ALL "
" SELECT * FROM (VALUES ('y')) as t2(c0)"
")");
}
}

0 comments on commit f37dc00

Please sign in to comment.