Skip to content

Commit

Permalink
partial fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 17, 2024
1 parent 218bf20 commit 69ea274
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public class AvroTransformer extends Transformer {
this.avroData = avroData;
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public Schema getKeySchema() {
return Schema.OPTIONAL_BYTES_SCHEMA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ public class ByteArrayTransformer extends Transformer {

private static final int MAX_BUFFER_SIZE = 4096;

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
// For byte array transformations, ByteArrayConverter is the converter which is the default config.
}

public Schema getKeySchema() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ public class JsonTransformer extends Transformer {
this.jsonConverter = jsonConverter;
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
}

public Schema getKeySchema() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public class ParquetTransformer extends Transformer {
this.avroData = avroData;
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

public Schema getKeySchema() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

public abstract class Transformer {

public abstract void configureValueConverter(Map<String, String> config, AbstractConfig sourceConfig);

public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, AbstractConfig config) {

final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, offsetManagerEntry.getTopic(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,6 @@ void setUp() {

}

@Test
void testConfigureValueConverter() {
final String value = "http://localhost:8081";
when(sourceCommonConfig.getString(SCHEMA_REGISTRY_URL)).thenReturn(value);
avroTransformer.configureValueConverter(config, sourceCommonConfig);
assertThat(config.get(SCHEMA_REGISTRY_URL)).isEqualTo("http://localhost:8081")
.describedAs("The schema registry URL should be correctly set in the config.");
}

@Test
void testReadAvroRecordsInvalidData() {
final InputStream inputStream = new ByteArrayInputStream("mock-avro-data".getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ void testGetRecordsSingleChunk() {

final Stream<SchemaAndValue> records = byteArrayTransformer.getRecords(inputStreamIOSupplier, offsetManagerEntry, sourceCommonConfig);

final List<Object> recs = records.collect(Collectors.toList());
final List<SchemaAndValue> recs = records.collect(Collectors.toList());
assertThat(recs).hasSize(1);
assertThat((byte[]) recs.get(0)).isEqualTo(data);
assertThat(recs.get(0).value()).isEqualTo(data);
}

@Test
Expand All @@ -78,13 +78,4 @@ void testGetRecordsEmptyInputStream() {

assertThat(records).hasSize(0);
}

// @Test
// void testGetValueBytes() {
// final byte[] record = { 1, 2, 3 };
// final byte[] result = (byte[]) byteArrayTransformer.getValueData(record, TEST_TOPIC, sourceCommonConfig)
// .value();
//
// assertThat(result).containsExactlyInAnyOrder(record);
// }
}

0 comments on commit 69ea274

Please sign in to comment.