Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE 615: Changes to support "seekEpoch", "seekTimestamp" in Kafka Consumer step, removing "seek" from ConsumerCommonConfigs #619

Merged
merged 10 commits into from
Feb 14, 2024
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class ConsumerLocalConfigs {
private final String protoClassType;
private final Boolean cacheByTopic;
private final String filterByJsonPath;
private final String seekEpoch;
private final SeekTimestamp seekTimestamp;


@JsonCreator
public ConsumerLocalConfigs(
Expand All @@ -36,9 +39,11 @@ public ConsumerLocalConfigs(
@JsonProperty("pollingTime") Long pollingTime,
@JsonProperty("cacheByTopic") Boolean cacheByTopic,
@JsonProperty("filterByJsonPath") String filterByJsonPath,
@JsonProperty("seek") String seek) {
@JsonProperty("seek") String seek,
@JsonProperty("seekEpoch") String seekEpoch,
@JsonProperty("seekTimestamp") SeekTimestamp seekTimestamp) {
this.recordType = recordType;
this.protoClassType= protobufMessageClassType;
this.protoClassType = protobufMessageClassType;
this.fileDumpTo = fileDumpTo;
this.commitAsync = commitAsync;
this.commitSync = commitSync;
Expand All @@ -48,39 +53,45 @@ public ConsumerLocalConfigs(
this.cacheByTopic = cacheByTopic;
this.filterByJsonPath = filterByJsonPath;
this.seek = seek;
this.seekEpoch = seekEpoch;
this.seekTimestamp = seekTimestamp;
}


public ConsumerLocalConfigs(
String recordType,
String fileDumpTo,
Boolean commitAsync,
String recordType,
String fileDumpTo,
Boolean commitAsync,
Boolean commitSync,
Boolean showRecordsConsumed,
Integer maxNoOfRetryPollsOrTimeouts,
Long pollingTime,
Boolean cacheByTopic,
String filterByJsonPath,
String seek) {
this(recordType, null,
String seek,
String seekEpoch,
SeekTimestamp seekTimestamp) {
this(recordType, null,
fileDumpTo,
commitAsync,
commitSync,
showRecordsConsumed,
maxNoOfRetryPollsOrTimeouts,
pollingTime,
pollingTime,
cacheByTopic,
filterByJsonPath,
seek);
seek,
seekEpoch,
seekTimestamp);
}

public String getRecordType() {
return recordType != null ? recordType : RAW;
}
public String getProtoClassType() {
return protoClassType;
}

public String getProtoClassType() {
return protoClassType;
}


public String getFileDumpTo() {
Expand Down Expand Up @@ -119,6 +130,14 @@ public String getSeek() {
return seek;
}

public String getSeekEpoch() {
return seekEpoch;
}

public SeekTimestamp getSeekTimestamp() {
return seekTimestamp;
}

@JsonIgnore
public String[] getSeekTopicPartitionOffset() {
return seek.split(",");
Expand All @@ -130,7 +149,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
ConsumerLocalConfigs that = (ConsumerLocalConfigs) o;
return Objects.equals(recordType, that.recordType) &&
Objects.equals(protoClassType, that.protoClassType) &&
Objects.equals(protoClassType, that.protoClassType) &&
Objects.equals(fileDumpTo, that.fileDumpTo) &&
Objects.equals(commitAsync, that.commitAsync) &&
Objects.equals(commitSync, that.commitSync) &&
Expand All @@ -139,13 +158,14 @@ public boolean equals(Object o) {
Objects.equals(pollingTime, that.pollingTime) &&
Objects.equals(filterByJsonPath, that.filterByJsonPath) &&
Objects.equals(cacheByTopic, that.cacheByTopic) &&
Objects.equals(seek, that.seek);
Objects.equals(seek, that.seek) &&
Objects.equals(seekEpoch, that.seekEpoch);
}

@Override
public int hashCode() {

return Objects.hash(recordType, fileDumpTo, commitAsync, commitSync, showRecordsConsumed, maxNoOfRetryPollsOrTimeouts, pollingTime,cacheByTopic, filterByJsonPath, seek);
return Objects.hash(recordType, fileDumpTo, commitAsync, commitSync, showRecordsConsumed, maxNoOfRetryPollsOrTimeouts, pollingTime, cacheByTopic, filterByJsonPath, seek);
}

@Override
Expand All @@ -162,6 +182,8 @@ public String toString() {
", cacheByTopic=" + cacheByTopic +
", filterByJsonPath=" + filterByJsonPath +
", seek=" + seek +
", seekEpoch=" + seekEpoch +
", seekTimestamp=" + seekTimestamp +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.jsmart.zerocode.core.kafka.consume;


import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;


@Getter
@Builder
@ToString
@Jacksonized
public class SeekTimestamp {

private final String timestamp;
private final String format;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static java.lang.Long.parseLong;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNumeric;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.AVRO;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.DEFAULT_POLLING_TIME_MILLI_SEC;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.JSON;
Expand All @@ -17,22 +18,19 @@
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.*;
a1shadows marked this conversation as resolved.
Show resolved Hide resolved
import java.util.function.Function;
import java.util.stream.Collectors;

import com.jayway.jsonpath.JsonPath;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
Expand All @@ -41,6 +39,7 @@
import org.jsmart.zerocode.core.kafka.KafkaConstants;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigsWrap;
import org.jsmart.zerocode.core.kafka.consume.SeekTimestamp;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecords;
Expand Down Expand Up @@ -135,6 +134,31 @@ public static void validateLocalConfigs(ConsumerLocalConfigs localConfigs) {
validateCommitFlags(localCommitSync, localCommitAsync);

validateSeekConfig(localConfigs);
validateSeekToTimestamp(localConfigs);
}
}

private static void validateSeekToTimestamp(ConsumerLocalConfigs localConfigs) {
String seekToTimestamp = localConfigs.getSeekEpoch();
if (isEmpty(seekToTimestamp)) {
if (isNumeric(seekToTimestamp) && (Long.parseLong(seekToTimestamp) > System.currentTimeMillis() || Long.parseLong(seekToTimestamp) < 0L)) {
throw new RuntimeException("\n------> 'seekEpoch' is not a valid epoch/Unix timestamp");
}
if (!isEmpty(localConfigs.getSeek()) && Objects.nonNull(localConfigs.getSeekTimestamp())) {
throw new RuntimeException("Only one of 'seek', 'seekEpoch' and 'seekTimestamp' should be provided, but not both. Please fix and rerun");
}
}
if (Objects.nonNull(localConfigs.getSeekTimestamp())) {
DateFormat dateFormat = new SimpleDateFormat(localConfigs.getSeekTimestamp().getFormat());
try {
Date date = dateFormat.parse(localConfigs.getSeekTimestamp().getTimestamp());
long epochMillis = date.toInstant().toEpochMilli();
if (epochMillis > System.currentTimeMillis() || epochMillis < 0L) {
throw new RuntimeException("\n------> 'seekTimestamp' is not a valid epoch/Unix timestamp " + epochMillis);
}
} catch (ParseException e) {
throw new RuntimeException("Timestamp and format provided in 'seekTimestamp' cannot be parsed ", e);
}
}
}

Expand Down Expand Up @@ -163,7 +187,9 @@ public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consume
consumerCommon.getPollingTime(),
consumerCommon.getCacheByTopic(),
consumerCommon.getFilterByJsonPath(),
consumerCommon.getSeek());
null,
null,
null);
}

// Handle recordType
Expand All @@ -188,9 +214,6 @@ public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consume
// Handle pollingTime
String filterByJsonPath = ofNullable(consumerLocal.getFilterByJsonPath()).orElse(consumerCommon.getFilterByJsonPath());

// Handle pollingTime
String effectiveSeek = ofNullable(consumerLocal.getSeek()).orElse(consumerCommon.getSeek());

// Handle consumerCache by topic
Boolean effectiveConsumerCacheByTopic = ofNullable(consumerLocal.getCacheByTopic())
.orElse(consumerCommon.getCacheByTopic());
Expand Down Expand Up @@ -222,7 +245,9 @@ public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consume
effectivePollingTime,
effectiveConsumerCacheByTopic,
filterByJsonPath,
effectiveSeek);
consumerLocal.getSeek(),
consumerLocal.getSeekEpoch(),
consumerLocal.getSeekTimestamp());
}

