Skip to content

Commit

Permalink
[Feature] Support schema change for Generated Column in share data mo…
Browse files Browse the repository at this point in the history
…de (StarRocks#53526)

In pr StarRocks#52407, Generated Column has been removed the restriction for shared data mode. In this pr, we support schema change operation for Generated Column in shared data mode.

Fixes StarRocks#53527

Signed-off-by: srlch <linzichao@starrocks.com>
  • Loading branch information
srlch authored Dec 31, 2024
1 parent 16e8e18 commit 102b6d1
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 30 deletions.
25 changes: 25 additions & 0 deletions be/src/storage/lake/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ Status DirectSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_
return Status::InternalError("failed to convert chunk data");
}

if (auto st = _chunk_changer->fill_generated_columns(_new_chunk); !st.ok()) {
std::stringstream ss;
ss << "fill generated columns failed: " << st.message();
return Status::InternalError(ss.str());
}

ChunkHelper::padding_char_columns(_char_field_indexes, _new_schema, _new_tablet_schema, _new_chunk.get());
RETURN_IF_ERROR(writer->write(*_new_chunk));
}
Expand Down Expand Up @@ -382,6 +388,25 @@ Status SchemaChangeHandler::do_process_alter_tablet(const TAlterTabletReqV2& req
has_delete_predicates, &sc_params.sc_sorting,
&sc_params.sc_directly, &generated_column_idxs));

if (request.__isset.materialized_column_req && request.materialized_column_req.mc_exprs.size() != 0) {
DCHECK_EQ(sc_params.sc_sorting, false);
// for cloud native table, schema change for generated column must be in directly mode
sc_params.sc_directly = true;

chunk_changer->init_runtime_state(request.materialized_column_req.query_options,
request.materialized_column_req.query_globals);

for (const auto& it : request.materialized_column_req.mc_exprs) {
ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(chunk_changer->get_object_pool(), it.second, &ctx,
chunk_changer->get_runtime_state()));
RETURN_IF_ERROR(ctx->prepare(chunk_changer->get_runtime_state()));
RETURN_IF_ERROR(ctx->open(chunk_changer->get_runtime_state()));

chunk_changer->get_gc_exprs()->insert({it.first, ctx});
}
}

// create txn log
auto txn_log = std::make_shared<TxnLog>();
txn_log->set_tablet_id(new_tablet.id());
Expand Down
216 changes: 216 additions & 0 deletions be/test/storage/lake/schema_change_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ class SchemaChangeTest : public testing::Test, public testing::WithParamInterfac
std::shared_ptr<TabletSchema> _new_tablet_schema;
std::shared_ptr<VSchema> _new_schema;

std::shared_ptr<TabletMetadata> _new_tablet_metadata_with_generated_column;
std::shared_ptr<TabletSchema> _new_tablet_schema_with_generated_column;
std::shared_ptr<VSchema> _new_schema_with_generated_column;

int64_t _partition_id = 100;
};

