Skip to content

Commit

Permalink
[improve](cloud-mow) merge and remove old version of delete bitmap wh…
Browse files Browse the repository at this point in the history
…en cumulative compaction is done (apache#40204)

When the count of delete bitmap is big, it may lead to high cpu use
rate, we can merge old version delete bitmap to reduce the count of
delete bitmap when cumulative compaction is done to reduce the use rate
of cpu.
Here is an example:
before cu compaciton ,rowset layout is

    "cumulative point": 10,
    "rowsets": [
"[0-1] 0 DATA NONOVERLAPPING
0200000000000000ffffffffffffffecffffffffffffffff 0",
"[2-9] 1 DATA NONOVERLAPPING
0200000000002cc1a8447a4e0ba5850f773803ae2d534284 1.21 KB",
"[10-10] 1 DATA NONOVERLAPPING
0200000000002d07a8447a4e0ba5850f773803ae2d534284 1.09 KB",
"[11-11] 1 DATA NONOVERLAPPING
0200000000002d14a8447a4e0ba5850f773803ae2d534284 1.10 KB",
"[12-12] 1 DATA NONOVERLAPPING
0200000000002d23a8447a4e0ba5850f773803ae2d534284 1.10 KB",
"[13-13] 1 DATA NONOVERLAPPING
0200000000002d33a8447a4e0ba5850f773803ae2d534284 1.10 KB",
"[14-14] 1 DATA NONOVERLAPPING
0200000000002d40a8447a4e0ba5850f773803ae2d534284 1.10 KB"
    ],
base rowset is [2-9], now cu compaciton input rowset range is [10-14],
after compaciton, rowset layout is

    "cumulative point": 10,
    "rowsets": [
"[0-1] 0 DATA NONOVERLAPPING
0200000000000000ffffffffffffffecffffffffffffffff 0",
"[2-9] 1 DATA NONOVERLAPPING
0200000000002cc1a8447a4e0ba5850f773803ae2d534284 1.21 KB",
"[10-14] 1 DATA NONOVERLAPPING
0200000000002d63a8447a4e0ba5850f773803ae2d534284 1.21 KB"
    ],
1. the delete bitmap of rowset [10-10] -> [11-11] -> [12-12] -> [13-13]
-> [14-14] will be deleted when delete_expired_stale_rowsets
2. the delete bitmap of version 10~14, will do agg first, then update
the agg result on verison 14 delete bitmap
3. if update sucess, add the delete bitmap of version 10~13 to the queue
which name is to_remove_vec
4. when deleting delete bitmap on rowset [10-10] -> [11-11] -> [12-12]
-> [13-13] -> [14-14], the delete bitmap which mention on step 3 will be
deleted too.
  • Loading branch information
hust-hhb authored Sep 20, 2024
1 parent 61dc258 commit 7681850
Show file tree
Hide file tree
Showing 25 changed files with 891 additions and 35 deletions.
85 changes: 83 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cloud/cloud_cumulative_compaction.h"

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
Expand All @@ -27,6 +28,7 @@
#include "olap/compaction.h"
#include "olap/cumulative_compaction_policy.h"
#include "service/backend_options.h"
#include "util/debug_points.h"
#include "util/trace.h"
#include "util/uuid_generator.h"

Expand Down Expand Up @@ -254,10 +256,10 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator =
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
Expand Down Expand Up @@ -340,9 +342,88 @@ Status CloudCumulativeCompaction::modify_rowsets() {
stats.num_rows(), stats.data_size());
}
}
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
process_old_version_delete_bitmap();
}
return Status::OK();
}

void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
std::vector<std::string> pre_rowset_ids {};
for (const auto& it : cloud_tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
auto pre_max_version = _output_rowset->version().second;
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), seg_id,
pre_max_version - 1};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(
std::make_tuple(_tablet->tablet_id(), start, before_end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
Status update_st;
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed",
{
update_st = Status::InternalError(
"test fail to update delete bitmap for tablet_id {}",
cloud_tablet()->tablet_id());
});
if (update_st.ok()) {
update_st = _engine.meta_mgr().update_delete_bitmap_without_lock(
*cloud_tablet(), new_delete_bitmap.get());
}
if (!update_st.ok()) {
std::stringstream ss;
ss << "failed to update delete bitmap for tablet=" << cloud_tablet()->tablet_id()
<< " st=" << update_st.to_string();
std::string msg = ss.str();
LOG(WARNING) << msg;
} else {
Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<CloudTablet*>(_tablet.get())
->delete_expired_stale_rowsets();
});
}
}
}
}

