Skip to content

Commit

Permalink
Implemented support for custom runId (#305)
Browse files Browse the repository at this point in the history
* Implemented support for custom `runId`

* Apply suggestions from code review

* Review comments

---------

Co-authored-by: Madhavan <msmygit@users.noreply.github.com>
  • Loading branch information
pravinbhat and msmygit authored Sep 12, 2024
1 parent 8ca2666 commit e6f3811
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 59 deletions.
5 changes: 4 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Release Notes
## [4.3.10] - 2024-09-12
- Added property `spark.cdm.trackRun.runId` to support a custom unique identifier for the current run. This can be used by wrapper scripts to pass a known `runId` and then use it to query the `cdm_run_info` and `cdm_run_details` tables.

## [4.3.9] - 2024-09-11
- Added new `status` value of `DIFF_CORRECTED` on `cdm_run_details` table to specifically mark partitions that were corrected during the CDM validation run.
- Upgraded Validation job skip partitions with `DIFF_CORRECTED` status on rerun with a previous `runId`.
- Upgraded Validation job to skip partitions with `DIFF_CORRECTED` status on rerun with a previous `runId`.

## [4.3.8] - 2024-09-09
- Upgraded `spark.cdm.trackRun` feature to include `status` on `cdm_run_info` table. Also improved the code to handle rerun of previous run which may have exited before being correctly initialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class TargetUpsertRunDetailsStatement {
private CqlSession session;
private String keyspaceName;
private String tableName;
private long runId;
private long prevRunId;
private BoundStatement boundInitInfoStatement;
private BoundStatement boundInitStatement;
private BoundStatement boundEndInfoStatement;
Expand All @@ -60,16 +58,18 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
String cdmKsTabInfo = this.keyspaceName + ".cdm_run_info";
String cdmKsTabDetails = this.keyspaceName + ".cdm_run_details";

this.session.execute("create table if not exists " + cdmKsTabInfo
+ " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, status text, primary key (table_name, run_id))");
this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabInfo
+ " (table_name TEXT, run_id BIGINT, run_type TEXT, prev_run_id BIGINT, start_time TIMESTAMP, end_time TIMESTAMP, run_info TEXT, status TEXT, PRIMARY KEY (table_name, run_id))");

// TODO: Remove this code block after a few releases, its only added for backward compatibility
try {
this.session.execute("alter table " + cdmKsTabInfo + " add status text");
this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT");
} catch (Exception e) {
// ignore if column already exists
logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo);
}
this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabDetails
+ " (table_name TEXT, run_id BIGINT, start_time TIMESTAMP, token_min BIGINT, token_max BIGINT, status TEXT, PRIMARY KEY ((table_name, run_id), token_min))");

boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
+ " (table_name, run_id, run_type, prev_run_id, start_time, status) VALUES (?, ?, ?, ?, dateof(now()), ?)");
Expand All @@ -88,7 +88,6 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
}

public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
this.prevRunId = prevRunId;
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
if (prevRunId == 0) {
return pendingParts;
Expand Down Expand Up @@ -117,31 +116,34 @@ public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId
return pendingParts;
}

public long initCdmRun(Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
runId = System.currentTimeMillis();
public void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
ResultSet rsInfo = session
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId));
if (null != rsInfo.one()) {
throw new RuntimeException("Run id " + runId + " already exists for table " + tableName);
}
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
parts.forEach(part -> initCdmRun(part));
parts.forEach(part -> initCdmRun(runId, part));
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.STARTED.toString()));
return runId;
}

private void initCdmRun(Partition partition) {
private void initCdmRun(long runId, Partition partition) {
session.execute(boundInitStatement.setString("table_name", tableName).setLong("run_id", runId)
.setLong("token_min", partition.getMin().longValue())
.setLong("token_max", partition.getMax().longValue())
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
}

public void endCdmRun(String runInfo) {
public void endCdmRun(long runId, String runInfo) {
session.execute(boundEndInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_info", runInfo).setString("status", TrackRun.RUN_STATUS.ENDED.toString()));
}

public void updateCdmRun(BigInteger min, TrackRun.RUN_STATUS status) {
public void updateCdmRun(long runId, BigInteger min, TrackRun.RUN_STATUS status) {
if (TrackRun.RUN_STATUS.STARTED.equals(status)) {
session.execute(boundUpdateStartStatement.setString("table_name", tableName).setLong("run_id", runId)
.setLong("token_min", min.longValue()).setString("status", status.toString()));
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/com/datastax/cdm/feature/TrackRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,16 @@ public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId
return pendingParts;
}

public long initCdmRun(Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
long runId = runStatement.initCdmRun(parts, runType);
public void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
runStatement.initCdmRun(runId, prevRunId, parts, runType);
logger.info("###################### Run Id for this job is: {} ######################", runId);

return runId;
}

public void updateCdmRun(BigInteger min, RUN_STATUS status) {
runStatement.updateCdmRun(min, status);
public void updateCdmRun(long runId, BigInteger min, RUN_STATUS status) {
runStatement.updateCdmRun(runId, min, status);
}

public void endCdmRun(String runInfo) {
runStatement.endCdmRun(runInfo);
public void endCdmRun(long runId, String runInfo) {
runStatement.endCdmRun(runId, runInfo);
}
}
10 changes: 8 additions & 2 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected JobCounter jobCounter;
protected Long printStatsAfter;
protected TrackRun trackRunFeature;
protected long runId;

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
Expand Down Expand Up @@ -102,12 +103,17 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,

public abstract void processSlice(T slice);

public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {
public synchronized void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts,
TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) {
this.runId = runId;
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(runId, prevRunId, parts, runType);
}

public synchronized void printCounts(boolean isFinal) {
if (isFinal) {
jobCounter.printFinal(trackRunFeature);
jobCounter.printFinal(runId, trackRunFeature);
} else {
jobCounter.printProgress();
}
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,11 @@ public void processSlice(SplitPartitions.Partition slice) {
this.getDataAndInsert(slice.getMin(), slice.getMax());
}

public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.MIGRATE);
}

private void getDataAndInsert(BigInteger min, BigInteger max) {
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED);

BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
String guardrailCheck;
Expand Down Expand Up @@ -139,13 +133,13 @@ private void getDataAndInsert(BigInteger min, BigInteger max) {
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS);
} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE)
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max, e);
logger.error("Error stats " + jobCounter.getThreadCounters(false));
Expand Down
18 changes: 5 additions & 13 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -124,18 +123,11 @@ public void processSlice(SplitPartitions.Partition slice) {
this.getDataAndDiff(slice.getMin(), slice.getMax());
}

@Override
public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.DIFF_DATA);
}

private void getDataAndDiff(BigInteger min, BigInteger max) {
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED);

AtomicBoolean hasDiff = new AtomicBoolean(false);
try {
Expand Down Expand Up @@ -196,12 +188,12 @@ private void getDataAndDiff(BigInteger min, BigInteger max) {
.getCount(JobCounter.CounterType.CORRECTED_MISSING)
&& jobCounter.getCount(JobCounter.CounterType.MISMATCH) == jobCounter
.getCount(JobCounter.CounterType.CORRECTED_MISMATCH)) {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF_CORRECTED);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF_CORRECTED);
} else {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF);
}
} else if (null != trackRunFeature) {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS);
}
} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
Expand All @@ -212,7 +204,7 @@ private void getDataAndDiff(BigInteger min, BigInteger max) {
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max, e);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL);
} finally {
jobCounter.globalIncrement();
printCounts(false);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/datastax/cdm/job/JobCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected void printAndLogProgress(String message, boolean global) {
logger.info(fullMessage);
}

public void printFinal(TrackRun trackRunFeature) {
public void printFinal(long runId, TrackRun trackRunFeature) {
if (null != trackRunFeature) {
StringBuilder sb = new StringBuilder();
if (counterMap.containsKey(CounterType.READ))
Expand All @@ -202,7 +202,7 @@ public void printFinal(TrackRun trackRunFeature) {
if (counterMap.containsKey(CounterType.LARGE))
sb.append("; Large: " + counterMap.get(CounterType.LARGE).getGlobalCounter());

trackRunFeature.endCdmRun(sb.toString());
trackRunFeature.endCdmRun(runId, sb.toString());
}
logger.info("################################################################################################");
if (counterMap.containsKey(CounterType.READ))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public enum PropertyType {
public static final String AUTOCORRECT_MISMATCH = "spark.cdm.autocorrect.mismatch"; // false
public static final String AUTOCORRECT_MISSING_COUNTER = "spark.cdm.autocorrect.missing.counter"; // false
public static final String TRACK_RUN = "spark.cdm.trackRun";
public static final String RUN_ID = "spark.cdm.trackRun.runId";
public static final String PREV_RUN_ID = "spark.cdm.trackRun.previousRunId";

public static final String PERF_NUM_PARTS = "spark.cdm.perfops.numParts"; // 5000, was spark.splitSize
Expand All @@ -131,6 +132,8 @@ public enum PropertyType {
defaults.put(AUTOCORRECT_MISSING_COUNTER, "false");
types.put(TRACK_RUN, PropertyType.BOOLEAN);
defaults.put(TRACK_RUN, "false");
types.put(RUN_ID, PropertyType.NUMBER);
defaults.put(RUN_ID, "0");
types.put(PREV_RUN_ID, PropertyType.NUMBER);
defaults.put(PREV_RUN_ID, "0");

Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/com/datastax/cdm/job/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ abstract class BaseJob[T: ClassTag] extends App {
var coveragePercent: Int = _
var numSplits: Int = _
var trackRun: Boolean = _
var runId: Long = _
var prevRunId: Long = _

var parts: util.Collection[T] = _
Expand Down Expand Up @@ -80,14 +81,21 @@ abstract class BaseJob[T: ClassTag] extends App {
maxPartition = getMaxPartition(propertyHelper.getString(KnownProperties.PARTITION_MAX), hasRandomPartitioner)
coveragePercent = propertyHelper.getInteger(KnownProperties.TOKEN_COVERAGE_PERCENT)
numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS)
runId = propertyHelper.getLong(KnownProperties.RUN_ID)
prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID)
trackRun = if (0 != prevRunId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN)
trackRun = if (0 != prevRunId || 0 != runId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN)
if (trackRun == true && runId == 0) {
runId = System.nanoTime();
}

abstractLogger.info("PARAM -- Min Partition: " + minPartition)
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
abstractLogger.info("PARAM -- Number of Splits : " + numSplits)
abstractLogger.info("PARAM -- Track Run : " + trackRun)
abstractLogger.info("PARAM -- Previous RunId : " + prevRunId)
if (trackRun == true) {
abstractLogger.info("PARAM -- RunId : " + runId)
abstractLogger.info("PARAM -- Previous RunId : " + prevRunId)
}
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
this.parts = getParts(numSplits)
abstractLogger.info("PARAM Calculated -- Total Partitions: " + parts.size())
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/datastax/cdm/job/DiffData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datastax.cdm.job

import com.datastax.cdm.feature.TrackRun

object DiffData extends BasePartitionJob {
setup("Data Validation Job", new DiffJobSessionFactory())
execute()
Expand All @@ -24,7 +26,7 @@ object DiffData extends BasePartitionJob {
if (!parts.isEmpty()) {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature)));
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.DIFF_DATA)));

slices.foreach(slice => {
originConnection.withSessionDo(originSession =>
Expand Down
4 changes: 0 additions & 4 deletions src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ object GuardrailCheck extends BasePartitionJob {

protected def execute(): Unit = {
if (!parts.isEmpty()) {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature)));

slices.foreach(slice => {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/datastax/cdm/job/Migrate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datastax.cdm.job

import com.datastax.cdm.feature.TrackRun

object Migrate extends BasePartitionJob {
setup("Migrate Job", new CopyJobSessionFactory())
execute()
Expand All @@ -24,7 +26,7 @@ object Migrate extends BasePartitionJob {
if (!parts.isEmpty()) {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature)));
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.MIGRATE)));

slices.foreach(slice => {
originConnection.withSessionDo(originSession =>
Expand Down
Loading

0 comments on commit e6f3811

Please sign in to comment.