Skip to content

Commit

Permalink
feat: support left-related join spilling (databendlabs#14828)
Browse files Browse the repository at this point in the history
* feat: support left-related join spilling

* fix test
  • Loading branch information
xudong963 authored Mar 5, 2024
1 parent 0217923 commit f22ad96
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_pipeline_core::query_spill_prefix;
use databend_common_sql::plans::JoinType;
use databend_common_storage::DataOperator;
use log::info;

Expand Down Expand Up @@ -53,10 +54,22 @@ impl BuildSpillState {
}

// Get all hashes for build input data.
pub fn get_hashes(&self, block: &DataBlock, hashes: &mut Vec<u64>) -> Result<()> {
pub fn get_hashes(
&self,
block: &DataBlock,
join_type: Option<&JoinType>,
hashes: &mut Vec<u64>,
) -> Result<()> {
let func_ctx = self.build_state.ctx.get_function_context()?;
let keys = &self.build_state.hash_join_state.hash_join_desc.build_keys;
get_hashes(&func_ctx, block, keys, &self.build_state.method, hashes)
get_hashes(
&func_ctx,
block,
keys,
&self.build_state.method,
join_type,
hashes,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_sql::plans::JoinType;
use log::info;

use crate::pipelines::processors::transforms::BuildSpillState;
Expand Down Expand Up @@ -91,12 +92,12 @@ impl BuildSpillHandler {
}

// Spill pending data block
pub(crate) async fn spill(&mut self) -> Result<()> {
pub(crate) async fn spill(&mut self, join_type: &JoinType) -> Result<()> {
let pending_spill_data = self.pending_spill_data.clone();
for block in pending_spill_data.iter() {
let mut hashes = Vec::with_capacity(block.num_rows());
let spill_state = self.spill_state_mut();
spill_state.get_hashes(block, &mut hashes)?;
spill_state.get_hashes(block, Some(join_type), &mut hashes)?;
spill_state
.spiller
.spill_input(block.clone(), &hashes, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,10 @@ impl HashJoinBuildState {
}
Ok(())
}

pub(crate) fn join_type(&self) -> JoinType {
self.hash_join_state.hash_join_desc.join_type.clone()
}
}

pub fn supported_join_type_for_runtime_filter(join_type: &JoinType) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use parking_lot::RwLock;
use super::merge_into_hash_join_optimization::MergeIntoState;
use crate::pipelines::processors::transforms::hash_join::build_state::BuildState;
use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
use crate::pipelines::processors::transforms::hash_join::spill_common::spilling_supported_join_type;
use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
use crate::pipelines::processors::HashJoinDesc;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -145,7 +146,7 @@ impl HashJoinState {
let (build_done_watcher, _build_done_dummy_receiver) = watch::channel(0);
let (continue_build_watcher, _continue_build_dummy_receiver) = watch::channel(false);
let mut enable_spill = false;
if hash_join_desc.join_type == JoinType::Inner
if spilling_supported_join_type(&hash_join_desc.join_type)
&& ctx.get_settings().get_join_spilling_memory_ratio()? != 0
{
enable_spill = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl ProbeSpillState {
block,
keys,
&self.probe_state.hash_method,
None,
hashes,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ impl TransformHashJoinProbe {
self.step = HashJoinProbeStep::Running;
self.step_logs.push(HashJoinProbeStep::Running);
self.probe_state.reset();
if (self.join_probe_state.hash_join_state.need_outer_scan()
|| self.join_probe_state.hash_join_state.need_mark_scan())
if self.join_probe_state.hash_join_state.need_final_scan()
&& self.join_probe_state.probe_workers.load(Ordering::Relaxed) == 0
{
self.join_probe_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

// Define some methods that are used by both the build and probe spilling of the hash join.

use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::Column;
Expand All @@ -23,17 +24,38 @@ use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::HashMethodKind;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_sql::plans::JoinType;

use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity;
use crate::pipelines::processors::transforms::hash_join::util::hash_by_method;

pub fn get_hashes(
func_ctx: &FunctionContext,
block: &DataBlock,
keys: &[Expr],
method: &HashMethodKind,
join_type: Option<&JoinType>,
hashes: &mut Vec<u64>,
) -> Result<()> {
let evaluator = Evaluator::new(block, func_ctx, &BUILTIN_FUNCTIONS);
let mut block = block.clone();
let mut evaluator = Evaluator::new(&block, func_ctx, &BUILTIN_FUNCTIONS);
if let Some(join_type) = join_type {
if matches!(
join_type,
JoinType::Left | JoinType::LeftSingle | JoinType::Full
) {
let validity = Bitmap::new_constant(true, block.num_rows());
block = DataBlock::new(
block
.columns()
.iter()
.map(|c| wrap_true_validity(c, block.num_rows(), &validity))
.collect::<Vec<_>>(),
block.num_rows(),
);
evaluator = Evaluator::new(&block, func_ctx, &BUILTIN_FUNCTIONS);
}
}
let columns: Vec<(Column, DataType)> = keys
.iter()
.map(|expr| {
Expand All @@ -49,3 +71,14 @@ pub fn get_hashes(
hash_by_method(method, &columns, block.num_rows(), hashes)?;
Ok(())
}

pub fn spilling_supported_join_type(join_type: &JoinType) -> bool {
matches!(
*join_type,
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftSingle
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ impl Processor for TransformHashJoinBuild {
self.step = HashJoinBuildStep::Finalize;
}
HashJoinBuildStep::Spill => {
self.spill_handler.spill().await?;
self.spill_handler
.spill(&self.build_state.join_type())
.await?;
// After spill, the processor should continue to run, and process incoming data.
self.step = HashJoinBuildStep::Running;
}
Expand Down
File renamed without changes.
191 changes: 191 additions & 0 deletions tests/sqllogictests/suites/spill/left_join_spill.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
statement ok
set sandbox_tenant = 'test_tenant';

statement ok
use spill_test;

statement ok
set join_spilling_memory_ratio = 10;

statement ok
set join_spilling_bytes_threshold_per_proc = 1024;

statement ok
set disable_join_reorder = 1;

query I
select
c_custkey, count(o_orderkey) as c_count
from
customer
left join
orders
on c_custkey = o_custkey
and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120
group by
c_custkey
order by c_custkey
limit 20;
----
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
10 0
11 0
12 0
13 0
14 0
15 0
16 0
17 0
18 0
19 0
20 0

query II
select
c_custkey
from
customer
left semi join
orders
on c_custkey = o_custkey
and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120
order by c_custkey
limit 20;
----
101
101
103
103
104
104
106
106
107
107
109
109
110
110
112
112
113
113
115
115

query I
select
c_custkey
from
customer
left anti join
orders
on c_custkey = o_custkey
and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120
order by c_custkey
limit 20;
----
1
1
2
2
3
3
4
4
5
5
6
6
7
7
8
8
9
9
10
10


# tpch queries contain left join
#Q13
query I
select
c_count,
count(*) as custdist
from
(
select
c_custkey,
count(o_orderkey) as c_count
from
customer
left outer join
orders
on c_custkey = o_custkey
and o_comment not like '%pending%deposits%'
group by
c_custkey
)
c_orders
group by
c_count
order by
custdist desc,
c_count desc;
----
0 5000
40 676
36 651
44 618
48 554
32 548
52 514
28 487
76 485
72 461
56 454
80 444
64 442
68 438
60 430
84 396
88 378
24 355
92 322
96 262
100 188
20 184
104 162
108 138
112 103
16 92
116 59
12 49
120 29
124 26
128 19
8 12
132 8
136 7
140 5
4 3
144 1


statement ok
set disable_join_reorder = 0;

statement ok
set join_spilling_memory_ratio = 0;

statement ok
set join_spilling_bytes_threshold_per_proc = 0;

0 comments on commit f22ad96

Please sign in to comment.