Skip to content

Commit

Permalink
[feature](merge-cloud) Add CloudGlobalTransactionMgr
Browse files Browse the repository at this point in the history
* Abstract GlobalTransactionMgrIface interface
* Implement CloudGlobalTransactionMgr
  • Loading branch information
SWJTU-ZhangLei committed Jan 19, 2024
1 parent 0fd46fb commit 8fa3ffa
Show file tree
Hide file tree
Showing 34 changed files with 2,176 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public synchronized void run() {
default:
break;
}
} catch (AlterCancelException e) {
} catch (Exception e) {
cancelImpl(e.getMessage());
}
}
Expand Down Expand Up @@ -260,7 +260,7 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {
}
}

protected abstract void runPendingJob() throws AlterCancelException;
protected abstract void runPendingJob() throws Exception;

protected abstract void runWaitingTxnJob() throws AlterCancelException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,15 @@ public IndexChangeJob() {
this.jobState = JobState.WAITING_TXN;
}

public IndexChangeJob(long jobId, long dbId, long tableId, String tableName) {
public IndexChangeJob(long jobId, long dbId, long tableId, String tableName) throws Exception {
this.jobId = jobId;
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;

this.createTimeMs = System.currentTimeMillis();
this.jobState = JobState.WAITING_TXN;
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
}

public long getJobId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private void initAnalyzer() throws AnalysisException {
* 3. Get a new transaction id, then set job's state to WAITING_TXN
*/
@Override
protected void runPendingJob() throws AlterCancelException {
protected void runPendingJob() throws Exception {
Preconditions.checkState(jobState == JobState.PENDING, jobState);

LOG.info("begin to send create rollup replica tasks. job: {}", jobId);
Expand Down Expand Up @@ -331,8 +331,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.writeUnlock();
}

this.watershedTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
this.jobState = JobState.WAITING_TXN;

// write edit log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2891,45 +2891,50 @@ public void buildOrDeleteTableInvertedIndices(Database db, OlapTable olapTable,
throw new DdlException("Nothing is changed. please check your alter stmt.");
}

for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
long originIndexId = entry.getKey();
for (Partition partition : olapTable.getPartitions()) {
// create job
long jobId = Env.getCurrentEnv().getNextId();
IndexChangeJob indexChangeJob = new IndexChangeJob(
jobId, db.getId(), olapTable.getId(), olapTable.getName());
indexChangeJob.setOriginIndexId(originIndexId);
indexChangeJob.setAlterInvertedIndexInfo(isDropOp, alterIndexes);
long partitionId = partition.getId();
String partitionName = partition.getName();
boolean found = false;
for (Set<String> partitions : invertedIndexOnPartitions.values()) {
if (partitions.contains(partitionName)) {
found = true;
break;
try {
for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
long originIndexId = entry.getKey();
for (Partition partition : olapTable.getPartitions()) {
// create job
long jobId = Env.getCurrentEnv().getNextId();
IndexChangeJob indexChangeJob = new IndexChangeJob(
jobId, db.getId(), olapTable.getId(), olapTable.getName());
indexChangeJob.setOriginIndexId(originIndexId);
indexChangeJob.setAlterInvertedIndexInfo(isDropOp, alterIndexes);
long partitionId = partition.getId();
String partitionName = partition.getName();
boolean found = false;
for (Set<String> partitions : invertedIndexOnPartitions.values()) {
if (partitions.contains(partitionName)) {
found = true;
break;
}
}
if (!found) {
continue;
}
}
if (!found) {
continue;
}

if (hasIndexChangeJobOnPartition(originIndexId, db.getId(), olapTable.getId(),
partitionName, alterIndexes, isDropOp)) {
throw new DdlException("partition " + partitionName + " has been built specified index."
+ " please check your build stmt.");
}
if (hasIndexChangeJobOnPartition(originIndexId, db.getId(), olapTable.getId(),
partitionName, alterIndexes, isDropOp)) {
throw new DdlException("partition " + partitionName + " has been built specified index."
+ " please check your build stmt.");
}

indexChangeJob.setPartitionId(partitionId);
indexChangeJob.setPartitionName(partitionName);
indexChangeJob.setPartitionId(partitionId);
indexChangeJob.setPartitionName(partitionName);

addIndexChangeJob(indexChangeJob);
addIndexChangeJob(indexChangeJob);

// write edit log
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
LOG.info("finish create table's inverted index job. table: {}, partition: {}, job: {}",
olapTable.getName(), partitionName, jobId);
} // end for partition
} // end for index
// write edit log
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
LOG.info("finish create table's inverted index job. table: {}, partition: {}, job: {}",
olapTable.getName(), partitionName, jobId);
} // end for partition
} // end for index
} catch (Exception e) {
LOG.warn("Exception:", e);
throw new UserException(e.getMessage());
}
}

public boolean hasIndexChangeJobOnPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private void pruneMeta() {
* 3. Get a new transaction id, then set job's state to WAITING_TXN
*/
@Override
protected void runPendingJob() throws AlterCancelException {
protected void runPendingJob() throws Exception {
Preconditions.checkState(jobState == JobState.PENDING, jobState);
LOG.info("begin to send create replica tasks. job: {}", jobId);
Database db = Env.getCurrentInternalCatalog()
Expand Down Expand Up @@ -343,8 +343,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.writeUnlock();
}

this.watershedTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
this.jobState = JobState.WAITING_TXN;

// write edit log
Expand Down
21 changes: 17 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.doris.clone.TabletChecker;
import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.clone.TabletSchedulerStat;
import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -270,6 +271,7 @@
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.PublishVersionDaemon;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -441,7 +443,7 @@ public class Env {
private BrokerMgr brokerMgr;
private ResourceMgr resourceMgr;

private GlobalTransactionMgr globalTransactionMgr;
private GlobalTransactionMgrIface globalTransactionMgr;

private DeployManager deployManager;

Expand Down Expand Up @@ -688,7 +690,11 @@ public Env(boolean isCheckpointCatalog) {
this.brokerMgr = new BrokerMgr();
this.resourceMgr = new ResourceMgr();

this.globalTransactionMgr = new GlobalTransactionMgr(this);
if (Config.isCloudMode()) {
this.globalTransactionMgr = new CloudGlobalTransactionMgr();
} else {
this.globalTransactionMgr = new GlobalTransactionMgr(this);
}

this.tabletStatMgr = new TabletStatMgr();

Expand Down Expand Up @@ -795,11 +801,11 @@ public ResourceMgr getResourceMgr() {
return resourceMgr;
}

public static GlobalTransactionMgr getCurrentGlobalTransactionMgr() {
public static GlobalTransactionMgrIface getCurrentGlobalTransactionMgr() {
return getCurrentEnv().globalTransactionMgr;
}

public GlobalTransactionMgr getGlobalTransactionMgr() {
public GlobalTransactionMgrIface getGlobalTransactionMgr() {
return globalTransactionMgr;
}

Expand Down Expand Up @@ -2044,6 +2050,10 @@ public long loadAuth(DataInputStream dis, long checksum) throws IOException {
}

public long loadTransactionState(DataInputStream dis, long checksum) throws IOException {
if (Config.isCloudMode()) {
//for CloudGlobalTransactionMgr do nothing.
return checksum;
}
int size = dis.readInt();
long newChecksum = checksum ^ size;
globalTransactionMgr.readFields(dis);
Expand Down Expand Up @@ -2358,6 +2368,9 @@ public long saveAuth(CountingDataOutputStream dos, long checksum) throws IOExcep
}

public long saveTransactionState(CountingDataOutputStream dos, long checksum) throws IOException {
if (Config.isCloudMode()) {
return checksum;
}
int size = globalTransactionMgr.getTransactionNum();
checksum ^= size;
dos.writeInt(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.doris.thrift.TTablet;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTabletMetaInfo;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.PartitionCommitInfo;
import org.apache.doris.transaction.TableCommitInfo;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -232,7 +232,7 @@ && isLocal(tabletMeta.getStorageMedium())) {
// check if should clear transactions
if (backendTabletInfo.isSetTransactionIds()) {
List<Long> transactionIds = backendTabletInfo.getTransactionIds();
GlobalTransactionMgr transactionMgr = Env.getCurrentGlobalTransactionMgr();
GlobalTransactionMgrIface transactionMgr = Env.getCurrentGlobalTransactionMgr();
for (Long transactionId : transactionIds) {
TransactionState transactionState
= transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
replica.setNeedFurtherRepair(true);
try {
long furtherRepairWatermarkTxnTd = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
.getNextTransactionId();
replica.setFurtherRepairWatermarkTxnTd(furtherRepairWatermarkTxnTd);
LOG.info("new replica {} of tablet {} set further repair watermark id {}",
replica, tabletId, furtherRepairWatermarkTxnTd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,41 +1126,36 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx,
LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason);
}
try {
long preWatermarkTxnId = replica.getPreWatermarkTxnId();
if (preWatermarkTxnId == -1) {
preWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
replica.setPreWatermarkTxnId(preWatermarkTxnId);
LOG.info("set decommission replica {} on backend {} of tablet {} pre watermark txn id {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), preWatermarkTxnId);
}

long preWatermarkTxnId = replica.getPreWatermarkTxnId();
if (preWatermarkTxnId == -1) {
preWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
replica.setPreWatermarkTxnId(preWatermarkTxnId);
LOG.info("set decommission replica {} on backend {} of tablet {} pre watermark txn id {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), preWatermarkTxnId);
}

long postWatermarkTxnId = replica.getPostWatermarkTxnId();
if (postWatermarkTxnId == -1) {
try {
long postWatermarkTxnId = replica.getPostWatermarkTxnId();
if (postWatermarkTxnId == -1) {
if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(preWatermarkTxnId,
tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) {
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"wait txn before pre watermark txn " + preWatermarkTxnId + " to be finished");
}
} catch (AnalysisException e) {
throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
postWatermarkTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();

replica.setPostWatermarkTxnId(postWatermarkTxnId);
LOG.info("set decommission replica {} on backend {} of tablet {} post watermark txn id {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), postWatermarkTxnId);
}
postWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
replica.setPostWatermarkTxnId(postWatermarkTxnId);
LOG.info("set decommission replica {} on backend {} of tablet {} post watermark txn id {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), postWatermarkTxnId);
}

try {
if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId,
tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) {
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"wait txn before post watermark txn " + postWatermarkTxnId + " to be finished");
}
} catch (AnalysisException e) {
} catch (Exception e) {
throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
}
}
Expand Down
Loading

0 comments on commit 8fa3ffa

Please sign in to comment.