Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Nov 4, 2023
1 parent 8f57fd9 commit d4b9400
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 22 deletions.
3 changes: 3 additions & 0 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
: _is_init(false),
_load_id(load_id),
_src_id(src_id),
_handler(load_id),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {}

LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
: _is_init(stub._is_init.load()),
_load_id(stub._load_id),
_src_id(stub._src_id),
_handler(stub._load_id),
_tablet_schema_for_index(stub._tablet_schema_for_index),
_enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {}

Expand Down Expand Up @@ -135,6 +137,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
return Status::Error<true>(ret, "Failed to create stream");
}
_handler.set_stream_id(_stream_id);
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
POpenStreamSinkRequest request;
*request.mutable_load_id() = _load_id;
Expand Down
20 changes: 16 additions & 4 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class LoadStreamStub {
private:
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
public:
LoadStreamReplyHandler(PUniqueId load_id) {
_load_id = load_id;
}

int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
size_t size) override;

Expand All @@ -99,13 +103,17 @@ class LoadStreamStub {
if (_is_closed) {
return Status::OK();
}
int ret = 0;
if (timeout_ms > 0) {
int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
return ret == 0 ? Status::OK()
: Status::Error<true>(ret, "stream close_wait timeout");
ret = _close_cv.wait_for(lock, timeout_ms * 1000);
if (!_is_closed) {
// avoid use after free on on_closed
brpc::StreamClose(_stream_id);
}
}
_close_cv.wait(lock);
return Status::OK();
return ret == 0 ? Status::OK()
: Status::Error<true>(ret, "stream close_wait timeout");
};

std::vector<int64_t> success_tablets() {
Expand All @@ -120,7 +128,11 @@ class LoadStreamStub {

void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }

void set_stream_id(brpc::StreamId stream_id) { _stream_id = stream_id; }

private:
PUniqueId _load_id;
brpc::StreamId _stream_id;
int64_t _dst_id = -1; // for logging
std::atomic<bool> _is_closed;
bthread::Mutex _mutex;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ class LoadStreamStubPool {
};

} // namespace stream_load
} // namespace doris
} // namespace doris
17 changes: 0 additions & 17 deletions be/src/vec/sink/vtablet_sink_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,6 @@ using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>;
using NodePartitionTabletMapping =
std::unordered_map<int64_t, std::unordered_map<int64_t, std::unordered_set<int64_t>>>;

class StreamSinkHandler : public brpc::StreamInputHandler {
public:
StreamSinkHandler(VOlapTableSinkV2* sink) : _sink(sink) {}

int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
size_t size) override;

void on_idle_timeout(brpc::StreamId id) override {}

void on_closed(brpc::StreamId id) override;

private:
VOlapTableSinkV2* _sink;
};

struct Rows {
int64_t partition_id;
int64_t index_id;
Expand Down Expand Up @@ -226,8 +211,6 @@ class VOlapTableSinkV2 final : public DataSink {
std::unordered_map<int64_t, std::vector<int64_t>> _tablet_failure_map;
bthread::Mutex _tablet_success_map_mutex;
bthread::Mutex _tablet_failure_map_mutex;

friend class StreamSinkHandler;
};

} // namespace vectorized
Expand Down

0 comments on commit d4b9400

Please sign in to comment.