Skip to content

Commit

Permalink
Update partial_sort.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Dec 24, 2024
1 parent b4b267a commit cc73190
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl PartialSortStream {
return Poll::Ready(None);
}
loop {
return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) {
return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
if let Some(slice_point) =
self.get_slice_point(self.common_prefix_length, &batch)?
Expand All @@ -375,20 +375,33 @@ impl PartialSortStream {
let remaining_batch =
batch.slice(slice_point, batch.num_rows() - slice_point);
let sorted_batch = self.sort_in_mem_batches();
self.in_mem_batches.push(remaining_batch);
sorted_batch
if sorted_batch
.as_ref()
.map(|batch| batch.num_rows() > 0)
.unwrap_or(true)
{
self.in_mem_batches.push(remaining_batch);
Some(sorted_batch)
} else {
None
}
} else {
self.in_mem_batches.push(batch);
continue;
}
}
Some(Err(e)) => Err(e),
Some(Err(e)) => Some(Err(e)),
None => {
self.is_closed = true;
// once input is consumed, sort the rest of the inserted batches
self.sort_in_mem_batches()
let remaining_batch = self.sort_in_mem_batches()?;
if remaining_batch.num_rows() > 0 {
Some(Ok(remaining_batch))
} else {
None
}
}
}));
});
}
}

Expand All @@ -409,9 +422,6 @@ impl PartialSortStream {
self.is_closed = true;
}
}
// Empty record batches should not be emitted.
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
debug_assert!(result.num_rows() > 0);
Ok(result)
}

Expand Down

0 comments on commit cc73190

Please sign in to comment.