Skip to content

Commit

Permalink
feat(subscription): support cusor order (#18801)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Oct 25, 2024
1 parent a89abcf commit fcb6c35
Show file tree
Hide file tree
Showing 14 changed files with 525 additions and 113 deletions.
17 changes: 16 additions & 1 deletion e2e_test/subscription/create_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,19 @@ statement ok
create subscription sub2 from t3 with(retention = '1D');

statement ok
create sink s1 into t3 from t2;
create sink s1 into t3 from t2;

statement ok
create table t4 (v1 int, v2 int, v3 int, v4 int);

statement ok
create materialized view mv4 as select v4,v2 from t4;

statement ok
create subscription sub4 from mv4 with(retention = '1D');

statement ok
create table t5 (v1 int, v2 int, v3 int, v4 int, primary key (v1, v2));

statement ok
create subscription sub5 from t5 with(retention = '1D');
18 changes: 17 additions & 1 deletion e2e_test/subscription/drop_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,20 @@ statement ok
drop table t3;

statement ok
drop table t2;
drop table t2;


statement ok
drop subscription sub4;

statement ok
drop materialized view mv4;

statement ok
drop table t4;

statement ok
drop subscription sub5;

statement ok
drop table t5;
147 changes: 146 additions & 1 deletion e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ def test_block_cursor():
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub2 full",conn)
execute_insert("insert into t2 values(1,1)",conn)
execute_insert("flush",conn)
Expand Down Expand Up @@ -402,6 +401,148 @@ def insert_into_table():
)
execute_insert("insert into t2 values(10,10)",conn)

