Skip to content

Commit

Permalink
sequence add segment and flush
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Nov 3, 2023
1 parent a9aa656 commit 0378719
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 13 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ Status BetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}

Status BetaRowsetWriter::add_segment(uint32_t segment_id, SegmentStatistics& segstat) {
Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
uint32_t segid_offset = segment_id - _segment_start_id;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BetaRowsetWriter : public RowsetWriter {

Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override;

Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override;
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override;

Status flush() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite
return Status::OK();
}

Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics& segstat) {
Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
for (const auto& stream : _streams) {
RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id,
_context.tablet_id, segment_id, segstat));
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class BetaRowsetWriterV2 : public RowsetWriter {
return Status::OK();
}

Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override;
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override;

int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); };

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct SegmentStatistics {
index_size(pb.index_size()),
key_bounds(pb.key_bounds()) {}

void to_pb(SegmentStatisticsPB* segstat_pb) {
void to_pb(SegmentStatisticsPB* segstat_pb) const {
segstat_pb->set_row_num(row_num);
segstat_pb->set_data_size(data_size);
segstat_pb->set_index_size(index_size);
Expand Down Expand Up @@ -114,7 +114,7 @@ class RowsetWriter {
"RowsetWriter not support flush_single_block");
}

virtual Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) {
virtual Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
return Status::NotSupported("RowsetWriter does not support add_segment");
}

Expand Down
11 changes: 9 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
LOG(INFO) << "write data failed " << *this;
}
};
return _flush_tokens[segid % _flush_tokens.size()]->submit_func(flush_func);
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func);
}

Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
Expand All @@ -151,7 +151,14 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());

return _load_stream_writer->add_segment(new_segid, stat);
auto flush_func = [this, new_segid, stat]() {
auto st = _load_stream_writer->add_segment(new_segid, stat);
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
LOG(INFO) << "add segment failed " << *this;
}
};
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func);
}

Status TabletStream::close() {
Expand Down
15 changes: 13 additions & 2 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,16 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
return Status::OK();
}

Status LoadStreamWriter::add_segment(uint32_t segid, SegmentStatistics& stat) {
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) {
if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) {
LOG(WARNING) << _segment_file_writers[segid]->path() << " is added without all data, "
<< "actual " << _segment_file_writers[segid]->bytes_appended()
<< " expected " << stat.data_size;
return Status::Corruption("segment {} is added without all data, actual {} expected {}",
_segment_file_writers[segid]->path().native(),
_segment_file_writers[segid]->bytes_appended(),
stat.data_size);
}
return _rowset_writer->add_segment(segid, stat);
}

Expand All @@ -145,7 +154,9 @@ Status LoadStreamWriter::close() {

for (size_t i = 0; i < _segment_file_writers.size(); i++) {
if (!_segment_file_writers[i]->is_closed()) {
return Status::Corruption("segment {} is not eos", i);
LOG(WARNING) << _segment_file_writers[i]->path() << " is not eos";
return Status::Corruption("segment {} is not eos",
_segment_file_writers[i]->path().native());
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LoadStreamWriter {

Status close_segment(uint32_t segid);

Status add_segment(uint32_t segid, SegmentStatistics& stat);
Status add_segment(uint32_t segid, const SegmentStatistics& stat);

// wait for all memtables to be flushed.
Status close();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64

// ADD_SEGMENT
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, SegmentStatistics& segment_stat) {
int64_t segment_id, const SegmentStatistics& segment_stat) {
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class LoadStreamStub {

// ADD_SEGMENT
Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, SegmentStatistics& segment_stat);
int64_t segment_id, const SegmentStatistics& segment_stat);

// CLOSE_LOAD
Status close_load(const std::vector<PTabletID>& tablets_to_commit);
Expand Down

0 comments on commit 0378719

Please sign in to comment.