Expand Down Expand Up @@ -218,6 +222,61 @@ class SchemaChangeAddColumnTest : public SchemaChangeTest {

_new_tablet_schema = TabletSchema::create(*new_schema);
_new_schema = std::make_shared<VSchema>(ChunkHelper::convert_schema(_new_tablet_schema));

// new tablet with generated column
_new_tablet_metadata_with_generated_column = std::make_shared<TabletMetadata>();
_new_tablet_metadata_with_generated_column->set_id(next_id());
_new_tablet_metadata_with_generated_column->set_version(1);
//
// | column | type | KEY | NULL | DEFAULT | EXPR |
// +--------+------+-----+------+---------+---------+
// | c0 | INT | YES | NO | | |
// | c1 | INT | NO | NO | | |
// | c2 | INT | NO | NO | | c0 |
auto new_schema_with_generated_column = _new_tablet_metadata_with_generated_column->mutable_schema();
new_schema_with_generated_column->set_id(next_id());
new_schema_with_generated_column->set_num_short_key_columns(1);
new_schema_with_generated_column->set_keys_type(GetParam().keys_type);
new_schema_with_generated_column->set_num_rows_per_row_block(65535);
// c0 c1 id should be same as base tablet schema
{
auto c0 = new_schema_with_generated_column->add_column();
c0->set_unique_id(c0_id);
c0->set_name("c0");
c0->set_type("INT");
c0->set_is_key(true);
c0->set_is_nullable(false);
}
{
auto c1 = new_schema_with_generated_column->add_column();
c1->set_unique_id(c1_id);
c1->set_name("c1");
c1->set_type("INT");
c1->set_is_key(false);
c1->set_is_nullable(false);
if (GetParam().keys_type == DUP_KEYS) {
c1->set_aggregation("NONE");
} else {
c1->set_aggregation("REPLACE");
}
}
{
auto c2 = new_schema_with_generated_column->add_column();
c2->set_unique_id(next_id());
c2->set_name("c2");
c2->set_type("INT");
c2->set_is_key(false);
c2->set_is_nullable(true);
if (GetParam().keys_type == DUP_KEYS) {
c2->set_aggregation("NONE");
} else {
c2->set_aggregation("REPLACE");
}
}

_new_tablet_schema_with_generated_column = TabletSchema::create(*new_schema_with_generated_column);
_new_schema_with_generated_column =
std::make_shared<VSchema>(ChunkHelper::convert_schema(_new_tablet_schema_with_generated_column));
}

protected:
Expand All @@ -230,6 +289,7 @@ class SchemaChangeAddColumnTest : public SchemaChangeTest {
CHECK_OK(fs::create_directories(lake::join_path(kTestGroupPath, lake::kTxnLogDirectoryName)));
CHECK_OK(_tablet_manager->put_tablet_metadata(*_base_tablet_metadata));
CHECK_OK(_tablet_manager->put_tablet_metadata(*_new_tablet_metadata));
CHECK_OK(_tablet_manager->put_tablet_metadata(*_new_tablet_metadata_with_generated_column));
}

void TearDown() override { ASSERT_OK(fs::remove_all(kTestGroupPath)); }
Expand Down Expand Up @@ -365,6 +425,162 @@ TEST_P(SchemaChangeAddColumnTest, test_add_column) {
}
}

TEST_P(SchemaChangeAddColumnTest, test_add_generated_column) {
int64_t version = 1;
int64_t txn_id = 1000;
auto base_tablet_id = _base_tablet_metadata->id();
for (int i = 0; i < GetParam().writes_before; i++) {
auto c0 = Int32Column::create();
auto c1 = Int32Column::create();
c0->append_datum(Datum(i * 1));
c1->append_datum(Datum(i * 2));

VChunk chunk0({c0, c1}, _base_schema);
uint32_t indexes[1] = {0};

ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_manager.get())
.set_tablet_id(base_tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_base_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes, sizeof(indexes) / sizeof(indexes[0])));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSERT_OK(TEST_publish_single_version(_tablet_manager.get(), base_tablet_id, version + 1, txn_id).status());
version++;
txn_id++;
}

// do schema change
auto new_tablet_id = _new_tablet_metadata_with_generated_column->id();
auto alter_version = version;
auto alter_txn_id = txn_id;
{
TAlterTabletReqV2 request;
request.base_tablet_id = base_tablet_id;
request.new_tablet_id = new_tablet_id;
request.alter_version = alter_version;
request.txn_id = alter_txn_id;

TAlterTabletMaterializedColumnReq mc_request;

std::vector<TExprNode> nodes;
TExprNode node;
node.node_type = TExprNodeType::SLOT_REF;
node.type = gen_type_desc(TPrimitiveType::INT);
node.num_children = 0;
TSlotRef t_slot_ref = TSlotRef();
t_slot_ref.slot_id = 0;
t_slot_ref.tuple_id = 0;
node.__set_slot_ref(t_slot_ref);
node.is_nullable = true;
nodes.emplace_back(node);

TExpr t_expr;
t_expr.nodes = nodes;

std::map<int32_t, starrocks::TExpr> m_expr;
m_expr.insert({2, t_expr});

mc_request.__set_query_globals(TQueryGlobals());
mc_request.__set_query_options(TQueryOptions());
mc_request.__set_mc_exprs(m_expr);

request.__set_materialized_column_req(mc_request);

SchemaChangeHandler handler(_tablet_manager.get());
ASSERT_OK(handler.process_alter_tablet(request));
txn_id++;
}