def test_order_table_with_pk():
print(f"test_order_table_with_pk")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t2 values(6,6),(3,3),(5,5),(4,4),(7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub2 full",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([3,3],row[0],"Insert")
check_rows_data([4,4],row[1],"Insert")
check_rows_data([5,5],row[2],"Insert")
check_rows_data([6,6],row[3],"Insert")
check_rows_data([7,7],row[4],"Insert")
execute_insert("insert into t2 values(16,16),(13,13),(15,15),(14,14),(17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([13,13],row[0],"Insert")
check_rows_data([14,14],row[1],"Insert")
check_rows_data([15,15],row[2],"Insert")
check_rows_data([16,16],row[3],"Insert")
check_rows_data([17,17],row[4],"Insert")
execute_insert("update t2 set v2 = 100 where v1 > 10",conn)
execute_insert("flush",conn)
row = execute_query("fetch 10 from cur",conn)
assert len(row) == 10
check_rows_data([13,13],row[0],"UpdateDelete")
check_rows_data([13,100],row[1],"UpdateInsert")
check_rows_data([14,14],row[2],"UpdateDelete")
check_rows_data([14,100],row[3],"UpdateInsert")
check_rows_data([15,15],row[4],"UpdateDelete")
check_rows_data([15,100],row[5],"UpdateInsert")
check_rows_data([16,16],row[6],"UpdateDelete")
check_rows_data([16,100],row[7],"UpdateInsert")
check_rows_data([17,17],row[8],"UpdateDelete")
check_rows_data([17,100],row[9],"UpdateInsert")
drop_table_subscription()

def test_order_table_with_row_id():
print(f"test_order_table_with_pk")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t1 values(6,6),(3,3),(5,5),(4,4),(7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch 10 from cur",conn)
ex_row = execute_query("select v1, v2 from t1 order by _row_id",conn)
assert len(row) == 6
assert len(ex_row) == 6
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
execute_insert("insert into t1 values(16,16),(13,13),(15,15),(14,14),(17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
ex_row = execute_query("select v1, v2 from t1 where v1 > 10 order by _row_id",conn)
assert len(row) == 5
assert len(ex_row) == 5
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
drop_table_subscription()

def test_order_mv():
print(f"test_order_mv")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t4 values(6,6,6,6),(3,3,3,3),(5,5,5,5),(4,4,4,4),(7,7,7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub4 full",conn)
row = execute_query("fetch 5 from cur",conn)
ex_row = execute_query("select v4, v2 from t4 order by _row_id",conn)
assert len(row) == 5
assert len(ex_row) == 5
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
execute_insert("insert into t4 values(16,16,16,16),(13,13,13,13),(15,15,15,15),(14,14,14,14),(17,17,17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
ex_row = execute_query("select v4, v2 from t4 where v2 > 10 order by _row_id",conn)
assert len(row) == 5
assert len(ex_row) == 5
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
drop_table_subscription()

def test_order_multi_pk():
print(f"test_order_mutil_pk")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t5 values(6,6,6,6),(6,3,3,3),(5,5,5,5),(5,4,4,4),(7,7,7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub5 full",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([5,4,4,4],row[0],"Insert")
check_rows_data([5,5,5,5],row[1],"Insert")
check_rows_data([6,3,3,3],row[2],"Insert")
check_rows_data([6,6,6,6],row[3],"Insert")
check_rows_data([7,7,7,7],row[4],"Insert")
execute_insert("insert into t5 values(16,16,16,16),(16,13,13,13),(15,15,15,15),(15,14,14,14),(17,17,17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([15,14,14,14],row[0],"Insert")
check_rows_data([15,15,15,15],row[1],"Insert")
check_rows_data([16,13,13,13],row[2],"Insert")
check_rows_data([16,16,16,16],row[3],"Insert")
check_rows_data([17,17,17,17],row[4],"Insert")
drop_table_subscription()

if __name__ == "__main__":
test_cursor_snapshot()
test_cursor_op()
Expand All @@ -413,4 +554,8 @@ def insert_into_table():
test_cursor_with_table_alter()
test_cursor_fetch_n()
test_rebuild_table()
test_order_table_with_pk()
test_order_table_with_row_id()
test_order_mv()
test_order_multi_pk()
test_block_cursor()
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ message LogRowSeqScanNode {
common.Buffer vnode_bitmap = 3;
common.BatchQueryEpoch old_epoch = 4;
common.BatchQueryEpoch new_epoch = 5;
bool ordered = 6;
}

message InsertNode {
Expand Down
8 changes: 8 additions & 0 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct LogRowSeqScanExecutor<S: StateStore> {
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
ordered: bool,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
Expand All @@ -63,6 +64,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
chunk_size: usize,
identity: String,
metrics: Option<BatchMetrics>,
ordered: bool,
) -> Self {
let mut schema = table.schema().clone();
schema.fields.push(Field::with_name(
Expand All @@ -78,6 +80,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
old_epoch,
new_epoch,
version_id,
ordered,
}
}
}
Expand Down Expand Up @@ -146,6 +149,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
log_store_seq_scan_node.ordered,
)))
})
}
Expand Down Expand Up @@ -175,6 +179,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
new_epoch,
version_id,
schema,
ordered,
..
} = *self;
let table = std::sync::Arc::new(table);
Expand All @@ -194,6 +199,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
chunk_size,
histogram,
Arc::new(schema.clone()),
ordered,
);
#[for_await]
for chunk in stream {
Expand All @@ -211,12 +217,14 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
ordered: bool,
) {
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(
old_epoch,
HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
ordered,
)
.await?
.flat_map(|r| {
Expand Down
16 changes: 15 additions & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::Fields;
use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType, Statement};
use risingwave_sqlparser::ast::{
ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
};
use thiserror_ext::AsReport;

use super::create_index::{gen_create_index_plan, resolve_index_schema};
Expand Down Expand Up @@ -94,6 +96,18 @@ async fn do_handle_explain(
(Ok(plan), context)
}

Statement::FetchCursor {
stmt: FetchCursorStatement { cursor_name, .. },
} => {
let cursor_manager = session.clone().get_cursor_manager();
let plan = cursor_manager
.gen_batch_plan_with_subscription_cursor(cursor_name, handler_args)
.await
.map(|x| x.plan)?;
let context = plan.ctx();
(Ok(plan), context)
}

// For other queries without `await` point, we can keep a copy of reference to the
// `OptimizerContext` even if the planning fails. This enables us to log the partial
// traces for better debugging experience.
Expand Down
20 changes: 18 additions & 2 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use risingwave_common::types::{
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_sqlparser::ast::{
CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr,
TableFactor, TableWithJoins,
CompatibleSourceSchema, ConnectorSchema, Expr, Ident, ObjectName, OrderByExpr, Query, Select,
SelectItem, SetExpr, TableFactor, TableWithJoins,
};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -234,6 +234,22 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> Query {
}
}

pub fn gen_query_from_table_name_order_by(from_name: ObjectName, pk_names: Vec<String>) -> Query {
let mut query = gen_query_from_table_name(from_name);
query.order_by = pk_names
.into_iter()
.map(|pk| {
let expr = Expr::Identifier(Ident::with_quote_unchecked('"', pk));
OrderByExpr {
expr,
asc: None,
nulls_first: None,
}
})
.collect();
query
}

pub fn convert_unix_millis_to_logstore_u64(unix_millis: u64) -> u64 {
Epoch::from_unix_millis(unix_millis).0
}
Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct BatchLogSeqScan {

impl BatchLogSeqScan {
fn new_inner(core: generic::LogScan, dist: Distribution) -> Self {
let order = Order::any();
let order = Order::new(core.table_desc.pk.clone());
let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);

Self { base, core }
Expand Down Expand Up @@ -88,8 +88,11 @@ impl Distill for BatchLogSeqScan {
});
vec.push(("distribution", dist));
}
vec.push(("old_epoch", Pretty::from(self.core.old_epoch.to_string())));
vec.push(("new_epoch", Pretty::from(self.core.new_epoch.to_string())));
vec.push(("version_id", Pretty::from(self.core.version_id.to_string())));

childless_record("BatchScan", vec)
childless_record("BatchLogSeqScan", vec)
}
}

Expand Down Expand Up @@ -126,6 +129,8 @@ impl TryToBatchPb for BatchLogSeqScan {
},
)),
}),
// It's currently true.
ordered: !self.order().is_any(),
}))
}
}
Expand Down
Loading

0 comments on commit fcb6c35

Please sign in to comment.