diff --git a/cdap-app-templates/cdap-etl/cdap-data-pipeline-base/src/test/java/io/cdap/cdap/datapipeline/DataPipelineTest.java b/cdap-app-templates/cdap-etl/cdap-data-pipeline-base/src/test/java/io/cdap/cdap/datapipeline/DataPipelineTest.java index 0982e3f55d37..e6776acd6818 100644 --- a/cdap-app-templates/cdap-etl/cdap-data-pipeline-base/src/test/java/io/cdap/cdap/datapipeline/DataPipelineTest.java +++ b/cdap-app-templates/cdap-etl/cdap-data-pipeline-base/src/test/java/io/cdap/cdap/datapipeline/DataPipelineTest.java @@ -154,6 +154,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -278,7 +279,7 @@ private void testActionFieldLineage(Engine engine) throws Exception { ApplicationManager appManager = deployApplication(appId, appRequest); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.startAndWaitForGoodRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); + startAndWaitForGoodRun(workflowManager); FieldLineageAdmin fieldAdmin = getFieldLineageAdmin(); @@ -420,7 +421,7 @@ private void testAlertPublisher(Engine engine) throws Exception { MockSource.writeInput(sourceTable, ImmutableList.of(record1, record2, alertRecord)); WorkflowManager manager = appManager.getWorkflowManager(SmartWorkflow.NAME); - manager.start(); + startProgram(manager); manager.waitForRun(ProgramRunStatus.COMPLETED, 3, TimeUnit.MINUTES); DataSetManager sinkTable = getDataset(sinkName); @@ -485,7 +486,7 @@ public void testExternalSparkProgramPipelines() throws Exception { ApplicationManager appManager = deployApplication(appId, appRequest); WorkflowManager manager = appManager.getWorkflowManager(SmartWorkflow.NAME); - manager.start(); + startProgram(manager); manager.waitForRun(ProgramRunStatus.COMPLETED, 3, TimeUnit.MINUTES); // check wordcount output @@ -578,7 +579,7 @@ private void runHeadTriggeringPipeline(Engine engine, String expectedValue1, Str ApplicationManager appManager = deployApplication(appId, appRequest); WorkflowManager manager = appManager.getWorkflowManager(SmartWorkflow.NAME); manager.setRuntimeArgs(runtimeArguments); - manager.start(ImmutableMap.of("logical.start.time", "0")); + startProgram(manager, ImmutableMap.of("logical.start.time", "0")); manager.waitForRun(ProgramRunStatus.COMPLETED, 3, TimeUnit.MINUTES); } @@ -693,7 +694,7 @@ private void testMacroEvaluationActionPipeline(Engine engine) throws Exception { ApplicationManager appManager = deployApplication(appId, appRequest); WorkflowManager manager = appManager.getWorkflowManager(SmartWorkflow.NAME); manager.setRuntimeArgs(runtimeArguments); - manager.start(ImmutableMap.of("logical.start.time", "0")); + startProgram(manager, ImmutableMap.of("logical.start.time", "0")); manager.waitForRun(ProgramRunStatus.COMPLETED, 3, TimeUnit.MINUTES); DataSetManager
actionTableDS = getDataset("actionTable"); @@ -781,8 +782,7 @@ private void testErrorTransform(Engine engine) throws Exception { WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); Map args = Collections.singletonMap(io.cdap.cdap.etl.common.Constants.CONSOLIDATE_STAGES, "true"); - workflowManager.startAndWaitForGoodRun(args, ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); - + startAndWaitForGoodRun(workflowManager, args); Schema flattenSchema = Schema.recordOf("erroruser", @@ -861,7 +861,7 @@ public void testPipelineWithAllActions() throws Exception { ApplicationManager appManager = deployApplication(appId, appRequest); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); DataSetManager
actionTableDS = getDataset(actionTable); @@ -951,7 +951,7 @@ private void testPipelineWithActions(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check sink @@ -1033,7 +1033,7 @@ private void testSimpleCondition(Engine engine) throws Exception { for (String branch : Arrays.asList("true", "false")) { String sink = branch.equals("true") ? trueSink : falseSink; workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition.branch.to.execute", branch)); + startProgram(workflowManager, ImmutableMap.of("condition.branch.to.execute", branch)); if (branch.equals("true")) { workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1125,7 +1125,7 @@ public void testSimpleConditionWithActions() throws Exception { = getDataset(NamespaceId.DEFAULT.dataset(source)); MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition.branch.to.execute", branch)); + startProgram(workflowManager, ImmutableMap.of("condition.branch.to.execute", branch)); if (branch.equals("true")) { workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1199,7 +1199,7 @@ public void testSimpleConditionWithMultipleInputActions() throws Exception { DataSetManager
inputManager = getDataset(NamespaceId.DEFAULT.dataset(source)); MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition.branch.to.execute", branch)); + startProgram(workflowManager, ImmutableMap.of("condition.branch.to.execute", branch)); if (branch.equals("true")) { workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1278,7 +1278,7 @@ public void testMultipleOrderedInputActions() throws Exception { DataSetManager
inputManager = getDataset(NamespaceId.DEFAULT.dataset(source)); MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition.branch.to.execute", branch)); + startProgram(workflowManager, ImmutableMap.of("condition.branch.to.execute", branch)); if (branch.equals("true")) { workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1351,8 +1351,8 @@ public void testConditionsOnBranches() throws Exception { DataSetManager
inputManager = getDataset(sourceName); MockSource.writeInput(inputManager, records); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition1.branch.to.execute", "true", - "condition2.branch.to.execute", "false")); + startProgram(workflowManager, ImmutableMap.of("condition1.branch.to.execute", "true", + "condition2.branch.to.execute", "false")); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); DataSetManager
sink1Manager = getDataset(sink1Name); @@ -1415,7 +1415,7 @@ public void testSimpleConditionWithSingleOutputAction() throws Exception { DataSetManager
inputManager = getDataset(NamespaceId.DEFAULT.dataset(source)); MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition.branch.to.execute", branch)); + startProgram(workflowManager, ImmutableMap.of("condition.branch.to.execute", branch)); if (branch.equals("true")) { workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1490,7 +1490,7 @@ public void testSimpleConditionWithMultipleOutputActions() throws Exception { DataSetManager
inputManager = getDataset(NamespaceId.DEFAULT.dataset(source)); MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition.branch.to.execute", branch)); + startProgram(workflowManager, ImmutableMap.of("condition.branch.to.execute", branch)); if (branch.equals("true")) { workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1592,8 +1592,8 @@ private void testNestedCondition(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition3.branch.to.execute", "true", - "condition4.branch.to.execute", "true")); + startProgram(workflowManager, ImmutableMap.of("condition3.branch.to.execute", "true", + "condition4.branch.to.execute", "true")); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check sink @@ -1650,7 +1650,7 @@ public void testSimpleControlOnlyDag() throws Exception { for (String branch : Arrays.asList("true", "false")) { String table = branch.equals("true") ? trueActionTable : falseActionTable; WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition.branch.to.execute", branch)); + startProgram(workflowManager, ImmutableMap.of("condition.branch.to.execute", branch)); if (branch.equals("true")) { workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1703,8 +1703,8 @@ public void testMultiConditionControlOnlyDag() throws Exception { ApplicationManager appManager = deployApplication(appId, appRequest); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition2.branch.to.execute", "true", - "condition3.branch.to.execute", "true")); + startProgram(workflowManager, ImmutableMap.of("condition2.branch.to.execute", "true", + "condition3.branch.to.execute", "true")); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); DataSetManager
actionTableDS = getDataset(actionTable); @@ -1748,8 +1748,8 @@ public void testNoConnectorsForSourceCondition() throws Exception { DataSetManager
inputManager = getDataset(NamespaceId.DEFAULT.dataset("simpleNoConnectorConditionSource")); MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(ImmutableMap.of("condition1.branch.to.execute", "true", - "condition2.branch.to.execute", "true")); + startProgram(workflowManager, ImmutableMap.of("condition1.branch.to.execute", "true", + "condition2.branch.to.execute", "true")); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); @@ -1774,7 +1774,7 @@ public void testFailureToStartIncapableProgram() throws Exception { WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); // starting the workflow should throw incapable exception as the pipeline contains incapable plugins - workflowManager.start(); + startProgram(workflowManager); // the program should fail as it has incapable plugins workflowManager.waitForRun(ProgramRunStatus.FAILED, 5, TimeUnit.MINUTES); } @@ -1806,7 +1806,7 @@ public void testSinglePhase() throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check sink @@ -1871,7 +1871,7 @@ private void testSimpleMultiSource(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordBob)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check sink @@ -1967,7 +1967,7 @@ private void testMultiSource(Engine engine) throws Exception { WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); Map args = Collections.singletonMap(io.cdap.cdap.etl.common.Constants.CONSOLIDATE_STAGES, "true"); - workflowManager.startAndWaitForGoodRun(args, ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); + startAndWaitForGoodRun(workflowManager, args); // sink1 should get records from source1 and source2 Set expected = ImmutableSet.of(recordSamuel, recordBob); @@ -2049,7 +2049,7 @@ private void testSequentialAggregators(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob, recordJane)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check output @@ -2118,7 +2118,7 @@ private void testParallelAggregators(Engine engine) throws Exception { StructuredRecord.builder(inputSchema).set("user", "john").set("item", 3L).build())); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); Schema outputSchema1 = Schema.recordOf( @@ -2199,7 +2199,7 @@ public void testPostAction() throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob, recordJane)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); DataSetManager
tokenTableManager = getDataset(NamespaceId.DEFAULT.dataset("tokenTable")); @@ -2271,7 +2271,7 @@ private void testSinglePhaseWithSparkSink() throws Exception { Map runtimeArgs = new HashMap<>(); FileSetArguments.setInputPath(runtimeArgs, "inputTexts"); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(runtimeArgs); + startProgram(workflowManager, runtimeArgs); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); DataSetManager classifiedTexts = getDataset(NaiveBayesTrainer.CLASSIFIED_TEXTS); @@ -2326,7 +2326,7 @@ private void testSinglePhaseWithSparkCompute() throws Exception { // manually trigger the pipeline WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); DataSetManager
classifiedTexts = getDataset(classifiedTextsTable); @@ -2460,7 +2460,7 @@ private void testInnerJoinWithMultiOutput(Engine engine) throws Exception { Map args = Collections.singletonMap(io.cdap.cdap.etl.common.Constants.CONSOLIDATE_STAGES, "true"); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.startAndWaitForGoodRun(args, ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); + startAndWaitForGoodRun(workflowManager, args); StructuredRecord joinRecordSamuel = StructuredRecord.builder(outSchema) .set("customer_id", "1").set("customer_name", "samuel") @@ -2589,7 +2589,7 @@ private void testOuterJoin(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordTrasCar, recordTrasPlane, recordTrasBike)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); StructuredRecord joinRecordSamuel = StructuredRecord.builder(outSchema) @@ -2735,7 +2735,7 @@ private void testMultipleJoiner(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordTrasCar, recordTrasBike, recordTrasPlane)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); StructuredRecord joinRecordSamuel = StructuredRecord.builder(outSchema2) @@ -2798,7 +2798,7 @@ private void testSecureStorePipeline(Engine engine, String prefix) throws Except Assert.assertNull(getDataset(prefix + "MockSecureSourceDataset").get()); Assert.assertNull(getDataset(prefix + "MockSecureSinkDataset").get()); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // now the datasets should exist @@ -2877,7 +2877,7 @@ private void testExternalDatasetTracking(Engine engine, boolean backwardsCompati Map args = Collections.singletonMap(io.cdap.cdap.etl.common.Constants.CONSOLIDATE_STAGES, "true"); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.startAndWaitForGoodRun(args, ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); + startAndWaitForGoodRun(workflowManager, args); List history = workflowManager.getHistory(); // there should be only one completed run Assert.assertEquals(1, history.size()); @@ -2924,7 +2924,7 @@ public void testMacrosMapReducePipeline() throws Exception { WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); workflowManager.setRuntimeArgs(runtimeArguments); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // now the datasets should exist @@ -2967,7 +2967,7 @@ public void testMacrosSparkPipeline() throws Exception { Assert.assertNull(getDataset("mockRuntimeSparkSourceDataset").get()); Assert.assertNull(getDataset("mockRuntimeSparkSinkDataset").get()); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // now the datasets should exist @@ -3009,7 +3009,7 @@ public void testNoMacroMapReduce() throws Exception { Assert.assertNotNull(getDataset("configTimeMockSinkDataset").get()); workflowManager.setRuntimeArgs(runtimeArguments); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); } @@ -3045,7 +3045,7 @@ public void testKVTableLookup() throws Exception { DataSetManager
inputTable = getDataset("inputTable"); MockSource.writeInput(inputTable, ImmutableList.of(recordSamuel, recordBob, recordJane)); - WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME).start(); + WorkflowManager workflowManager = startProgram(appManager.getWorkflowManager(SmartWorkflow.NAME)); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); Schema schema = Schema.recordOf( @@ -3149,7 +3149,7 @@ private void runPipelineForMetadata(MetadataAdmin metadataAdmin, WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); int numRuns = workflowManager.getHistory().size(); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRuns(ProgramRunStatus.COMPLETED, numRuns + 1, 5, TimeUnit.MINUTES); } @@ -3188,7 +3188,7 @@ private void testRuntimeArgs(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordDwayne)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check sink @@ -3234,7 +3234,7 @@ public void testTableLookup() throws Exception { DataSetManager
inputTable = getDataset("inputTable"); MockSource.writeInput(inputTable, ImmutableList.of(recordSamuel, recordBob, recordJane)); - WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME).start(); + WorkflowManager workflowManager = startProgram(appManager.getWorkflowManager(SmartWorkflow.NAME)); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); Schema schema = Schema.recordOf( @@ -3336,7 +3336,7 @@ private void testServiceUrl(Engine engine) throws Exception { MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob, recordJane)); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check output @@ -3405,7 +3405,7 @@ private void testSplitterToConnector(Engine engine) throws Exception { // run pipeline WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check output @@ -3490,7 +3490,7 @@ private void testSplitterToJoiner(Engine engine) throws Exception { // run pipeline WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check output @@ -3585,7 +3585,7 @@ private void testSplitterToJoinerSelectedField(Engine engine) throws Exception { // run pipeline WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.start(); + startProgram(workflowManager); workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); // check output @@ -3683,8 +3683,7 @@ public void testStageConsolidation() throws Exception { // run pipeline Map args = Collections.singletonMap(io.cdap.cdap.etl.common.Constants.CONSOLIDATE_STAGES, "true"); WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); - workflowManager.startAndWaitForGoodRun(args, ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); - + startAndWaitForGoodRun(workflowManager, args); Schema errorSchema = Schema.recordOf("erroritem", @@ -4010,4 +4009,28 @@ public void testUpgradePipelinesWithNoChangeInPluginRange() throws Exception { // Verify that after upgrade, application upgrades artifact version to latest version available. Assert.assertEquals(upgradedAppDetail.getArtifact().getVersion(), UPGRADE_APP_ARTIFACT_ID_3_SNAPSHOT.getVersion()); } + + protected Map addRuntimeArguments(Map arguments) { + return arguments; + } + private WorkflowManager startProgram(WorkflowManager workflowManager, Map runtimeArgs) { + return workflowManager.start(addRuntimeArguments(runtimeArgs)); + } + + private WorkflowManager startProgram(WorkflowManager workflowManager) { + return startProgram(workflowManager, ImmutableMap.of()); + } + + private WorkflowManager startAndWaitForGoodRun(WorkflowManager workflowManager, Map args) + throws InterruptedException, ExecutionException, TimeoutException { + return workflowManager.startAndWaitForGoodRun(addRuntimeArguments(args), + ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); + } + + private WorkflowManager startAndWaitForGoodRun(WorkflowManager workflowManager) + throws InterruptedException, ExecutionException, TimeoutException { + return startAndWaitForGoodRun(workflowManager, ImmutableMap.of()); + } + + } diff --git a/cdap-app-templates/cdap-etl/cdap-data-pipeline-base/src/test/java/io/cdap/cdap/datapipeline/DatasetDataPipelineTest.java b/cdap-app-templates/cdap-etl/cdap-data-pipeline-base/src/test/java/io/cdap/cdap/datapipeline/DatasetDataPipelineTest.java new file mode 100644 index 000000000000..fbcad3cec1c9 --- /dev/null +++ b/cdap-app-templates/cdap-etl/cdap-data-pipeline-base/src/test/java/io/cdap/cdap/datapipeline/DatasetDataPipelineTest.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.datapipeline; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.etl.common.Constants; +import java.util.Map; + +/** + * This test runs all testcases of DataPipelineTest while enforcing maximum Dataset usage + */ +public class DatasetDataPipelineTest extends DataPipelineTest{ + + @Override + protected Map addRuntimeArguments(Map arguments) { + return ImmutableMap.builder().putAll(arguments) + .put(Constants.DATASET_FORCE, "true").build(); + } +} diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/Constants.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/Constants.java index c80986474a3b..3a3e9a8165f1 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/Constants.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/Constants.java @@ -38,6 +38,12 @@ public final class Constants { public static final String CONSOLIDATE_STAGES = "spark.cdap.pipeline.consolidate.stages"; public static final String CACHE_FUNCTIONS = "spark.cdap.pipeline.functioncache.enable"; public static final String DATASET_KRYO_ENABLED = "spark.cdap.pipeline.dataset.kryo.enable"; + + /** + * Force using Datasets instead of RDDs right out of BatchSource. Should mostly + * be used for testing + */ + public static final String DATASET_FORCE = "spark.cdap.pipeline.dataset.force"; public static final String DATASET_AGGREGATE_ENABLED = "spark.cdap.pipeline.aggregate.dataset.enable"; public static final String DISABLE_ELT_PUSHDOWN = "cdap.pipeline.pushdown.disable"; public static final String DATASET_AGGREGATE_IGNORE_PARTITIONS = diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java index 0e9e06355d74..f3cc3e25e797 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java @@ -62,6 +62,7 @@ import io.cdap.cdap.etl.spark.function.JoinOnFunction; import io.cdap.cdap.etl.spark.function.PluginFunctionContext; import io.cdap.cdap.internal.io.SchemaTypeAdapter; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.SQLContext; @@ -141,15 +142,22 @@ protected SparkCollection> getSource(StageSpec stageSpec, } } - // If SQL engine is not initiated : use default spark method (RDDCollection) + // If SQL engine is not initiated : use default spark method (RDDCollection or OpaqueDatasetCollection) + boolean shouldForceDatasets = Boolean.parseBoolean( + sec.getRuntimeArguments().getOrDefault(Constants.DATASET_FORCE, Boolean.FALSE.toString())); PluginFunctionContext pluginFunctionContext = new PluginFunctionContext(stageSpec, sec, collector); FlatMapFunction, RecordInfo> sourceFunction = new BatchSourceFunction(pluginFunctionContext, functionCacheFactory.newCache()); this.functionCacheFactory = functionCacheFactory; + JavaRDD> rdd = sourceFactory + .createRDD(sec, jsc, stageSpec.getName(), Object.class, Object.class) + .flatMap(sourceFunction); + if (shouldForceDatasets) { + return OpaqueDatasetCollection.fromRdd( + rdd, sec, jsc, new SQLContext(jsc), datasetContext, sinkFactory, functionCacheFactory); + } return new RDDCollection<>(sec, functionCacheFactory, jsc, - new SQLContext(jsc), datasetContext, sinkFactory, sourceFactory - .createRDD(sec, jsc, stageSpec.getName(), Object.class, Object.class) - .flatMap(sourceFunction)); + new SQLContext(jsc), datasetContext, sinkFactory, rdd); } @Override diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DataframeCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DataframeCollection.java index ee41d255041a..5a6b0b755990 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DataframeCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DataframeCollection.java @@ -84,6 +84,11 @@ public DataframeCollection(Schema schema, Dataset dataframe, JavaSparkExecu super(sec, jsc, sqlContext, datasetContext, sinkFactory, functionCacheFactory); this.schema = Objects.requireNonNull(schema); this.dataframe = dataframe; + if (!Row.class.isAssignableFrom(dataframe.encoder().clsTag().runtimeClass())) { + throw new IllegalArgumentException( + "Dataframe collection received dataset of " + dataframe.encoder().clsTag() + .runtimeClass()); + } } /** diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java index e586cd98b088..63e393600c98 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java @@ -56,9 +56,11 @@ public abstract class DatasetCollection extends DelegatingSparkCollection implements BatchCollection{ private static final Encoder KRYO_OBJECT_ENCODER = Encoders.kryo(Object.class); + private static final Encoder KRYO_ARRAY_ENCODER = Encoders.kryo(Object[].class); private static final Encoder KRYO_TUPLE_ENCODER = Encoders.tuple( KRYO_OBJECT_ENCODER, KRYO_OBJECT_ENCODER); private static final Encoder JAVA_OBJECT_ENCODER = Encoders.javaSerialization(Object.class); + private static final Encoder JAVA_ARRAY_ENCODER = Encoders.javaSerialization(Object[].class); private static final Encoder JAVA_TUPLE_ENCODER = Encoders.tuple( JAVA_OBJECT_ENCODER, JAVA_OBJECT_ENCODER); @@ -112,6 +114,13 @@ protected static Encoder objectEncoder(boolean useKryoForDatasets) { return useKryoForDatasets ? KRYO_OBJECT_ENCODER : JAVA_OBJECT_ENCODER; } + /** + * helper function to provide a generified encoder for array of serializable type + * @param useKryoForDatasets + */ + protected static Encoder arrayEncoder(boolean useKryoForDatasets) { + return useKryoForDatasets ? KRYO_ARRAY_ENCODER : JAVA_ARRAY_ENCODER; + } /** * helper function to provide a generified encoder for tuple of two serializable types */ @@ -126,6 +135,12 @@ protected Encoder objectEncoder() { return objectEncoder(useKryoForDatasets); } + /** + * helper function to provide a generified encoder for array of serializable type + */ + protected Encoder arrayEncoder() { + return arrayEncoder(useKryoForDatasets); + } @Override public SparkCollection map(Function function) { MapFunction mapFunction = function::call; @@ -158,10 +173,25 @@ protected DatasetCollection cache(StorageLevel cacheStorageLevel) { return wrap(getDataset().persist(cacheStorageLevel)); } + private static Object[] arrayWrapper(T value) { + return value == null ? null : new Object[]{value}; + } + + private static T arrayUnWrapper(Object[] array) { + return array == null ? null : (T) array[0]; + } + @Override public SparkCollection union(SparkCollection other) { if (other instanceof DatasetCollection) { - return wrap(getDataset().unionAll(((DatasetCollection) other).getDataset())); + //We need to workaround https://issues.apache.org/jira/browse/SPARK-46176 that + //causes problems with union for Dataset[Object]. We'll wrap and unwrap value into array. + MapFunction wrapper = DatasetCollection::arrayWrapper; + MapFunction unWrapper = DatasetCollection::arrayUnWrapper; + Dataset left = getDataset().map(wrapper, arrayEncoder()); + Dataset right = ((DatasetCollection) other).getDataset() + .map(wrapper, arrayEncoder()); + return wrap(left.unionAll(right).map(unWrapper, objectEncoder())); } return super.union(other); } diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java index c74857272d33..f20a1b4d12de 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java @@ -56,6 +56,11 @@ private OpaqueDatasetCollection(Dataset dataset, FunctionCache.Factory functionCacheFactory) { super(sec, jsc, sqlContext, datasetContext, sinkFactory, functionCacheFactory); this.dataset = dataset; + if (Row.class.isAssignableFrom(dataset.encoder().clsTag().runtimeClass())) { + throw new IllegalArgumentException( + "Opaque collection received dataset of Row (" + dataset.encoder().clsTag() + .runtimeClass() + "). DataframeCollection should be used."); + } } @Override diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/function/TransformFunction.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/function/TransformFunction.java index 838b1614b02e..c64a500af586 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/function/TransformFunction.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/function/TransformFunction.java @@ -20,6 +20,7 @@ import io.cdap.cdap.etl.common.RecordInfo; import io.cdap.cdap.etl.common.TrackedTransform; import io.cdap.cdap.etl.spark.CombinedEmitter; +import java.util.concurrent.ExecutionException; import org.apache.spark.api.java.function.FlatMapFunction; import java.util.Iterator; @@ -43,15 +44,21 @@ public TransformFunction(PluginFunctionContext pluginFunctionContext, FunctionCa @Override public Iterator> call(T input) throws Exception { - if (transform == null) { - Transform plugin = pluginFunctionContext.createAndInitializePlugin(functionCache); - transform = new TrackedTransform<>(plugin, pluginFunctionContext.createStageMetrics(), - pluginFunctionContext.getDataTracer(), - pluginFunctionContext.getStageStatisticsCollector()); - emitter = new CombinedEmitter<>(pluginFunctionContext.getStageName()); + try { + if (transform == null) { + Transform plugin = pluginFunctionContext.createAndInitializePlugin( + functionCache); + transform = new TrackedTransform<>(plugin, pluginFunctionContext.createStageMetrics(), + pluginFunctionContext.getDataTracer(), + pluginFunctionContext.getStageStatisticsCollector()); + emitter = new CombinedEmitter<>(pluginFunctionContext.getStageName()); + } + emitter.reset(); + transform.transform(input, emitter); + return emitter.getEmitted().iterator(); + } catch (Exception e) { + throw new ExecutionException("Error when transforming stage " + + pluginFunctionContext.getStageName() + ": " + e, e); } - emitter.reset(); - transform.transform(input, emitter); - return emitter.getEmitted().iterator(); } }