Skip to content

Commit

Permalink
[improvement](query) prefer to chose tablet on alive disk (apache#39467)
Browse files Browse the repository at this point in the history
improvement:
1.  when query, prefer to chose tablets on alive disks;
2. when be report tablets, if report version fall behind, try report
again;
3. when be restart, it report its tablets and disks immedidately, no
wait 1min;
4. when fe handle tablet report, even if this report is stale, but if
there exists other health tablets and this tablet is on bad disk, still
process this tablet;
  • Loading branch information
yujun777 authored Aug 20, 2024
1 parent 3e8c19f commit 5280c18
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 22 deletions.
19 changes: 10 additions & 9 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1009,13 +1009,6 @@ void report_task_callback(const TMasterInfo& master_info) {
}

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
if (config::report_random_wait) {
random_sleep(5);
}

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;
Expand Down Expand Up @@ -1081,8 +1074,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;

uint64_t report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
uint64_t report_version;
for (int i = 0; i < 5; i++) {
request.tablets.clear();
report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
if (report_version == s_report_version) {
break;
}
}

if (report_version < s_report_version) {
// TODO llj This can only reduce the possibility for report error, but can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
Expand Down
2 changes: 2 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ int main(int argc, char** argv) {
stop_work_if_error(
status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string());

exec_env->storage_engine().notify_listeners();

while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ public boolean hasPathHash() {
return pathHash != 0;
}

public boolean isAlive() {
return state == DiskState.ONLINE;
}

public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
return this.storageMedium == storageMedium;
}
Expand Down
28 changes: 20 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,11 @@ protected Multimap<Long, Long> getNormalReplicaBackendPathMapCloud(String beEndp
}

// for query
public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) {
public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Long>> backendAlivePathHashs,
boolean allowFailedVersion) {
List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> deadPathReplica = Lists.newArrayList();
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
Expand All @@ -317,21 +319,31 @@ public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFail
continue;
}

if (!replica.checkVersionCatchUp(visibleVersion, false)) {
continue;
}

Set<Long> thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendId());
ReplicaState state = replica.getState();
if (state.canQuery()) {
if (replica.checkVersionCatchUp(visibleVersion, false)) {
allQueryableReplica.add(replica);
}
// if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state.
// should ignore this case.
if (replica.getPathHash() != -1 && thisBeAlivePaths != null
&& !thisBeAlivePaths.contains(replica.getPathHash())
&& !thisBeAlivePaths.contains(0L)) {
deadPathReplica.add(replica);
} else if (state.canQuery()) {
allQueryableReplica.add(replica);
} else if (state == ReplicaState.DECOMMISSION) {
if (replica.checkVersionCatchUp(visibleVersion, false)) {
auxiliaryReplica.add(replica);
}
auxiliaryReplica.add(replica);
}
}

if (allQueryableReplica.isEmpty()) {
allQueryableReplica = auxiliaryReplica;
}
if (allQueryableReplica.isEmpty()) {
allQueryableReplica = deadPathReplica;
}

if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) {
long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.MaterializedIndex;
Expand Down Expand Up @@ -822,6 +823,15 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
Set<Long> backendHealthPathHashs;
if (backend == null) {
backendHealthPathHashs = Sets.newHashSet();
} else {
backendHealthPathHashs = backend.getDisks().values().stream()
.filter(DiskInfo::isAlive)
.map(DiskInfo::getPathHash).collect(Collectors.toSet());
}
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
Expand Down Expand Up @@ -877,7 +887,24 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
long currentBackendReportVersion = Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId);
if (backendReportVersion < currentBackendReportVersion) {
continue;

// if backendHealthPathHashs contains health path hash 0,
// it means this backend hadn't reported disks state,
// should ignore this case.
boolean thisReplicaOnBadDisk = replica.getPathHash() != -1L
&& !backendHealthPathHashs.contains(replica.getPathHash())
&& !backendHealthPathHashs.contains(0L);

boolean existsOtherHealthReplica = tablet.getReplicas().stream()
.anyMatch(r -> r.getBackendId() != replica.getBackendId()
&& r.getVersion() >= replica.getVersion()
&& r.getLastFailedVersion() == -1L
&& !r.isBad());

// if replica is on bad disks and there are other health replicas, still delete it.
if (!(thisReplicaOnBadDisk && existsOtherHealthReplica)) {
continue;
}
}

BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
Expand Down Expand Up @@ -751,7 +752,7 @@ public Long getTabletSingleReplicaSize(Long tabletId) {
}

