Skip to content

Commit

Permalink
Add fix for no range work unit (#79)
Browse files Browse the repository at this point in the history
* Add fix for no range work unit

* add MSTAGE_AUX_KEYS to all properties
  • Loading branch information
dihu-linkedin authored Nov 29, 2022
1 parent e9127ff commit 2c1911d
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public JsonObject getDefaultValue() {
SecondaryInputProperties MSTAGE_SECONDARY_INPUT = new SecondaryInputProperties("ms.secondary.input");
StringProperties MSTAGE_SECRET_MANAGER_CLASS = new StringProperties("ms.secret.manager.class", "com.linkedin.cdi.util.GobblinSecretManager");
JsonObjectProperties MSTAGE_SESSION_KEY_FIELD = new JsonObjectProperties("ms.session.key.field");
JsonObjectProperties MSTAGE_AUX_KEYS = new JsonObjectProperties("ms.aux.keys");

// default: 60 seconds, minimum: 0, maximum: -
IntegerProperties MSTAGE_SFTP_CONN_TIMEOUT_MILLIS = new IntegerProperties("ms.sftp.conn.timeout.millis", 60000);
Expand Down Expand Up @@ -360,6 +361,7 @@ protected String getValidNonblankWithDefault(State state) {
MSTAGE_SECONDARY_INPUT,
MSTAGE_SECRET_MANAGER_CLASS,
MSTAGE_SESSION_KEY_FIELD,
MSTAGE_AUX_KEYS,
MSTAGE_SFTP_CONN_TIMEOUT_MILLIS,
MSTAGE_SOURCE_DATA_CHARACTER_SET,
MSTAGE_SOURCE_FILES_PATTERN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public interface StaticConstants {
String REGEXP_HOUR_ONLY_DURATION_PATTERN = "P\\d+D(T\\d+H)(\\..*)?";

String DOC_BASE_URL = "https://github.com/linkedin/data-integration-library/blob/master/docs";
String CLEANSE_NO_RANGE_WORK_UNIT = "cleanseNoRangeWorkUnit";

String EXCEPTION_WORK_UNIT_MINIMUM = "Job requires a minimum of %s work unit(s) to proceed because ms.work.unit.min.units = %s.";
String EXCEPTION_RECORD_MINIMUM = "Work unit requires a minimum of %s record(s) to succeed because ms.work.unit.min.records = %s.";
Expand Down
17 changes: 17 additions & 0 deletions cdi-core/src/main/java/com/linkedin/cdi/keys/JobKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ public class JobKeys {
private Boolean schemaCleansingNullable = false;
private long minWorkUnits = 0;
private long minWorkUnitRecords = 0;
private JsonObject auxKeys = new JsonObject();

public void initialize(State state) {
parsePaginationFields(state);
parsePaginationInitialValues(state);
setSessionKeyField(MSTAGE_SESSION_KEY_FIELD.get(state));
setAuxKeys(MSTAGE_AUX_KEYS.get(state));
setTotalCountField(MSTAGE_TOTAL_COUNT_FIELD.get(state));
setSourceUri(MSTAGE_SOURCE_URI.get(state));
setDefaultFieldTypes(parseDefaultFieldTypes(state));
Expand Down Expand Up @@ -159,6 +161,13 @@ public String getSessionStateCondition() {
return StringUtils.EMPTY;
}

public boolean shouldCleanseNoRangeWorkUnit() {
if (auxKeys != null && auxKeys.has(CLEANSE_NO_RANGE_WORK_UNIT)) {
return auxKeys.get(CLEANSE_NO_RANGE_WORK_UNIT).getAsBoolean();
}
return false;
}

/**
* failCondition is optional in the definition
* @return failCondition if it is defined
Expand Down Expand Up @@ -549,6 +558,14 @@ public void setSessionKeyField(JsonObject sessionKeyField) {
this.sessionKeyField = sessionKeyField;
}

public JsonObject getAuxKeys() {
return auxKeys;
}

public void setAuxKeys(JsonObject auxKeys) {
this.auxKeys = auxKeys;
}

public String getTotalCountField() {
return totalCountField;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,13 @@ List<WorkUnit> generateWorkUnits(List<WatermarkDefinition> definitions, Map<Stri
ImmutablePair<Long, Long> dtPartitionModified = unitCutoffTime == -1L
? dtPartition : previousHighWatermarks.get(wuSignature).equals(dtPartition.left)
? dtPartition : new ImmutablePair<>(Long.max(unitCutoffTime, dtPartition.left), dtPartition.right);

LOG.info(String.format(MSG_WORK_UNIT_INFO, wuSignature, dtPartitionModified));
if (jobKeys.shouldCleanseNoRangeWorkUnit()
&& (long) dtPartitionModified.left == dtPartitionModified.right) {
LOG.info(String.format("Skipping no range work units with low watermark: %s, high watermark: %s",
dtPartitionModified.left, dtPartitionModified.right));
continue;
}
WorkUnit workUnit = WorkUnit.create(extract,
new WatermarkInterval(
new LongWatermark(dtPartitionModified.getLeft()),
Expand Down
11 changes: 10 additions & 1 deletion cdi-core/src/test/java/com/linkedin/cdi/keys/JobKeysTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.mockito.Mockito;
Expand Down Expand Up @@ -72,6 +71,16 @@ public void testIsSessionStateEnabled() {
Assert.assertEquals(jobKeys.getSessionStateFailCondition(), "testFailValue");
}

@Test
public void testGetCleanseNoRangeWorkUnit() {
JsonObject auxKeys = new JsonObject();
Assert.assertFalse(jobKeys.shouldCleanseNoRangeWorkUnit());
jobKeys.setAuxKeys(auxKeys);
auxKeys.addProperty("cleanseNoRangeWorkUnit", true);
Assert.assertTrue(jobKeys.shouldCleanseNoRangeWorkUnit());
}


@Test
public void testHasSourceSchema() {
JsonArray sourceSchema = SchemaBuilder.fromJsonData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,4 +685,65 @@ public void testAvoidWatermarkGoingBeyondLeftBoundary() {
Assert.assertEquals(actualWorkUnits.get(0).getLowWatermark(), expectedWorkUnit.getLowWatermark());
Assert.assertEquals(actualWorkUnits.get(0).getExpectedHighWatermark(), expectedWorkUnit.getExpectedHighWatermark());
}

@Test
public void testRemoveNoRangeWorkUnitEnabled() {
SourceState state = new SourceState();
state.setProp("ms.aux.keys", "{\"cleanseNoRangeWorkUnit\": true}");
MultistageSource<?, ?> source = new MultistageSource<>();
source.setSourceState(state);
source.jobKeys.initialize(state);

// watermark definition: define from and to date watermark
String jsonDef = "[{\"name\": \"system\",\"type\": \"datetime\", \"range\": {\"from\": \"2021-06-18\", \"to\": \"2021-06-19\"}}]";
Gson gson = new Gson();
JsonArray defArray = gson.fromJson(jsonDef, JsonArray.class);
WatermarkDefinition watermarkDefinition = new WatermarkDefinition(defArray.get(0).getAsJsonObject(),
false, WorkUnitPartitionTypes.DAILY);
List<WatermarkDefinition> definitions = ImmutableList.of(watermarkDefinition);

// previous highwatermarks: simulate state-store entry
Map<String, Long> previousHighWatermarks = Mockito.mock(HashMap.class);
when(previousHighWatermarks.containsKey(any())).thenReturn(true);
when(previousHighWatermarks.get(any())).thenReturn(
DTF_PST_TIMEZONE.parseDateTime("2021-06-19T00:00:00").getMillis());

List<WorkUnit> actualWorkUnits = source.generateWorkUnits(definitions, previousHighWatermarks);
// expected result should not contain any no range work units
Assert.assertEquals(actualWorkUnits.size(), 0);
}

@Test
public void testRemoveNoRangeWorkUnitDisabled() {
SourceState state = new SourceState();
state.setProp("ms.aux.keys", "{\"cleanseNoRangeWorkUnit\": false}");
MultistageSource<?, ?> source = new MultistageSource<>();
source.setSourceState(state);
source.jobKeys.initialize(state);

// watermark definition: define from and to date watermark
String jsonDef = "[{\"name\": \"system\",\"type\": \"datetime\", \"range\": {\"from\": \"2021-06-18\", \"to\": \"2021-06-19\"}}]";
Gson gson = new Gson();
JsonArray defArray = gson.fromJson(jsonDef, JsonArray.class);
WatermarkDefinition watermarkDefinition = new WatermarkDefinition(defArray.get(0).getAsJsonObject(),
false, WorkUnitPartitionTypes.DAILY);
List<WatermarkDefinition> definitions = ImmutableList.of(watermarkDefinition);

// previous highwatermarks: simulate state-store entry
Map<String, Long> previousHighWatermarks = Mockito.mock(HashMap.class);
when(previousHighWatermarks.containsKey(any())).thenReturn(true);
when(previousHighWatermarks.get(any())).thenReturn(
DTF_PST_TIMEZONE.parseDateTime("2021-06-19T00:00:00").getMillis());

List<WorkUnit> actualWorkUnits = source.generateWorkUnits(definitions, previousHighWatermarks);
// expected result contains no range work units
WorkUnit expectedWorkUnit = WorkUnit.create(null,
new WatermarkInterval(
new LongWatermark(DTF_PST_TIMEZONE.parseDateTime("2021-06-19T00:00:00").getMillis()),
new LongWatermark(DTF_PST_TIMEZONE.parseDateTime("2021-06-19T00:00:00").getMillis())));

Assert.assertEquals(actualWorkUnits.size(), 1);
Assert.assertEquals(actualWorkUnits.get(0).getLowWatermark(), expectedWorkUnit.getLowWatermark());
Assert.assertEquals(actualWorkUnits.get(0).getExpectedHighWatermark(), expectedWorkUnit.getExpectedHighWatermark());
}
}

0 comments on commit 2c1911d

Please sign in to comment.