Skip to content

Commit

Permalink
DBZ-7032 Use Duration type
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed Oct 30, 2023
1 parent cfd134f commit f69c2e6
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
import java.util.List;
import java.util.Optional;

import io.debezium.DebeziumException;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
Expand All @@ -26,19 +23,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.core.exception.SdkClientException;

/**
* Implementation of the consumer that delivers the messages into Amazon Kinesis destination.
Expand All @@ -61,7 +61,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu
private Optional<String> endpointOverride;
private Optional<String> credentialsProfile;
private static final int DEFAULT_RETRIES = 5;
private static final Long RETRY_INTERVAL = Integer.toUnsignedLong(1_000);
private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);

@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey;
Expand Down Expand Up @@ -115,16 +115,14 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
if (attempts >= DEFAULT_RETRIES) {
throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record);
}
Metronome.sleeper(Duration.ofMillis(RETRY_INTERVAL), Clock.SYSTEM).pause();
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause();
}
committer.markProcessed(record);
}
committer.markBatchFinished();
}

private boolean recordSent(ChangeEvent<Object, Object> record) {
boolean sent = false;

Object rv = record.value();
if (rv == null) {
rv = "";
Expand All @@ -138,11 +136,11 @@ private boolean recordSent(ChangeEvent<Object, Object> record) {

try {
client.putRecord(putRecord);
sent = true;
return true;
}
catch (SdkClientException exception) {
LOGGER.error("Failed to send record to {}:", record.destination(), exception);
LOGGER.error("Failed to send record to {}", record.destination(), exception);
return false;
}
return sent;
}
}

0 comments on commit f69c2e6

Please sign in to comment.