public static ConsumerLocalConfigs readConsumerLocalTestProperties(String requestJsonWithConfigWrapped) {
Expand Down Expand Up @@ -378,30 +403,72 @@ public static void handleCommitSyncAsync(Consumer<Long, String> consumer,
// --------------------------------------------------------
}

public static void handleSeekOffset(ConsumerLocalConfigs effectiveLocal, Consumer consumer) {
public static void handleSeek(ConsumerLocalConfigs effectiveLocal, Consumer consumer, String topicName) {
String seek = effectiveLocal.getSeek();
if (!isEmpty(seek)) {
String[] seekParts = effectiveLocal.getSeekTopicPartitionOffset();
String topic = seekParts[0];
int partition = parseInt(seekParts[1]);
long offset = parseLong(seekParts[2]);
handleSeekByOffset(effectiveLocal, consumer);
} else if (!isEmpty(effectiveLocal.getSeekEpoch())) {
handleSeekByEpoch(Long.parseLong(effectiveLocal.getSeekEpoch()), consumer, topicName);
} else if (Objects.nonNull(effectiveLocal.getSeekTimestamp())) {
handleSeekByTimestamp(effectiveLocal.getSeekTimestamp(), consumer, topicName);
}
}

TopicPartition topicPartition = new TopicPartition(topic, partition);
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(topicPartition);
@SneakyThrows
private static void handleSeekByTimestamp(SeekTimestamp seekTimestamp, Consumer consumer, String topicName) {
if (Objects.nonNull(seekTimestamp)) {
DateFormat dateFormat = new SimpleDateFormat(seekTimestamp.getFormat());
Date date = dateFormat.parse(seekTimestamp.getTimestamp());
handleSeekByEpoch(date.toInstant().toEpochMilli(), consumer, topicName);
}
}

private static void handleSeekByEpoch(Long epoch, Consumer consumer, String topicName) {
if (Objects.nonNull(epoch)) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);

//fetch partitions on topic
List<TopicPartition> topicPartitions = partitionInfos.stream()
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());

//fetch offsets for each partition-timestamp pair
Map<TopicPartition, Long> topicPartitionTimestampMap = topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), ignore -> epoch));
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(topicPartitionTimestampMap);

