Skip to content

Commit

Permalink
Merge branch 'master' into patch-5
Browse files Browse the repository at this point in the history
  • Loading branch information
xiedeyantu committed Sep 18, 2024
2 parents 1fa8448 + b58570a commit 8fe40b0
Show file tree
Hide file tree
Showing 859 changed files with 32,394 additions and 8,809 deletions.
2 changes: 2 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
Expand Down Expand Up @@ -86,6 +87,7 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
Expand Down
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@
# limitations under the License.
#
be/src/io/* @platoneko @gavinchou @dataroaring
be/src/agent/be_exec_version_manager.cpp @BiteTheDDDDt
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @dataroaring @CalvinKirs @morningman
**/pom.xml @CalvinKirs @morningman
fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @dataroaring @morningman @yiguolei @xiaokang
2 changes: 1 addition & 1 deletion .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
git clone https://github.com/DoozyX/clang-format-lint-action .github/actions/clang-format-lint-action
pushd .github/actions/clang-format-lint-action &>/dev/null
git checkout 6adbe14579e5b8e19eb3e31e5ff2479f3bd302c7
git checkout c71d0bf4e21876ebec3e5647491186f8797fde31 # v0.18.2
popd &>/dev/null
- name: Install Python dependencies
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/sonarcloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ jobs:
- 'fe/**'
- 'gensrc/proto/**'
- 'gensrc/thrift/**'
- name: Set up JDK 11
- name: Set up JDK 17
if: ${{ steps.filter.outputs.fe_changes == 'true' }}
uses: actions/setup-java@v3
with:
java-version: 11
java-version: 17
distribution: 'adopt'
- name: Cache SonarCloud packages
if: ${{ steps.filter.outputs.fe_changes == 'true' }}
Expand Down
32 changes: 32 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ostream>
#include <string>

#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -244,6 +245,37 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
return Status::InvalidArgument(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() &&
!master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " "
<< st;
}

if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument(
"cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}",
master_info.cloud_instance_id, config::cloud_instance_id);
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " "
<< st;
}
}

return Status::OK();
}

Expand Down
7 changes: 4 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ namespace {
std::mutex s_task_signatures_mtx;
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;

std::atomic_ulong s_report_version(time(nullptr) * 10000);
std::atomic_ulong s_report_version(time(nullptr) * 100000);

void increase_report_version() {
s_report_version.fetch_add(1, std::memory_order_relaxed);
Expand Down Expand Up @@ -189,7 +189,7 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
Expand Down Expand Up @@ -265,7 +265,7 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age
if (status.ok()) {
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
Expand Down Expand Up @@ -1074,6 +1074,7 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;

increase_report_version();
uint64_t report_version;
for (int i = 0; i < 5; i++) {
request.tablets.clear();
Expand Down
14 changes: 13 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,19 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,

void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) {
std::string brpc_addr = fmt::format("{}:{}", request.host, request.brpc_port);
std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(request.host)) {
Status status = dns_cache->get(request.host, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
<< status.to_string();
return;
}
}
std::string brpc_addr = get_host_port(host, request.brpc_port);
Status st = Status::OK();
TStatus t_status;
std::shared_ptr<PBackendService_Stub> brpc_stub =
Expand Down
8 changes: 6 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn);
}

Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) {
GetObjStoreInfoRequest req;
GetObjStoreInfoResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -916,6 +916,8 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
return s;
}

*is_vault_mode = resp.enable_storage_vault();

auto add_obj_store = [&vault_infos](const auto& obj_store) {
vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store),
StorageVaultPB_PathFormat {});
Expand All @@ -931,6 +933,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
}
});

// desensitization, hide secret
for (int i = 0; i < resp.obj_info_size(); ++i) {
resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
}
Expand All @@ -940,7 +943,8 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}

LOG(INFO) << "get storage vault response: " << resp.ShortDebugString();
LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode
<< " response=" << resp.ShortDebugString();
return Status::OK();
}

Expand Down
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ class CloudMetaMgr {

Status precommit_txn(const StreamLoadContext& ctx);

Status get_storage_vault_info(StorageVaultInfos* vault_infos);
/**
* Gets storage vault (storage backends) from meta-service
*
* @param vault_info output param, all storage backends
* @param is_vault_mode output param, true for pure vault mode, false for legacy mode
* @return status
*/
Status get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode);

Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res);

