Skip to content

Commit

Permalink
Merge branch 'master' into test-json-funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Dec 24, 2024
2 parents 858d352 + 8e09e1b commit ba6cc5d
Show file tree
Hide file tree
Showing 4,245 changed files with 53,640 additions and 109,006 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,9 @@ Apache Doris is an easy-to-use, high-performance and real-time analytical databa

All this makes Apache Doris an ideal tool for scenarios including report analysis, ad-hoc query, unified data warehouse, and data lake query acceleration. On Apache Doris, users can build various applications, such as user behavior analysis, AB test platform, log retrieval analysis, user portrait analysis, and order analysis.

🎉 Version 2.1.4 released now. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.1.4) here. The 2.1 verison delivers exceptional performance with 100% higher out-of-the-box queries proven by TPC-DS 1TB tests, enhanced data lake analytics that are 4-6 times speedier than Trino and Spark, solid support for semi-structured data analysis with new Variant types and suite of analytical functions, asynchronous materialized views for query acceleration, optimized real-time writing at scale, and better workload management with stability and runtime SQL resource tracking.
🎉 Check out the 🔗[All releases](https://doris.apache.org/docs/releasenotes/all-release), where you'll find a chronological summary of Apache Doris versions released over the past year.


🎉 Version 2.0.12 is now released ! This fully evolved and stable release is ready for all users to upgrade. Check out the 🔗[Release Notes](https://doris.apache.org/docs/2.0/releasenotes/release-2.0.12) here.

👀 Have a look at the 🔗[Official Website](https://doris.apache.org/) for a comprehensive list of Apache Doris's core features, blogs and user cases.
👀 Explore the 🔗[Official Website](https://doris.apache.org/) to discover Apache Doris's core features, blogs, and user cases in detail.

## 📈 Usage Scenarios

Expand Down
Binary file added aazcp.tar.gz
Binary file not shown.
1 change: 1 addition & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "agent/workload_group_listener.h"

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/workload_group_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <glog/logging.h>

#include "agent/topic_listener.h"
#include "runtime/exec_env.h"

namespace doris {

class ExecEnv;

class WorkloadGroupListener : public TopicListener {
public:
~WorkloadGroupListener() {}
Expand Down
7 changes: 4 additions & 3 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ Status CloudBaseCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -343,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
12 changes: 9 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ Status CloudCumulativeCompaction::prepare_compact() {
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
update_cumulative_point();
if (!config::enable_sleep_between_delete_cumu_compaction) {
st = Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
"_last_delete_version.first not equal to -1");
}
}
return st;
}
Expand Down Expand Up @@ -200,8 +204,9 @@ Status CloudCumulativeCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -295,7 +300,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("number_output_delete_bitmap",
output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, start_resp.alter_version());
std::vector<uint32_t> cluster_key_idxes;
if (!_base_tablet_schema->cluster_key_uids().empty()) {
for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
}
reader_context.read_orderby_key_columns = &cluster_key_idxes;
reader_context.is_unique = false;
reader_context.sequence_id_idx = -1;
}

for (auto& split : rs_splits) {
RETURN_IF_ERROR(split.rs_reader->init(&reader_context));
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>() &&
st.msg() != "_last_delete_version.first not equal to -1") {
// Backoff strategy if no suitable version
tablet->last_cumu_no_suitable_version_ms = now;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;

bool synced = false;
do {
Expand Down
38 changes: 26 additions & 12 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/cumulative_compaction_time_series_policy.h"
Expand All @@ -54,6 +55,7 @@ namespace doris {
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
Expand Down Expand Up @@ -407,6 +409,9 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
_stale_rs_version_map.erase(rs_it);
} else {
LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
Expand Down Expand Up @@ -504,13 +509,19 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
int64_t txn_expiration) {
if (rowset.rowset_meta()->rowset_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE) [[unlikely]] {
// May cause the segment files generated by the transient rowset writer unable to be
// recycled, see `CloudRowsetWriter::build` for detail.
LOG(WARNING) << "Wrong rowset state: " << rowset.rowset_meta()->rowset_state();
DCHECK(false) << rowset.rowset_meta()->rowset_state();
if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
auto msg = fmt::format(
"wrong rowset state when create_transient_rowset_writer, rowset state should be "
"BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
tablet_id());
// see `CloudRowsetWriter::build` for detail.
// if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
// in `RowsetMeta::merge_rowset_meta()` in previous trials.
LOG(WARNING) << msg;
DCHECK(false) << msg;
}

RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
Expand Down Expand Up @@ -650,11 +661,14 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
}

void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
_cumulative_point = new_point;
return;
}
// cumulative point should only be reset to -1, or be increased
CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
<< "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
_cumulative_point = new_point;
// FIXME: could happen in currently unresolved race conditions
LOG(WARNING) << "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
}

std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
Expand Down Expand Up @@ -719,8 +733,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
Expand Down
94 changes: 56 additions & 38 deletions be/src/cloud/cloud_tablet_hotspot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,55 @@ TabletHotspot::~TabletHotspot() {
}
}

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
void get_return_partitions(
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
hot_partition,
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
last_hot_partition,
std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
auto last_value_iter = last_hot_partition.find(key);
if (last_value_iter != last_hot_partition.end()) {
auto last_partition_iter = last_value_iter->second.find(partition_id);
if (last_partition_iter != last_value_iter->second.end()) {
const auto& last_value = last_partition_iter->second;
if (std::abs(static_cast<int64_t>(value.qpd) -
static_cast<int64_t>(last_value.qpd)) < 5 &&
std::abs(static_cast<int64_t>(value.qpw) -
static_cast<int64_t>(last_value.qpw)) < 10 &&
std::abs(static_cast<int64_t>(value.last_access_time) -
static_cast<int64_t>(last_value.last_access_time)) < 60) {
LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
<< " qpw=" << value.qpw
<< " last_access_time=" << value.last_access_time
<< " last_qpd=" << last_value.qpd
<< " last_qpw=" << last_value.qpw
<< " last_access_time=" << last_value.last_access_time;
continue;
}
}
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

using TabletHotspotMapKey = std::pair<int64_t, int64_t>;
}

void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
// map<pair<table_id, index_id>, map<partition_id, value>> for day
Expand Down Expand Up @@ -108,33 +145,14 @@ void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_t
});
constexpr int N = 50;
int return_partitions = 0;
auto get_return_partitions =
[=, &return_partitions](
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>& hot_partition) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
THotPartition hot_partition;
hot_partition.__set_partition_id(partition_id);
hot_partition.__set_query_per_day(value.qpd);
hot_partition.__set_query_per_week(value.qpw);
hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
};
get_return_partitions(day_hot_partitions);
get_return_partitions(week_hot_partitions);

