Skip to content

Commit

Permalink
Improve SortPreservingMerge::enable_round_robin_repartition docs (#13826
Browse files Browse the repository at this point in the history
)

* Clarify SortPreservingMerge::enable_round_robin_repartition  docs

* tweaks

* Improve comments more

* clippy

* fix doc link
  • Loading branch information
alamb authored Dec 20, 2024
1 parent 667c77a commit 4118c43
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 22 deletions.
28 changes: 17 additions & 11 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {

/// Configuration parameter to enable round-robin selection of tied winners of loser tree.
///
/// To address the issue of unbalanced polling between partitions due to tie-breakers being based
/// on partition index, especially in cases of low cardinality, we are making changes to the winner
/// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners,
/// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions
/// to grow excessively, as they continued receiving data without consuming it.
/// This option controls the tie-breaker strategy and attempts to avoid the
/// issue of unbalanced polling between partitions
///
/// For example, an upstream operator like a repartition execution would keep sending data to certain partitions,
/// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage.
/// If `true`, when multiple partitions have the same value, the partition
/// that has the fewest poll counts is selected. This strategy ensures that
/// multiple partitions with the same value are chosen equally, distributing
/// the polling load in a round-robin fashion. This approach balances the
/// workload more effectively across partitions and avoids excessive buffer
/// growth.
///
/// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index,
/// we now select the partition that has the fewest poll counts for the same value.
/// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion.
/// This approach balances the workload more effectively across partitions and avoids excessive buffer growth.
/// if `false`, partitions with smaller indices are consistently chosen as
/// the winners, which can lead to an uneven distribution of polling and potentially
/// causing upstream operator buffers for the other partitions to grow
/// excessively, as they continued receiving data without consuming it.
///
/// For example, an upstream operator like `RepartitonExec` execution would
/// keep sending data to certain partitions, but those partitions wouldn't
/// consume the data if they weren't selected as winners. This resulted in
/// inefficient buffer usage.
enable_round_robin_tie_breaker: bool,

/// Flag indicating whether we are in the mode of round-robin
Expand Down
45 changes: 34 additions & 11 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Defines the sort preserving merge plan
//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream.
use std::any::Any;
use std::sync::Arc;
Expand All @@ -38,10 +38,22 @@ use log::{debug, trace};

/// Sort preserving merge execution plan
///
/// This takes an input execution plan and a list of sort expressions, and
/// provided each partition of the input plan is sorted with respect to
/// these sort expressions, this operator will yield a single partition
/// that is also sorted with respect to them
/// # Overview
///
/// This operator implements a K-way merge. It is used to merge multiple sorted
/// streams into a single sorted stream and is highly optimized.
///
/// ## Inputs:
///
/// 1. A list of sort expressions
/// 2. An input plan, where each partition is sorted with respect to
/// these sort expressions.
///
/// ## Output:
///
/// 1. A single partition that is also sorted with respect to the expressions
///
/// ## Diagram
///
/// ```text
/// ┌─────────────────────────┐
Expand All @@ -55,12 +67,12 @@ use log::{debug, trace};
/// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗ │ │
/// │ ║ B ║ E ║ ... │──┘ │
/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream
/// └─────────────────────────┘ places equal rows from stream 1
/// │ ╚═══╩═══╝ │ Stable sort if `enable_round_robin_repartition=false`:
/// └─────────────────────────┘ the merged stream places equal rows from stream 1
/// Stream 2
///
///
/// Input Streams Output stream
/// Input Partitions Output Partition
/// (sorted) (sorted)
/// ```
///
Expand All @@ -70,7 +82,7 @@ use log::{debug, trace};
/// the output and inputs are not polled again.
#[derive(Debug, Clone)]
pub struct SortPreservingMergeExec {
/// Input plan
/// Input plan with sorted partitions
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: LexOrdering,
Expand All @@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec {
fetch: Option<usize>,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// Configuration parameter to enable round-robin selection of tied winners of loser tree.
/// Use round-robin selection of tied winners of loser tree
///
/// See [`Self::with_round_robin_repartition`] for more information.
enable_round_robin_repartition: bool,
}

Expand All @@ -105,6 +119,14 @@ impl SortPreservingMergeExec {
}

/// Sets the selection strategy of tied winners of the loser tree algorithm
///
/// If true (the default) equal output rows are placed in the merged stream
/// in round robin fashion. This approach consumes input streams at more
/// even rates when there are many rows with the same sort key.
///
/// If false, equal output rows are always placed in the merged stream in
/// the order of the inputs, resulting in potentially slower execution but a
/// stable output order.
pub fn with_round_robin_repartition(
mut self,
enable_round_robin_repartition: bool,
Expand All @@ -128,7 +150,8 @@ impl SortPreservingMergeExec {
self.fetch
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
/// Creates the cache object that stores the plan properties
/// such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
ordering: LexOrdering,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/sorts/streaming_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl<'a> StreamingMergeBuilder<'a> {
self
}

/// See [SortPreservingMergeExec::with_round_robin_repartition] for more
/// information.
///
/// [SortPreservingMergeExec::with_round_robin_repartition]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition
pub fn with_round_robin_tie_breaker(
mut self,
enable_round_robin_tie_breaker: bool,
Expand Down

0 comments on commit 4118c43

Please sign in to comment.