Expand Down
24 changes: 4 additions & 20 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,7 @@ struct RefreshFSVaultVisitor {
};

Status CloudStorageEngine::open() {
cloud::StorageVaultInfos vault_infos;
do {
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (st.ok()) {
break;
}

LOG(WARNING) << "failed to get vault info, retry after 5s, err=" << st;
std::this_thread::sleep_for(5s);
} while (vault_infos.empty());

for (auto& [id, vault_info, path_format] : vault_infos) {
if (auto st = std::visit(VaultCreateFSVisitor {id, path_format}, vault_info); !st.ok())
[[unlikely]] {
return vault_process_error(id, vault_info, std::move(st));
}
}
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));
sync_storage_vault();

// TODO(plat1ko): DeleteBitmapTxnManager

Expand Down Expand Up @@ -340,7 +323,8 @@ void CloudStorageEngine::_check_file_cache_ttl_block_valid() {

void CloudStorageEngine::sync_storage_vault() {
cloud::StorageVaultInfos vault_infos;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
bool enable_storage_vault = false;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
return;
Expand All @@ -363,7 +347,7 @@ void CloudStorageEngine::sync_storage_vault() {
}

if (auto& id = std::get<0>(vault_infos.back());
latest_fs() == nullptr || latest_fs()->id() != id) {
(latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) {
set_latest_fs(get_filesystem(id));
}
}
Expand Down
21 changes: 17 additions & 4 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,27 @@ class CloudStorageEngine final : public BaseStorageEngine {
}
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) const {
std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
if (vault_id.empty()) {
if (latest_fs() == nullptr) {
LOG(INFO) << "there is not latest fs";
return std::nullopt;
}
return StorageResource {latest_fs()};
}

if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
bool synced = false;
do {
if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
if (synced) {
break;
}
sync_storage_vault();
synced = true;
} while (true);

return std::nullopt;
}
Expand Down
10 changes: 5 additions & 5 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE);
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update &&
rowset_writer->num_rows() > 0) {
Expand All @@ -684,9 +684,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap,
cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info);
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class CloudTabletMgr::TabletMap {
CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)
: _engine(engine),
_tablet_map(std::make_unique<TabletMap>()),
_cache(std::make_unique<LRUCachePolicyTrackingManual>(
_cache(std::make_unique<LRUCachePolicy>(
CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity,
LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {}

Expand Down
42 changes: 25 additions & 17 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <shared_mutex>

#include "cloud/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
Expand All @@ -32,8 +33,8 @@
namespace doris {

CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes)
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE,
size_in_bytes, LRUCacheType::SIZE, 86400, 4),
: LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, size_in_bytes,
LRUCacheType::SIZE, 86400, 4),
_stop_latch(1) {}

CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() {
Expand Down Expand Up @@ -119,12 +120,11 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, int64_t txn_expiration,
std::shared_ptr<PartialUpdateInfo> partial_update_info) {
if (txn_expiration <= 0) {
txn_expiration = duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() +
120;
}
int64_t txn_expiration_min =
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count() +
config::tablet_txn_info_min_expired_seconds;
txn_expiration = std::max(txn_expiration_min, txn_expiration);
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
Expand Down Expand Up @@ -153,16 +153,21 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
.tag("delete_bitmap_size", charge);
}

void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status,
TxnPublishInfo publish_info) {
Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status,
TxnPublishInfo publish_info) {
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
CHECK(_txn_map.contains(txn_key));
if (!_txn_map.contains(txn_key)) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
"not found txn info, tablet_id={}, transaction_id={}, may be expired and be "
"removed",
tablet_id, transaction_id);
}
TxnVal& txn_val = _txn_map[txn_key];
*(txn_val.publish_status) = publish_status;
if (publish_status == PublishStatus::SUCCEED) {
Expand All @@ -184,7 +189,9 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
LOG_INFO("update txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablt_id", tablet_id)
.tag("delete_bitmap_size", charge);
.tag("delete_bitmap_size", charge)
.tag("publish_status", static_cast<int>(publish_status));
return Status::OK();
}

void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
Expand Down Expand Up @@ -238,7 +245,8 @@ void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId tra
void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
do {
remove_expired_tablet_txn_info();
} while (!_stop_latch.wait_for(std::chrono::seconds(300)));
} while (!_stop_latch.wait_for(
std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
}

} // namespace doris
Loading

0 comments on commit 8fe40b0

Please sign in to comment.