diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index ad963e2e..b4e0758d 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -10,9 +10,6 @@ import java.util.List; import java.util.Optional; -import io.debezium.DebeziumException; -import io.debezium.util.Clock; -import io.debezium.util.Metronome; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.Dependent; @@ -26,19 +23,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.RecordCommitter; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.CustomConsumerBuilder; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; -import software.amazon.awssdk.core.exception.SdkClientException; /** * Implementation of the consumer that delivers the messages into Amazon Kinesis destination. @@ -61,7 +61,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private Optional endpointOverride; private Optional credentialsProfile; private static final int DEFAULT_RETRIES = 5; - private static final Long RETRY_INTERVAL = Integer.toUnsignedLong(1_000); + private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; @@ -115,7 +115,7 @@ public void handleBatch(List> records, RecordCommitt if (attempts >= DEFAULT_RETRIES) { throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record); } - Metronome.sleeper(Duration.ofMillis(RETRY_INTERVAL), Clock.SYSTEM).pause(); + Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); } committer.markProcessed(record); } @@ -123,8 +123,6 @@ public void handleBatch(List> records, RecordCommitt } private boolean recordSent(ChangeEvent record) { - boolean sent = false; - Object rv = record.value(); if (rv == null) { rv = ""; @@ -138,11 +136,11 @@ private boolean recordSent(ChangeEvent record) { try { client.putRecord(putRecord); - sent = true; + return true; } catch (SdkClientException exception) { - LOGGER.error("Failed to send record to {}:", record.destination(), exception); + LOGGER.error("Failed to send record to {}", record.destination(), exception); + return false; } - return sent; } }