diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index a62d603fe0e2d9..c8b7391f8e24f7 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1529,6 +1529,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { std::map succ_tablets; // partition_id, tablet_id, publish_version std::vector> discontinuous_version_tablets; + std::map tablet_id_to_num_delta_rows; uint32_t retry_time = 0; Status status; bool is_task_timeout = false; @@ -1536,7 +1537,8 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { succ_tablets.clear(); error_tablet_ids.clear(); EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, - &succ_tablets, &discontinuous_version_tablets); + &succ_tablets, &discontinuous_version_tablets, + &tablet_id_to_num_delta_rows); status = StorageEngine::instance()->execute_task(&engine_task); if (status.ok()) { break; @@ -1621,7 +1623,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { finish_task_request.__set_succ_tablets(succ_tablets); finish_task_request.__set_error_tablet_ids( std::vector(error_tablet_ids.begin(), error_tablet_ids.end())); - + finish_task_request.__set_tablet_id_to_delta_num_rows(tablet_id_to_num_delta_rows); _finish_task(finish_task_request); _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index a56430fbb16c9c..d46cf3f7b82f5a 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -71,11 +71,13 @@ void TabletPublishStatistics::record_in_bvar() { EnginePublishVersionTask::EnginePublishVersionTask( const TPublishVersionRequest& publish_version_req, std::set* error_tablet_ids, std::map* succ_tablets, - std::vector>* discontinuous_version_tablets) + std::vector>* discontinuous_version_tablets, + std::map* tablet_id_to_num_delta_rows) : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), _succ_tablets(succ_tablets), - _discontinuous_version_tablets(discontinuous_version_tablets) {} + _discontinuous_version_tablets(discontinuous_version_tablets), + _tablet_id_to_num_delta_rows(tablet_id_to_num_delta_rows) {} void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) { std::lock_guard lck(_tablet_ids_mutex); @@ -186,6 +188,9 @@ Status EnginePublishVersionTask::finish() { continue; } } + auto rowset_meta_ptr = rowset->rowset_meta(); + _tablet_id_to_num_delta_rows->insert( + {rowset_meta_ptr->tablet_id(), rowset_meta_ptr->num_rows()}); auto tablet_publish_txn_ptr = std::make_shared( this, tablet, rowset, partition_id, transaction_id, version, tablet_info); auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); }); diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 0a270c93d2a418..46ad5f9949458d 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -87,10 +87,11 @@ class EnginePublishVersionTask : public EngineTask { EnginePublishVersionTask( const TPublishVersionRequest& publish_version_req, std::set* error_tablet_ids, std::map* succ_tablets, - std::vector>* discontinous_version_tablets); - ~EnginePublishVersionTask() {} + std::vector>* discontinous_version_tablets, + std::map* tablet_id_to_num_delta_rows); + ~EnginePublishVersionTask() override = default; - virtual Status finish() override; + Status finish() override; void add_error_tablet_id(int64_t tablet_id); @@ -102,6 +103,7 @@ class EnginePublishVersionTask : public EngineTask { std::set* _error_tablet_ids; std::map* _succ_tablets; std::vector>* _discontinuous_version_tablets; + std::map* _tablet_id_to_num_delta_rows; }; class AsyncTabletPublishTask { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 23118647ccafa5..4cffd1b38b6c01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -491,6 +491,9 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) { // not remove the task from queue and be will retry return; } + if (request.isSetTabletIdToDeltaNumRows()) { + publishVersionTask.setTabletIdToDeltaNumRows(request.getTabletIdToDeltaNumRows()); + } AgentTaskQueue.removeTask(publishVersionTask.getBackendId(), publishVersionTask.getTaskType(), publishVersionTask.getSignature()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 8461b1db4f5efe..998733657a73c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -21,6 +21,7 @@ import org.apache.doris.thrift.TPublishVersionRequest; import org.apache.doris.thrift.TTaskType; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,6 +39,11 @@ public class PublishVersionTask extends AgentTask { // tabletId => version, current version = 0 private Map succTablets; + /** + * To collect loaded rows for each tablet from each BE + */ + private final Map tabletIdToDeltaNumRows = Maps.newHashMap(); + public PublishVersionTask(long backendId, long transactionId, long dbId, List partitionVersionInfos, long createTime) { super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime); @@ -81,4 +87,12 @@ public synchronized void addErrorTablets(List errorTablets) { } this.errorTablets.addAll(errorTablets); } + + public void setTabletIdToDeltaNumRows(Map tabletIdToDeltaNumRows) { + this.tabletIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows); + } + + public Map getTabletIdToDeltaNumRows() { + return tabletIdToDeltaNumRows; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 7974cb6a89ae24..bbf847d9b2c408 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -51,6 +51,7 @@ import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.ClearTransactionTask; @@ -1787,6 +1788,9 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat } } } + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + LOG.debug("table id to loaded rows:{}", transactionState.getTableIdToNumDeltaRows()); + transactionState.getTableIdToNumDeltaRows().forEach(analysisManager::updateUpdatedRows); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 33ea8de07eb454..af8c4a2e1b041a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -18,6 +18,8 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.metric.MetricRepo; @@ -29,13 +31,17 @@ import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Stream; public class PublishVersionDaemon extends MasterDaemon { @@ -121,12 +127,39 @@ private void publishVersion() { AgentTaskExecutor.submit(batchTask); } + TabletInvertedIndex tabletInvertedIndex = Env.getCurrentEnv().getTabletInvertedIndex(); + Set tabletIdFilter = Sets.newHashSet(); + Map tableIdToNumDeltaRows = Maps.newHashMap(); // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { - boolean hasBackendAliveAndUnfinishTask = transactionState.getPublishVersionTasks().values().stream() + Stream publishVersionTaskStream = transactionState + .getPublishVersionTasks() + .values() + .stream() + .peek(task -> { + if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { + Map tabletIdToDeltaNumRows = + task.getTabletIdToDeltaNumRows(); + tabletIdToDeltaNumRows.forEach((tabletId, numRows) -> { + if (!tabletIdFilter.add(tabletId)) { + // means the delta num rows for this tablet id has been collected + return; + } + TabletMeta tabletMeta = tabletInvertedIndex.getTabletMeta(tabletId); + if (tabletMeta == null) { + // for delete, drop, schema change etc. here may be a null value + return; + } + long tableId = tabletMeta.getTableId(); + tableIdToNumDeltaRows.computeIfPresent(tableId, (tblId, orgNum) -> orgNum + numRows); + tableIdToNumDeltaRows.putIfAbsent(tableId, numRows); + }); + } + }); + boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || transactionState.isPublishTimeout(); + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout(); if (shouldFinishTxn) { try { // one transaction exception should not affect other transaction diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 897bc3b63b8ff7..4d72490f77ea99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -253,6 +253,8 @@ public String toString() { // tbl id -> (index ids) private Map> loadedTblIndexes = Maps.newHashMap(); + private Map tableIdToNumDeltaRows = Maps.newHashMap(); + private String errorLogUrl = null; // record some error msgs during the transaction operation. @@ -701,6 +703,14 @@ public void readFields(DataInput in) throws IOException { } } + public Map getTableIdToNumDeltaRows() { + return tableIdToNumDeltaRows; + } + + public void setTableIdToNumDeltaRows(Map tableIdToNumDeltaRows) { + this.tableIdToNumDeltaRows.putAll(tableIdToNumDeltaRows); + } + public void setErrorMsg(String errMsg) { this.errMsg = errMsg; } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index dedc454d33f259..0e56c0e658a5c6 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -66,6 +66,7 @@ struct TFinishTaskRequest { 15: optional i64 copy_size 16: optional i64 copy_time_ms 17: optional map succ_tablets + 18: optional map tablet_id_to_delta_num_rows } struct TTablet {