diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index ebc3269151143f..5a9821697b4a05 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -109,6 +109,7 @@ void LoadStreamStub::prepare() { return Status::InternalError("stream {} is already inited by {}", _stream_id, _load_id); } ++_use_cnt; + LOG(WARNING) << "prepare stream " << _stream_id << " load_id " << _load_id << " use_cnt " << _use_cnt; } // open_load_stream @@ -210,11 +211,16 @@ Status LoadStreamStub::close_load(const std::vector& tablets_to_commi if (--_use_cnt > 0) { return Status::OK(); } + if (_use_cnt <= 0) { + return Status::InternalError("stream {} already closed", _stream_id); + } + + DCHECK(_use_cnt == 0); PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); - LOG(INFO) << "issue close load stream " << _stream_id; + LOG(INFO) << "issue close load stream " << _stream_id << " load_id " << _load_id; { std::lock_guard lock(_tablets_to_commit_mutex); for (const auto& tablet : _tablets_to_commit) {