Skip to content

Commit

Permalink
Merge branch 'master' into docker_build_readme
Browse files Browse the repository at this point in the history
  • Loading branch information
catpineapple authored Aug 16, 2024
2 parents 77d5452 + e4056df commit 8360a8c
Show file tree
Hide file tree
Showing 280 changed files with 6,227 additions and 1,861 deletions.
354 changes: 255 additions & 99 deletions be/src/agent/cgroup_cpu_ctl.cpp

Large diffs are not rendered by default.

87 changes: 76 additions & 11 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ namespace doris {

// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static std::string CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE = "max 100000";

class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }

virtual Status init();
virtual Status init() = 0;

virtual Status add_thread_to_cgroup() = 0;

Expand All @@ -48,18 +48,36 @@ class CgroupCpuCtl {
// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);

virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) = 0;
static void init_doris_cgroup_path();

static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);

static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);

static bool is_a_valid_cgroup_path(std::string cg_path);

static uint64_t cpu_soft_limit_default_value();

protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
Status write_cg_sys_file(std::string file_path, std::string value, std::string msg,
bool is_append);

virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;

virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;

std::string _doris_cgroup_cpu_path;
uint64_t _cpu_core_num = CpuInfo::num_cores();
uint64_t _cpu_cfs_period_us = 100000;
Status add_thread_to_cgroup(std::string task_file);

protected:
inline static uint64_t _cpu_core_num;
const static uint64_t _cpu_cfs_period_us = 100000;
inline static std::string _doris_cgroup_cpu_path = "";
inline static std::string _doris_cgroup_cpu_query_path = "";
inline static bool _is_enable_cgroup_v1_in_env = false;
inline static bool _is_enable_cgroup_v2_in_env = false;
inline static bool _is_cgroup_query_path_valid = false;

protected:
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
Expand Down Expand Up @@ -96,20 +114,67 @@ class CgroupCpuCtl {
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
CgroupV1CpuCtl() = default;
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;

Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;

private:
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
std::string _cgroup_v1_cpu_tg_quota_file;
std::string _cgroup_v1_cpu_tg_shares_file;
std::string _cgroup_v1_cpu_tg_task_file;
};

/*
NOTE: cgroup v2 directory structure
1 root path:
/sys/fs/cgroup
2 doris home path:
/sys/fs/cgroup/{doris_home}/
3 doris home subtree_control file:
/sys/fs/cgroup/{doris_home}/cgroup.subtree_control
4 query path:
/sys/fs/cgroup/{doris_home}/query/
5 query path subtree_control file:
/sys/fs/cgroup/{doris_home}/query/cgroup.subtree_control
6 workload group path:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}
7 workload grou cpu.max file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.max
8 workload grou cpu.weight file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.weight
9 workload group cgroup type file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cgroup.type
*/
class CgroupV2CpuCtl : public CgroupCpuCtl {
public:
CgroupV2CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;

private:
Status enable_cpu_controller(std::string file);

private:
std::string _doris_cgroup_cpu_path_subtree_ctl_file;
std::string _cgroup_v2_query_path_subtree_ctl_file;
std::string _cgroup_v2_query_wg_path;
std::string _cgroup_v2_query_wg_cpu_max_file;
std::string _cgroup_v2_query_wg_cpu_weight_file;
std::string _cgroup_v2_query_wg_thread_file;
std::string _cgroup_v2_query_wg_type_file;
};

} // namespace doris
10 changes: 6 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2067,10 +2067,12 @@ void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
}

void clean_udf_cache_callback(const TAgentTaskRequest& req) {
LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature;
static_cast<void>(
JniUtil::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature;
if (doris::config::enable_java_support) {
LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature;
static_cast<void>(
JniUtil::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature;
}
}

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
request.timeout, nullptr);
}

