Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE 615: Changes to support "seekEpoch", "seekTimestamp" in Kafka Consumer step, removing "seek" from ConsumerCommonConfigs #619

Merged
merged 10 commits into from
Feb 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class ConsumerLocalConfigs {
private final String protoClassType;
private final Boolean cacheByTopic;
private final String filterByJsonPath;
private final String seekEpoch;
private final SeekTimestamp seekTimestamp;


@JsonCreator
public ConsumerLocalConfigs(
Expand All @@ -36,9 +39,11 @@ public ConsumerLocalConfigs(
@JsonProperty("pollingTime") Long pollingTime,
@JsonProperty("cacheByTopic") Boolean cacheByTopic,
@JsonProperty("filterByJsonPath") String filterByJsonPath,
@JsonProperty("seek") String seek) {
@JsonProperty("seek") String seek,
@JsonProperty("seekEpoch") String seekEpoch,
@JsonProperty("seekTimestamp") SeekTimestamp seekTimestamp) {
this.recordType = recordType;
this.protoClassType= protobufMessageClassType;
this.protoClassType = protobufMessageClassType;
this.fileDumpTo = fileDumpTo;
this.commitAsync = commitAsync;
this.commitSync = commitSync;
Expand All @@ -48,39 +53,45 @@ public ConsumerLocalConfigs(
this.cacheByTopic = cacheByTopic;
this.filterByJsonPath = filterByJsonPath;
this.seek = seek;
this.seekEpoch = seekEpoch;
this.seekTimestamp = seekTimestamp;
}


public ConsumerLocalConfigs(
String recordType,
String fileDumpTo,
Boolean commitAsync,
String recordType,
String fileDumpTo,
Boolean commitAsync,
Boolean commitSync,
Boolean showRecordsConsumed,
Integer maxNoOfRetryPollsOrTimeouts,
Long pollingTime,
Boolean cacheByTopic,
String filterByJsonPath,
String seek) {
this(recordType, null,
String seek,
String seekEpoch,
SeekTimestamp seekTimestamp) {
this(recordType, null,
fileDumpTo,
commitAsync,
commitSync,
showRecordsConsumed,
maxNoOfRetryPollsOrTimeouts,
pollingTime,
pollingTime,
cacheByTopic,
filterByJsonPath,
seek);
seek,
seekEpoch,
seekTimestamp);
}

public String getRecordType() {
return recordType != null ? recordType : RAW;
}
public String getProtoClassType() {
return protoClassType;
}

public String getProtoClassType() {
return protoClassType;
}


public String getFileDumpTo() {
Expand Down Expand Up @@ -119,6 +130,14 @@ public String getSeek() {
return seek;
}

public String getSeekEpoch() {
return seekEpoch;
}

public SeekTimestamp getSeekTimestamp() {
return seekTimestamp;
}

@JsonIgnore
public String[] getSeekTopicPartitionOffset() {
return seek.split(",");
Expand All @@ -130,7 +149,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
ConsumerLocalConfigs that = (ConsumerLocalConfigs) o;
return Objects.equals(recordType, that.recordType) &&
Objects.equals(protoClassType, that.protoClassType) &&
Objects.equals(protoClassType, that.protoClassType) &&
Objects.equals(fileDumpTo, that.fileDumpTo) &&
Objects.equals(commitAsync, that.commitAsync) &&
Objects.equals(commitSync, that.commitSync) &&
Expand All @@ -139,13 +158,14 @@ public boolean equals(Object o) {
Objects.equals(pollingTime, that.pollingTime) &&
Objects.equals(filterByJsonPath, that.filterByJsonPath) &&
Objects.equals(cacheByTopic, that.cacheByTopic) &&
Objects.equals(seek, that.seek);
Objects.equals(seek, that.seek) &&
Objects.equals(seekEpoch, that.seekEpoch);
}

@Override
public int hashCode() {

return Objects.hash(recordType, fileDumpTo, commitAsync, commitSync, showRecordsConsumed, maxNoOfRetryPollsOrTimeouts, pollingTime,cacheByTopic, filterByJsonPath, seek);
return Objects.hash(recordType, fileDumpTo, commitAsync, commitSync, showRecordsConsumed, maxNoOfRetryPollsOrTimeouts, pollingTime, cacheByTopic, filterByJsonPath, seek);
}

@Override
Expand All @@ -162,6 +182,8 @@ public String toString() {
", cacheByTopic=" + cacheByTopic +
", filterByJsonPath=" + filterByJsonPath +
", seek=" + seek +
", seekEpoch=" + seekEpoch +
", seekTimestamp=" + seekTimestamp +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.jsmart.zerocode.core.kafka.consume;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class SeekTimestamp {

private final String timestamp;
private final String format;


@JsonCreator
public SeekTimestamp(
@JsonProperty("timestamp") String timestamp,
@JsonProperty("format") String format) {
this.timestamp = timestamp;
this.format = format;
}

public String getTimestamp() {
return timestamp;
}

public String getFormat() {
return format;
}

@Override
public String toString() {
return "SeekTimestamp{" +
"timestamp='" + timestamp + '\'' +
", format='" + format + '\'' +
'}';
}
}
Loading
Loading