Skip to content

Commit

Permalink
[fix][dingo-executor] Fix the problem of job not being released
Browse files Browse the repository at this point in the history
  • Loading branch information
githubgxll authored and guojn1 committed Nov 6, 2024
1 parent f66262e commit 16a9bbe
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import io.dingodb.common.log.LogUtils;
import io.dingodb.exec.base.IdGenerator;
import io.dingodb.exec.base.Job;
import io.dingodb.exec.base.JobManager;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.impl.IdGeneratorImpl;
import io.dingodb.exec.transaction.base.ITransaction;
Expand Down Expand Up @@ -130,23 +131,28 @@ private DingoJobVisitor(Job job, IdGenerator idGenerator, Location currentLocati
this.executeVariables = executeVariables;
}

public static void renderJob(Job job, RelNode input, Location currentLocation) {
renderJob(job, input, currentLocation, false, null, null, new ExecuteVariables());
public static void renderJob(JobManager jobManager, Job job, RelNode input, Location currentLocation) {
renderJob(jobManager, job, input, currentLocation, false, null, null, new ExecuteVariables());
}

public static void renderJob(Job job, RelNode input, Location currentLocation,
public static void renderJob(JobManager jobManager, Job job, RelNode input, Location currentLocation,
boolean checkRoot, ITransaction transaction, SqlKind kind, ExecuteVariables executeVariables) {
IdGenerator idGenerator = new IdGeneratorImpl(job.getJobId().seq);
DingoJobVisitor visitor = new DingoJobVisitor(job, idGenerator, currentLocation, transaction, kind, executeVariables);
Collection<Vertex> outputs = dingo(input).accept(visitor);
if (checkRoot && !outputs.isEmpty()) {
throw new IllegalStateException("There root of plan must be `DingoRoot`.");
try {
IdGenerator idGenerator = new IdGeneratorImpl(job.getJobId().seq);
DingoJobVisitor visitor = new DingoJobVisitor(job, idGenerator, currentLocation, transaction, kind, executeVariables);
Collection<Vertex> outputs = dingo(input).accept(visitor);
if (checkRoot && !outputs.isEmpty()) {
throw new IllegalStateException("There root of plan must be `DingoRoot`.");
}
if (transaction != null) {
transaction.setJob(job);
}

LogUtils.debug(log, "job = {}", job);
} catch (Exception e) {
jobManager.removeJob(job.getJobId());
throw new RuntimeException(e);
}
if (transaction != null) {
transaction.setJob(job);
}

LogUtils.debug(log, "job = {}", job);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testScan(String sql) throws SqlParseException {
// To job.
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, relNode, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, relNode, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(tableTestPartNum);
assertJob.task(jobSeqId, 0).location(currentLocation).operatorNum(3);
}
Expand All @@ -121,7 +121,7 @@ public void testFilterScan() throws SqlParseException {
// To job.
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, relNode, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, relNode, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(tableTestPartNum);
assertJob.task(jobSeqId, 0).location(currentLocation).operatorNum(3);
}
Expand All @@ -145,7 +145,7 @@ public void testProjectScan() throws SqlParseException {
// To job.
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, relNode, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, relNode, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(tableTestPartNum);
assertJob.task(jobSeqId, 0).location(currentLocation).operatorNum(3);
}
Expand All @@ -170,7 +170,7 @@ public void testProjectFilterScan() throws SqlParseException {
// To job.
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, relNode, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, relNode, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(tableTestPartNum);
assertJob.task(jobSeqId, 0).location(currentLocation).operatorNum(3);
}
Expand All @@ -195,7 +195,7 @@ public void testFilterScanWithParameters() throws SqlParseException {
// To job.
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, relNode, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, relNode, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(tableTestPartNum);
assertJob.task(jobSeqId, 0).location(currentLocation).operatorNum(3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testVisitTableScan() {
);
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, scan, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, scan, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(1);
/*CommonId tableId = MetaService.root()
.getSubMetaService(RootSnapshotSchema.DEFAULT_SCHEMA_NAME)
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testVisitDingoStreamingConverterNotRoot() {
);
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, converter, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, converter, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(1);
AssertTask assertTask =
assertJob.task(jobSeqId, 0).operatorNum(1).location(MockMetaServiceProvider.LOC_0).sourceNum(1);
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testVisitDingoStreamingConverterRoot() {
);
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, converter, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, converter, currentLocation);
AssertJob assertJob = Assert.job(job).taskNum(1);
//AssertTask assertTask =
// assertJob.task(jobSeqId, 0).operatorNum(1).location(MockMetaServiceProvider.LOC_0).sourceNum(1);
Expand Down Expand Up @@ -207,7 +207,7 @@ public void testVisitValues() {
);
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, values, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, values, currentLocation);
Vertex vertex = Assert.job(job)
.soleTask().location(MockMetaServiceProvider.LOC_0).operatorNum(1)
.soleSource().isA(ValuesOperator.class)
Expand Down Expand Up @@ -247,7 +247,7 @@ public void testVisitPartModify() {
);
long jobSeqId = TsoService.getDefault().tso();
Job job = jobManager.createJob(jobSeqId, jobSeqId);
DingoJobVisitor.renderJob(job, partModify, currentLocation);
DingoJobVisitor.renderJob(jobManager, job, partModify, currentLocation);
Assert.job(job).taskNum(1)
.task(jobSeqId, 0).location(MockMetaServiceProvider.LOC_0).operatorNum(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ public Meta.Signature parseQuery(
statementType == Meta.StatementType.SELECT
);
DingoJobVisitor.renderJob(
jobManager,
job,
relNode,
currentLocation,
Expand Down Expand Up @@ -515,10 +516,16 @@ public Meta.Signature parseQuery(
);
} else {
List<ColumnMetaData> metaDataList = getExplainColMeta(typeFactory);
return new ExplainSignature(metaDataList, sql, createParameterList(parasType),
return new ExplainSignature(
metaDataList,
sql,
createParameterList(parasType),
null,
cursorFactory,
statementType, relNode);
statementType,
relNode,
job.getJobId()
);
}
}
if(trace && statementType == Meta.StatementType.IS_DML){
Expand Down Expand Up @@ -685,6 +692,7 @@ public Meta.Signature retryQuery(
false
);
DingoJobVisitor.renderJob(
jobManager,
job,
relNode,
currentLocation,
Expand Down Expand Up @@ -722,7 +730,7 @@ private static void runPessimisticPrimaryKeyJob(
Integer retry = Optional.mapOrGet(DingoConfiguration.instance().find("retry", int.class), __ -> __, () -> 30);
while (retry-- > 0) {
Job job = jobManager.createJob(transaction.getStartTs(), jobSeqId, transaction.getTxnId(), dingoType);
DingoJobVisitor.renderJob(job, relNode, currentLocation, true, transaction, sqlNode.getKind(), executeVariables);
DingoJobVisitor.renderJob(jobManager, job, relNode, currentLocation, true, transaction, sqlNode.getKind(), executeVariables);
try {
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public static void removeJobInSignature(JobManager jobManager, Meta.Signature si
LogUtils.debug(log, "Job id \"{}\" found in signature, remove it.", jobId);
jobManager.removeJob(jobId);
}
} else if (signature instanceof ExplainSignature) {
CommonId jobId = ((ExplainSignature) signature).getJobId();
if (jobId != null) {
LogUtils.debug(log, "Job id \"{}\" found in signature, remove it.", jobId);
jobManager.removeJob(jobId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

package io.dingodb.driver;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.dingodb.calcite.visitor.DingoExplainVisitor;
import io.dingodb.common.CommonId;
import io.dingodb.common.mysql.Explain;
import lombok.Getter;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
Expand All @@ -33,15 +38,23 @@
public class ExplainSignature extends Meta.Signature {
RelNode relNode;

@JsonProperty("jobId")
@Getter
@JsonSerialize(using = CommonId.JacksonSerializer.class)
@JsonDeserialize(using = CommonId.JacksonDeserializer.class)
private final CommonId jobId;

public ExplainSignature(List<ColumnMetaData> columns,
String sql,
List<AvaticaParameter> parameters,
Map<String, Object> internalParameters,
Meta.CursorFactory cursorFactory,
Meta.StatementType statementType,
RelNode relNode) {
RelNode relNode,
CommonId jobId) {
super(columns, sql, parameters, internalParameters, cursorFactory, statementType);
this.relNode = relNode;
this.jobId = jobId;
}

public Iterator<Object[]> getIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public synchronized void commit(JobManager jobManager) {
long jobSeqId = TransactionManager.nextTimestamp();
job = jobManager.createJob(startTs, jobSeqId, txnId, null);
jobId.set(job.getJobId());
DingoTransactionRenderJob.renderPreWriteJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderPreWriteJob(jobManager, job, currentLocation, this, true);
// 3、run PreWrite
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Expand Down Expand Up @@ -518,7 +518,7 @@ private void cleanUpJobRun(JobManager jobManager, Location currentLocation) {
// 2、generator job、task、cleanCacheOperator
Job job = jobManager.createJob(startTs, cleanUpTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderCleanCacheJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderCleanCacheJob(jobManager, job, currentLocation, this, true);
// 3、run cleanCache
if (commitFuture != null) {
commitFuture.get();
Expand All @@ -545,7 +545,7 @@ private void cleanUpExtraDataJobRun(JobManager jobManager, Location currentLocat
// 2、generator job、task、cleanExtraDataCacheOperator
Job job = jobManager.createJob(startTs, cleanUpTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderCleanExtraDataCacheJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderCleanExtraDataCacheJob(jobManager, job, currentLocation, this, true);
// 3、run cleanCache
if (commitFuture != null) {
commitFuture.get();
Expand All @@ -570,7 +570,7 @@ private void commitJobRun(JobManager jobManager, Location currentLocation) {
// 5、generator job、task、CommitOperator
job = jobManager.createJob(startTs, commitTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderCommitJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderCommitJob(jobManager, job, currentLocation, this, true);
// 6、run Commit
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Expand Down Expand Up @@ -612,7 +612,7 @@ public synchronized void rollback(JobManager jobManager) {
// 2、generator job、task、RollBackOperator
job = jobManager.createJob(startTs, rollbackTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderRollBackJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderRollBackJob(jobManager, job, currentLocation, this, true);
// 3、run RollBack
jobManager.createIterator(job, null);
this.status = TransactionStatus.ROLLBACK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public synchronized void rollBackOptimisticCurrentJobData(JobManager jobManager)
// 2、generator job、task、rollBackOptimisticOperator
job = jobManager.createJob(startTs, rollBackTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderRollBackOptimisticData(job, currentLocation, this, true);
DingoTransactionRenderJob.renderRollBackOptimisticData(jobManager, job, currentLocation, this, true);
// 3、run RollBackOptimisticLock
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Expand Down Expand Up @@ -143,7 +143,7 @@ public synchronized void cleanOptimisticCurrentJobData(JobManager jobManager) {
// 2、generator job、task、rollBackOptimisticOperator
job = jobManager.createJob(startTs, rollBackTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderRollBackOptimisticData(job, currentLocation, this, true);
DingoTransactionRenderJob.renderRollBackOptimisticData(jobManager, job, currentLocation, this, true);
// 3、run RollBackOptimisticLock
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Expand Down Expand Up @@ -214,7 +214,7 @@ public Job createRetryJob(JobManager jobManager) {
return job;
}
public void retryRun(JobManager jobManager, Job job, Location currentLocation) {
DingoTransactionRenderJob.renderPreWriteJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderPreWriteJob(jobManager, job, currentLocation, this, true);
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Object[] next = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public synchronized void rollBackPessimisticLock(JobManager jobManager) {
// 2、generator job、task、rollBackPessimisticLockOperator
job = jobManager.createJob(startTs, rollBackTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderRollBackPessimisticLockJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderRollBackPessimisticLockJob(jobManager, job, currentLocation, this, true);
// 3、run RollBackPessimisticLock
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Expand Down Expand Up @@ -224,7 +224,7 @@ public void rollBackResidualPessimisticLock(JobManager jobManager) {
// 2、generator job、task、rollBackResidualPessimisticLock
job = jobManager.createJob(startTs, rollBackTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderRollBackResidualPessimisticLockJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderRollBackResidualPessimisticLockJob(jobManager, job, currentLocation, this, true);
// 3、run rollBackResidualPessimisticLock
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()) {
Expand Down Expand Up @@ -488,7 +488,7 @@ public synchronized void rollback(JobManager jobManager) {
// 2、generator job、task、RollBackOperator
job = jobManager.createJob(startTs, rollbackTs, txnId, null);
jobId = job.getJobId();
DingoTransactionRenderJob.renderRollBackJob(job, currentLocation, this, true);
DingoTransactionRenderJob.renderRollBackJob(jobManager, job, currentLocation, this, true);
// 3、run RollBack
Iterator<Object[]> iterator = jobManager.createIterator(job, null);
while (iterator.hasNext()){
Expand Down
Loading

0 comments on commit 16a9bbe

Please sign in to comment.