Skip to content

Commit

Permalink
[feature](merge-cloud) schema change for mow table (apache#31819)
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Mar 5, 2024
1 parent 214eea0 commit d2688ae
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 59 deletions.
80 changes: 79 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace doris {
using namespace ErrorCode;

static constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;

static std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer,
bool sc_sorting) {
Expand Down Expand Up @@ -234,6 +235,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}

// 3. Convert historical data
bool already_exist_any_version = false;
for (const auto& rs_reader : sc_params.ref_rowset_readers) {
VLOG_TRACE << "Begin to convert a history rowset. version=" << rs_reader->version();

Expand Down Expand Up @@ -264,6 +266,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
RETURN_IF_ERROR(RowsetFactory::create_rowset(nullptr, _new_tablet->tablet_path(),
existed_rs_meta, &rowset));
_output_rowsets.push_back(std::move(rowset));
already_exist_any_version = true;
continue;
} else {
return st;
Expand Down Expand Up @@ -327,7 +330,18 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
_output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
sc_job->set_output_cumulative_point(_output_cumulative_point);

// TODO(Lchangliang): process delete bitmap if the table is MOW
// process delete bitmap if the table is MOW
if (_new_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
std::numeric_limits<int64_t>::max();
// If there are historical versions of rowsets, we need to recalculate their delete
// bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets
int64_t start_calc_delete_bitmap_version =
already_exist_any_version ? 0 : sc_job->alter_version() + 1;
RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
start_calc_delete_bitmap_version, initiator));
sc_job->set_delete_bitmap_lock_initiator(initiator);
}

cloud::FinishTabletJobResponse finish_resp;
st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
Expand Down Expand Up @@ -361,4 +375,68 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}
return Status::OK();
}

Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
int64_t start_calc_delete_bitmap_version,
int64_t initiator) {
LOG_INFO("process mow table")
.tag("new_tablet_id", _new_tablet->tablet_id())
.tag("out_rowset_size", _output_rowsets.size())
.tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version)
.tag("alter_version", alter_version);
TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
tmp_meta->delete_bitmap().delete_bitmap.clear();
std::shared_ptr<CloudTablet> tmp_tablet =
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}

// step 1, process incremental rowset without delete bitmap update lock
std::vector<RowsetSharedPtr> incremental_rowsets;
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
int64_t max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version
<< "-" << max_version << " new_table_id: " << _new_tablet->tablet_id();
if (max_version >= start_calc_delete_bitmap_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{start_calc_delete_bitmap_version, max_version}, &incremental_rowsets));
for (auto rowset : incremental_rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
}
}

// step 2, process incremental rowset with delete bitmap update lock
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
int64_t new_max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets with lock, version: " << max_version + 1 << "-"
<< new_max_version << " new_tablet_id: " << _new_tablet->tablet_id();
std::vector<RowsetSharedPtr> new_incremental_rowsets;
if (new_max_version > max_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{max_version + 1, new_max_version}, &new_incremental_rowsets));
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
for (auto rowset : new_incremental_rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
}
}

auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();

// step4, store delete bitmap
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap));

_new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_schema_change_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class CloudSchemaChangeJob {
private:
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);

Status _process_delete_bitmap(int64_t alter_version, int64_t start_calc_delete_bitmap_version,
int64_t initiator);

private:
CloudStorageEngine& _cloud_storage_engine;
std::shared_ptr<CloudTablet> _base_tablet;
Expand Down
55 changes: 55 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1375,4 +1375,59 @@ Status BaseTablet::check_rowid_conversion(
return Status::OK();
}

// The caller should hold _rowset_update_lock and _meta_lock lock.
Status BaseTablet::update_delete_bitmap_without_lock(const BaseTabletSPtr& self,
const RowsetSharedPtr& rowset) {
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
return Status::InternalError(
"debug tablet update delete bitmap without lock random failed");
}
});
int64_t cur_version = rowset->end_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));

// If this rowset does not have a segment, there is no need for an update.
if (segments.empty()) {
LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: "
<< self->tablet_id() << " cur max_version: " << cur_version;
return Status::OK();
}
RowsetIdUnorderedSet cur_rowset_ids;
RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids));
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id());
RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));

std::vector<RowsetSharedPtr> specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids);
OlapStopWatch watch;
auto token = self->calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, token.get()));
RETURN_IF_ERROR(token->wait());
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id()
<< ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version
<< ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us()
<< "(us), total rows: " << total_rows;
if (config::enable_merge_on_write_correctness_check) {
// check if all the rowset has ROWSET_SENTINEL_MARK
auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1,
cur_rowset_ids, &specified_rowsets);
if (!st.ok()) {
LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!");
}
self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
}
for (auto& iter : delete_bitmap->delete_bitmap) {
self->_tablet_meta->delete_bitmap().merge(
{std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second);
}

return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ class BaseTablet {
const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>&
location_map);

static Status update_delete_bitmap_without_lock(const BaseTabletSPtr& self,
const RowsetSharedPtr& rowset);

////////////////////////////////////////////////////////////////////////////
// end MoW functions
////////////////////////////////////////////////////////////////////////////
Expand Down
55 changes: 0 additions & 55 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2300,61 +2300,6 @@ void Tablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) {
}
}

// The caller should hold _rowset_update_lock and _meta_lock lock.
Status Tablet::update_delete_bitmap_without_lock(const TabletSharedPtr& self,
const RowsetSharedPtr& rowset) {
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
return Status::InternalError(
"debug tablet update delete bitmap without lock random failed");
}
});
int64_t cur_version = rowset->end_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));

// If this rowset does not have a segment, there is no need for an update.
if (segments.empty()) {
LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: "
<< self->tablet_id() << " cur max_version: " << cur_version;
return Status::OK();
}
RowsetIdUnorderedSet cur_rowset_ids;
RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids));
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id());
RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));

std::vector<RowsetSharedPtr> specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids);
OlapStopWatch watch;
auto token = self->_engine.calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, token.get()));
RETURN_IF_ERROR(token->wait());
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id()
<< ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version
<< ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us()
<< "(us), total rows: " << total_rows;
if (config::enable_merge_on_write_correctness_check) {
// check if all the rowset has ROWSET_SENTINEL_MARK
auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1,
cur_rowset_ids, &specified_rowsets);
if (!st.ok()) {
LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!");
}
self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
}
for (auto& iter : delete_bitmap->delete_bitmap) {
self->_tablet_meta->delete_bitmap().merge(
{std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second);
}

return Status::OK();
}

CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() {
return _engine.calc_delete_bitmap_executor();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,6 @@ class Tablet final : public BaseTablet {
// end cooldown functions
////////////////////////////////////////////////////////////////////////////

static Status update_delete_bitmap_without_lock(const TabletSharedPtr& self,
const RowsetSharedPtr& rowset);

CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
Expand Down

0 comments on commit d2688ae

Please sign in to comment.