return Status::OK();
return st;
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
error_msg = cntl.ErrorText();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
} else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
res->status().msg());
} else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
res->status().msg());
Expand Down
13 changes: 0 additions & 13 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// number of max scan keys
DEFINE_mInt32(doris_max_scan_key_num, "48");
// the max number of push down values of a single column.
// if exceed, no conditions will be pushed down for that column.
DEFINE_mInt32(max_pushdown_conditions_per_column, "1024");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down Expand Up @@ -810,14 +805,6 @@ DEFINE_Int32(load_stream_eagain_wait_seconds, "600");
DEFINE_Int32(load_stream_flush_token_max_tasks, "15");
// max wait flush token time in load stream
DEFINE_Int32(load_stream_max_wait_flush_token_time_ms, "600000");

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
DEFINE_mInt32(max_send_batch_parallelism_per_job, "5");
DEFINE_Validator(max_send_batch_parallelism_per_job,
[](const int config) -> bool { return config >= 1; });

// number of send batch thread pool size
DEFINE_Int32(send_batch_thread_pool_thread_num, "64");
// number of send batch thread pool queue size
Expand Down
11 changes: 0 additions & 11 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
DECLARE_mInt32(min_bytes_in_scanner_queue);
// number of max scan keys
DECLARE_mInt32(doris_max_scan_key_num);
// the max number of push down values of a single column.
// if exceed, no conditions will be pushed down for that column.
DECLARE_mInt32(max_pushdown_conditions_per_column);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down Expand Up @@ -872,12 +867,6 @@ DECLARE_Int32(load_stream_eagain_wait_seconds);
DECLARE_Int32(load_stream_flush_token_max_tasks);
// max wait flush token time in load stream
DECLARE_Int32(load_stream_max_wait_flush_token_time_ms);

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
DECLARE_mInt32(max_send_batch_parallelism_per_job);

// number of send batch thread pool size
DECLARE_Int32(send_batch_thread_pool_thread_num);
// number of send batch thread pool queue size
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include <fmt/format.h>
#include <gen_cpp/Status_types.h>

#include <cstdint>
#include <exception>
#include <memory>
#include <ostream>
#include <string>
#include <string_view>
#include <utility>
Expand Down
60 changes: 59 additions & 1 deletion be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ Status HttpClient::init(const std::string& url, bool set_fail_on_error) {
LOG(WARNING) << "fail to set CURLOPT_WRITEDATA, msg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_WRITEDATA");
}

std::string escaped_url;
RETURN_IF_ERROR(_escape_url(url, &escaped_url));
// set url
code = curl_easy_setopt(_curl, CURLOPT_URL, url.c_str());
code = curl_easy_setopt(_curl, CURLOPT_URL, escaped_url.c_str());
if (code != CURLE_OK) {
LOG(WARNING) << "failed to set CURLOPT_URL, errmsg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_URL");
Expand Down Expand Up @@ -290,4 +293,59 @@ Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
return status;
}