private void addScanRangeLocations(Partition partition,
List<Tablet> tablets) throws UserException {
List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs) throws UserException {
long visibleVersion = Partition.PARTITION_INIT_VERSION;

// For cloud mode, set scan range visible version in Coordinator.exec so that we could
Expand Down Expand Up @@ -804,7 +805,8 @@ private void addScanRangeLocations(Partition partition,
//
// ATTN: visibleVersion is not used in cloud mode, see CloudReplica.checkVersionCatchup
// for details.
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion,
backendAlivePathHashs, skipMissingVersion);
if (replicas.isEmpty()) {
if (ConnectContext.get().getSessionVariable().skipBadTablet) {
continue;
Expand Down Expand Up @@ -1168,6 +1170,12 @@ private void computeTabletInfo() throws UserException {
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream()
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
}

for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
Expand Down Expand Up @@ -1209,7 +1217,7 @@ private void computeTabletInfo() throws UserException {

totalTabletsNum += selectedTable.getTablets().size();
selectedSplitNum += tablets.size();
addScanRangeLocations(partition, tablets);
addScanRangeLocations(partition, tablets, backendAlivePathHashs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ public void checkAvailableCapacity() throws DdlException {
}
}

private ImmutableMap<Long, Backend> getAllClusterBackendsNoException() {
public ImmutableMap<Long, Backend> getAllClusterBackendsNoException() {
try {
return getAllBackendsByAllCluster();
} catch (AnalysisException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.system.Backend;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class QueryTabletTest extends TestWithFeService {

@Override
protected int backendNum() {
return 3;
}

@Test
public void testTabletOnBadDisks() throws Exception {
createDatabase("db1");
createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1"
+ " properties('replication_num' = '3')");

Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
Assertions.assertNotNull(tbl);
Tablet tablet = tbl.getPartitions().iterator().next()
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
.getTablets().iterator().next();

List<Replica> replicas = tablet.getReplicas();
Assertions.assertEquals(3, replicas.size());
for (Replica replica : replicas) {
Assertions.assertTrue(replica.getPathHash() != -1L);
}

Assertions.assertEquals(replicas,
tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));

// disk mark as bad
Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId())
.getDisks().values().forEach(disk -> disk.setState(DiskInfo.DiskState.OFFLINE));

// lost disk
replicas.get(1).setPathHash(-123321L);

Assertions.assertEquals(Lists.newArrayList(replicas.get(2)),
tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
}

private Map<Long, Set<Long>> getAlivePathHashs() {
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream()
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
}

return backendAlivePathHashs;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TCloneReq;
import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TDropTabletReq;
import org.apache.doris.thrift.TExecPlanFragmentParams;
Expand Down Expand Up @@ -95,7 +96,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

/*
* This class is used to create mock backends.
Expand Down Expand Up @@ -203,13 +206,17 @@ public void run() {
TTaskType taskType = request.getTaskType();
switch (taskType) {
case CREATE:
++reportVersion;
handleCreateTablet(request, finishTaskRequest);
break;
case ALTER:
++reportVersion;
break;
case DROP:
handleDropTablet(request, finishTaskRequest);
break;
case CLONE:
++reportVersion;
handleCloneTablet(request, finishTaskRequest);
break;
case STORAGE_MEDIUM_MIGRATE:
Expand All @@ -235,6 +242,30 @@ public void run() {
}
}

private void handleCreateTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
TCreateTabletReq req = request.getCreateTabletReq();
List<DiskInfo> candDisks = backendInFe.getDisks().values().stream()
.filter(disk -> req.storage_medium == disk.getStorageMedium() && disk.isAlive())
.collect(Collectors.toList());
if (candDisks.isEmpty()) {
candDisks = backendInFe.getDisks().values().stream()
.filter(DiskInfo::isAlive)
.collect(Collectors.toList());
}
DiskInfo choseDisk = candDisks.isEmpty() ? null
: candDisks.get(new Random().nextInt(candDisks.size()));

List<TTabletInfo> tabletInfos = Lists.newArrayList();
TTabletInfo tabletInfo = new TTabletInfo();
tabletInfo.setTabletId(req.tablet_id);
tabletInfo.setVersion(req.version);
tabletInfo.setPathHash(choseDisk == null ? -1L : choseDisk.getPathHash());
tabletInfo.setReplicaId(req.replica_id);
tabletInfo.setUsed(true);
tabletInfos.add(tabletInfo);
finishTaskRequest.setFinishTabletInfos(tabletInfos);
}

private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
TDropTabletReq req = request.getDropTabletReq();
long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id));
Expand Down

0 comments on commit 5280c18

Please sign in to comment.