Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Nov 2, 2024
1 parent c9a23a5 commit deb27db
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 32 deletions.
3 changes: 3 additions & 0 deletions e2e_test/batch/basic/rw_timestamp.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,8 @@ select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where
----
t 11 1

statement ok
delete from t;

statement ok
drop table t;
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {

let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() {
if self.lookup_prefix_len == self.table.pk_indices().len() && !self.table.has_epoch_idx() {
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
Expand Down
21 changes: 5 additions & 16 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use itertools::Itertools;
use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema, RW_TIMESTAMP_COLUMN_NAME};
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -322,11 +322,6 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
assert_eq!(scan_ranges.len(), 1);
}

let select_rw_timestamp =
table.schema().fields().iter().any(|x| {
x.name == RW_TIMESTAMP_COLUMN_NAME && x.data_type == DataType::Timestamptz
});

let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
.into_iter()
.partition(|x| x.pk_prefix.len() == table.pk_indices().len());
Expand All @@ -342,14 +337,8 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
// Point Get
for point_get in point_gets {
let table = table.clone();
if let Some(row) = Self::execute_point_get(
table,
point_get,
query_epoch,
select_rw_timestamp,
histogram,
)
.await?
if let Some(row) =
Self::execute_point_get(table, point_get, query_epoch, histogram).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
returned += chunk.cardinality() as u64;
Expand Down Expand Up @@ -403,14 +392,14 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
scan_range: ScanRange,
epoch: BatchQueryEpoch,
select_rw_timestamp: bool,
histogram: Option<impl Deref<Target = Histogram>>,
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

if select_rw_timestamp {
if table.has_epoch_idx() {
// has epoch_idx means we need to select `_rw_timestamp` column which is unsupported by `get_row` interface, so use iterator interface instead.
let range_bounds = (Bound::<OwnedRow>::Unbounded, Bound::Unbounded);
let iter = table
.batch_chunk_iter_with_pk_bounds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) }
└─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
delete from t where v1 = 1;
Expand All @@ -122,7 +122,7 @@
└─BatchDelete { table: t }
└─BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (t.v1 = 1:Int32) }
└─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) }
└─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
delete from t where v1;
Expand Down
14 changes: 8 additions & 6 deletions src/frontend/planner_test/tests/testdata/output/delete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,29 @@
logical_plan: |-
LogicalProject { exprs: [t.a, t.b, t.a, (t.a + t.b) as $expr1] }
└─LogicalDelete { table: t, returning: true }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] }
└─LogicalProject { exprs: [t.a, t.b, t._row_id] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.a, t.b, t.a, (t.a + t.b) as $expr1] }
└─BatchDelete { table: t, returning: true }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- name: delete with returning constant, should keep `Delete`
sql: |
create table t (v int);
delete from t returning 114514;
logical_plan: |-
LogicalProject { exprs: [114514:Int32] }
└─LogicalDelete { table: t, returning: true }
└─LogicalScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp] }
└─LogicalProject { exprs: [t.v, t._row_id] }
└─LogicalScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [114514:Int32] }
└─BatchDelete { table: t, returning: true }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) }
└─BatchScan { table: t, columns: [t.v, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- name: insert with returning agg functions, should not run
sql: |
create table t (a int, b int);
Expand All @@ -41,5 +43,5 @@
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id, t._rw_timestamp) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@
BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t1 }
└─BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 }
└─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 }
└─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) }
└─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard }
batch_local_plan: |-
BatchDelete { table: t1 }
└─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 }
└─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard }
- sql: |
Expand Down
11 changes: 6 additions & 5 deletions src/frontend/planner_test/tests/testdata/output/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,18 @@
delete from t where a not in (select b from t);
logical_plan: |-
LogicalDelete { table: t }
└─LogicalApply { type: LeftAnti, on: (t.a = t.b), correlated_id: 1 }
├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] }
└─LogicalProject { exprs: [t.b] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] }
└─LogicalProject { exprs: [t.a, t.b, t._row_id] }
└─LogicalApply { type: LeftAnti, on: (t.a = t.b), correlated_id: 1 }
├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] }
└─LogicalProject { exprs: [t.b] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t }
└─BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftAnti, predicate: t.a = t.b, output: all }
├─BatchExchange { order: [], dist: HashShard(t.a) }
│ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) }
│ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
└─BatchExchange { order: [], dist: HashShard(t.b) }
└─BatchScan { table: t, columns: [t.b], distribution: SomeShard }
- name: distributed update
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.distribution.update_vnode_bitmap(new_vnodes)
}

pub fn has_epoch_idx(&self) -> bool {
self.epoch_idx.is_some()
}
}

pub trait PkAndRowStream = Stream<Item = StorageResult<KeyedRow<Bytes>>> + Send;
Expand Down

0 comments on commit deb27db

Please sign in to comment.