Skip to content

Commit

Permalink
(cloud) provide a conf to enable/disable streamload commit on be
Browse files Browse the repository at this point in the history
When to enable commit on be?
Bypassing FE during commit is good for load performance, especially when
doing freqent streamloading.

When to disable commit on be?
The current implementation of bypass will mess up the statistics, thus
the query plan optimizer. So disable it when you encoutering any
troubles. Such problem will be fixed in future work.

Signed-off-by: freemandealer <freeman.zhang1992@gmail.com>
  • Loading branch information
freemandealer authored and dataroaring committed Jul 15, 2024
1 parent 8d249a2 commit b1225b0
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
Status st = Status::InternalError<false>("impossible branch reached, " + op_info);

if (ctx->txn_operation.compare("commit") == 0) {
if (!config::enable_stream_load_commit_txn_on_be) {
VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info;
st = StreamLoadExecutor::operate_txn_2pc(ctx);
}
if (topt == TxnOpParamType::WITH_TXN_ID) {
VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
Expand Down Expand Up @@ -93,6 +97,10 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
}

Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
if (!config::enable_stream_load_commit_txn_on_be) {
VLOG_DEBUG << "commit stream load txn with FE support";
return StreamLoadExecutor::commit_txn(ctx);
}
if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
return StreamLoadExecutor::commit_txn(ctx);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ DEFINE_mInt32(stream_load_record_batch_size, "50");
DEFINE_Int32(stream_load_record_expire_time_secs, "28800");
// time interval to clean expired stream load records
DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800");
// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false");
// The buffer size to store stream table function schema info
DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ DECLARE_mInt32(stream_load_record_batch_size);
DECLARE_Int32(stream_load_record_expire_time_secs);
// time interval to clean expired stream load records
DECLARE_mInt64(clean_stream_load_record_interval_secs);
// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
DECLARE_mBool(enable_stream_load_commit_txn_on_be);
// The buffer size to store stream table function schema info
DECLARE_Int64(stream_tvf_buffer_size);

Expand Down

0 comments on commit b1225b0

Please sign in to comment.