Skip to content

Commit

Permalink
Merge branch 'master' into fix-funcs-with-params
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Dec 11, 2024
2 parents 829f542 + 55db277 commit 8a56be8
Show file tree
Hide file tree
Showing 626 changed files with 4,629 additions and 2,518 deletions.
28 changes: 5 additions & 23 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,28 +383,10 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
auto pre_max_version = _output_rowset->version().second;
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed",
Expand All @@ -424,9 +406,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
{ static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets();
});
}
}
return Status::OK();
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");
DEFINE_mInt32(multi_get_max_threads, "10");

// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
DEFINE_mInt64(total_permits_for_compaction_score, "10000");
DEFINE_mInt64(total_permits_for_compaction_score, "1000000");

// sleep interval in ms after generated compaction tasks
DEFINE_mInt32(generate_compaction_tasks_interval_ms, "100");
Expand Down Expand Up @@ -1216,6 +1216,8 @@ DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_mBool(enable_be_proc_monitor, "false");
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");

DEFINE_Int32(workload_group_metrics_interval_ms, "5000");

DEFINE_mBool(enable_workload_group_memory_gc, "true");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ DECLARE_mBool(exit_on_exception);
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_mBool(enable_be_proc_monitor);
DECLARE_mInt32(be_proc_monitor_interval_ms);
DECLARE_Int32(workload_group_metrics_interval_ms);

DECLARE_mBool(enable_workload_group_memory_gc);

Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ void Daemon::be_proc_monitor_thread() {
}
}

void Daemon::calculate_workload_group_metrics_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) {
ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics();
}
}

void Daemon::start() {
Status st;
st = Thread::create(
Expand Down Expand Up @@ -570,6 +577,12 @@ void Daemon::start() {
&_threads.emplace_back());
}
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "workload_group_metrics",
[this]() { this->calculate_workload_group_metrics_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
}

void Daemon::stop() {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Daemon {
void cache_prune_stale_thread();
void report_runtime_query_statistics_thread();
void be_proc_monitor_thread();
void calculate_workload_group_metrics_thread();

CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "cloud_delete_bitmap_action.h"
#include "delete_bitmap_action.h"

#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
Expand All @@ -34,8 +34,10 @@
#include <utility>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
Expand All @@ -44,7 +46,6 @@
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
Expand All @@ -59,10 +60,9 @@ constexpr std::string_view HEADER_JSON = "application/json";

} // namespace

CloudDeleteBitmapAction::CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine,
TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
DeleteBitmapAction::DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
BaseStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
: HttpHandlerWithAuth(exec_env, hier, ptype),
_engine(engine),
_delete_bitmap_action_type(ctype) {}
Expand All @@ -80,20 +80,24 @@ static Status _check_param(HttpRequest* req, uint64_t* tablet_id) {
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
Status DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}

CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
BaseTabletSPtr tablet = nullptr;
if (config::is_cloud_mode()) {
tablet = DORIS_TRY(_engine.to_cloud().tablet_mgr().get_tablet(tablet_id));
} else {
tablet = _engine.to_local().tablet_manager()->get_tablet(tablet_id);
}
if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();
Expand All @@ -115,23 +119,23 @@ Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpReque
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
auto st = _engine.to_cloud().meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
if (!st.ok()) {
LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id
<< ", st=" << st.to_string();
return st;
}
auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true);
auto tablet = std::make_shared<CloudTablet>(_engine.to_cloud(), std::move(tablet_meta));
st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
return st;
Expand All @@ -157,7 +161,7 @@ Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest*
return Status::OK();
}

void CloudDeleteBitmapAction::handle(HttpRequest* req) {
void DeleteBitmapAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) {
std::string json_result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

#include <string>

#include "cloud/cloud_storage_engine.h"
#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"

namespace doris {
Expand All @@ -35,13 +35,12 @@ class ExecEnv;
enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 };

/// This action is used for viewing the delete bitmap status
class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
class DeleteBitmapAction : public HttpHandlerWithAuth {
public:
CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype);
DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, BaseStorageEngine& engine,
TPrivilegeHier::type hier, TPrivilegeType::type ptype);

~CloudDeleteBitmapAction() override = default;
~DeleteBitmapAction() override = default;

void handle(HttpRequest* req) override;

Expand All @@ -50,7 +49,7 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result);

private:
CloudStorageEngine& _engine;
BaseStorageEngine& _engine;
DeleteBitmapActionType _delete_bitmap_action_type;
};
#include "common/compile_check_end.h"
Expand Down
64 changes: 64 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,33 @@ Status CloudCompactionMixin::update_delete_bitmap() {
return Status::OK();
}

void Compaction::agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap) {
// agg previously rowset old version delete bitmap
auto pre_max_version = _output_rowset->version().second;
new_delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
}

Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
// only do index compaction for dup_keys and unique_keys with mow enabled
if (config::inverted_index_compaction_enable &&
Expand Down Expand Up @@ -1103,6 +1130,13 @@ Status CompactionMixin::modify_rowsets() {
tablet()->delete_expired_stale_rowset();
}

if (config::enable_delete_bitmap_merge_on_compaction &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
process_old_version_delete_bitmap();
}

int64_t cur_max_version = 0;
{
std::shared_lock rlock(_tablet->get_header_lock());
Expand All @@ -1121,6 +1155,36 @@ Status CompactionMixin::modify_rowsets() {
return Status::OK();
}

void CompactionMixin::process_old_version_delete_bitmap() {
std::vector<RowsetSharedPtr> pre_rowsets {};
for (const auto& it : tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<Tablet*>(_tablet.get())->delete_expired_stale_rowset();
});
}
}
}

bool CompactionMixin::_check_if_includes_input_rowsets(
const RowsetIdUnorderedSet& commit_rowset_ids_set) const {
std::vector<RowsetId> commit_rowset_ids {};
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class Compaction {

virtual Status update_delete_bitmap() = 0;

void agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap);

// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;

Expand Down Expand Up @@ -162,6 +168,8 @@ class CompactionMixin : public Compaction {

Status do_compact_ordered_rowsets();

void process_old_version_delete_bitmap();

bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const;

PendingRowsetGuard _pending_rs_guard;
Expand Down
Loading

0 comments on commit 8a56be8

Please sign in to comment.