diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java index e33e41052b9..cf49d8c83bd 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java @@ -105,13 +105,13 @@ public Optional getSchemaProvider() { } @Override - Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + protected Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs) { return new KafkaAvroDeserializer(Objects.requireNonNull(schemaRegistryClient)); } @Override - KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, + protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) { KeyOrValueIngestData data = new KeyOrValueIngestData(); @@ -126,7 +126,7 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, } @Override - KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { + protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { return GenericRecordChunkAdapter.make( tableDef, ci -> StreamChunkUtils.chunkTypeForColumnIndex(tableDef, ci), diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/IgnoreImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/IgnoreImpl.java index 1337786e1e1..11e8f15937c 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/IgnoreImpl.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/IgnoreImpl.java @@ -34,20 +34,20 @@ public Optional getSchemaProvider() { } @Override - Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + protected Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs) { return new ByteArrayDeserializer(); } @Override - KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, + protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) { return null; } @Override - KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { + protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { return null; } } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/JsonImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/JsonImpl.java index 246b547edd3..26c82f88f7f 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/JsonImpl.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/JsonImpl.java @@ -64,13 +64,13 @@ public Optional getSchemaProvider() { } @Override - Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + protected Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs) { return new StringDeserializer(); } @Override - KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, + protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) { final KeyOrValueIngestData data = new KeyOrValueIngestData(); @@ -98,7 +98,7 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, } @Override - KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { + protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { return JsonNodeChunkAdapter.make( tableDef, ci -> StreamChunkUtils.chunkTypeForColumnIndex(tableDef, ci), diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 5cf0a97e7e2..5f01fbe51ee 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -244,7 +244,7 @@ public static void avroSchemaToColumnDefinitions( /** * Enum to specify operations that may apply to either of Kafka KEY or VALUE fields. */ - enum KeyOrValue { + public enum KeyOrValue { KEY, VALUE } @@ -313,19 +313,19 @@ public static class Consume { */ public static abstract class KeyOrValueSpec implements SchemaProviderProvider { - abstract Deserializer getDeserializer( + protected abstract Deserializer getDeserializer( KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs); - abstract KeyOrValueIngestData getIngestData( + protected abstract KeyOrValueIngestData getIngestData( KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut); - abstract KeyOrValueProcessor getProcessor( + protected abstract KeyOrValueProcessor getProcessor( TableDefinition tableDef, KeyOrValueIngestData data); } @@ -1502,7 +1502,7 @@ private static Table makeResultTable() { } } - static class KeyOrValueIngestData { + public static class KeyOrValueIngestData { public Map fieldPathToColumnName; public int simpleColumnIndex = NULL_COLUMN_INDEX; public Function toObjectChunkMapper = Function.identity(); diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ProtobufImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ProtobufImpl.java index 93f60275d2c..b53d60696d7 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ProtobufImpl.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ProtobufImpl.java @@ -110,13 +110,13 @@ public Optional getSchemaProvider() { } @Override - Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + protected Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs) { return new KafkaProtobufDeserializer<>(Objects.requireNonNull(schemaRegistryClient)); } @Override - KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) { final Descriptor descriptor; try { @@ -154,7 +154,7 @@ private void add( } @Override - KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { + protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { // noinspection unchecked return new KeyOrValueProcessorImpl( MultiFieldChunkAdapter.chunkOffsets(tableDef, data.fieldPathToColumnName), diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/RawImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/RawImpl.java index 167965c6440..daa8606dff0 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/RawImpl.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/RawImpl.java @@ -55,13 +55,13 @@ public Optional getSchemaProvider() { } @Override - Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + protected Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs) { return supplier.get(); } @Override - KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, + protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) { final KeyOrValueIngestData data = new KeyOrValueIngestData(); @@ -71,7 +71,7 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, } @Override - KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { + protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { return null; } } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/SimpleImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/SimpleImpl.java index 7096c1b7f2d..7c95c3916da 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/SimpleImpl.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/SimpleImpl.java @@ -84,7 +84,7 @@ public Optional getSchemaProvider() { } @Override - Deserializer getDeserializer( + protected Deserializer getDeserializer( KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs) { @@ -99,7 +99,7 @@ Deserializer getDeserializer( } @Override - KeyOrValueIngestData getIngestData( + protected KeyOrValueIngestData getIngestData( KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) { @@ -113,7 +113,7 @@ KeyOrValueIngestData getIngestData( } @Override - KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { + protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { return null; }