Skip to content

Commit

Permalink
Remove null record condition
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyasahsan123 committed Oct 15, 2023
1 parent b610ffa commit 713c181
Showing 1 changed file with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,15 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);

if (record.value() != null) {
int attempts = 0;
if (!recordSent(record)) {
attempts++;
if (attempts >= retries) {
throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record);
}
Metronome.sleeper(retryInterval, Clock.SYSTEM).pause();
int attempts = 0;
if (!recordSent(record)) {
attempts++;
if (attempts >= retries) {
throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record);
}
committer.markProcessed(record);
Metronome.sleeper(retryInterval, Clock.SYSTEM).pause();
}
committer.markProcessed(record);
}
committer.markBatchFinished();
}
Expand All @@ -141,6 +139,7 @@ private boolean recordSent(ChangeEvent<Object, Object> record) throws Interrupte
if (rv == null) {
rv = "";
}

final PutRecordRequest putRecord = PutRecordRequest.builder()
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey)
.streamName(streamNameMapper.map(record.destination()))
Expand Down

0 comments on commit 713c181

Please sign in to comment.