//assign to fetched partitions
consumer.unsubscribe();
consumer.assign(topicPartitions);
consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());

if (offset <= -1) {
consumer.seekToEnd(topicPartitions);
consumer.seek(topicPartition, consumer.position(topicPartition) + offset);
} else {
consumer.seek(topicPartition, offset);
//seek to fetched offsets for partitions
for (Map.Entry<TopicPartition, OffsetAndTimestamp> topicOffsetEntry : topicPartitionOffsetAndTimestampMap.entrySet()) {
consumer.seek(topicOffsetEntry.getKey(), topicOffsetEntry.getValue().offset());
}
}
}

private static void handleSeekByOffset(ConsumerLocalConfigs effectiveLocal, Consumer consumer) {
String[] seekParts = effectiveLocal.getSeekTopicPartitionOffset();
String topic = seekParts[0];
int partition = parseInt(seekParts[1]);
long offset = parseLong(seekParts[2]);

TopicPartition topicPartition = new TopicPartition(topic, partition);
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(topicPartition);

consumer.unsubscribe();
consumer.assign(topicPartitions);

if (offset <= -1) {
consumer.seekToEnd(topicPartitions);
consumer.seek(topicPartition, consumer.position(topicPartition) + offset);
} else {
consumer.seek(topicPartition, offset);
}
}

private static void validateCommitFlags(Boolean commitSync, Boolean commitAsync) {
if ((commitSync != null && commitAsync != null) && commitSync == true && commitAsync == true) {
throw new RuntimeException("\n********* Both commitSync and commitAsync can not be true *********\n");
Expand Down
Loading
Loading