diff --git a/modules/grpc/clickhouse/clickhouse-dest-worker.cpp b/modules/grpc/clickhouse/clickhouse-dest-worker.cpp index d48c9b33f..4641128c9 100644 --- a/modules/grpc/clickhouse/clickhouse-dest-worker.cpp +++ b/modules/grpc/clickhouse/clickhouse-dest-worker.cpp @@ -24,6 +24,7 @@ #include "clickhouse-dest-worker.hpp" #include "clickhouse-dest.hpp" +#include using syslogng::grpc::clickhouse::DestWorker; using syslogng::grpc::clickhouse::DestDriver; @@ -31,18 +32,180 @@ using syslogng::grpc::clickhouse::DestDriver; DestWorker::DestWorker(GrpcDestWorker *s) : syslogng::grpc::DestWorker(s) { + std::shared_ptr<::grpc::ChannelCredentials> credentials = this->create_credentials(); + if (!credentials) + { + msg_error("Error querying ClickHouse credentials", + evt_tag_str("url", this->owner.get_url().c_str()), + log_pipe_location_tag(&this->super->super.owner->super.super.super)); + throw std::runtime_error("Error querying ClickHouse credentials"); + } + + ::grpc::ChannelArguments args = this->create_channel_args(); + + this->channel = ::grpc::CreateCustomChannel(this->owner.get_url(), credentials, args); + this->stub = ::clickhouse::grpc::ClickHouse::NewStub(this->channel); +} + +bool +DestWorker::should_initiate_flush() +{ + return this->current_batch_bytes >= this->get_owner()->batch_bytes; } LogThreadedResult DestWorker::insert(LogMessage *msg) { + DestDriver *owner_ = this->get_owner(); + std::streampos last_pos = this->query_data.tellp(); + size_t row_bytes = 0; + + google::protobuf::Message *message = owner_->schema.format(msg, this->super->super.seq_num); + if (!message) + goto drop; + + this->batch_size++; + + if (!google::protobuf::util::SerializeDelimitedToOstream(*message, &this->query_data)) + goto drop; + + row_bytes = this->query_data.tellp() - last_pos; + this->current_batch_bytes += row_bytes; + log_threaded_dest_driver_insert_msg_length_stats(this->super->super.owner, row_bytes); + + msg_trace("Message added to ClickHouse batch", log_pipe_location_tag(&this->super->super.owner->super.super.super)); + + delete message; + + if (!this->client_context.get()) + { + this->client_context = std::make_unique<::grpc::ClientContext>(); + prepare_context_dynamic(*this->client_context, msg); + } + + if (this->should_initiate_flush()) + return log_threaded_dest_worker_flush(&this->super->super, LTF_FLUSH_NORMAL); + + return LTR_QUEUED; + +drop: + if (!(owner_->template_options.on_error & ON_ERROR_SILENT)) + { + msg_error("Failed to format message for ClickHouse, dropping message", + log_pipe_location_tag(&this->super->super.owner->super.super.super)); + } + + /* LTR_DROP currently drops the entire batch */ + return LTR_QUEUED; +} + +void +DestWorker::prepare_query_info(::clickhouse::grpc::QueryInfo &query_info) +{ + DestDriver *owner_ = this->get_owner(); + + query_info.set_database(owner_->get_database()); + query_info.set_user_name(owner_->get_user()); + query_info.set_password(owner_->get_password()); + query_info.set_query(owner_->get_query()); + query_info.set_input_data(this->query_data.str()); +} + +static LogThreadedResult +_map_grpc_status_to_log_threaded_result(const ::grpc::Status &status) +{ + // TODO: this is based on OTLP, we should check how the ClickHouse gRPC server behaves + + switch (status.error_code()) + { + case ::grpc::StatusCode::OK: + return LTR_SUCCESS; + case ::grpc::StatusCode::UNAVAILABLE: + case ::grpc::StatusCode::CANCELLED: + case ::grpc::StatusCode::DEADLINE_EXCEEDED: + case ::grpc::StatusCode::ABORTED: + case ::grpc::StatusCode::OUT_OF_RANGE: + case ::grpc::StatusCode::DATA_LOSS: + goto temporary_error; + case ::grpc::StatusCode::UNKNOWN: + case ::grpc::StatusCode::INVALID_ARGUMENT: + case ::grpc::StatusCode::NOT_FOUND: + case ::grpc::StatusCode::ALREADY_EXISTS: + case ::grpc::StatusCode::PERMISSION_DENIED: + case ::grpc::StatusCode::UNAUTHENTICATED: + case ::grpc::StatusCode::FAILED_PRECONDITION: + case ::grpc::StatusCode::UNIMPLEMENTED: + case ::grpc::StatusCode::INTERNAL: + goto permanent_error; + case ::grpc::StatusCode::RESOURCE_EXHAUSTED: + if (status.error_details().length() > 0) + goto temporary_error; + goto permanent_error; + default: + g_assert_not_reached(); + } + +temporary_error: + msg_debug("ClickHouse server responded with a temporary error status code, retrying after time-reopen() seconds", + evt_tag_int("error_code", status.error_code()), + evt_tag_str("error_message", status.error_message().c_str()), + evt_tag_str("error_details", status.error_details().c_str())); return LTR_NOT_CONNECTED; + +permanent_error: + msg_error("ClickHouse server responded with a permanent error status code, dropping batch", + evt_tag_int("error_code", status.error_code()), + evt_tag_str("error_message", status.error_message().c_str()), + evt_tag_str("error_details", status.error_details().c_str())); + return LTR_DROP; +} + +void +DestWorker::prepare_batch() +{ + this->query_data.str(""); + this->batch_size = 0; + this->current_batch_bytes = 0; + this->client_context.reset(); } LogThreadedResult DestWorker::flush(LogThreadedFlushMode mode) { - return LTR_ERROR; + if (this->batch_size == 0) + return LTR_SUCCESS; + + ::clickhouse::grpc::QueryInfo query_info; + ::clickhouse::grpc::Result query_result; + + this->prepare_query_info(query_info); + + ::grpc::Status status = this->stub->ExecuteQuery(this->client_context.get(), query_info, &query_result); + LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status); + if (result != LTR_SUCCESS) + goto exit; + + if (query_result.has_exception()) + { + const ::clickhouse::grpc::Exception &exception = query_result.exception(); + msg_error("ClickHouse server responded with an exception, dropping batch", + evt_tag_int("code", exception.code()), + evt_tag_str("name", exception.name().c_str()), + evt_tag_str("display_text", exception.display_text().c_str()), + evt_tag_str("stack_trace", exception.stack_trace().c_str())); + result = LTR_DROP; + goto exit; + } + + log_threaded_dest_worker_written_bytes_add(&this->super->super, this->current_batch_bytes); + log_threaded_dest_driver_insert_batch_length_stats(this->super->super.owner, this->current_batch_bytes); + + msg_debug("ClickHouse batch delivered", log_pipe_location_tag(&this->super->super.owner->super.super.super)); + +exit: + this->get_owner()->metrics.insert_grpc_request_stats(status); + this->prepare_batch(); + return result; } DestDriver * diff --git a/modules/grpc/clickhouse/clickhouse-dest-worker.hpp b/modules/grpc/clickhouse/clickhouse-dest-worker.hpp index 051c3df92..22ba8fe43 100644 --- a/modules/grpc/clickhouse/clickhouse-dest-worker.hpp +++ b/modules/grpc/clickhouse/clickhouse-dest-worker.hpp @@ -27,6 +27,10 @@ #include "clickhouse-dest.hpp" #include "grpc-dest-worker.hpp" +#include + +#include "clickhouse_grpc.grpc.pb.h" + namespace syslogng { namespace grpc { namespace clickhouse { @@ -40,7 +44,19 @@ class DestWorker final : public syslogng::grpc::DestWorker LogThreadedResult flush(LogThreadedFlushMode mode); private: + bool should_initiate_flush(); + void prepare_query_info(::clickhouse::grpc::QueryInfo &query_info); + void prepare_batch(); DestDriver *get_owner(); + +private: + std::shared_ptr<::grpc::Channel> channel; + std::unique_ptr<::clickhouse::grpc::ClickHouse::Stub> stub; + std::unique_ptr<::grpc::ClientContext> client_context; + + std::ostringstream query_data; + size_t batch_size = 0; + size_t current_batch_bytes = 0; }; } diff --git a/modules/grpc/clickhouse/clickhouse-dest.cpp b/modules/grpc/clickhouse/clickhouse-dest.cpp index 2e26a5c26..308d4395b 100644 --- a/modules/grpc/clickhouse/clickhouse-dest.cpp +++ b/modules/grpc/clickhouse/clickhouse-dest.cpp @@ -40,6 +40,7 @@ DestDriver::DestDriver(GrpcDestDriver *s) &this->template_options, &this->super->super.super.super.super) { this->url = "localhost:9100"; + this->enable_dynamic_headers(); } bool