Skip to content

Commit

Permalink
Receive time for Kafka messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright committed Sep 20, 2023
1 parent bcb240f commit 7a19ccd
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 18 deletions.
31 changes: 27 additions & 4 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ public class KafkaTools {
public static final String OFFSET_COLUMN_NAME_DEFAULT = "KafkaOffset";
public static final String TIMESTAMP_COLUMN_NAME_PROPERTY = "deephaven.timestamp.column.name";
public static final String TIMESTAMP_COLUMN_NAME_DEFAULT = "KafkaTimestamp";
public static final String RECEIVETIME_COLUMN_NAME_PROPERTY = "deephaven.receivetime.column.name";
public static final String RECEIVETIME_COLUMN_NAME_DEFAULT = null;
public static final String KEY_BYTES_COLUMN_NAME_PROPERTY = "deephaven.keybytes.column.name";
public static final String KEY_BYTES_COLUMN_NAME_DEFAULT = null;
public static final String VALUE_BYTES_COLUMN_NAME_PROPERTY = "deephaven.valuebytes.column.name";
public static final String VALUE_BYTES_COLUMN_NAME_DEFAULT = null;
public static final String KEY_COLUMN_NAME_PROPERTY = "deephaven.key.column.name";
public static final String KEY_COLUMN_NAME_DEFAULT = "KafkaKey";
public static final String VALUE_COLUMN_NAME_PROPERTY = "deephaven.value.column.name";
Expand Down Expand Up @@ -1532,15 +1538,30 @@ private enum CommonColumn {
Timestamp(
TIMESTAMP_COLUMN_NAME_PROPERTY,
TIMESTAMP_COLUMN_NAME_DEFAULT,
ColumnDefinition::ofTime);
ColumnDefinition::ofTime),

ReceiveTime(
RECEIVETIME_COLUMN_NAME_PROPERTY,
RECEIVETIME_COLUMN_NAME_DEFAULT,
ColumnDefinition::ofTime),

KeyBytes(
KEY_BYTES_COLUMN_NAME_PROPERTY,
KEY_BYTES_COLUMN_NAME_DEFAULT,
ColumnDefinition::ofLong),

ValueBytes(
VALUE_BYTES_COLUMN_NAME_PROPERTY,
VALUE_BYTES_COLUMN_NAME_DEFAULT,
ColumnDefinition::ofLong);
// @formatter:on

private final String nameProperty;
private final String nameDefault;
private final Function<String, ColumnDefinition<?>> definitionFactory;

CommonColumn(@NotNull final String nameProperty,
@NotNull final String nameDefault,
@Nullable final String nameDefault,
@NotNull final Function<String, ColumnDefinition<?>> definitionFactory) {
this.nameProperty = nameProperty;
this.nameDefault = nameDefault;
Expand All @@ -1557,6 +1578,8 @@ private ColumnDefinition<?> getDefinition(@NotNull final Properties consumerProp
result = definitionFactory.apply(partitionColumnName);
}
consumerProperties.remove(nameProperty);
} else if (nameDefault == null) {
result = null;
} else {
result = definitionFactory.apply(nameDefault);
}
Expand Down Expand Up @@ -1631,9 +1654,9 @@ private SimpleKafkaRecordConsumer(@NotNull final ConsumerRecordToStreamPublisher
}

@Override
public long consume(@NotNull final List<? extends ConsumerRecord<?, ?>> consumerRecords) {
public long consume(long receiveTime, @NotNull final List<? extends ConsumerRecord<?, ?>> consumerRecords) {
try {
return adapter.consumeRecords(consumerRecords);
return adapter.consumeRecords(receiveTime, consumerRecords);
} catch (Exception e) {
acceptFailure(e);
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ public interface ConsumerRecordToStreamPublisherAdapter extends StreamPublisher
/**
* Consume a List of Kafka records, producing zero or more rows in the output.
*
* @param records the records received from {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}.
* @param receiveTime the time, in nanoseconds since the epoch, the records were received in this process
* @param records the records received from {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}.
* @return the number of bytes processed
* @throws IOException if there was an error writing to the output table
*/
long consumeRecords(@NotNull List<? extends ConsumerRecord<?, ?>> records) throws IOException;
long consumeRecords(long receiveTime, @NotNull List<? extends ConsumerRecord<?, ?>> records) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.io.logger.Logger;
import io.deephaven.kafka.KafkaTools.ConsumerLoopCallback;
import io.deephaven.kafka.KafkaTools.InitialOffsetLookup;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.annotations.InternalUseOnly;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -185,7 +186,7 @@ public String toString() {
* @param partitionFilter A predicate indicating which partitions we should replicate
* @param partitionToStreamConsumer A function implementing a mapping from partition to its consumer of records. The
* function will be invoked once per partition at construction; implementations should internally defer
* resource allocation until first call to {@link KafkaRecordConsumer#consume(List)} or
* resource allocation until first call to {@link KafkaRecordConsumer#consume(long, List)} or
* {@link KafkaRecordConsumer#acceptFailure(Throwable)} if appropriate.
* @param partitionToInitialSeekOffset A function implementing a mapping from partition to its initial seek offset,
* or -1 if seek to beginning is intended.
Expand Down Expand Up @@ -350,9 +351,11 @@ private void consumerLoop() {
*/
private boolean pollOnce(final Duration timeout) {
final ConsumerRecords<?, ?> records;
final long receiveTime;
try {
++pollCalls;
records = kafkaConsumer.poll(timeout);
receiveTime = DateTimeUtils.currentClock().currentTimeNanos();
} catch (WakeupException we) {
// we interpret a wakeup as a signal to stop /this/ poll.
return true;
Expand Down Expand Up @@ -380,7 +383,7 @@ private boolean pollOnce(final Duration timeout) {
}

try {
bytesProcessed += streamConsumer.consume(partitionRecords);
bytesProcessed += streamConsumer.consume(receiveTime, partitionRecords);
} catch (Throwable ex) {
++messagesWithErr;
log.error().append(logPrefix).append("Exception while processing Kafka message:").append(ex).endl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ public interface KafkaRecordConsumer extends StreamFailureConsumer {
/**
* Consume a list of ConsumerRecords coming from Kafka.
*
* @param records the records to consume
* @param receiveTime the time, in nanoseconds since the epoch, the records were received in this process
* @param records the records to consume
* @return the total number of message bytes processed, according whether any key and/or value fields were
* processed, and the corresponding values for
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedKeySize}
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedValueSize}
* processed, and the corresponding values for
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedKeySize}
* {@code org.apache.kafka.clients.consumer.ConsumerRecord.serializedValueSize}
*/
long consume(@NotNull List<? extends ConsumerRecord<?, ?>> records);
long consume(long receiveTime, @NotNull List<? extends ConsumerRecord<?, ?>> records);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class KafkaStreamPublisher extends StreamPublisherBase implements Consume
private final int simpleKeyColumnIndex;
private final int simpleValueColumnIndex;

private final int receiveTimeColumnIndex;
private final int keyBytesColumnIndex;
private final int valueBytesColumnIndex;

private final boolean keyIsSimpleObject;
private final boolean valueIsSimpleObject;

Expand All @@ -56,7 +60,10 @@ private KafkaStreamPublisher(
final int simpleKeyColumnIndex,
final int simpleValueColumnIndex,
final Function<Object, Object> keyToChunkObjectMapper,
final Function<Object, Object> valueToChunkObjectMapper) {
final Function<Object, Object> valueToChunkObjectMapper,
final int receiveTimeColumnIndex,
final int keyBytesColumnIndex,
final int valueBytesColumnIndex) {
super(tableDefinition);
this.shutdownCallback = shutdownCallback;
this.kafkaPartitionColumnIndex = kafkaPartitionColumnIndex;
Expand All @@ -68,6 +75,9 @@ private KafkaStreamPublisher(
this.valueProcessor = valueProcessor;
this.keyToChunkObjectMapper = keyToChunkObjectMapper;
this.valueToChunkObjectMapper = valueToChunkObjectMapper;
this.receiveTimeColumnIndex = receiveTimeColumnIndex;
this.keyBytesColumnIndex = keyBytesColumnIndex;
this.valueBytesColumnIndex = valueBytesColumnIndex;

keyIsSimpleObject = this.simpleKeyColumnIndex >= 0;
if (keyIsSimpleObject && keyProcessor != null) {
Expand Down Expand Up @@ -127,7 +137,10 @@ public static ConsumerRecordToStreamPublisherAdapter make(
simpleKeyColumnIndex,
simpleValueColumnIndex,
parameters.getKeyToChunkObjectMapper(),
parameters.getValueToChunkObjectMapper());
parameters.getValueToChunkObjectMapper(),
parameters.getReceiveTimeColumnIndex(),
parameters.getKeyBytesColumnIndex(),
parameters.getValueBytesColumnIndex());
}

@NotNull
Expand Down Expand Up @@ -159,7 +172,7 @@ public void propagateFailure(@NotNull final Throwable cause) {
}

@Override
public synchronized long consumeRecords(@NotNull final List<? extends ConsumerRecord<?, ?>> records) {
public synchronized long consumeRecords(long receiveTime, @NotNull final List<? extends ConsumerRecord<?, ?>> records) {
WritableChunk<Values>[] chunks = getChunksToFill();
checkChunkSizes(chunks);
int remaining = chunks[0].capacity() - chunks[0].size();
Expand Down Expand Up @@ -202,6 +215,18 @@ public synchronized long consumeRecords(@NotNull final List<? extends ConsumerRe
? chunks[timestampColumnIndex].asWritableLongChunk()
: null;

WritableLongChunk<Values> receiveTimeChunk = receiveTimeColumnIndex >= 0
? chunks[receiveTimeColumnIndex].asWritableLongChunk()
: null;

WritableIntChunk<Values> keyBytesChunk = keyBytesColumnIndex >= 0
? chunks[keyBytesColumnIndex].asWritableIntChunk()
: null;

WritableIntChunk<Values> valueBytesChunk = valueBytesColumnIndex >= 0
? chunks[keyBytesColumnIndex].asWritableIntChunk()
: null;

for (ConsumerRecord<?, ?> record : records) {
if (remaining == 0) {
if (keyChunk != null) {
Expand Down Expand Up @@ -235,6 +260,21 @@ public synchronized long consumeRecords(@NotNull final List<? extends ConsumerRe
} else {
timestampChunk = null;
}
if (receiveTimeColumnIndex >= 0) {
receiveTimeChunk = chunks[receiveTimeColumnIndex].asWritableLongChunk();
} else {
receiveTimeChunk = null;
}
if (keyBytesColumnIndex >= 0) {
keyBytesChunk = chunks[keyBytesColumnIndex].asWritableIntChunk();
} else {
keyBytesChunk = null;
}
if (valueBytesColumnIndex >= 0) {
valueBytesChunk = chunks[valueBytesColumnIndex].asWritableIntChunk();
} else {
valueBytesChunk = null;
}
if (keyIsSimpleObject) {
keyChunk = chunks[simpleKeyColumnIndex].asWritableObjectChunk();
}
Expand All @@ -258,6 +298,15 @@ public synchronized long consumeRecords(@NotNull final List<? extends ConsumerRe
timestampChunk.add(DateTimeUtils.millisToNanos(timestamp));
}
}
if (receiveTimeChunk != null) {
receiveTimeChunk.add(receiveTime);
}
if (keyBytesChunk != null) {
keyBytesChunk.add(record.serializedKeySize());
}
if (valueBytesChunk != null) {
valueBytesChunk.add(record.serializedValueSize());
}

if (keyChunk != null) {
keyChunk.add(keyToChunkObjectMapper.apply(record.key()));
Expand Down Expand Up @@ -344,6 +393,9 @@ public static class Parameters {
private final int kafkaPartitionColumnIndex;
private final int offsetColumnIndex;
private final int timestampColumnIndex;
private final int receiveTimeColumnIndex;
private final int keyBytesColumnIndex;
private final int valueBytesColumnIndex;
private final KeyOrValueProcessor keyProcessor;
private final KeyOrValueProcessor valueProcessor;
private final int simpleKeyColumnIndex;
Expand All @@ -361,7 +413,10 @@ private Parameters(
final int simpleKeyColumnIndex,
final int simpleValueColumnIndex,
final Function<Object, Object> keyToChunkObjectMapper,
final Function<Object, Object> valueToChunkObjectMapper) {
final Function<Object, Object> valueToChunkObjectMapper,
int receiveTimeColumnIndex,
int keyBytesColumnIndex,
int valueBytesColumnIndex) {
this.tableDefinition = tableDefinition;
this.kafkaPartitionColumnIndex = kafkaPartitionColumnIndex;
this.offsetColumnIndex = offsetColumnIndex;
Expand All @@ -372,6 +427,9 @@ private Parameters(
this.simpleValueColumnIndex = simpleValueColumnIndex;
this.keyToChunkObjectMapper = keyToChunkObjectMapper;
this.valueToChunkObjectMapper = valueToChunkObjectMapper;
this.receiveTimeColumnIndex = receiveTimeColumnIndex;
this.keyBytesColumnIndex = keyBytesColumnIndex;
this.valueBytesColumnIndex = valueBytesColumnIndex;
}

@NotNull
Expand Down Expand Up @@ -407,6 +465,18 @@ public int getSimpleValueColumnIndex() {
return simpleValueColumnIndex;
}

public int getReceiveTimeColumnIndex() {
return receiveTimeColumnIndex;
}

public int getKeyBytesColumnIndex() {
return keyBytesColumnIndex;
}

public int getValueBytesColumnIndex() {
return valueBytesColumnIndex;
}

public Function<Object, Object> getKeyToChunkObjectMapper() {
return keyToChunkObjectMapper;
}
Expand All @@ -426,6 +496,9 @@ public static class Builder {
private int kafkaPartitionColumnIndex = NULL_COLUMN_INDEX;
private int offsetColumnIndex = NULL_COLUMN_INDEX;
private int timestampColumnIndex = NULL_COLUMN_INDEX;
private int receiveTimeColumnIndex = NULL_COLUMN_INDEX;
private int keyBytesColumnIndex = NULL_COLUMN_INDEX;
private int valueBytesColumnIndex = NULL_COLUMN_INDEX;
private KeyOrValueProcessor keyProcessor;
private KeyOrValueProcessor valueProcessor;
private int simpleKeyColumnIndex = NULL_COLUMN_INDEX;
Expand Down Expand Up @@ -455,6 +528,21 @@ public Builder setTimestampColumnIndex(final int timestampColumnIndex) {
return this;
}

public Builder setReceiveTimeColumnIndex(final int receiveTimeColumnIndex) {
this.receiveTimeColumnIndex = receiveTimeColumnIndex;
return this;
}

public Builder setKeyBytesColumnIndex(final int keyBytesColumnIndex) {
this.keyBytesColumnIndex = keyBytesColumnIndex;
return this;
}

public Builder setValueBytesColumnIndex(final int valueBytesColumnIndex) {
this.valueBytesColumnIndex = valueBytesColumnIndex;
return this;
}

public Builder setKeyProcessor(final KeyOrValueProcessor keyProcessor) {
this.keyProcessor = keyProcessor;
return this;
Expand Down Expand Up @@ -505,7 +593,10 @@ public KafkaStreamPublisher.Parameters build() {
simpleKeyColumnIndex,
simpleValueColumnIndex,
keyToChunkObjectMapper,
valueToChunkObjectMapper);
valueToChunkObjectMapper,
receiveTimeColumnIndex,
keyBytesColumnIndex,
valueBytesColumnIndex);
}
}
}
Expand Down

0 comments on commit 7a19ccd

Please sign in to comment.