void CloudCumulativeCompaction::garbage_collection() {
CloudCompactionMixin::garbage_collection();
cloud::TabletJobInfoPB job;
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {

void update_cumulative_point();

void process_old_version_delete_bitmap();

ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; }

std::string _uuid;
Expand Down
7 changes: 3 additions & 4 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ namespace doris {

CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
int64_t compaction_min_size, int64_t promotion_version_count)
int64_t compaction_min_size)
: _promotion_size(promotion_size),
_promotion_ratio(promotion_ratio),
_promotion_min_size(promotion_min_size),
_compaction_min_size(compaction_min_size),
_promotion_version_count(promotion_version_count) {}
_compaction_min_size(compaction_min_size) {}

int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
if (size < 1024) return 0;
Expand Down Expand Up @@ -205,7 +204,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
// consider it's version count here.
bool satisfy_promotion_version = tablet->enable_unique_key_merge_on_write() &&
output_rowset->end_version() - output_rowset->start_version() >
_promotion_version_count;
config::compaction_promotion_version_count;
// if rowsets have delete version, move to the last directly.
// if rowsets have no delete version, check output_rowset total disk size satisfies promotion size.
return output_rowset->start_version() == last_cumulative_point &&
Expand Down
5 changes: 1 addition & 4 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024,
double promotion_ratio = config::compaction_promotion_ratio,
int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes * 1024 * 1024,
int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024,
int64_t promotion_version_count = config::compaction_promotion_version_count);
int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024);

~CloudSizeBasedCumulativeCompactionPolicy() override = default;

Expand Down Expand Up @@ -94,8 +93,6 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
int64_t _promotion_min_size;
/// lower bound size to do compaction compaction.
int64_t _compaction_min_size;
// cumulative compaction promotion version count, only works for unique key MoW table
int64_t _promotion_version_count;
};

class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
Expand Down
127 changes: 127 additions & 0 deletions be/src/cloud/cloud_delete_bitmap_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud_delete_bitmap_action.h"

#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>

#include <chrono> // IWYU pragma: keep
#include <exception>
#include <future>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>

#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"

namespace doris {
using namespace ErrorCode;

namespace {

constexpr std::string_view HEADER_JSON = "application/json";

} // namespace

CloudDeleteBitmapAction::CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine,
TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
: HttpHandlerWithAuth(exec_env, hier, ptype),
_engine(engine),
_delete_bitmap_action_type(ctype) {}

static Status _check_param(HttpRequest* req, uint64_t* tablet_id) {
const auto& req_tablet_id = req->param(TABLET_ID_KEY);
if (req_tablet_id.empty()) {
return Status::InternalError("tablet id is empty!");
}
try {
*tablet_id = std::stoull(req_tablet_id);
} catch (const std::exception& e) {
return Status::InternalError("convert tablet_id failed, {}", e.what());
}
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}

CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();

rapidjson::Document root;
root.SetObject();
root.AddMember("delete_bitmap_count", count, root.GetAllocator());
root.AddMember("cardinality", cardinality, root.GetAllocator());
root.AddMember("size", size, root.GetAllocator());

// to json string
rapidjson::StringBuffer strbuf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
*json_result = std::string(strbuf.GetString());

return Status::OK();
}

void CloudDeleteBitmapAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_INFO) {
std::string json_result;
Status st = _handle_show_delete_bitmap_count(req, &json_result);
if (!st.ok()) {
HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
} else {
HttpChannel::send_reply(req, HttpStatus::OK, json_result);
}
}
}

} // namespace doris
54 changes: 54 additions & 0 deletions be/src/cloud/cloud_delete_bitmap_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdint.h>

#include <string>

#include "cloud/cloud_storage_engine.h"
#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "olap/tablet.h"

namespace doris {
class HttpRequest;

class ExecEnv;

enum class DeleteBitmapActionType { COUNT_INFO = 1 };

/// This action is used for viewing the delete bitmap status
class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
public:
CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype);

~CloudDeleteBitmapAction() override = default;

void handle(HttpRequest* req) override;

private:
Status _handle_show_delete_bitmap_count(HttpRequest* req, std::string* json_result);

private:
CloudStorageEngine& _engine;
DeleteBitmapActionType _delete_bitmap_action_type;
};
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
LOG(WARNING) << "handle calc delete bitmap fail, st=" << st.to_string();
}
});
VLOG_DEBUG << "submit TabletCalcDeleteBitmapTask for tablet=" << tablet_id;
if (!submit_st.ok()) {
_res = submit_st;
break;
Expand Down Expand Up @@ -126,6 +127,7 @@ void CloudTabletCalcDeleteBitmapTask::set_compaction_stats(int64_t ms_base_compa
}

Status CloudTabletCalcDeleteBitmapTask::handle() const {
VLOG_DEBUG << "start calculate delete bitmap on tablet " << _tablet_id;
SCOPED_ATTACH_TASK(_mem_tracker);
int64_t t1 = MonotonicMicros();
auto base_tablet = DORIS_TRY(_engine.get_tablet(_tablet_id));
Expand Down
Loading

0 comments on commit 7681850

Please sign in to comment.