diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 458c1c29c0cf..258e234b35c7 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream { /// Configuration parameter to enable round-robin selection of tied winners of loser tree. /// - /// To address the issue of unbalanced polling between partitions due to tie-breakers being based - /// on partition index, especially in cases of low cardinality, we are making changes to the winner - /// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners, - /// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions - /// to grow excessively, as they continued receiving data without consuming it. + /// This option controls the tie-breaker strategy and attempts to avoid the + /// issue of unbalanced polling between partitions /// - /// For example, an upstream operator like a repartition execution would keep sending data to certain partitions, - /// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage. + /// If `true`, when multiple partitions have the same value, the partition + /// that has the fewest poll counts is selected. This strategy ensures that + /// multiple partitions with the same value are chosen equally, distributing + /// the polling load in a round-robin fashion. This approach balances the + /// workload more effectively across partitions and avoids excessive buffer + /// growth. /// - /// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index, - /// we now select the partition that has the fewest poll counts for the same value. - /// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion. - /// This approach balances the workload more effectively across partitions and avoids excessive buffer growth. + /// if `false`, partitions with smaller indices are consistently chosen as + /// the winners, which can lead to an uneven distribution of polling and potentially + /// causing upstream operator buffers for the other partitions to grow + /// excessively, as they continued receiving data without consuming it. + /// + /// For example, an upstream operator like `RepartitonExec` execution would + /// keep sending data to certain partitions, but those partitions wouldn't + /// consume the data if they weren't selected as winners. This resulted in + /// inefficient buffer usage. enable_round_robin_tie_breaker: bool, /// Flag indicating whether we are in the mode of round-robin diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 21597fb85662..adcb28e538fd 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Defines the sort preserving merge plan +//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream. use std::any::Any; use std::sync::Arc; @@ -38,10 +38,22 @@ use log::{debug, trace}; /// Sort preserving merge execution plan /// -/// This takes an input execution plan and a list of sort expressions, and -/// provided each partition of the input plan is sorted with respect to -/// these sort expressions, this operator will yield a single partition -/// that is also sorted with respect to them +/// # Overview +/// +/// This operator implements a K-way merge. It is used to merge multiple sorted +/// streams into a single sorted stream and is highly optimized. +/// +/// ## Inputs: +/// +/// 1. A list of sort expressions +/// 2. An input plan, where each partition is sorted with respect to +/// these sort expressions. +/// +/// ## Output: +/// +/// 1. A single partition that is also sorted with respect to the expressions +/// +/// ## Diagram /// /// ```text /// ┌─────────────────────────┐ @@ -55,12 +67,12 @@ use log::{debug, trace}; /// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘ /// │ ╔═══╦═══╗ │ │ /// │ ║ B ║ E ║ ... │──┘ │ -/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream -/// └─────────────────────────┘ places equal rows from stream 1 +/// │ ╚═══╩═══╝ │ Stable sort if `enable_round_robin_repartition=false`: +/// └─────────────────────────┘ the merged stream places equal rows from stream 1 /// Stream 2 /// /// -/// Input Streams Output stream +/// Input Partitions Output Partition /// (sorted) (sorted) /// ``` /// @@ -70,7 +82,7 @@ use log::{debug, trace}; /// the output and inputs are not polled again. #[derive(Debug, Clone)] pub struct SortPreservingMergeExec { - /// Input plan + /// Input plan with sorted partitions input: Arc, /// Sort expressions expr: LexOrdering, @@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec { fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, - /// Configuration parameter to enable round-robin selection of tied winners of loser tree. + /// Use round-robin selection of tied winners of loser tree + /// + /// See [`Self::with_round_robin_repartition`] for more information. enable_round_robin_repartition: bool, } @@ -105,6 +119,14 @@ impl SortPreservingMergeExec { } /// Sets the selection strategy of tied winners of the loser tree algorithm + /// + /// If true (the default) equal output rows are placed in the merged stream + /// in round robin fashion. This approach consumes input streams at more + /// even rates when there are many rows with the same sort key. + /// + /// If false, equal output rows are always placed in the merged stream in + /// the order of the inputs, resulting in potentially slower execution but a + /// stable output order. pub fn with_round_robin_repartition( mut self, enable_round_robin_repartition: bool, @@ -128,7 +150,8 @@ impl SortPreservingMergeExec { self.fetch } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + /// Creates the cache object that stores the plan properties + /// such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, ordering: LexOrdering, diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 2178cc012a10..448d70760de1 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -120,6 +120,10 @@ impl<'a> StreamingMergeBuilder<'a> { self } + /// See [SortPreservingMergeExec::with_round_robin_repartition] for more + /// information. + /// + /// [SortPreservingMergeExec::with_round_robin_repartition]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition pub fn with_round_robin_tie_breaker( mut self, enable_round_robin_tie_breaker: bool,