Skip to content

Commit

Permalink
[enhancement](random_sink) change tablet search algorithm from random…
Browse files Browse the repository at this point in the history
… to round-robin for random distribution table (apache#26611)

1. fix race condition problem when get tablet load index
2. change tablet search algorithm from random to round-robin for random distribution table when load_to_single_tablet set to false
  • Loading branch information
caiconghui authored Nov 15, 2023
1 parent 4e105e9 commit 83edcde
Show file tree
Hide file tree
Showing 12 changed files with 410 additions and 151 deletions.
18 changes: 9 additions & 9 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ struct VOlapTablePartition {
int64_t num_buckets = 0;
std::vector<OlapTableIndexTablets> indexes;
bool is_mutable;
// -1 indicates load_to_single_tablet = false
// -1 indicates partition with hash distribution
int64_t load_tablet_idx = -1;

VOlapTablePartition(vectorized::Block* partition_block)
Expand Down Expand Up @@ -187,7 +187,7 @@ class VOlapTablePartitionParam {
const std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_indexes /*result*/,
/*TODO: check if flat hash map will be better*/
std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr) const {
std::map<VOlapTablePartition*, int64_t>* partition_tablets_buffer = nullptr) const {
std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)>
compute_function;
if (!_distributed_slot_locs.empty()) {
Expand All @@ -212,10 +212,9 @@ class VOlapTablePartitionParam {
compute_function = [](vectorized::Block* block, uint32_t row,
const VOlapTablePartition& partition) -> uint32_t {
if (partition.load_tablet_idx == -1) {
// load_to_single_tablet = false, just do random
// for compatible with old version, just do random
return butil::fast_rand() % partition.num_buckets;
}
// load_to_single_tablet = ture, do round-robin
return partition.load_tablet_idx % partition.num_buckets;
};
}
Expand All @@ -226,14 +225,15 @@ class VOlapTablePartitionParam {
}
} else { // use buffer
for (auto index : indexes) {
auto& partition_id = partitions[index]->id;
if (auto it = partition_tablets_buffer->find(partition_id);
auto* partition = partitions[index];
if (auto it = partition_tablets_buffer->find(partition);
it != partition_tablets_buffer->end()) {
tablet_indexes[index] = it->second; // tablet
} else {
// compute and save in buffer
(*partition_tablets_buffer)[partition] = tablet_indexes[index] =
compute_function(block, index, *partitions[index]);
}
// compute and save in buffer
(*partition_tablets_buffer)[partition_id] = tablet_indexes[index] =
compute_function(block, index, *partitions[index]);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Status BaseCompaction::execute_compact_impl() {

void BaseCompaction::_filter_input_rowset() {
// if dup_key and no delete predicate
// we skip big files too save resources
// we skip big files to save resources
if (_tablet->keys_type() != KeysType::DUP_KEYS) {
return;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ Status VRowDistribution::generate_rows_distribution(
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
_state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows));

_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
auto num_rows = block->rows();
_tablet_finder->filter_bitmap().Reset(num_rows);
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/sink/vtablet_finder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,18 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
} else {
// for random distribution
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
&_partition_to_tablet_map);
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
for (auto it : _partition_to_tablet_map) {
// do round-robin for next batch
if (it.first->load_tablet_idx != -1) {
it.first->load_tablet_idx++;
}
}
_partition_to_tablet_map.clear();
}
}

return Status::OK();
Expand Down
17 changes: 6 additions & 11 deletions be/src/vec/sink/vtablet_finder.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ namespace doris::vectorized {

class OlapTabletFinder {
public:
// FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we
// FIND_TABLET_EVERY_ROW is used for hash distribution info, which indicates that we
// should compute tablet index for every row
// FIND_TABLET_EVERY_BATCH is only used for random distribution info, which indicates that we should
// FIND_TABLET_EVERY_BATCH is used for random distribution info, which indicates that we should
// compute tablet index for every row batch
// FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should
// only compute tablet index in the corresponding partition once for the whole time in olap table sink
// FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true,
// which indicates that we should only compute tablet index in the corresponding partition once for the
// whole time in olap table sink
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK };

OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
Expand All @@ -50,12 +51,6 @@ class OlapTabletFinder {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
}

void clear_for_new_batch() {
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
_partition_to_tablet_map.clear();
}
}

bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }

const vectorized::flat_hash_set<int64_t>& partition_ids() { return _partition_ids; }
Expand All @@ -71,7 +66,7 @@ class OlapTabletFinder {
private:
VOlapTablePartitionParam* _vpartition;
FindTabletMode _find_tablet_mode;
std::map<int64_t, int64_t> _partition_to_tablet_map;
std::map<VOlapTablePartition*, int64_t> _partition_to_tablet_map;
vectorized::flat_hash_set<int64_t> _partition_ids;

int64_t _num_filtered_rows = 0;
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,6 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
_state->update_num_bytes_load_total(bytes);
DorisMetrics::instance()->load_rows->increment(rows);
DorisMetrics::instance()->load_bytes->increment(bytes);

// Random distribution and the block belongs to a single tablet, we could optimize to append the whole
// block into node channel.
bool load_block_to_single_tablet =
Expand Down
12 changes: 6 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
import org.apache.doris.persist.meta.MetaHeader;
import org.apache.doris.persist.meta.MetaReader;
import org.apache.doris.persist.meta.MetaWriter;
import org.apache.doris.planner.SingleTabletLoadRecorderMgr;
import org.apache.doris.planner.TabletLoadIndexRecorderMgr;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.plugin.PluginMgr;
import org.apache.doris.policy.PolicyMgr;
Expand Down Expand Up @@ -331,7 +331,7 @@ public class Env {
private LoadManager loadManager;
private ProgressManager progressManager;
private StreamLoadRecordMgr streamLoadRecordMgr;
private SingleTabletLoadRecorderMgr singleTabletLoadRecorderMgr;
private TabletLoadIndexRecorderMgr tabletLoadIndexRecorderMgr;
private RoutineLoadManager routineLoadManager;
private SqlBlockRuleMgr sqlBlockRuleMgr;
private ExportMgr exportMgr;
Expand Down Expand Up @@ -682,7 +682,7 @@ private Env(boolean isCheckpointCatalog) {
this.progressManager = new ProgressManager();
this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager",
Config.fetch_stream_load_record_interval_second * 1000L);
this.singleTabletLoadRecorderMgr = new SingleTabletLoadRecorderMgr();
this.tabletLoadIndexRecorderMgr = new TabletLoadIndexRecorderMgr();
this.loadEtlChecker = new LoadEtlChecker(loadManager);
this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
Expand Down Expand Up @@ -1549,7 +1549,7 @@ private void startMasterOnlyDaemonThreads() {
cooldownConfHandler.start();
}
streamLoadRecordMgr.start();
singleTabletLoadRecorderMgr.start();
tabletLoadIndexRecorderMgr.start();
getInternalCatalog().getIcebergTableCreationRecordMgr().start();
new InternalSchemaInitializer().start();
if (Config.enable_hms_events_incremental_sync) {
Expand Down Expand Up @@ -3758,8 +3758,8 @@ public StreamLoadRecordMgr getStreamLoadRecordMgr() {
return streamLoadRecordMgr;
}

public SingleTabletLoadRecorderMgr getSingleTabletLoadRecorderMgr() {
return singleTabletLoadRecorderMgr;
public TabletLoadIndexRecorderMgr getTabletLoadIndexRecorderMgr() {
return tabletLoadIndexRecorderMgr;
}

public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
Expand Down Expand Up @@ -109,8 +110,6 @@ public class OlapTableSink extends DataSink {

private boolean isStrictMode = false;

private boolean loadToSingleTablet;

public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
boolean singleReplicaLoad) {
this.dstTable = dstTable;
Expand All @@ -134,7 +133,6 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou
"if load_to_single_tablet set to true," + " the olap table must be with random distribution");
}
tSink.setLoadToSingleTablet(loadToSingleTablet);
this.loadToSingleTablet = loadToSingleTablet;
tDataSink = new TDataSink(getDataSinkType());
tDataSink.setOlapTableSink(tSink);

Expand Down Expand Up @@ -344,11 +342,12 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Ana
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId));
if (loadToSingleTablet) {
int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
.getCurrentLoadTabletIndex(dbId, table.getId(), partitionId);
if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
.getCurrentTabletLoadIndex(dbId, table.getId(), partition);
tPartition.setLoadTabletIdx(tabletIndex);
}

partitionParam.addToPartitions(tPartition);

DistributionInfo distInfo = partition.getDistributionInfo();
Expand Down Expand Up @@ -403,9 +402,10 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Ana
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
tPartition.setNumBuckets(index.getTablets().size());
}
if (loadToSingleTablet) {
int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
.getCurrentLoadTabletIndex(dbId, table.getId(), partition.getId());

if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
.getCurrentTabletLoadIndex(dbId, table.getId(), partition);
tPartition.setLoadTabletIdx(tabletIndex);
}
partitionParam.addToPartitions(tPartition);
Expand Down

This file was deleted.

Loading

0 comments on commit 83edcde

Please sign in to comment.