get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
return_partitions, N);
get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
return_partitions, N);

_last_day_hot_partitions = std::move(day_hot_partitions);
_last_week_hot_partitions = std::move(week_hot_partitions);
}

void HotspotCounter::make_dot_point() {
Expand Down
19 changes: 19 additions & 0 deletions be/src/cloud/cloud_tablet_hotspot.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ struct HotspotCounter {
};

using HotspotCounterPtr = std::shared_ptr<HotspotCounter>;
using TabletHotspotMapKey = std::pair<int64_t, int64_t>;

struct TabletHotspotMapValue {
uint64_t qpd = 0; // query per day
uint64_t qpw = 0; // query per week
int64_t last_access_time;
};

struct MapKeyHash {
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
}
};

class TabletHotspot {
public:
Expand All @@ -71,6 +84,12 @@ class TabletHotspot {
bool _closed {false};
std::mutex _mtx;
std::condition_variable _cond;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_day_hot_partitions;
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
_last_week_hot_partitions;
};

} // namespace doris
7 changes: 5 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");
// whether to download small files in batch
DEFINE_mBool(enable_batch_download, "false");
DEFINE_mBool(enable_batch_download, "true");

DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
Expand Down Expand Up @@ -1211,7 +1211,7 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_Bool(enable_flush_file_cache_async, "true");

// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_String(doris_cgroup_cpu_path, "");

DEFINE_mBool(enable_be_proc_monitor, "false");
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");
Expand Down Expand Up @@ -1402,6 +1402,9 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");
// Enable validation to check the correctness of table size.
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

DEFINE_mInt32(compaction_num_per_round, "1");

// clang-format off
#ifdef BE_TEST
Expand Down
Loading

0 comments on commit ba6cc5d

Please sign in to comment.