Skip to content

Commit

Permalink
added kafka-testing cases for seekEpoch and seekTimestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
a1shadows committed Feb 4, 2024
1 parent 101a270 commit f01e108
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package org.jsmart.zerocode.core.kafka.consume;


import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;


@Getter
@Builder
@ToString
@AllArgsConstructor
@Jacksonized
public class SeekTimestamp {

private final String timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,86 @@
public class ConsumerLocalConfigsWrapTest {
ObjectMapper objectMapper = new ObjectMapperProvider().get();

@Test
public void testSerDeser_seekEpoch() throws IOException {
ConsumerLocalConfigsWrap javaObject = new ConsumerLocalConfigsWrap(
new ConsumerLocalConfigs("RAW",
"RAW:/target/ttt",
true,
null,
true,
3,
50L,
false,
"$.JSON.Path",
null,
String.valueOf(System.currentTimeMillis()),
null));
ObjectMapper objectMapper = new ObjectMapperProvider().get();

String json = objectMapper.writeValueAsString(javaObject);
assertEquals("{\n" +
" \"consumerLocalConfigs\":\n" +
" {\n" +
" \"recordType\": \"RAW\",\n" +
" \"fileDumpTo\": \"RAW:/target/ttt\",\n" +
" \"commitAsync\": true,\n" +
" \"showRecordsConsumed\": true,\n" +
" \"maxNoOfRetryPollsOrTimeouts\": 3,\n" +
" \"pollingTime\": 50,\n" +
" \"cacheByTopic\": false,\n" +
" \"filterByJsonPath\": \"$.JSON.Path\",\n" +
" \"seekEpoch\": \"1706940293669\"\n" +
" }\n" +
"}",
json, LENIENT);

ConsumerLocalConfigsWrap javaPojo = objectMapper.readValue(json, ConsumerLocalConfigsWrap.class);
assertThat(javaPojo, is(javaObject));
}

@Test
public void testSerDeser_seekTimestamp() throws IOException {
ConsumerLocalConfigsWrap javaObject = new ConsumerLocalConfigsWrap(
new ConsumerLocalConfigs("RAW",
"RAW:/target/ttt",
true,
null,
true,
3,
50L,
false,
"$.JSON.Path",
null,
null,
new SeekTimestamp("2024-01-29T19:35:21.959340", "yyyy-MM-dd'T'HH:mm:ss.ssssss")));
ObjectMapper objectMapper = new ObjectMapperProvider().get();

String json = objectMapper.writeValueAsString(javaObject);
assertEquals("{\n" +
" \"consumerLocalConfigs\":\n" +
" {\n" +
" \"recordType\": \"RAW\",\n" +
" \"fileDumpTo\": \"RAW:/target/ttt\",\n" +
" \"commitAsync\": true,\n" +
" \"showRecordsConsumed\": true,\n" +
" \"maxNoOfRetryPollsOrTimeouts\": 3,\n" +
" \"pollingTime\": 50,\n" +
" \"cacheByTopic\": false,\n" +
" \"filterByJsonPath\": \"$.JSON.Path\",\n" +
" \"seekTimestamp\":\n" +
" {\n" +
" \"timestamp\": \"2024-01-29T19:35:21.959340\",\n" +
" \"format\": \"yyyy-MM-dd'T'HH:mm:ss.ssssss\"\n" +
" }\n" +
" }\n" +
"}",
json, LENIENT);

ConsumerLocalConfigsWrap javaPojo = objectMapper.readValue(json, ConsumerLocalConfigsWrap.class);
assertThat(javaPojo, is(javaObject));
}

@Test
public void testSerDeser() throws IOException {
ConsumerLocalConfigsWrap javaObject = new ConsumerLocalConfigsWrap(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package org.jsmart.zerocode.zerocodejavaexec.utils;

import org.jsmart.zerocode.core.kafka.consume.SeekTimestamp;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;

public class ExampleUtils {

public String seekTimestampToEpoch(SeekTimestamp seekTimestamp) throws ParseException {
DateFormat dateFormat = new SimpleDateFormat(seekTimestamp.getFormat());
return String.valueOf(dateFormat.parse(seekTimestamp.getTimestamp()).toInstant().toEpochMilli());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ public void testKafkaConsume_seekOffset() throws Exception {
public void testKafkaConsume_seekOffsetLatest() throws Exception {
}

@Test
@Scenario("kafka/consume/test_kafka_consume_seek_epoch_and_timestamp.json")
public void testKafkaConsume_seekEpochAndTimestamp() {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"scenarioName": "Consume message after timestamp/epoch",
"steps": [
{
"name": "load_kafka_before_timestamp",
"url": "kafka-topic:demo-seekTime",
"operation": "PRODUCE",
"request": {
"records": [
{
"key": "${RANDOM.NUMBER}",
"value": "Before Timestamp 1"
},
{
"key": "${RANDOM.NUMBER}",
"value": "Before Timestamp 2"
}
]
},
"assertions": {
"status": "Ok"
}
},
{
"name": "load_timestamp_and_epoch",
"url": "org.jsmart.zerocode.zerocodejavaexec.utils.ExampleUtils",
"operation": "seekTimestampToEpoch",
"request": {
"timestamp": "${LOCAL.DATETIME.NOW:yyyy-MM-dd'T'HH:mm:ss.SSS}",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS"
},
"assertions": {}
},
{
"name": "load_kafka_after_timestamp",
"url": "kafka-topic:demo-seekTime",
"operation": "PRODUCE",
"request": {
"records": [
{
"key": "${RANDOM.NUMBER}",
"value": "After Timestamp 1"
},
{
"key": "${RANDOM.NUMBER}",
"value": "After Timestamp 2"
}
]
},
"assertions": {
"status": "Ok"
}
},
{
"name": "consume_seekEpoch",
"url": "kafka-topic:demo-seekTime",
"operation": "CONSUME",
"request": {
"consumerLocalConfigs": {
"seekEpoch": "${$.load_timestamp_and_epoch.response}",
"commitSync": true,
"recordType": "RAW",
"showRecordsConsumed": true,
"maxNoOfRetryPollsOrTimeouts": 3
}
},
"verify": {
"records": [
{
"value": "After Timestamp 1"
},
{
"value": "After Timestamp 2"
}
]
}},
{
"name": "consume_seekTimestamp",
"url": "kafka-topic:demo-seekTime",
"operation": "CONSUME",
"request": {
"consumerLocalConfigs": {
"seekTimestamp": {
"timestamp": "${$.load_timestamp_and_epoch.request.timestamp}",
"format": "${$.load_timestamp_and_epoch.request.format}"
},
"commitSync": true,
"recordType": "RAW",
"showRecordsConsumed": true,
"maxNoOfRetryPollsOrTimeouts": 3
}
},
"verify": {
"records": [
{
"value": "After Timestamp 1"
},
{
"value": "After Timestamp 2"
}
]
}
}
]
}

0 comments on commit f01e108

Please sign in to comment.