-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support normalized expr in CSE #13315
Conversation
cc @peter-toth |
Thanks @alamb for pinging me and @zhuliquan for the PR. Node normalization was indeed missing from CSE. I am happy to review the change next week. |
datafusion/expr/src/expr.rs
Outdated
| Operator::NotEq | ||
) { | ||
let (l_expr, r_expr) = | ||
if format!("{normalized_left}") < format!("{normalized_right}") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this slowdown the query? How about checking the equality for (left, right) and (right, left) to determine the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm, This code runs in CSE phase, instead of the statement execution phase. It stands to reason that there should be no impact on execution, but are you referring specifically to the scenario where you use datafusion-cli
to run statements? This function will only be invoked on Eq
when the hash value of the node is the same, and the frequency should not be high, and the normalized comparison should be the same time complexity as the original node direct comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I share the concerns of @jayzhan211. Also, the other case in this method contains .clone()
of Expr
, which can be also costly.
I wonder if we could implement and use something like fn eq_normalized(&self, other: &Self) -> bool
instead of the current fn normalize(&self) -> Expr
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I got it. Thanks for @peter-toth @jayzhan211 suggestion. I improve this code as below
datafusion/datafusion/expr/src/expr.rs
Lines 1697 to 1738 in dda0848
impl NormalizeEq for Expr { | |
fn normalize_eq(&self, other: &Self) -> bool { | |
match (self, other) { | |
( | |
Expr::BinaryExpr(BinaryExpr { | |
left: self_left, | |
op: self_op, | |
right: self_right, | |
}), | |
Expr::BinaryExpr(BinaryExpr { | |
left: other_left, | |
op: other_op, | |
right: other_right, | |
}), | |
) => { | |
if self_op != other_op { | |
return false; | |
} | |
if matches!( | |
self_op, | |
Operator::Plus | |
| Operator::Multiply | |
| Operator::BitwiseAnd | |
| Operator::BitwiseOr | |
| Operator::BitwiseXor | |
| Operator::Eq | |
| Operator::NotEq | |
) { | |
(self_left.normalize_eq(other_left) | |
&& self_right.normalize_eq(other_right)) | |
|| (self_left.normalize_eq(other_right) | |
&& self_right.normalize_eq(other_left)) | |
} else { | |
self_left.normalize_eq(other_left) | |
&& self_right.normalize_eq(other_right) | |
} | |
} | |
(_, _) => self == other, | |
} | |
} | |
} |
4e60434
to
dda0848
Compare
datafusion/common/src/cse.rs
Outdated
impl<'n, N: HashNode> Identifier<'n, N> { | ||
impl<N: NormalizeEq> PartialEq for Identifier<'_, N> { | ||
fn eq(&self, other: &Self) -> bool { | ||
self.node.normalize_eq(other.node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's compare hash
es as well. I know that we use Identifier
in hashmap keys and there hashes need to match first to evaluate eq
, but still this would look more eq
like...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth you mean implement PartialEq
as below code?
self.hash == other.hash && self.node.normalize_eq(other.node)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just that small change.
&& self_right.normalize_eq(other_right) | ||
} | ||
} | ||
(_, _) => self == other, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this actually means that if there is an other expr above the cumulative BinaryExpr
operation, then we don't normalize? E.g. Not(a == b)
shouldn't be semantically equal to Not(b == a)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I got it, I will add more arm for this scope later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @peter-toth, Apologies for the delayed commit. I've added more arm in the normalize_eq
function to handle cumulative BinaryExpr
comparisons for other expressions. While working on this, I also noticed that other expressions could benefit from normalization. For example, with the InList
and CaseWhen
expression, we can ignore the order of elements.
You can see the relevant code here:
datafusion/datafusion/expr/src/expr.rs
Line 2013 in cc11692
// TODO: normalize_eq for lists, for example `a IN (c1 + c3, c3)` is equal to `a IN (c3, c1 + c3)` |
datafusion/datafusion/expr/src/expr.rs
Lines 2034 to 2036 in cc11692
// TODO: normalize_eq for when_then_expr | |
// for example `CASE a WHEN 1 THEN 2 WHEN 3 THEN 4 ELSE 5 END` is equal to `CASE a WHEN 3 THEN 4 WHEN 1 THEN 2 ELSE 5 END` | |
self_when_then_expr.len() == other_when_then_expr.len() |
In this case, I think the normalize_eq(&self, other: &Self) -> bool trait is not the best way to handle this scenario for almost exponential time complexity. At this moment, it's a good idea to normalize it first and then compare it. Do you have any suggestions on how to approach this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that is not easy to handle now, but IMO this PR looks great without that as well.
Actually, I'm working on something that stores and updates certain statistics and properties (like the hash) of tree nodes automatically during transformations. I think once the hashes of a node's children will be cheap we can use it to sort the children and call normalize_eq()
on the pairs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that is not easy to handle now, but IMO this PR looks great without that as well.
Actually, I'm working on something that stores and updates certain statistics and properties (like the hash) of nodes automatically during transformations. I think once the hashes of a node's children will be cheap we can use it to sort the children and call
normalize_eq()
on the pairs.
Yeah, I agree store and update hash for nodes greatly. Now, computing node's hash (this is recursive process and incrementally accumulate the hash of the child nodes) and invoke normalize_eq
are separate.
-
computing node's hash here:
datafusion/datafusion/common/src/cse.rs
Lines 325 to 331 in cc11692
if can_normalize { node_ids.sort_by_key(|i| i.hash); } let node_id = node_ids .into_iter() .fold(None, |accum, item| Some(item.combine(accum))); return (down_index, node_id, is_valid); -
invoke
normalize_eq
here:
datafusion/datafusion/common/src/cse.rs
Lines 396 to 399 in cc11692
.entry(node_id) .and_modify(|evaluation| { if *evaluation == NodeEvaluation::SurelyOnce || *evaluation == NodeEvaluation::ConditionallyAtLeastOnce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's the best we can do now. But I will try to open a PR in a few days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks for the improvement @zhuliquan!
Let's handle some edge case improvements in a follow-up PR.
cc @alamb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
What is the plan for this PR? Are we going to merge it? |
Yeah, this PR looks good to me. |
I merged this PR up from main to get a clean CI run and I am also running planning benchmarks just to be sure no regressions are introduced |
Benchmark results. If anything this branch seems to be slightly faster than main. I am rerunning to check again but I see no reason not to merge. Thanks again @zhuliquan @jayzhan211 and @peter-toth -- I am sorry for the delay in merging. I am not sure why we didn't merge this more quickly after approval. I used to have more time to actively look for PRs that were lagging and try and push them through but I am spread increasingly thin these days it seems ++ critcmp main feature-normalize_node
group feature-normalize_node main
----- ---------------------- ----
logical_aggregate_with_join 1.00 1480.2±14.66µs ? ?/sec 1.03 1523.2±78.12µs ? ?/sec
logical_select_all_from_1000 1.00 5.3±0.02ms ? ?/sec 1.03 5.4±0.03ms ? ?/sec
logical_select_one_from_700 1.00 1180.2±20.46µs ? ?/sec 1.01 1190.0±13.62µs ? ?/sec
logical_trivial_join_high_numbered_columns 1.00 1145.3±13.83µs ? ?/sec 1.02 1173.8±15.17µs ? ?/sec
logical_trivial_join_low_numbered_columns 1.00 1131.1±18.23µs ? ?/sec 1.02 1158.2±16.78µs ? ?/sec
physical_intersection 1.01 2.5±0.10ms ? ?/sec 1.00 2.5±0.02ms ? ?/sec
physical_join_consider_sort 1.00 3.3±0.02ms ? ?/sec 1.02 3.3±0.05ms ? ?/sec
physical_join_distinct 1.00 1121.8±21.54µs ? ?/sec 1.01 1136.7±14.28µs ? ?/sec
physical_many_self_joins 1.00 17.4±0.08ms ? ?/sec 1.01 17.6±0.14ms ? ?/sec
physical_plan_clickbench_all 1.00 229.1±1.40ms ? ?/sec 1.00 228.8±1.85ms ? ?/sec
physical_plan_clickbench_q1 1.02 3.4±0.05ms ? ?/sec 1.00 3.3±0.03ms ? ?/sec
physical_plan_clickbench_q10 1.06 4.7±0.10ms ? ?/sec 1.00 4.4±0.04ms ? ?/sec
physical_plan_clickbench_q11 1.06 4.8±0.08ms ? ?/sec 1.00 4.5±0.05ms ? ?/sec
physical_plan_clickbench_q12 1.05 4.9±0.11ms ? ?/sec 1.00 4.7±0.04ms ? ?/sec
physical_plan_clickbench_q13 1.03 4.4±0.08ms ? ?/sec 1.00 4.3±0.04ms ? ?/sec
physical_plan_clickbench_q14 1.01 4.6±0.05ms ? ?/sec 1.00 4.5±0.05ms ? ?/sec
physical_plan_clickbench_q15 1.02 4.5±0.06ms ? ?/sec 1.00 4.4±0.04ms ? ?/sec
physical_plan_clickbench_q16 1.01 3.8±0.04ms ? ?/sec 1.00 3.8±0.05ms ? ?/sec
physical_plan_clickbench_q17 1.02 4.0±0.04ms ? ?/sec 1.00 3.9±0.04ms ? ?/sec
physical_plan_clickbench_q18 1.02 3.7±0.05ms ? ?/sec 1.00 3.6±0.04ms ? ?/sec
physical_plan_clickbench_q19 1.00 4.6±0.04ms ? ?/sec 1.00 4.6±0.06ms ? ?/sec
physical_plan_clickbench_q2 1.00 3.7±0.05ms ? ?/sec 1.00 3.7±0.07ms ? ?/sec
physical_plan_clickbench_q20 1.01 3.4±0.03ms ? ?/sec 1.00 3.3±0.03ms ? ?/sec
physical_plan_clickbench_q21 1.00 3.6±0.04ms ? ?/sec 1.00 3.6±0.06ms ? ?/sec
physical_plan_clickbench_q22 1.01 4.7±0.08ms ? ?/sec 1.00 4.6±0.05ms ? ?/sec
physical_plan_clickbench_q23 1.01 5.1±0.05ms ? ?/sec 1.00 5.0±0.06ms ? ?/sec
physical_plan_clickbench_q24 1.00 5.8±0.05ms ? ?/sec 1.00 5.8±0.05ms ? ?/sec
physical_plan_clickbench_q25 1.00 4.0±0.05ms ? ?/sec 1.00 4.0±0.04ms ? ?/sec
physical_plan_clickbench_q26 1.00 3.7±0.03ms ? ?/sec 1.01 3.7±0.07ms ? ?/sec
physical_plan_clickbench_q27 1.00 4.0±0.04ms ? ?/sec 1.00 4.0±0.07ms ? ?/sec
physical_plan_clickbench_q28 1.00 4.8±0.04ms ? ?/sec 1.00 4.8±0.06ms ? ?/sec
physical_plan_clickbench_q29 1.00 5.9±0.06ms ? ?/sec 1.00 5.9±0.09ms ? ?/sec
physical_plan_clickbench_q3 1.02 3.6±0.04ms ? ?/sec 1.00 3.5±0.04ms ? ?/sec
physical_plan_clickbench_q30 1.01 16.9±0.17ms ? ?/sec 1.00 16.7±0.17ms ? ?/sec
physical_plan_clickbench_q31 1.00 4.9±0.06ms ? ?/sec 1.00 4.9±0.06ms ? ?/sec
physical_plan_clickbench_q32 1.00 4.9±0.04ms ? ?/sec 1.00 4.9±0.07ms ? ?/sec
physical_plan_clickbench_q33 1.00 4.4±0.04ms ? ?/sec 1.00 4.4±0.05ms ? ?/sec
physical_plan_clickbench_q34 1.00 3.9±0.03ms ? ?/sec 1.01 3.9±0.04ms ? ?/sec
physical_plan_clickbench_q35 1.00 4.1±0.04ms ? ?/sec 1.00 4.1±0.05ms ? ?/sec
physical_plan_clickbench_q36 1.00 5.3±0.05ms ? ?/sec 1.00 5.2±0.05ms ? ?/sec
physical_plan_clickbench_q37 1.00 5.3±0.07ms ? ?/sec 1.01 5.4±0.09ms ? ?/sec
physical_plan_clickbench_q38 1.00 5.3±0.03ms ? ?/sec 1.03 5.5±0.27ms ? ?/sec
physical_plan_clickbench_q39 1.00 4.8±0.03ms ? ?/sec 1.00 4.8±0.08ms ? ?/sec
physical_plan_clickbench_q4 1.02 3.4±0.04ms ? ?/sec 1.00 3.3±0.03ms ? ?/sec
physical_plan_clickbench_q40 1.00 5.4±0.08ms ? ?/sec 1.00 5.4±0.04ms ? ?/sec
physical_plan_clickbench_q41 1.00 5.1±0.05ms ? ?/sec 1.00 5.1±0.06ms ? ?/sec
physical_plan_clickbench_q42 1.00 5.0±0.07ms ? ?/sec 1.00 5.0±0.07ms ? ?/sec
physical_plan_clickbench_q43 1.00 5.1±0.05ms ? ?/sec 1.00 5.1±0.05ms ? ?/sec
physical_plan_clickbench_q44 1.00 3.5±0.03ms ? ?/sec 1.01 3.5±0.04ms ? ?/sec
physical_plan_clickbench_q45 1.00 3.5±0.03ms ? ?/sec 1.01 3.5±0.06ms ? ?/sec
physical_plan_clickbench_q46 1.00 4.1±0.04ms ? ?/sec 1.00 4.1±0.05ms ? ?/sec
physical_plan_clickbench_q47 1.00 4.8±0.06ms ? ?/sec 1.00 4.8±0.05ms ? ?/sec
physical_plan_clickbench_q48 1.00 5.4±0.08ms ? ?/sec 1.00 5.4±0.07ms ? ?/sec
physical_plan_clickbench_q49 1.00 5.7±0.05ms ? ?/sec 1.00 5.7±0.06ms ? ?/sec
physical_plan_clickbench_q5 1.08 3.8±0.10ms ? ?/sec 1.00 3.6±0.03ms ? ?/sec
physical_plan_clickbench_q6 1.04 3.7±0.09ms ? ?/sec 1.00 3.6±0.04ms ? ?/sec
physical_plan_clickbench_q7 1.09 4.5±0.13ms ? ?/sec 1.00 4.1±0.04ms ? ?/sec
physical_plan_clickbench_q8 1.06 4.1±0.12ms ? ?/sec 1.00 3.8±0.04ms ? ?/sec
physical_plan_clickbench_q9 1.06 4.4±0.16ms ? ?/sec 1.00 4.2±0.04ms ? ?/sec
physical_plan_tpcds_all 1.00 1371.6±20.14ms ? ?/sec 1.00 1370.3±7.17ms ? ?/sec
physical_plan_tpch_all 1.00 88.9±0.57ms ? ?/sec 1.00 89.2±0.48ms ? ?/sec
physical_plan_tpch_q1 1.00 3.2±0.02ms ? ?/sec 1.01 3.2±0.02ms ? ?/sec
physical_plan_tpch_q10 1.00 4.3±0.03ms ? ?/sec 1.01 4.3±0.03ms ? ?/sec
physical_plan_tpch_q11 1.00 3.9±0.05ms ? ?/sec 1.00 3.9±0.02ms ? ?/sec
physical_plan_tpch_q12 1.00 3.1±0.04ms ? ?/sec 1.00 3.1±0.02ms ? ?/sec
physical_plan_tpch_q13 1.00 2.4±0.02ms ? ?/sec 1.00 2.4±0.02ms ? ?/sec
physical_plan_tpch_q14 1.00 2.8±0.02ms ? ?/sec 1.02 2.8±0.07ms ? ?/sec
physical_plan_tpch_q16 1.00 3.9±0.04ms ? ?/sec 1.00 3.9±0.03ms ? ?/sec
physical_plan_tpch_q17 1.00 3.7±0.03ms ? ?/sec 1.01 3.7±0.03ms ? ?/sec
physical_plan_tpch_q18 1.00 4.1±0.04ms ? ?/sec 1.00 4.1±0.03ms ? ?/sec
physical_plan_tpch_q19 1.00 5.9±0.06ms ? ?/sec 1.00 5.9±0.04ms ? ?/sec
physical_plan_tpch_q2 1.00 7.3±0.05ms ? ?/sec 1.00 7.4±0.04ms ? ?/sec
physical_plan_tpch_q20 1.00 4.7±0.03ms ? ?/sec 1.01 4.7±0.04ms ? ?/sec
physical_plan_tpch_q21 1.00 6.0±0.04ms ? ?/sec 1.01 6.1±0.08ms ? ?/sec
physical_plan_tpch_q22 1.00 3.6±0.03ms ? ?/sec 1.00 3.6±0.03ms ? ?/sec
physical_plan_tpch_q3 1.00 3.2±0.02ms ? ?/sec 1.01 3.2±0.03ms ? ?/sec
physical_plan_tpch_q4 1.00 2.6±0.02ms ? ?/sec 1.01 2.6±0.02ms ? ?/sec
physical_plan_tpch_q5 1.00 4.4±0.03ms ? ?/sec 1.00 4.4±0.03ms ? ?/sec
physical_plan_tpch_q6 1.00 1863.8±74.09µs ? ?/sec 1.01 1878.4±80.90µs ? ?/sec
physical_plan_tpch_q7 1.00 5.7±0.03ms ? ?/sec 1.01 5.7±0.07ms ? ?/sec
physical_plan_tpch_q8 1.00 6.7±0.04ms ? ?/sec 1.01 6.7±0.04ms ? ?/sec
physical_plan_tpch_q9 1.00 5.3±0.04ms ? ?/sec 1.02 5.4±0.06ms ? ?/sec
physical_select_aggregates_from_200 1.00 26.6±0.12ms ? ?/sec 1.01 26.9±0.12ms ? ?/sec
physical_select_all_from_1000 1.00 40.6±0.15ms ? ?/sec 1.01 41.1±0.27ms ? ?/sec
physical_select_one_from_700 1.00 3.4±0.03ms ? ?/sec 1.01 3.4±0.02ms ? ?/sec
physical_theta_join_consider_sort 1.00 3.7±0.02ms ? ?/sec 1.01 3.7±0.03ms ? ?/sec
physical_unnest_to_join 1.00 3.3±0.02ms ? ?/sec 1.01 3.4±0.02ms ? ?/sec
with_param_values_many_columns 1.00 160.0±0.95µs ? ?/sec 1.02 163.6±1.01µs ? ?/sec |
Thanks again @zhuliquan @peter-toth @jayzhan211 -- this release is shaping up to be the best yet! |
Which issue does this PR close?
Rationale for this change
I notice that some expressions are semantically equivalent. (i.g.
a + b
andb + a
is equivalent). I think their expressions can be optimized in CSE (i.e. common subexper eliminate) process. For example: we can optimize below logical plan.before:
after:
In example, we extract common expr
test.a + test.b
, astest.a + test.b
is equal totest.b + test.a
semantically.What changes are included in this PR?
Normalizable
and implement this trait for enumExpr
.NormalizeEq
and implement this trait for enumExpr
.BinaryExpr
(includingPlus
/Multi
/BitsetAnd
/BitsetOr
/BitsetXor
/Eq
/NotEq
expressions)Normalizeable
andNormalizeEq
trait on CSE.Are these changes tested?
yes, I add below test cases.
and, I modify below test cases.
Are there any user-facing changes?
No