diff --git a/core/src/main/java/org/jsmart/zerocode/core/kafka/consume/SeekTimestamp.java b/core/src/main/java/org/jsmart/zerocode/core/kafka/consume/SeekTimestamp.java index 5ce72b408..371ad21bb 100644 --- a/core/src/main/java/org/jsmart/zerocode/core/kafka/consume/SeekTimestamp.java +++ b/core/src/main/java/org/jsmart/zerocode/core/kafka/consume/SeekTimestamp.java @@ -1,14 +1,16 @@ package org.jsmart.zerocode.core.kafka.consume; -import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Getter; import lombok.ToString; +import lombok.extern.jackson.Jacksonized; @Getter +@Builder @ToString -@AllArgsConstructor +@Jacksonized public class SeekTimestamp { private final String timestamp; diff --git a/core/src/test/java/org/jsmart/zerocode/core/kafka/consume/ConsumerLocalConfigsWrapTest.java b/core/src/test/java/org/jsmart/zerocode/core/kafka/consume/ConsumerLocalConfigsWrapTest.java index 39041a984..4c592f2aa 100644 --- a/core/src/test/java/org/jsmart/zerocode/core/kafka/consume/ConsumerLocalConfigsWrapTest.java +++ b/core/src/test/java/org/jsmart/zerocode/core/kafka/consume/ConsumerLocalConfigsWrapTest.java @@ -15,6 +15,86 @@ public class ConsumerLocalConfigsWrapTest { ObjectMapper objectMapper = new ObjectMapperProvider().get(); + @Test + public void testSerDeser_seekEpoch() throws IOException { + ConsumerLocalConfigsWrap javaObject = new ConsumerLocalConfigsWrap( + new ConsumerLocalConfigs("RAW", + "RAW:/target/ttt", + true, + null, + true, + 3, + 50L, + false, + "$.JSON.Path", + null, + String.valueOf(System.currentTimeMillis()), + null)); + ObjectMapper objectMapper = new ObjectMapperProvider().get(); + + String json = objectMapper.writeValueAsString(javaObject); + assertEquals("{\n" + + " \"consumerLocalConfigs\":\n" + + " {\n" + + " \"recordType\": \"RAW\",\n" + + " \"fileDumpTo\": \"RAW:/target/ttt\",\n" + + " \"commitAsync\": true,\n" + + " \"showRecordsConsumed\": true,\n" + + " \"maxNoOfRetryPollsOrTimeouts\": 3,\n" + + " \"pollingTime\": 50,\n" + + " \"cacheByTopic\": false,\n" + + " \"filterByJsonPath\": \"$.JSON.Path\",\n" + + " \"seekEpoch\": \"1706940293669\"\n" + + " }\n" + + "}", + json, LENIENT); + + ConsumerLocalConfigsWrap javaPojo = objectMapper.readValue(json, ConsumerLocalConfigsWrap.class); + assertThat(javaPojo, is(javaObject)); + } + + @Test + public void testSerDeser_seekTimestamp() throws IOException { + ConsumerLocalConfigsWrap javaObject = new ConsumerLocalConfigsWrap( + new ConsumerLocalConfigs("RAW", + "RAW:/target/ttt", + true, + null, + true, + 3, + 50L, + false, + "$.JSON.Path", + null, + null, + new SeekTimestamp("2024-01-29T19:35:21.959340", "yyyy-MM-dd'T'HH:mm:ss.ssssss"))); + ObjectMapper objectMapper = new ObjectMapperProvider().get(); + + String json = objectMapper.writeValueAsString(javaObject); + assertEquals("{\n" + + " \"consumerLocalConfigs\":\n" + + " {\n" + + " \"recordType\": \"RAW\",\n" + + " \"fileDumpTo\": \"RAW:/target/ttt\",\n" + + " \"commitAsync\": true,\n" + + " \"showRecordsConsumed\": true,\n" + + " \"maxNoOfRetryPollsOrTimeouts\": 3,\n" + + " \"pollingTime\": 50,\n" + + " \"cacheByTopic\": false,\n" + + " \"filterByJsonPath\": \"$.JSON.Path\",\n" + + " \"seekTimestamp\":\n" + + " {\n" + + " \"timestamp\": \"2024-01-29T19:35:21.959340\",\n" + + " \"format\": \"yyyy-MM-dd'T'HH:mm:ss.ssssss\"\n" + + " }\n" + + " }\n" + + "}", + json, LENIENT); + + ConsumerLocalConfigsWrap javaPojo = objectMapper.readValue(json, ConsumerLocalConfigsWrap.class); + assertThat(javaPojo, is(javaObject)); + } + @Test public void testSerDeser() throws IOException { ConsumerLocalConfigsWrap javaObject = new ConsumerLocalConfigsWrap( diff --git a/kafka-testing/src/main/java/org/jsmart/zerocode/zerocodejavaexec/utils/ExampleUtils.java b/kafka-testing/src/main/java/org/jsmart/zerocode/zerocodejavaexec/utils/ExampleUtils.java index 860646189..90be7b93d 100644 --- a/kafka-testing/src/main/java/org/jsmart/zerocode/zerocodejavaexec/utils/ExampleUtils.java +++ b/kafka-testing/src/main/java/org/jsmart/zerocode/zerocodejavaexec/utils/ExampleUtils.java @@ -1,5 +1,16 @@ package org.jsmart.zerocode.zerocodejavaexec.utils; +import org.jsmart.zerocode.core.kafka.consume.SeekTimestamp; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; + public class ExampleUtils { + public String seekTimestampToEpoch(SeekTimestamp seekTimestamp) throws ParseException { + DateFormat dateFormat = new SimpleDateFormat(seekTimestamp.getFormat()); + return String.valueOf(dateFormat.parse(seekTimestamp.getTimestamp()).toInstant().toEpochMilli()); + } + } diff --git a/kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/consume/KafkaConsumeSeekOffsetTest.java b/kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/consume/KafkaConsumeSeekOffsetTest.java index bb48d0f15..690be202b 100644 --- a/kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/consume/KafkaConsumeSeekOffsetTest.java +++ b/kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/consume/KafkaConsumeSeekOffsetTest.java @@ -26,4 +26,10 @@ public void testKafkaConsume_seekOffset() throws Exception { public void testKafkaConsume_seekOffsetLatest() throws Exception { } + @Test + @Scenario("kafka/consume/test_kafka_consume_seek_epoch_and_timestamp.json") + public void testKafkaConsume_seekEpochAndTimestamp() { + + } + } diff --git a/kafka-testing/src/test/resources/kafka/consume/test_kafka_consume_seek_epoch_and_timestamp.json b/kafka-testing/src/test/resources/kafka/consume/test_kafka_consume_seek_epoch_and_timestamp.json new file mode 100755 index 000000000..b71aac928 --- /dev/null +++ b/kafka-testing/src/test/resources/kafka/consume/test_kafka_consume_seek_epoch_and_timestamp.json @@ -0,0 +1,105 @@ +{ + "scenarioName": "Consume message after timestamp/epoch", + "steps": [ + { + "name": "load_kafka_before_timestamp", + "url": "kafka-topic:demo-seekTime", + "operation": "PRODUCE", + "request": { + "records": [ + { + "key": "${RANDOM.NUMBER}", + "value": "Before Timestamp 1" + }, + { + "key": "${RANDOM.NUMBER}", + "value": "Before Timestamp 2" + } + ] + }, + "assertions": { + "status": "Ok" + } + }, + { + "name": "load_timestamp_and_epoch", + "url": "org.jsmart.zerocode.zerocodejavaexec.utils.ExampleUtils", + "operation": "seekTimestampToEpoch", + "request": { + "timestamp": "${LOCAL.DATETIME.NOW:yyyy-MM-dd'T'HH:mm:ss.SSS}", + "format": "yyyy-MM-dd'T'HH:mm:ss.SSS" + }, + "assertions": {} + }, + { + "name": "load_kafka_after_timestamp", + "url": "kafka-topic:demo-seekTime", + "operation": "PRODUCE", + "request": { + "records": [ + { + "key": "${RANDOM.NUMBER}", + "value": "After Timestamp 1" + }, + { + "key": "${RANDOM.NUMBER}", + "value": "After Timestamp 2" + } + ] + }, + "assertions": { + "status": "Ok" + } + }, + { + "name": "consume_seekEpoch", + "url": "kafka-topic:demo-seekTime", + "operation": "CONSUME", + "request": { + "consumerLocalConfigs": { + "seekEpoch": "${$.load_timestamp_and_epoch.response}", + "commitSync": true, + "recordType": "RAW", + "showRecordsConsumed": true, + "maxNoOfRetryPollsOrTimeouts": 3 + } + }, + "verify": { + "records": [ + { + "value": "After Timestamp 1" + }, + { + "value": "After Timestamp 2" + } + ] + }}, + { + "name": "consume_seekTimestamp", + "url": "kafka-topic:demo-seekTime", + "operation": "CONSUME", + "request": { + "consumerLocalConfigs": { + "seekTimestamp": { + "timestamp": "${$.load_timestamp_and_epoch.request.timestamp}", + "format": "${$.load_timestamp_and_epoch.request.format}" + }, + "commitSync": true, + "recordType": "RAW", + "showRecordsConsumed": true, + "maxNoOfRetryPollsOrTimeouts": 3 + } + }, + "verify": { + "records": [ + { + "value": "After Timestamp 1" + }, + { + "value": "After Timestamp 2" + } + ] + } + } + ] +}