Skip to content

Commit

Permalink
increate load stream stub use cnt in sink prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored and dataroaring committed Nov 3, 2023
1 parent 503f8a3 commit a9aa656
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 14 deletions.
15 changes: 11 additions & 4 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile) {
_num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init.load()) {
return Status::OK();
Expand Down Expand Up @@ -190,15 +189,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64

// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (--_num_open > 0) {
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
}
if (--_use_cnt > 0) {
return Status::OK();
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets_to_commit() = tablet;
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
for (const auto& tablet : _tablets_to_commit) {
*header.add_tablets_to_commit() = tablet;
}
}
return _encode_and_send(header);
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class LoadStreamStub {
#endif
~LoadStreamStub();

void prepare() { ++_use_cnt; };

// open_load_stream
Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
Expand Down Expand Up @@ -203,7 +205,10 @@ class LoadStreamStub {
std::atomic<bool> _is_init;
bthread::Mutex _mutex;

std::atomic<int> _num_open;
std::atomic<int> _use_cnt;

std::mutex _tablets_to_commit_mutex;
std::vector<PTabletID> _tablets_to_commit;

std::mutex _buffer_mutex;
std::mutex _send_mutex;
Expand Down
23 changes: 15 additions & 8 deletions be/src/vec/sink/vtablet_sink_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
} else {
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
}
_build_tablet_node_mapping();
_init_streams(state->backend_id());
return Status::OK();
}

Expand All @@ -171,21 +173,27 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
signal::set_signal_task_id(_load_id);

_build_tablet_node_mapping();
RETURN_IF_ERROR(_open_streams(state->backend_id()));

RETURN_IF_ERROR(_open_streams());
return Status::OK();
}

Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
void VOlapTableSinkV2::_init_streams(int64_t src_id) {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node);
for (auto& stream : *streams) {
stream->prepare();
}
_streams_for_node[dst_id] = streams;
}
}

Status VOlapTableSinkV2::_open_streams() {
for (auto& [dst_id, streams] : _streams_for_node) {
auto node_info = _nodes_info->find_node(dst_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
std::shared_ptr<Streams> streams;
streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node);
// get tablet schema from each backend only in the 1st stream
for (auto& stream : *streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
Expand All @@ -199,7 +207,6 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
*node_info, _txn_id, *_schema, {}, _total_streams,
_state->enable_profile()));
}
_streams_for_node[dst_id] = streams;
}
return Status::OK();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/sink/vtablet_sink_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ class VOlapTableSinkV2 final : public DataSink {
Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override;

private:
Status _open_streams(int64_t src_id);
void _init_streams(int64_t src_id);

Status _open_streams();

void _build_tablet_node_mapping();

Expand Down

0 comments on commit a9aa656

Please sign in to comment.