diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 7eb8f1821c57191..9e50ca189bcbc8f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -691,7 +691,7 @@ Status BetaRowsetWriter::_check_segment_number_limit() { return Status::OK(); } -Status BetaRowsetWriter::add_segment(uint32_t segment_id, SegmentStatistics& segstat) { +Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { uint32_t segid_offset = segment_id - _segment_start_id; { std::lock_guard lock(_segid_statistics_map_mutex); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 9c615048a7fdc21..8918e30f3e24386 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -78,7 +78,7 @@ class BetaRowsetWriter : public RowsetWriter { Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override; - Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override; + Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override; Status flush() override; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index abd7af2aabaf6b2..e16ce5fa9fa0a83 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -88,7 +88,7 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite return Status::OK(); } -Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics& segstat) { +Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { for (const auto& stream : _streams) { RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id, segment_id, segstat)); diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index d35f4b2486f31ae..1a6db6df4092f75 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -120,7 +120,7 @@ class BetaRowsetWriterV2 : public RowsetWriter { return Status::OK(); } - Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override; + Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override; int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); }; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 7958cc2ce0727a6..8ba5666bf56f93b 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -48,7 +48,7 @@ struct SegmentStatistics { index_size(pb.index_size()), key_bounds(pb.key_bounds()) {} - void to_pb(SegmentStatisticsPB* segstat_pb) { + void to_pb(SegmentStatisticsPB* segstat_pb) const { segstat_pb->set_row_num(row_num); segstat_pb->set_data_size(data_size); segstat_pb->set_index_size(index_size); @@ -114,7 +114,7 @@ class RowsetWriter { "RowsetWriter not support flush_single_block"); } - virtual Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) { + virtual Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { return Status::NotSupported("RowsetWriter does not support add_segment"); } diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 29df93386c47d6a..242fa9b8c8ba84a 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -129,7 +129,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data LOG(INFO) << "write data failed " << *this; } }; - return _flush_tokens[segid % _flush_tokens.size()]->submit_func(flush_func); + return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func); } Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { @@ -151,7 +151,14 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data } DCHECK(new_segid != std::numeric_limits::max()); - return _load_stream_writer->add_segment(new_segid, stat); + auto flush_func = [this, new_segid, stat]() { + auto st = _load_stream_writer->add_segment(new_segid, stat); + if (!st.ok() && _failed_st->ok()) { + _failed_st = std::make_shared(st); + LOG(INFO) << "add segment failed " << *this; + } + }; + return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func); } Status TabletStream::close() { diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index e44a682b25babd0..c4732363beccff0 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -121,7 +121,16 @@ Status LoadStreamWriter::close_segment(uint32_t segid) { return Status::OK(); } -Status LoadStreamWriter::add_segment(uint32_t segid, SegmentStatistics& stat) { +Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) { + if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) { + LOG(WARNING) << _segment_file_writers[segid]->path() << " is added without all data, " + << "actual " << _segment_file_writers[segid]->bytes_appended() + << " expected " << stat.data_size; + return Status::Corruption("segment {} is added without all data, actual {} expected {}", + _segment_file_writers[segid]->path().native(), + _segment_file_writers[segid]->bytes_appended(), + stat.data_size); + } return _rowset_writer->add_segment(segid, stat); } @@ -145,7 +154,9 @@ Status LoadStreamWriter::close() { for (size_t i = 0; i < _segment_file_writers.size(); i++) { if (!_segment_file_writers[i]->is_closed()) { - return Status::Corruption("segment {} is not eos", i); + LOG(WARNING) << _segment_file_writers[i]->path() << " is not eos"; + return Status::Corruption("segment {} is not eos", + _segment_file_writers[i]->path().native()); } } diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index e4a3341cea38d7b..dc952d2c18da48e 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -71,7 +71,7 @@ class LoadStreamWriter { Status close_segment(uint32_t segid); - Status add_segment(uint32_t segid, SegmentStatistics& stat); + Status add_segment(uint32_t segid, const SegmentStatistics& stat); // wait for all memtables to be flushed. Status close(); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 4e37b02ea4a27df..ecaae54d7a76d2c 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -174,7 +174,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 // ADD_SEGMENT Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, - int64_t segment_id, SegmentStatistics& segment_stat) { + int64_t segment_id, const SegmentStatistics& segment_stat) { PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 696397bd281b08f..45be25e6c1676ad 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -164,7 +164,7 @@ class LoadStreamStub { // ADD_SEGMENT Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, - int64_t segment_id, SegmentStatistics& segment_stat); + int64_t segment_id, const SegmentStatistics& segment_stat); // CLOSE_LOAD Status close_load(const std::vector& tablets_to_commit);