Skip to content

Commit

Permalink
Spotless.
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright committed Sep 20, 2023
1 parent 7a19ccd commit c1064b9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface ConsumerRecordToStreamPublisherAdapter extends StreamPublisher
* Consume a List of Kafka records, producing zero or more rows in the output.
*
* @param receiveTime the time, in nanoseconds since the epoch, the records were received in this process
* @param records the records received from {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}.
* @param records the records received from {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}.
* @return the number of bytes processed
* @throws IOException if there was an error writing to the output table
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public interface KafkaRecordConsumer extends StreamFailureConsumer {
* Consume a list of ConsumerRecords coming from Kafka.
*
* @param receiveTime the time, in nanoseconds since the epoch, the records were received in this process
* @param records the records to consume
* @param records the records to consume
* @return the total number of message bytes processed, according whether any key and/or value fields were
* processed, and the corresponding values for
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedKeySize}
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedValueSize}
* processed, and the corresponding values for
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedKeySize}
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedValueSize}
*/
long consume(long receiveTime, @NotNull List<? extends ConsumerRecord<?, ?>> records);
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public void propagateFailure(@NotNull final Throwable cause) {
}

@Override
public synchronized long consumeRecords(long receiveTime, @NotNull final List<? extends ConsumerRecord<?, ?>> records) {
public synchronized long consumeRecords(long receiveTime,
@NotNull final List<? extends ConsumerRecord<?, ?>> records) {
WritableChunk<Values>[] chunks = getChunksToFill();
checkChunkSizes(chunks);
int remaining = chunks[0].capacity() - chunks[0].size();
Expand Down

0 comments on commit c1064b9

Please sign in to comment.