From d4b94004aa3c0848d34a0dbe87673bc68d62cee3 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Sat, 4 Nov 2023 12:55:33 +0800 Subject: [PATCH] fix --- be/src/vec/sink/load_stream_stub.cpp | 3 +++ be/src/vec/sink/load_stream_stub.h | 20 ++++++++++++++++---- be/src/vec/sink/load_stream_stub_pool.h | 2 +- be/src/vec/sink/vtablet_sink_v2.h | 17 ----------------- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 386a9183cf5671..66369a5d6fdf37 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -87,6 +87,7 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id) : _is_init(false), _load_id(load_id), _src_id(src_id), + _handler(load_id), _tablet_schema_for_index(std::make_shared()), _enable_unique_mow_for_index(std::make_shared()) {} @@ -94,6 +95,7 @@ LoadStreamStub::LoadStreamStub(LoadStreamStub& stub) : _is_init(stub._is_init.load()), _load_id(stub._load_id), _src_id(stub._src_id), + _handler(stub._load_id), _tablet_schema_for_index(stub._tablet_schema_for_index), _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {} @@ -135,6 +137,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, if (int ret = StreamCreate(&_stream_id, cntl, &opt)) { return Status::Error(ret, "Failed to create stream"); } + _handler.set_stream_id(_stream_id); cntl.set_timeout_ms(config::open_load_stream_timeout_ms); POpenStreamSinkRequest request; *request.mutable_load_id() = _load_id; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index dbbd1b4ffcded7..19c9f31ccf9517 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -85,6 +85,10 @@ class LoadStreamStub { private: class LoadStreamReplyHandler : public brpc::StreamInputHandler { public: + LoadStreamReplyHandler(PUniqueId load_id) { + _load_id = load_id; + } + int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], size_t size) override; @@ -99,13 +103,17 @@ class LoadStreamStub { if (_is_closed) { return Status::OK(); } + int ret = 0; if (timeout_ms > 0) { - int ret = _close_cv.wait_for(lock, timeout_ms * 1000); - return ret == 0 ? Status::OK() - : Status::Error(ret, "stream close_wait timeout"); + ret = _close_cv.wait_for(lock, timeout_ms * 1000); + if (!_is_closed) { + // avoid use after free on on_closed + brpc::StreamClose(_stream_id); + } } _close_cv.wait(lock); - return Status::OK(); + return ret == 0 ? Status::OK() + : Status::Error(ret, "stream close_wait timeout"); }; std::vector success_tablets() { @@ -120,7 +128,11 @@ class LoadStreamStub { void set_dst_id(int64_t dst_id) { _dst_id = dst_id; } + void set_stream_id(brpc::StreamId stream_id) { _stream_id = stream_id; } + private: + PUniqueId _load_id; + brpc::StreamId _stream_id; int64_t _dst_id = -1; // for logging std::atomic _is_closed; bthread::Mutex _mutex; diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index 73b41fdd61adc8..3d7caf34222a35 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -99,4 +99,4 @@ class LoadStreamStubPool { }; } // namespace stream_load -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 5bacffbcea36f0..cc556009b8e0c0 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -87,21 +87,6 @@ using NodeIdForStream = std::unordered_map; using NodePartitionTabletMapping = std::unordered_map>>; -class StreamSinkHandler : public brpc::StreamInputHandler { -public: - StreamSinkHandler(VOlapTableSinkV2* sink) : _sink(sink) {} - - int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], - size_t size) override; - - void on_idle_timeout(brpc::StreamId id) override {} - - void on_closed(brpc::StreamId id) override; - -private: - VOlapTableSinkV2* _sink; -}; - struct Rows { int64_t partition_id; int64_t index_id; @@ -226,8 +211,6 @@ class VOlapTableSinkV2 final : public DataSink { std::unordered_map> _tablet_failure_map; bthread::Mutex _tablet_success_map_mutex; bthread::Mutex _tablet_failure_map_mutex; - - friend class StreamSinkHandler; }; } // namespace vectorized