From db7a1c96d977b3d36b00676c88ce41e37288feb6 Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Fri, 27 Dec 2024 16:13:40 +0300 Subject: [PATCH 1/5] IGNITE-24131 Reuse GroupKey.Builder for hash aggregate --- .../query/calcite/exec/exp/agg/GroupKey.java | 7 +++++++ .../calcite/exec/rel/HashAggregateNode.java | 20 ++++++++++++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java index 35b79ec9112d1..fbf9265ba4cb0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java @@ -91,5 +91,12 @@ public GroupKey build() { return new GroupKey(fields); } + + /** */ + public void clear() { + Arrays.fill(fields, null); + + idx = 0; + } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java index 1063cc00e4935..5e4580dbe8a2f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java @@ -244,11 +244,15 @@ private class Grouping { /** */ private final RowHandler handler; + /** */ + private GroupKey.Builder grpKeyBld; + /** */ private Grouping(byte grpId, ImmutableBitSet grpFields) { this.grpId = grpId; this.grpFields = grpFields; + grpKeyBld = GroupKey.builder(grpFields.cardinality()); handler = context().rowHandler(); init(); @@ -293,14 +297,20 @@ else if (type == AggregateType.MAP) /** */ private void addOnMapper(Row row) { - GroupKey.Builder b = GroupKey.builder(grpFields.cardinality()); - for (Integer field : grpFields) - b.add(handler.get(field, row)); + grpKeyBld.add(handler.get(field, row)); - GroupKey grpKey = b.build(); + GroupKey grpKey = grpKeyBld.build(); - List> wrappers = groups.computeIfAbsent(grpKey, this::create); + List> wrappers = groups.get(grpKey); + + if (wrappers == null) { + groups.put(grpKey, (wrappers = create(grpKey))); + + grpKeyBld = GroupKey.builder(grpFields.cardinality()); + } + else + grpKeyBld.clear(); for (AccumulatorWrapper wrapper : wrappers) wrapper.add(row); From 329f8f7ef7a63a36a0f8a0db2f84efa439d95bbf Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Sat, 28 Dec 2024 13:38:38 +0300 Subject: [PATCH 2/5] Review fix --- .../calcite/exec/rel/HashAggregateNode.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java index 5e4580dbe8a2f..848a71bdc0b66 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java @@ -263,7 +263,7 @@ private void init() { // Initializes aggregates for case when no any rows will be added into the aggregate to have 0 as result. // Doesn't do it for MAP type due to we don't want send from MAP node zero results because it looks redundant. if (grpFields.isEmpty() && (type == AggregateType.REDUCE || type == AggregateType.SINGLE)) - groups.put(GroupKey.EMPTY_GRP_KEY, create(GroupKey.EMPTY_GRP_KEY)); + groups.put(GroupKey.EMPTY_GRP_KEY, create()); } /** */ @@ -302,15 +302,18 @@ private void addOnMapper(Row row) { GroupKey grpKey = grpKeyBld.build(); - List> wrappers = groups.get(grpKey); + List> wrappers = groups.compute(grpKey, (k, v) -> { + if (v == null) { + grpKeyBld = GroupKey.builder(grpFields.cardinality()); - if (wrappers == null) { - groups.put(grpKey, (wrappers = create(grpKey))); + return create(); + } + else { + grpKeyBld.clear(); - grpKeyBld = GroupKey.builder(grpFields.cardinality()); - } - else - grpKeyBld.clear(); + return v; + } + }); for (AccumulatorWrapper wrapper : wrappers) wrapper.add(row); @@ -325,7 +328,7 @@ private void addOnReducer(Row row) { GroupKey grpKey = (GroupKey)handler.get(1, row); - List> wrappers = groups.computeIfAbsent(grpKey, this::create); + List> wrappers = groups.computeIfAbsent(grpKey, (k) -> create()); Accumulator[] accums = hasAccumulators() ? (Accumulator[])handler.get(2, row) : null; for (int i = 0; i < wrappers.size(); i++) { @@ -398,7 +401,7 @@ private List getOnReducer(int cnt) { } /** */ - private List> create(GroupKey key) { + private List> create() { if (accFactory == null) return Collections.emptyList(); From 1aefdd9820562e29211ff5ef244f2e21f741ec0b Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Sat, 28 Dec 2024 15:20:02 +0300 Subject: [PATCH 3/5] Review fix --- .../calcite/exec/rel/HashAggregateNode.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java index 848a71bdc0b66..9751e4273f296 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Supplier; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.util.ImmutableBitSet; @@ -247,6 +248,9 @@ private class Grouping { /** */ private GroupKey.Builder grpKeyBld; + /** */ + private final BiFunction>, List>> getOrCreateGroup; + /** */ private Grouping(byte grpId, ImmutableBitSet grpFields) { this.grpId = grpId; @@ -255,6 +259,19 @@ private Grouping(byte grpId, ImmutableBitSet grpFields) { grpKeyBld = GroupKey.builder(grpFields.cardinality()); handler = context().rowHandler(); + getOrCreateGroup = (k, v) -> { + if (v == null) { + grpKeyBld = GroupKey.builder(grpFields.cardinality()); + + return create(); + } + else { + grpKeyBld.clear(); + + return v; + } + }; + init(); } @@ -300,20 +317,7 @@ private void addOnMapper(Row row) { for (Integer field : grpFields) grpKeyBld.add(handler.get(field, row)); - GroupKey grpKey = grpKeyBld.build(); - - List> wrappers = groups.compute(grpKey, (k, v) -> { - if (v == null) { - grpKeyBld = GroupKey.builder(grpFields.cardinality()); - - return create(); - } - else { - grpKeyBld.clear(); - - return v; - } - }); + List> wrappers = groups.compute(grpKeyBld.build(), getOrCreateGroup); for (AccumulatorWrapper wrapper : wrappers) wrapper.add(row); From 113d91485895f7839f978b34d5380b42cdec10e8 Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Sat, 28 Dec 2024 15:56:11 +0300 Subject: [PATCH 4/5] Remove lambda from addOnReducer --- .../processors/query/calcite/exec/rel/HashAggregateNode.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java index 9751e4273f296..42eefa7462e92 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java @@ -332,10 +332,10 @@ private void addOnReducer(Row row) { GroupKey grpKey = (GroupKey)handler.get(1, row); - List> wrappers = groups.computeIfAbsent(grpKey, (k) -> create()); + List> wrappers = groups.get(grpKey); Accumulator[] accums = hasAccumulators() ? (Accumulator[])handler.get(2, row) : null; - for (int i = 0; i < wrappers.size(); i++) { + for (int i = 0; i < F.size(wrappers); i++) { AccumulatorWrapper wrapper = wrappers.get(i); Accumulator accum = accums[i]; From ef845aa0b758c1f4952e852b7bad00de17cca75e Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Sat, 28 Dec 2024 16:30:38 +0300 Subject: [PATCH 5/5] Fix test --- .../query/calcite/exec/rel/HashAggregateNode.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java index 42eefa7462e92..39c8a1f74db51 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.util.ImmutableBitSet; @@ -251,6 +252,9 @@ private class Grouping { /** */ private final BiFunction>, List>> getOrCreateGroup; + /** */ + private final Function>> createGroup; + /** */ private Grouping(byte grpId, ImmutableBitSet grpFields) { this.grpId = grpId; @@ -259,6 +263,8 @@ private Grouping(byte grpId, ImmutableBitSet grpFields) { grpKeyBld = GroupKey.builder(grpFields.cardinality()); handler = context().rowHandler(); + createGroup = (k) -> create(); + getOrCreateGroup = (k, v) -> { if (v == null) { grpKeyBld = GroupKey.builder(grpFields.cardinality()); @@ -332,10 +338,10 @@ private void addOnReducer(Row row) { GroupKey grpKey = (GroupKey)handler.get(1, row); - List> wrappers = groups.get(grpKey); + List> wrappers = groups.computeIfAbsent(grpKey, createGroup); Accumulator[] accums = hasAccumulators() ? (Accumulator[])handler.get(2, row) : null; - for (int i = 0; i < F.size(wrappers); i++) { + for (int i = 0; i < wrappers.size(); i++) { AccumulatorWrapper wrapper = wrappers.get(i); Accumulator accum = accums[i];