-
Notifications
You must be signed in to change notification settings - Fork 443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Fix NullPointerException when collect_list / collect_set are partially fallen back #5655
Conversation
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
adb07c7
to
a199917
Compare
Run Gluten Clickhouse CI |
2 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
/Benchmark Velox |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
Run Gluten Clickhouse CI |
3 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
CC @zhli1142015 |
bd6849e
to
76d3634
Compare
Run Gluten Clickhouse CI |
76d3634
to
0eb763c
Compare
Run Gluten Clickhouse CI |
0eb763c
to
b9f6db3
Compare
Run Gluten Clickhouse CI |
/Benchmark Velox |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
I am not 100% sure but seems to be that the PR's approach looks more portable? Row extraction would work but if it's done at operator level, the function still outputs ARRAY intermediate data which doesn't match Spark's CollectList / CollectSet definition. |
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
override def defaultResult: Option[Literal] = Option(Literal.create(Array(), dataType)) | ||
} | ||
|
||
case class VeloxCollectSet(override val child: Expression) extends VeloxCollect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ArrayUnion for updateExpressions and mergeExpressions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It internally uses Array. A single de-dup operation should cost ~O(n). I would do a distinct at end of aggregation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mainly for reduce shuffle data if the partial mode aggreagte fallback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it costs more space. May be we can use Spark Map type for collect_set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to fully copy Collect
code and change serialize
, deserialize
to adapt datatype ? It seems we can re-point binary to a UnsafeArrayData and then getArray.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't binary data only able to obtain when it's a ImperativeAggregate
? Would you elaborate on the suggestion?
case class VeloxCollectSet(override val child: Expression) extends VeloxCollect { | ||
override def prettyName: String = "velox_collect_set" | ||
|
||
override def nullable: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we preserve the comment ?
Lines 101 to 103 in 5d8ac72
// We should mark attribute as withNullability since the collect_set and collect_set | |
// are not nullable but velox may return null. This is to avoid potential issue when | |
// the post project fallback to vanilla Spark. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comment to VeloxCollectSet.
For now we don't need the withNullability
part of the comment.
case w: Window => | ||
w.transformExpressions { | ||
case func @ WindowExpression(ToVeloxCollectSet(newAggFunc), _) => | ||
val out = ensureNonNull(func.copy(newAggFunc)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we ensureNonNull
for newAggFunc rather than window func ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't work. Spark has some checks in checkAnalysis
to enforce "WindowExpression(WindowFunction, ...)" pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure adding ensureNonNull for window expression is been actually evaluated. It seems window operator only collects the window expression to eval. Is there a test for window + collect_set with null input ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's evaluated with the project created in WindowExecBase#createResultProjection
. Velox's window implementation doesn't support collect_set
yet. I'd add a case for vanilla Spark + velox_collect_set if you are concerned about this part of code.
agg.transformExpressions { | ||
case ToVeloxCollectSet(newAggFunc) => | ||
val out = ensureNonNull(newAggFunc) | ||
out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find a proper way for placing breakpoint on return value for debugging code without doing this. Do you know some?
If there is not a good way, Personally I would keep something like this and it doesn't increase code complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually dig the internal method returned value.
withSQLConf( | ||
GlutenConfig.EXPRESSION_BLACK_LIST.key -> "collect_set" | ||
) { | ||
CollectRewriteRule.forceEnableAndRun { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to force enable the rule ? Should it it already enabled if have collect_set ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this API and the two tests. They are not useful for now.
if (out.fastEquals(plan)) { | ||
return plan | ||
} | ||
spark.sessionState.analyzer.checkAnalysis(out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this check for ? I did not see Spark call it at optimizer phase...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try removing this. It should not be required with current approach either.
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
2 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
} | ||
} | ||
|
||
private def has[T <: Expression: ClassTag]: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After droping test, do we still need this check ? It seems this rule is only added by velox backend and the same is VeloxCollectSet. So when we going to this rule, there must have VeloxCollectSet ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with removing the check. Although we can do that later in some version...
For example VeloxCollectSet
is still expensive than vanilla Spark so removing this check may cause a user who disabled collect_set
experience performance regression.
BTW I'll raise some optimizations on VeloxCollectSet
after this patch is merged. When I am confident with vanilla Spark + velox_collect_list / velox_collect_set being used in production enough, I'll remove this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks.
Thank you all for reviewing! |
Edit: Fixes #5649. Added vanilla implementation of velox_collect_list and velox_collect_set.
Edit: (2024/12/13) Why doing the expression replacement on logical plan: The replacement changes the intermediate data type so will change partial aggregation's output schema if it's against physical plan. So we adjust the logical plan directly.
Velox backend's collect_list / collect_set implementations require for ARRAY intermediate data however Spark uses BINARY. To address this we did some tricks to forcibly modify the physical plan to change the output schema of partial aggregate operator to align with Velox, but that way the actual information for the two functions in Velox backend is still hidden from query plan so advanced optimizations or compatibility checks are made difficult during planning phase.
This patch adds new functions velox_collect_list / velox_collect_set to correctly map to Velox backend's implementation for the two functions and does essential code cleanup and refactors.
RewriteCollect
/RewriteTypedImperativeAggregate
, add logical ruleCollectRewriteRule
to incorporate functionalities of the formers.CollectRewriteRule
will replace collect_list / collect_set with velox_collect_list / velox_collect_set.DeclarativeAggregate
, some UTs should be disabled as they do some plan checks for existence ofObjectHashAggregateExec
.