Skip to content

Commit

Permalink
Make Kafka KeyOrValueProducers public to enable external parsers. (#4538
Browse files Browse the repository at this point in the history
)

* Make enough of the Kafka KeyOrValueProducers public so you can actually pass in your own.

* spotless

* protected.
  • Loading branch information
cpwright committed Sep 22, 2023
1 parent 603bd69 commit bf2a3cf
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ public Optional<SchemaProvider> getSchemaProvider() {
}

@Override
Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
protected Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return new KafkaAvroDeserializer(Objects.requireNonNull(schemaRegistryClient));
}

@Override
KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
KeyOrValueIngestData data = new KeyOrValueIngestData();
Expand All @@ -126,7 +126,7 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
}

@Override
KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
return GenericRecordChunkAdapter.make(
tableDef,
ci -> StreamChunkUtils.chunkTypeForColumnIndex(tableDef, ci),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ public Optional<SchemaProvider> getSchemaProvider() {
}

@Override
Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
protected Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return new ByteArrayDeserializer();
}

@Override
KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
return null;
}

@Override
KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ public Optional<SchemaProvider> getSchemaProvider() {
}

@Override
Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
protected Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return new StringDeserializer();
}

@Override
KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
final KeyOrValueIngestData data = new KeyOrValueIngestData();
Expand Down Expand Up @@ -98,7 +98,7 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
}

@Override
KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
return JsonNodeChunkAdapter.make(
tableDef,
ci -> StreamChunkUtils.chunkTypeForColumnIndex(tableDef, ci),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public static void avroSchemaToColumnDefinitions(
/**
* Enum to specify operations that may apply to either of Kafka KEY or VALUE fields.
*/
enum KeyOrValue {
public enum KeyOrValue {
KEY, VALUE
}

Expand Down Expand Up @@ -313,19 +313,19 @@ public static class Consume {
*/
public static abstract class KeyOrValueSpec implements SchemaProviderProvider {

abstract Deserializer<?> getDeserializer(
protected abstract Deserializer<?> getDeserializer(
KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs);

abstract KeyOrValueIngestData getIngestData(
protected abstract KeyOrValueIngestData getIngestData(
KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs,
MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut);

abstract KeyOrValueProcessor getProcessor(
protected abstract KeyOrValueProcessor getProcessor(
TableDefinition tableDef,
KeyOrValueIngestData data);
}
Expand Down Expand Up @@ -1502,7 +1502,7 @@ private static Table makeResultTable() {
}
}

static class KeyOrValueIngestData {
public static class KeyOrValueIngestData {
public Map<String, String> fieldPathToColumnName;
public int simpleColumnIndex = NULL_COLUMN_INDEX;
public Function<Object, Object> toObjectChunkMapper = Function.identity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ public Optional<SchemaProvider> getSchemaProvider() {
}

@Override
Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
protected Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return new KafkaProtobufDeserializer<>(Objects.requireNonNull(schemaRegistryClient));
}

@Override
KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs, MutableInt nextColumnIndexMut, List<ColumnDefinition<?>> columnDefinitionsOut) {
final Descriptor descriptor;
try {
Expand Down Expand Up @@ -154,7 +154,7 @@ private void add(
}

@Override
KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
// noinspection unchecked
return new KeyOrValueProcessorImpl(
MultiFieldChunkAdapter.chunkOffsets(tableDef, data.fieldPathToColumnName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ public Optional<SchemaProvider> getSchemaProvider() {
}

@Override
Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
protected Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return supplier.get();
}

@Override
KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
final KeyOrValueIngestData data = new KeyOrValueIngestData();
Expand All @@ -71,7 +71,7 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
}

@Override
KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Optional<SchemaProvider> getSchemaProvider() {
}

@Override
Deserializer<?> getDeserializer(
protected Deserializer<?> getDeserializer(
KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
Expand All @@ -99,7 +99,7 @@ Deserializer<?> getDeserializer(
}

@Override
KeyOrValueIngestData getIngestData(
protected KeyOrValueIngestData getIngestData(
KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
Expand All @@ -113,7 +113,7 @@ KeyOrValueIngestData getIngestData(
}

@Override
KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
return null;
}

Expand Down

0 comments on commit bf2a3cf

Please sign in to comment.