// http://example.com/page?param1=value1&param2=value+with+spaces#section
Status HttpClient::_escape_url(const std::string& url, std::string* escaped_url) {
size_t query_pos = url.find('?');
if (query_pos == std::string::npos) {
*escaped_url = url;
return Status::OK();
}
size_t fragment_pos = url.find('#');
std::string query;
std::string fragment;

if (fragment_pos == std::string::npos) {
query = url.substr(query_pos + 1, url.length() - query_pos - 1);
} else {
query = url.substr(query_pos + 1, fragment_pos - query_pos - 1);
fragment = url.substr(fragment_pos, url.length() - fragment_pos);
}

std::string encoded_query;
size_t ampersand_pos = query.find('&');
size_t equal_pos;

if (ampersand_pos == std::string::npos) {
ampersand_pos = query.length();
}

while (true) {
equal_pos = query.find('=');
if (equal_pos != std::string::npos) {
std::string key = query.substr(0, equal_pos);
std::string value = query.substr(equal_pos + 1, ampersand_pos - equal_pos - 1);

auto encoded_value = std::unique_ptr<char, decltype(&curl_free)>(
curl_easy_escape(_curl, value.c_str(), value.length()), &curl_free);
if (encoded_value) {
encoded_query += key + "=" + std::string(encoded_value.get());
} else {
return Status::InternalError("escape url failed, url={}", url);
}
} else {
encoded_query += query.substr(0, ampersand_pos);
}

if (ampersand_pos == query.length() || ampersand_pos == std::string::npos) {
break;
}

encoded_query += "&";
query = query.substr(ampersand_pos + 1);
ampersand_pos = query.find('&');
}
*escaped_url = url.substr(0, query_pos + 1) + encoded_query + fragment;
return Status::OK();
}

} // namespace doris
9 changes: 9 additions & 0 deletions be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class HttpClient {

size_t on_response_data(const void* data, size_t length);

// The file name of the variant column with the inverted index contains %
// such as: 020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx
// {rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx
// We need to handle %, otherwise it will cause an HTTP 404 error.
// Because the percent ("%") character serves as the indicator for percent-encoded octets,
// it must be percent-encoded as "%25" for that octet to be used as data within a URI.
// https://datatracker.ietf.org/doc/html/rfc3986
Status _escape_url(const std::string& url, std::string* escaped_url);

private:
const char* _to_errmsg(CURLcode code);

Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,20 @@ Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
} else {
// write sub predicate v1 for compactbility
std::string condition_str = construct_sub_predicate(condition);
if (TCondition tmp; !DeleteHandler::parse_condition(condition_str, &tmp)) {
LOG(WARNING) << "failed to parse condition_str, condtion="
<< ThriftDebugString(condition);
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"failed to parse condition_str, condtion={}", ThriftDebugString(condition));
}
VLOG_NOTICE << __PRETTY_FUNCTION__ << " condition_str: " << condition_str;
del_pred->add_sub_predicates(condition_str);
DeleteSubPredicatePB* sub_predicate = del_pred->add_sub_predicates_v2();
if (condition.__isset.column_unique_id) {
// only light schema change capable table set this field
sub_predicate->set_column_unique_id(condition.column_unique_id);
} else if (TCondition tmp; !DeleteHandler::parse_condition(condition_str, &tmp)) {
// for non light shema change tables, check regex match for condition str
LOG(WARNING) << "failed to parse condition_str, condtion="
<< ThriftDebugString(condition);
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"failed to parse condition_str, condtion={}", ThriftDebugString(condition));
}

sub_predicate->set_column_name(condition.column_name);
sub_predicate->set_op(trans_op(condition.condition_op));
sub_predicate->set_cond_value(condition.condition_values[0]);
Expand Down
17 changes: 17 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,23 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
}
}
start_off += array_elem_size;
// here to make debug for array field with current doc which should has expected number of fields
DBUG_EXECUTE_IF("array_inverted_index.write_index", {
auto single_array_field_count =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"array_inverted_index.write_index", "single_array_field_count",
0);
if (single_array_field_count < 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"indexes count cannot be negative");
}
if (_doc->getFields()->size() != single_array_field_count) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"array field has fields count {} not equal to expected {}",
_doc->getFields()->size(), single_array_field_count);
}
})

if (!_doc->getFields()->empty()) {
// if this array is null, we just ignore to write inverted index
RETURN_IF_ERROR(add_document());
Expand Down
15 changes: 1 addition & 14 deletions be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,20 +404,7 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
return Status::InternalError("single compaction init curl failed");
}
for (auto& file_name : file_name_list) {
// The file name of the variant column with the inverted index contains %
// such as: 020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx
// {rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx
// We need to handle %, otherwise it will cause an HTTP 404 error.
// Because the percent ("%") character serves as the indicator for percent-encoded octets,
// it must be percent-encoded as "%25" for that octet to be used as data within a URI.
// https://datatracker.ietf.org/doc/html/rfc3986
auto output = std::unique_ptr<char, decltype(&curl_free)>(
curl_easy_escape(curl.get(), file_name.c_str(), file_name.length()), &curl_free);
if (!output) {
return Status::InternalError("escape file name failed, file name={}", file_name);
}
std::string encoded_filename(output.get());
auto remote_file_url = remote_url_prefix + encoded_filename;
auto remote_file_url = remote_url_prefix + file_name;

// get file length
uint64_t file_size = 0;
Expand Down
Loading

0 comments on commit 8360a8c

Please sign in to comment.