diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 2099f22f1cd8bf..cd7e074432432b 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -39,6 +39,7 @@ namespace doris { using namespace ErrorCode; static constexpr int ALTER_TABLE_BATCH_SIZE = 4096; +static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2; static std::unique_ptr get_sc_procedure(const BlockChanger& changer, bool sc_sorting) { @@ -234,6 +235,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } // 3. Convert historical data + bool already_exist_any_version = false; for (const auto& rs_reader : sc_params.ref_rowset_readers) { VLOG_TRACE << "Begin to convert a history rowset. version=" << rs_reader->version(); @@ -264,6 +266,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam RETURN_IF_ERROR(RowsetFactory::create_rowset(nullptr, _new_tablet->tablet_path(), existed_rs_meta, &rowset)); _output_rowsets.push_back(std::move(rowset)); + already_exist_any_version = true; continue; } else { return st; @@ -327,7 +330,18 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam _output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1); sc_job->set_output_cumulative_point(_output_cumulative_point); - // TODO(Lchangliang): process delete bitmap if the table is MOW + // process delete bitmap if the table is MOW + if (_new_tablet->enable_unique_key_merge_on_write()) { + int64_t initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) & + std::numeric_limits::max(); + // If there are historical versions of rowsets, we need to recalculate their delete + // bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets + int64_t start_calc_delete_bitmap_version = + already_exist_any_version ? 0 : sc_job->alter_version() + 1; + RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(), + start_calc_delete_bitmap_version, initiator)); + sc_job->set_delete_bitmap_lock_initiator(initiator); + } cloud::FinishTabletJobResponse finish_resp; st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); @@ -361,4 +375,68 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } return Status::OK(); } + +Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, + int64_t start_calc_delete_bitmap_version, + int64_t initiator) { + LOG_INFO("process mow table") + .tag("new_tablet_id", _new_tablet->tablet_id()) + .tag("out_rowset_size", _output_rowsets.size()) + .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version) + .tag("alter_version", alter_version); + TabletMetaSharedPtr tmp_meta = std::make_shared(*(_new_tablet->tablet_meta())); + tmp_meta->delete_bitmap().delete_bitmap.clear(); + std::shared_ptr tmp_tablet = + std::make_shared(_cloud_storage_engine, tmp_meta); + { + std::unique_lock wlock(tmp_tablet->get_header_lock()); + tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + } + + // step 1, process incremental rowset without delete bitmap update lock + std::vector incremental_rowsets; + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); + int64_t max_version = tmp_tablet->max_version().second; + LOG(INFO) << "alter table for mow table, calculate delete bitmap of " + << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version + << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id(); + if (max_version >= start_calc_delete_bitmap_version) { + RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( + {start_calc_delete_bitmap_version, max_version}, &incremental_rowsets)); + for (auto rowset : incremental_rowsets) { + RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); + } + } + + // step 2, process incremental rowset with delete bitmap update lock + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock( + *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator)); + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); + int64_t new_max_version = tmp_tablet->max_version().second; + LOG(INFO) << "alter table for mow table, calculate delete bitmap of " + << "incremental rowsets with lock, version: " << max_version + 1 << "-" + << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id(); + std::vector new_incremental_rowsets; + if (new_max_version > max_version) { + RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, &new_incremental_rowsets)); + { + std::unique_lock wlock(tmp_tablet->get_header_lock()); + tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + } + for (auto rowset : new_incremental_rowsets) { + RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); + } + } + + auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap(); + + // step4, store delete bitmap + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap( + *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap)); + + _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap; + return Status::OK(); +} + } // namespace doris diff --git a/be/src/cloud/cloud_schema_change_job.h b/be/src/cloud/cloud_schema_change_job.h index 7bb03fda12aafd..d587111df717a3 100644 --- a/be/src/cloud/cloud_schema_change_job.h +++ b/be/src/cloud/cloud_schema_change_job.h @@ -39,6 +39,9 @@ class CloudSchemaChangeJob { private: Status _convert_historical_rowsets(const SchemaChangeParams& sc_params); + Status _process_delete_bitmap(int64_t alter_version, int64_t start_calc_delete_bitmap_version, + int64_t initiator); + private: CloudStorageEngine& _cloud_storage_engine; std::shared_ptr _base_tablet; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index fe76d43c7f075c..e5a41abdbd90d3 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1375,4 +1375,59 @@ Status BaseTablet::check_rowid_conversion( return Status::OK(); } +// The caller should hold _rowset_update_lock and _meta_lock lock. +Status BaseTablet::update_delete_bitmap_without_lock(const BaseTabletSPtr& self, + const RowsetSharedPtr& rowset) { + DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", { + if (rand() % 100 < (100 * dp->param("percent", 0.1))) { + LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed"); + return Status::InternalError( + "debug tablet update delete bitmap without lock random failed"); + } + }); + int64_t cur_version = rowset->end_version(); + std::vector segments; + RETURN_IF_ERROR(std::dynamic_pointer_cast(rowset)->load_segments(&segments)); + + // If this rowset does not have a segment, there is no need for an update. + if (segments.empty()) { + LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: " + << self->tablet_id() << " cur max_version: " << cur_version; + return Status::OK(); + } + RowsetIdUnorderedSet cur_rowset_ids; + RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); + DeleteBitmapPtr delete_bitmap = std::make_shared(self->tablet_id()); + RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); + + std::vector specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids); + OlapStopWatch watch; + auto token = self->calc_delete_bitmap_executor()->create_token(); + RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, + cur_version - 1, token.get())); + RETURN_IF_ERROR(token->wait()); + size_t total_rows = std::accumulate( + segments.begin(), segments.end(), 0, + [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); + LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id() + << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version + << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() + << "(us), total rows: " << total_rows; + if (config::enable_merge_on_write_correctness_check) { + // check if all the rowset has ROWSET_SENTINEL_MARK + auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, + cur_rowset_ids, &specified_rowsets); + if (!st.ok()) { + LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); + } + self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap); + } + for (auto& iter : delete_bitmap->delete_bitmap) { + self->_tablet_meta->delete_bitmap().merge( + {std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second); + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 867ff9c1e3f40f..b59a4303a0bd64 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -229,6 +229,9 @@ class BaseTablet { const std::map>>& location_map); + static Status update_delete_bitmap_without_lock(const BaseTabletSPtr& self, + const RowsetSharedPtr& rowset); + //////////////////////////////////////////////////////////////////////////// // end MoW functions //////////////////////////////////////////////////////////////////////////// diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 431577726ad4ba..d1bb734d9033ae 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2300,61 +2300,6 @@ void Tablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) { } } -// The caller should hold _rowset_update_lock and _meta_lock lock. -Status Tablet::update_delete_bitmap_without_lock(const TabletSharedPtr& self, - const RowsetSharedPtr& rowset) { - DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", { - if (rand() % 100 < (100 * dp->param("percent", 0.1))) { - LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed"); - return Status::InternalError( - "debug tablet update delete bitmap without lock random failed"); - } - }); - int64_t cur_version = rowset->end_version(); - std::vector segments; - RETURN_IF_ERROR(std::dynamic_pointer_cast(rowset)->load_segments(&segments)); - - // If this rowset does not have a segment, there is no need for an update. - if (segments.empty()) { - LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: " - << self->tablet_id() << " cur max_version: " << cur_version; - return Status::OK(); - } - RowsetIdUnorderedSet cur_rowset_ids; - RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); - DeleteBitmapPtr delete_bitmap = std::make_shared(self->tablet_id()); - RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); - - std::vector specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids); - OlapStopWatch watch; - auto token = self->_engine.calc_delete_bitmap_executor()->create_token(); - RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, - cur_version - 1, token.get())); - RETURN_IF_ERROR(token->wait()); - size_t total_rows = std::accumulate( - segments.begin(), segments.end(), 0, - [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); - LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id() - << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version - << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() - << "(us), total rows: " << total_rows; - if (config::enable_merge_on_write_correctness_check) { - // check if all the rowset has ROWSET_SENTINEL_MARK - auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, - cur_rowset_ids, &specified_rowsets); - if (!st.ok()) { - LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); - } - self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap); - } - for (auto& iter : delete_bitmap->delete_bitmap) { - self->_tablet_meta->delete_bitmap().merge( - {std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second); - } - - return Status::OK(); -} - CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() { return _engine.calc_delete_bitmap_executor(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 0f6cfa9c04f146..50970c51c5edda 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -369,9 +369,6 @@ class Tablet final : public BaseTablet { // end cooldown functions //////////////////////////////////////////////////////////////////////////// - static Status update_delete_bitmap_without_lock(const TabletSharedPtr& self, - const RowsetSharedPtr& rowset); - CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override; Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,