From 06ea35fc54e082ce78323a8162c95045a054523f Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Sat, 4 Nov 2023 14:24:00 +0800 Subject: [PATCH] fix --- be/src/vec/sink/vtablet_sink_v2.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index cf5e929ec16555..3700455599302d 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -370,6 +370,11 @@ Status VOlapTableSinkV2::_cancel(Status status) { _delta_writer_for_tablet->cancel(status); _delta_writer_for_tablet.reset(); } + for (const auto& [_, streams] : _streams_for_node) { + for (const auto& stream : *streams) { + RETURN_IF_ERROR(stream->close_wait(1)); + } + } return Status::OK(); } @@ -410,6 +415,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(_close_load_timer); for (const auto& [_, streams] : _streams_for_node) { for (const auto& stream : *streams) { + // TODO: remaining timeout RETURN_IF_ERROR(stream->close_wait()); } } @@ -447,7 +453,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { } _close_status = status; - static_cast(DataSink::close(state, exec_status)); + RETURN_IF_ERROR(DataSink::close(state, exec_status)); return status; }