Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement maintains_input_order for AggregateExec #13897

Merged
merged 4 commits into from
Dec 27, 2024

Conversation

alihan-synnada
Copy link
Contributor

Which issue does this PR close?

None

Rationale for this change

maintains_input_order helps with sort pushdown optimization. As explained in InputOrderMode documentation, given an ordering [a, b] and a grouping [b] (Linear mode), [a, b] will not be satisfied anymore. However, Sorted and PartiallySorted modes maintain the input order in aggregation.

What changes are included in this PR?

  • Implement maintains_input_order for AggregateExec

Are these changes tested?

No, but I'm open to suggestions. I checked BoundedWindowAggExec for inspiration but it seems this behavior is tested in SQL logic tests. Sort pushdown is implemented for window operations, but not for aggregation. Therefore, I cannot use a similar test as of now, but it can be applied if/when sort pushdown is implemented for AggregationExec

Are there any user-facing changes?

None

@github-actions github-actions bot added the physical-expr Physical Expressions label Dec 24, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend some tests if possible to avoid breakages during future refactorings

@ozankabak
Copy link
Contributor

If we add a unit test with an unnecessary SortExec on top of an order-maintaining AggregateExec, would the EnforceSorting rule remove it with this change? I guess it wouldn't remove it before?

@berkaysynnada
Copy link
Contributor

I recommend some tests if possible to avoid breakages during future refactorings

Unless some downstream rules explicitly check the maintains_input_order flag of the ExecutionPlan (which we do in our fork), this implementation does not benefit to the current code.

If we add a unit test with an unnecessary SortExec on top of an order-maintaining AggregateExec, would the EnforceSorting rule remove it with this change? I guess it wouldn't remove it before?

It wouldn't remove it, since the current upstream rules (like EnforceSorting) are using EquivalenceProperties::ordering_satisfy() API, and that one is consulting the AggregateExec output ordering (or related property cache of any operator), not the maintains_input_order(). The critical point here is that output ordering is built from directly group_by columns again, not using maintains_input_order() at all.

        let projection_mapping =
            ProjectionMapping::try_new(&group_by.expr, &input.schema())?;
        ...
            let cache = Self::compute_properties(
            &input,
            Arc::clone(&schema),
            &projection_mapping,
            &mode,
            &input_order_mode,
        );
  pub fn compute_properties(
        input: &Arc<dyn ExecutionPlan>,
        schema: SchemaRef,
        projection_mapping: &ProjectionMapping,
        mode: &AggregateMode,
        input_order_mode: &InputOrderMode,
    ) -> PlanProperties {
        // Construct equivalence properties:
        let eq_properties = input
            .equivalence_properties()
            .project(projection_mapping, schema);

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK then, since the change is trivial, let's get this in and then start thinking of a "real-world" plan and rule that would break if this flag was somehow removed.

@2010YOUY01
Copy link
Contributor

One problem is it's not obvious why it works, though it's only a one-line function 🤔
It should work in common case but i'm not sure it's correct for every corner case (e.g. topK aggregate), given now how ordering will be preserved is not documented in AggregateExec
If anyone is familiar with this part, it will be super helpful to add more comment to aggregate execution, and point to that from this PR's change, like

If input to AggregateExec is not ordered, no output order will be guaranteed
If input is ordered by group keys, the streaming aggregation path will be used and output will maintain the same ordering
...

I recommend some tests if possible to avoid breakages during future refactorings

Unless some downstream rules explicitly check the maintains_input_order flag of the ExecutionPlan (which we do in our fork), this implementation does not benefit to the current code.

Maybe we can mark or comment something like 'experimental' to this method, if it's not tested?

@ozankabak
Copy link
Contributor

AFAIK the flag is valid in all cases. What do you mean by topK aggregate?

Maybe we can mark or comment something like 'experimental' to this method, if it's not tested?

Let's add a TODO to remind us to add a test exercising this

@berkaysynnada
Copy link
Contributor

50acc71

@2010YOUY01 does this relieve your concerns?

@2010YOUY01
Copy link
Contributor

AFAIK the flag is valid in all cases. What do you mean by topK aggregate?

Maybe we can mark or comment something like 'experimental' to this method, if it's not tested?

Let's add a TODO to remind us to add a test exercising this

I'm just worried we will miss some corner cases, since whether to maintain input order for AggregateExec is implementation dependent, so I think more tests are necessary.
TopK aggregate is just an edge case example, I took a quick look: this path can be triggered if input is ordered, and its output will only guarantee to be ordered by first group key, which not seem correct

if let Some(limit) = self.limit {
if !self.is_unordered_unfiltered_group_by_distinct() {
return Ok(StreamType::GroupedPriorityQueue(
GroupedTopKAggregateStream::new(self, context, partition, limit)?,
));
}
}

However, since it is not used by the internal optimizer and there is a TODO for testing, it looks good to me

@berkaysynnada
Copy link
Contributor

I'm just worried we will miss some corner cases, since whether to maintain input order for AggregateExec is implementation dependent, so I think more tests are necessary. TopK aggregate is just an edge case example, I took a quick look: this path can be triggered if input is ordered, and its output will only guarantee to be ordered by first group key, which not seem correct

Could you detail how only the first group key's ordering is maintained while others are invalidated? If that's the case, the output ordering calculation for AggregateExec might also expose some bugs.

@ozankabak
Copy link
Contributor

ozankabak commented Dec 26, 2024

@2010YOUY01 I think a concrete example of when input ordering is not preserved even though input order mode is not linear would be very helpful

@ozankabak
Copy link
Contributor

I went through the code and I wasn't able to see how the ordering could be lost when InputOrderMode is not Linear. This is also the theoretical desideratum for this operator. Let's merge this PR to reflect the desideratum and if we discover that there is a code path where this is violated, we will open a quick follow-on PR to fix it.

@berkaysynnada berkaysynnada merged commit d25099e into apache:main Dec 27, 2024
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants