From 9187f1ba34824b76358ef2493ef80693da36400e Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Mon, 9 Sep 2024 15:19:01 -0400 Subject: [PATCH] Feature/track run (#302) * Upgraded trackRun feature to include status on Info table & also fixed rerun of uninitialized jobs * Added release notes * Added trace log --- RELEASE.md | 3 + .../TargetUpsertRunDetailsStatement.java | 63 +++++++++++---- .../com/datastax/cdm/data/DataUtility.java | 14 +++- .../com/datastax/cdm/feature/TrackRun.java | 7 +- .../cdm/job/RunNotStartedException.java | 26 ++++++ .../datastax/cdm/job/BasePartitionJob.scala | 6 +- .../TargetUpsertRunDetailsStatementTest.java | 79 +++++++++++++++++++ .../datastax/cdm/data/DataUtilityTest.java | 17 +++- .../datastax/cdm/feature/TrackRunTest.java | 2 +- 9 files changed, 193 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/datastax/cdm/job/RunNotStartedException.java create mode 100644 src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java diff --git a/RELEASE.md b/RELEASE.md index ae0b9429..ec7f7229 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,7 @@ # Release Notes +## [4.3.8] - 2024-09-09 +- Upgraded `spark.cdm.trackRun` feature to include `status` on `cdm_run_info` table. Also improved the code to handle rerun of previous run which may have exited before being correctly initialized. + ## [4.3.7] - 2024-09-03 - Added property `spark.cdm.transform.custom.ttl` to allow a custom constant value to be set for TTL instead of using the values from `origin` rows. - Repo wide code formating & imports organization diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 13d5f337..5bb57ff0 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -20,13 +20,18 @@ import java.util.ArrayList; import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datastax.cdm.feature.TrackRun; import com.datastax.cdm.feature.TrackRun.RUN_TYPE; +import com.datastax.cdm.job.RunNotStartedException; import com.datastax.cdm.job.SplitPartitions; import com.datastax.cdm.job.SplitPartitions.Partition; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; public class TargetUpsertRunDetailsStatement { private CqlSession session; @@ -36,45 +41,71 @@ public class TargetUpsertRunDetailsStatement { private long prevRunId; private BoundStatement boundInitInfoStatement; private BoundStatement boundInitStatement; - private BoundStatement boundUpdateInfoStatement; + private BoundStatement boundEndInfoStatement; private BoundStatement boundUpdateStatement; private BoundStatement boundUpdateStartStatement; + private BoundStatement boundSelectInfoStatement; private BoundStatement boundSelectStatement; + public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); + public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) { this.session = session; String[] ksTab = keyspaceTable.split("\\."); + if (ksTab.length != 2) { + throw new RuntimeException("Invalid keyspace.table format: " + keyspaceTable); + } this.keyspaceName = ksTab[0]; this.tableName = ksTab[1]; String cdmKsTabInfo = this.keyspaceName + ".cdm_run_info"; String cdmKsTabDetails = this.keyspaceName + ".cdm_run_details"; this.session.execute("create table if not exists " + cdmKsTabInfo - + " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, primary key (table_name, run_id))"); - this.session.execute("create table if not exists " + cdmKsTabDetails - + " (table_name text, run_id bigint, start_time timestamp, token_min bigint, token_max bigint, status text, primary key ((table_name, run_id), token_min))"); + + " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, status text, primary key (table_name, run_id))"); + + // TODO: Remove this code block after a few releases, its only added for backward compatibility + try { + this.session.execute("alter table " + cdmKsTabInfo + " add status text"); + } catch (Exception e) { + // ignore if column already exists + logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo); + } boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo - + " (table_name, run_id, run_type, prev_run_id, start_time) VALUES (?, ?, ?, ?, dateof(now()))"); + + " (table_name, run_id, run_type, prev_run_id, start_time, status) VALUES (?, ?, ?, ?, dateof(now()), ?)"); boundInitStatement = bindStatement("INSERT INTO " + cdmKsTabDetails + " (table_name, run_id, token_min, token_max, status) VALUES (?, ?, ?, ?, ?)"); - boundUpdateInfoStatement = bindStatement("UPDATE " + cdmKsTabInfo - + " SET end_time = dateof(now()), run_info = ? WHERE table_name = ? AND run_id = ?"); + boundEndInfoStatement = bindStatement("UPDATE " + cdmKsTabInfo + + " SET end_time = dateof(now()), run_info = ?, status = ? WHERE table_name = ? AND run_id = ?"); boundUpdateStatement = bindStatement( "UPDATE " + cdmKsTabDetails + " SET status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?"); boundUpdateStartStatement = bindStatement("UPDATE " + cdmKsTabDetails + " SET start_time = dateof(now()), status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?"); + boundSelectInfoStatement = bindStatement( + "SELECT status FROM " + cdmKsTabInfo + " WHERE table_name = ? AND run_id = ?"); boundSelectStatement = bindStatement("SELECT token_min, token_max FROM " + cdmKsTabDetails + " WHERE table_name = ? AND run_id = ? and status in ('NOT_STARTED', 'STARTED', 'FAIL', 'DIFF') ALLOW FILTERING"); } - public Collection getPendingPartitions(long prevRunId) { + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { this.prevRunId = prevRunId; + final Collection pendingParts = new ArrayList(); if (prevRunId == 0) { - return new ArrayList(); + return pendingParts; + } + + ResultSet rsInfo = session + .execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", prevRunId)); + Row cdmRunStatus = rsInfo.one(); + if (cdmRunStatus == null) { + return pendingParts; + } else { + String status = cdmRunStatus.getString("status"); + if (TrackRun.RUN_STATUS.NOT_STARTED.toString().equals(status)) { + throw new RunNotStartedException("Run not started for run_id: " + prevRunId); + } } - final Collection pendingParts = new ArrayList(); ResultSet rs = session .execute(boundSelectStatement.setString("table_name", tableName).setLong("run_id", prevRunId)); rs.forEach(row -> { @@ -89,8 +120,12 @@ public Collection getPendingPartitions(long prevRunId public long initCdmRun(Collection parts, RUN_TYPE runType) { runId = System.currentTimeMillis(); session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId) - .setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)); + .setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId) + .setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString())); parts.forEach(part -> initCdmRun(part)); + session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId) + .setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId) + .setString("status", TrackRun.RUN_STATUS.STARTED.toString())); return runId; } @@ -101,9 +136,9 @@ private void initCdmRun(Partition partition) { .setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString())); } - public void updateCdmRunInfo(String runInfo) { - session.execute(boundUpdateInfoStatement.setString("table_name", tableName).setLong("run_id", runId) - .setString("run_info", runInfo)); + public void endCdmRun(String runInfo) { + session.execute(boundEndInfoStatement.setString("table_name", tableName).setLong("run_id", runId) + .setString("run_info", runInfo).setString("status", TrackRun.RUN_STATUS.ENDED.toString())); } public void updateCdmRun(BigInteger min, TrackRun.RUN_STATUS status) { diff --git a/src/main/java/com/datastax/cdm/data/DataUtility.java b/src/main/java/com/datastax/cdm/data/DataUtility.java index 87a475c6..7ce2cbd4 100644 --- a/src/main/java/com/datastax/cdm/data/DataUtility.java +++ b/src/main/java/com/datastax/cdm/data/DataUtility.java @@ -131,10 +131,16 @@ public static String getMyClassMethodLine(Exception e) { break; } } - String className = targetStackTraceElement.getClassName(); - String methodName = targetStackTraceElement.getMethodName(); - int lineNumber = targetStackTraceElement.getLineNumber(); + if (null == targetStackTraceElement && null != stackTraceElements && stackTraceElements.length > 0) { + targetStackTraceElement = stackTraceElements[0]; + } + if (null != targetStackTraceElement) { + String className = targetStackTraceElement.getClassName(); + String methodName = targetStackTraceElement.getMethodName(); + int lineNumber = targetStackTraceElement.getLineNumber(); + return className + "." + methodName + ":" + lineNumber; + } - return className + "." + methodName + ":" + lineNumber; + return "Unknown"; } } diff --git a/src/main/java/com/datastax/cdm/feature/TrackRun.java b/src/main/java/com/datastax/cdm/feature/TrackRun.java index b0a4aa84..a94dfc03 100644 --- a/src/main/java/com/datastax/cdm/feature/TrackRun.java +++ b/src/main/java/com/datastax/cdm/feature/TrackRun.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement; +import com.datastax.cdm.job.RunNotStartedException; import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; @@ -31,7 +32,7 @@ public enum RUN_TYPE { } public enum RUN_STATUS { - NOT_STARTED, STARTED, PASS, FAIL, DIFF + NOT_STARTED, STARTED, PASS, FAIL, DIFF, ENDED } public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); @@ -41,7 +42,7 @@ public TrackRun(CqlSession session, String keyspaceTable) { this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable); } - public Collection getPendingPartitions(long prevRunId) { + public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { Collection pendingParts = runStatement.getPendingPartitions(prevRunId); logger.info("###################### {} partitions pending from previous run id {} ######################", pendingParts.size(), prevRunId); @@ -60,6 +61,6 @@ public void updateCdmRun(BigInteger min, RUN_STATUS status) { } public void endCdmRun(String runInfo) { - runStatement.updateCdmRunInfo(runInfo); + runStatement.endCdmRun(runInfo); } } diff --git a/src/main/java/com/datastax/cdm/job/RunNotStartedException.java b/src/main/java/com/datastax/cdm/job/RunNotStartedException.java new file mode 100644 index 00000000..ed590e7f --- /dev/null +++ b/src/main/java/com/datastax/cdm/job/RunNotStartedException.java @@ -0,0 +1,26 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.job; + +public class RunNotStartedException extends Exception { + + private static final long serialVersionUID = -4108800389847708120L; + + public RunNotStartedException(String message) { + super(message); + } + +} diff --git a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala index 411cecd5..592157ee 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala @@ -38,7 +38,11 @@ abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] { } if (prevRunId != 0) { - trackRunFeature.getPendingPartitions(prevRunId) + try { + trackRunFeature.getPendingPartitions(prevRunId) + } catch { + case e: RunNotStartedException => SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent) + } } else { SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent) } diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java new file mode 100644 index 00000000..a3418ecf --- /dev/null +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java @@ -0,0 +1,79 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.cql.statement; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import com.datastax.cdm.cql.CommonMocks; +import com.datastax.cdm.job.RunNotStartedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.type.DataTypes; + +public class TargetUpsertRunDetailsStatementTest extends CommonMocks { + @Mock + PreparedStatement preparedStatement; + + @Mock + CqlSession cqlSession; + + @Mock + ResultSet rs; + + @Mock + Row row; + + @Mock + BoundStatement bStatement; + + TargetUpsertRunDetailsStatement targetUpsertRunDetailsStatement; + + @BeforeEach + public void setup() { + // UPDATE is needed by counters, though the class should handle non-counter updates + commonSetup(false, false, true); + when(cqlSession.prepare(anyString())).thenReturn(preparedStatement); + when(preparedStatement.bind(any())).thenReturn(bStatement); + when(cqlSession.execute(bStatement)).thenReturn(rs); + when(rs.all()).thenReturn(List.of(row)); + + } + + @Test + public void init() throws RunNotStartedException { + targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); + assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(0)); + } + + @Test + public void incorrectKsTable() throws RunNotStartedException { + assertThrows(RuntimeException.class, () -> new TargetUpsertRunDetailsStatement(cqlSession, "table1")); + } + +} diff --git a/src/test/java/com/datastax/cdm/data/DataUtilityTest.java b/src/test/java/com/datastax/cdm/data/DataUtilityTest.java index 6c116c72..509223c5 100644 --- a/src/test/java/com/datastax/cdm/data/DataUtilityTest.java +++ b/src/test/java/com/datastax/cdm/data/DataUtilityTest.java @@ -137,11 +137,26 @@ public void extractObjectsFromCollectionTest() { } @Test - public void getMyClassMethodLineTest() { + public void getMyClassMethodLineTestCDMClass() { Exception ex = new Exception(); ex.setStackTrace(new StackTraceElement[] { new StackTraceElement("com.datastax.cdm.data.DataUtilityTest", "getMyClassMethodLineTest", "DataUtilityTest.java", 0) }); assertEquals("com.datastax.cdm.data.DataUtilityTest.getMyClassMethodLineTest:0", DataUtility.getMyClassMethodLine(ex)); } + + @Test + public void getMyClassMethodLineTestOtherClass() { + Exception ex = new Exception(); + ex.setStackTrace(new StackTraceElement[] { new StackTraceElement("com.datastax.other.SomeClass", + "getMyClassMethodLineTest", "SomeClass.java", 0) }); + assertEquals("com.datastax.other.SomeClass.getMyClassMethodLineTest:0", DataUtility.getMyClassMethodLine(ex)); + } + + @Test + public void getMyClassMethodLineTestUnknown() { + Exception ex = new Exception(); + ex.setStackTrace(new StackTraceElement[] {}); + assertEquals("Unknown", DataUtility.getMyClassMethodLine(ex)); + } } diff --git a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java index 248cf6d6..ac597d7a 100644 --- a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java +++ b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java @@ -27,7 +27,7 @@ void test() { assertEquals("DIFF_DATA", TrackRun.RUN_TYPE.DIFF_DATA.name()); assertEquals(2, TrackRun.RUN_TYPE.values().length); - assertEquals(5, TrackRun.RUN_STATUS.values().length); + assertEquals(6, TrackRun.RUN_STATUS.values().length); } }