// Simulate concurrent writes with schema change
for (int i = 0; i < GetParam().writes_after; i++) {
auto c0 = Int32Column::create();
auto c1 = Int32Column::create();
auto c2 = Int32Column::create();
c0->append_datum(Datum(i * 1));
c1->append_datum(Datum(i * 4));
c2->append_datum(Datum(i * 5 /* c0 + c1 */));

VChunk chunk1({c0, c1, c2}, _new_schema_with_generated_column);
uint32_t indexes[1] = {0};

ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_manager.get())
.set_tablet_id(new_tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_new_tablet_schema_with_generated_column->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk1, indexes, sizeof(indexes) / sizeof(indexes[0])));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSERT_OK(TEST_publish_single_log_version(_tablet_manager.get(), new_tablet_id, txn_id, version + 1));
version++;
txn_id++;
}

// publish schema change
int concurrency = GetParam().concurrency;
if (concurrency == 1) {
ASSERT_OK(publish_version_for_schema_change(new_tablet_id, version + 1, alter_txn_id));
} else {
std::vector<std::thread> threads;
for (int i = 0; i < concurrency; i++) {
threads.emplace_back(
[&]() { (void)publish_version_for_schema_change(new_tablet_id, version + 1, alter_txn_id); });
}
for (auto& t : threads) {
t.join();
}
}
version++;
txn_id++;

// check new tablet data
ASSIGN_OR_ABORT(auto new_tablet, _tablet_manager->get_tablet(new_tablet_id, version));

auto chunk = read(new_tablet, /*sorted=*/GetParam().keys_type != DUP_KEYS);

if (GetParam().keys_type == DUP_KEYS) {
int expect_num_rows = GetParam().writes_before + GetParam().writes_after;
ASSERT_EQ(expect_num_rows, chunk->num_rows());
for (int i = 0, sz = chunk->num_rows(); i < sz; i++) {
if (i < GetParam().writes_before) {
EXPECT_EQ(i * 1, chunk->get(i)[0].get_int32());
EXPECT_EQ(i * 2, chunk->get(i)[1].get_int32());
EXPECT_EQ(i * 1, chunk->get(i)[2].get_int32());
} else {
int k = i - GetParam().writes_before;
EXPECT_EQ(k * 1, chunk->get(i)[0].get_int32());
EXPECT_EQ(k * 4, chunk->get(i)[1].get_int32());
EXPECT_EQ(k * 5, chunk->get(i)[2].get_int32());
}
}
} else {
int expect_num_rows = std::max(GetParam().writes_before, GetParam().writes_after);
ASSERT_EQ(expect_num_rows, chunk->num_rows());
for (int i = 0, sz = chunk->num_rows(); i < sz; i++) {
if (i >= GetParam().writes_after) {
EXPECT_EQ(i * 1, chunk->get(i)[0].get_int32());
EXPECT_EQ(i * 2, chunk->get(i)[1].get_int32());
EXPECT_EQ(i * 1, chunk->get(i)[2].get_int32());
} else {
EXPECT_EQ(i * 1, chunk->get(i)[0].get_int32());
EXPECT_EQ(i * 4, chunk->get(i)[1].get_int32());
EXPECT_EQ(i * 5, chunk->get(i)[2].get_int32());
}
}
chunk->reset();
}
}

// clang-format off
INSTANTIATE_TEST_SUITE_P(SchemaChangeAddColumnTest, SchemaChangeAddColumnTest,
::testing::Values(SchemaChangeParam{DUP_KEYS, 0, 0},
Expand Down
Loading

0 comments on commit 102b6d1

Please sign in to comment.