Skip to content

Commit

Permalink
Ryan's feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Sep 19, 2023
1 parent 41b3c54 commit 760da38
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ public synchronized long consumeRecords(@NotNull final List<? extends ConsumerRe
: null;

for (ConsumerRecord<?, ?> record : records) {
// Note we only flush if we are about to overflow the chunks.
if (--remaining < 0) {
if (remaining == 0) {
if (keyChunk != null) {
flushKeyChunk(keyChunk, chunks);
}
Expand All @@ -218,9 +217,8 @@ public synchronized long consumeRecords(@NotNull final List<? extends ConsumerRe
chunks = getChunksToFill();
checkChunkSizes(chunks);

// Note that remaining should account for the row we are about to write.
remaining = chunks[0].capacity() - chunks[0].size() - 1;
Assert.geqZero(remaining, "remaining");
remaining = chunks[0].capacity() - chunks[0].size();
Assert.gtZero(remaining, "remaining");

if (kafkaPartitionColumnIndex >= 0) {
partitionChunk = chunks[kafkaPartitionColumnIndex].asWritableIntChunk();
Expand Down Expand Up @@ -275,6 +273,8 @@ public synchronized long consumeRecords(@NotNull final List<? extends ConsumerRe
bytesProcessed += valueBytes;
}
}

--remaining;
}
if (keyChunk != null) {
flushKeyChunk(keyChunk, chunks);
Expand Down

0 comments on commit 760da38

Please sign in to comment.