Skip to content

Commit

Permalink
Merge pull request apache#56 from lbendig/gobblin_dev
Browse files Browse the repository at this point in the history
Calculation of low watermark should honor source.timezone
  • Loading branch information
liyinan926 committed Mar 19, 2015
2 parents b40904e + cecf4b7 commit db20876
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ private long getLowWatermark(ExtractType extractType, WatermarkType watermarkTyp
int deltaForNextWatermark) {
long lowWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
if (this.isFullDump() || this.isWatermarkOverride()) {
lowWatermark = Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE));
String timeZone = this.state.getProp(ConfigurationKeys.SOURCE_TIMEZONE, ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE);
lowWatermark = Utils.getLongWithCurrentDate(
this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE), timeZone);
LOG.info("Overriding low water mark with the given start value: " + lowWatermark);
} else {
if (this.isSnapshot(extractType)) {
Expand All @@ -141,7 +143,7 @@ private long getLowWatermark(ExtractType extractType, WatermarkType watermarkTyp
*/
private long getSnapshotLowWatermark(WatermarkType watermarkType, long previousWatermark, int deltaForNextWatermark) {
LOG.debug("Getting snapshot low water mark");
String timeZone = this.state.getProp(ConfigurationKeys.SOURCE_TIMEZONE);
String timeZone = this.state.getProp(ConfigurationKeys.SOURCE_TIMEZONE, ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE);
if (this.isPreviousWatermarkExists(previousWatermark)) {
if (this.isSimpleWatermark(watermarkType)) {
return previousWatermark + deltaForNextWatermark
Expand All @@ -157,7 +159,7 @@ private long getSnapshotLowWatermark(WatermarkType watermarkType, long previousW
} else {
// if previous watermark is not found, override with the start value(irrespective of source.is.watermark.override flag)
long startValue =
Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE));
Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE), timeZone);
LOG.info("Overriding low water mark with the given start value: " + startValue);
return startValue;
}
Expand All @@ -184,7 +186,7 @@ private long getAppendLowWatermark(WatermarkType watermarkType, long previousWat
}
} else {
LOG.info("Overriding low water mark with start value: " + ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE);
return Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE));
return Utils.getLongWithCurrentDate(this.state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_START_VALUE), timeZone);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public class Utils {
private static final String CURRENT_DAY = "CURRENTDAY";
private static final String CURRENT_HOUR = "CURRENTHOUR";

private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmss").withZone(
DateTimeZone.forID(ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE));
private static final String CURRENT_DATE_FORMAT = "yyyyMMddHHmmss";

public static String getClause(String clause, String datePredicate) {
String retStr = "";
Expand Down Expand Up @@ -227,21 +226,23 @@ public static String escapeSpecialCharacters(String columnName, String escapeCha
/**
* Helper method for getting a value containing CURRENTDAY-1 or CURRENTHOUR-1 in the form yyyyMMddHHmmss
* @param value
* @param timezone
* @return
*/
public static long getLongWithCurrentDate(String value) {
public static long getLongWithCurrentDate(String value, String timezone) {
if (Strings.isNullOrEmpty(value)) {
return 0;
}

DateTime time = new DateTime(DateTimeZone.forID(ConfigurationKeys.DEFAULT_SOURCE_TIMEZONE));
DateTime time = getCurrentTime(timezone);
DateTimeFormatter dtFormatter = DateTimeFormat.forPattern(CURRENT_DATE_FORMAT).withZone(time.getZone());
if (value.toUpperCase().startsWith(CURRENT_DAY)) {
return Long
.valueOf(DATE_FORMATTER.print(time.minusDays(Integer.parseInt(value.substring(CURRENT_DAY.length() + 1)))));
.valueOf(dtFormatter.print(time.minusDays(Integer.parseInt(value.substring(CURRENT_DAY.length() + 1)))));
}
if (value.toUpperCase().startsWith(CURRENT_HOUR)) {
return Long
.valueOf(DATE_FORMATTER.print(time.minusHours(Integer.parseInt(value.substring(CURRENT_HOUR.length() + 1)))));
.valueOf(dtFormatter.print(time.minusHours(Integer.parseInt(value.substring(CURRENT_HOUR.length() + 1)))));
}
return Long.parseLong(value);
}
Expand Down

0 comments on commit db20876

Please sign in to comment.