diff --git a/gobblin-core/src/main/java/gobblin/source/extractor/partition/Partitioner.java b/gobblin-core/src/main/java/gobblin/source/extractor/partition/Partitioner.java index 8b70469b14a..c613c6b9122 100644 --- a/gobblin-core/src/main/java/gobblin/source/extractor/partition/Partitioner.java +++ b/gobblin-core/src/main/java/gobblin/source/extractor/partition/Partitioner.java @@ -119,7 +119,8 @@ private long getLowWatermark(ExtractType extractType, WatermarkType watermarkTyp int deltaForNextWatermark) { long lowWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE; if (this.isFullDump() || this.isWatermarkOverride()) { - lowWatermark = Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE)); + lowWatermark = Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE), + this.state.getProp(ConfigurationKeys.SOURCE_TIMEZONE)); LOG.info("Overriding low water mark with the given start value: " + lowWatermark); } else { if (this.isSnapshot(extractType)) { @@ -157,7 +158,7 @@ private long getSnapshotLowWatermark(WatermarkType watermarkType, long previousW } else { // if previous watermark is not found, override with the start value(irrespective of source.is.watermark.override flag) long startValue = - Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE)); + Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE), timeZone); LOG.info("Overriding low water mark with the given start value: " + startValue); return startValue; } @@ -184,7 +185,7 @@ private long getAppendLowWatermark(WatermarkType watermarkType, long previousWat } } else { LOG.info("Overriding low water mark with start value: " + ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE); - return Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE)); + return Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE), timeZone); } } diff --git a/gobblin-core/src/main/java/gobblin/source/extractor/utils/Utils.java b/gobblin-core/src/main/java/gobblin/source/extractor/utils/Utils.java index 2ac908eb4cf..942158ec90d 100644 --- a/gobblin-core/src/main/java/gobblin/source/extractor/utils/Utils.java +++ b/gobblin-core/src/main/java/gobblin/source/extractor/utils/Utils.java @@ -44,8 +44,7 @@ public class Utils { private static final String CURRENT_DAY = "CURRENTDAY"; private static final String CURRENT_HOUR = "CURRENTHOUR"; - private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmss").withZone( - DateTimeZone.forID(ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE)); + private static final String CURRENT_DATE_FORMAT = "yyyyMMddHHmmss"; public static String getClause(String clause, String datePredicate) { String retStr = ""; @@ -227,21 +226,23 @@ public static String escapeSpecialCharacters(String columnName, String escapeCha /** * Helper method for getting a value containing CURRENTDAY-1 or CURRENTHOUR-1 in the form yyyyMMddHHmmss * @param value + * @param timezone * @return */ - public static long getLongWithCurrentDate(String value) { + public static long getLongWithCurrentDate(String value, String timezone) { if (Strings.isNullOrEmpty(value)) { return 0; } - DateTime time = new DateTime(DateTimeZone.forID(ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE)); + DateTime time = getCurrentTime(timezone); + DateTimeFormatter dtFormatter = DateTimeFormat.forPattern(CURRENT_DATE_FORMAT).withZone(time.getZone()); if (value.toUpperCase().startsWith(CURRENT_DAY)) { return Long - .valueOf(DATE_FORMATTER.print(time.minusDays(Integer.parseInt(value.substring(CURRENT_DAY.length() + 1))))); + .valueOf(dtFormatter.print(time.minusDays(Integer.parseInt(value.substring(CURRENT_DAY.length() + 1))))); } if (value.toUpperCase().startsWith(CURRENT_HOUR)) { return Long - .valueOf(DATE_FORMATTER.print(time.minusHours(Integer.parseInt(value.substring(CURRENT_HOUR.length() + 1))))); + .valueOf(dtFormatter.print(time.minusHours(Integer.parseInt(value.substring(CURRENT_HOUR.length() + 1))))); } return Long.parseLong(value); }