Skip to content

Commit

Permalink
partial fix
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 17, 2024
2 parents 688a731 + 7ef4e31 commit 218bf20
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.source.SourceTaskContext;

public class OffsetManager<E extends OffsetManager.OffsetManagerEntry> {
Expand Down Expand Up @@ -135,6 +136,14 @@ public interface OffsetManagerEntry<T extends OffsetManagerEntry> extends Compar
* @return The offset manager key for this entry.
*/
OffsetManagerKey getManagerKey();

String getTopic();

Integer getPartition();

default long skipRecords() {
return 0;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.confluent.connect.avro.AvroData;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -34,19 +38,31 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroTransformer extends Transformer<GenericRecord> {
public class AvroTransformer extends Transformer {

private final AvroData avroData;

private static final Logger LOGGER = LoggerFactory.getLogger(AvroTransformer.class);

AvroTransformer(final AvroData avroData) {
super();
this.avroData = avroData;
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public StreamSpliterator<GenericRecord> createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
public Schema getKeySchema() {
return Schema.OPTIONAL_BYTES_SCHEMA;
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final String topic, final int topicPartition, final AbstractConfig sourceConfig) {
return new StreamSpliterator<GenericRecord>(LOGGER, inputStreamIOSupplier) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
private DataFileStream<GenericRecord> dataFileStream;
private final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();

Expand All @@ -68,18 +84,14 @@ public void doClose() {
}

@Override
protected boolean doAdvance(final Consumer<? super GenericRecord> action) {
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
if (dataFileStream.hasNext()) {
action.accept(dataFileStream.next());
GenericRecord record = dataFileStream.next();
action.accept(avroData.toConnectData(record.getSchema(), record));
return true;
}
return false;
}
};
}

@Override
public byte[] getValueBytes(final GenericRecord record, final String topic, final AbstractConfig sourceConfig) {
return TransformationUtils.serializeAvroRecordToBytes(Collections.singletonList(record), topic, sourceConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ByteArrayTransformer extends Transformer<byte[]> {
public class ByteArrayTransformer extends Transformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class);

private static final int MAX_BUFFER_SIZE = 4096;
Expand All @@ -39,10 +42,14 @@ public void configureValueConverter(final Map<String, String> config, final Abst
// For byte array transformations, ByteArrayConverter is the converter which is the default config.
}

public Schema getKeySchema() {
return null;
}

@Override
public StreamSpliterator<byte[]> createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final String topic, final int topicPartition, final AbstractConfig sourceConfig) {
return new StreamSpliterator<byte[]>(LOGGER, inputStreamIOSupplier) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected InputStream inputOpened(final InputStream input) {
return input;
Expand All @@ -54,17 +61,17 @@ protected void doClose() {
}

@Override
protected boolean doAdvance(final Consumer<? super byte[]> action) {
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
final byte[] buffer = new byte[MAX_BUFFER_SIZE];
try {
final int bytesRead = IOUtils.read(inputStream, buffer);
if (bytesRead == 0) {
return false;
}
if (bytesRead < MAX_BUFFER_SIZE) {
action.accept(Arrays.copyOf(buffer, bytesRead));
action.accept(new SchemaAndValue(null, Arrays.copyOf(buffer, bytesRead)));
} else {
action.accept(buffer);
action.accept(new SchemaAndValue(null, buffer));
}
return true;
} catch (IOException e) {
Expand All @@ -74,9 +81,4 @@ protected boolean doAdvance(final Consumer<? super byte[]> action) {
}
};
}

@Override
public byte[] getValueBytes(final byte[] record, final String topic, final AbstractConfig sourceConfig) {
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,30 +25,41 @@
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonTransformer extends Transformer<JsonNode> {
public class JsonTransformer extends Transformer {

private final JsonConverter jsonConverter;

private static final Logger LOGGER = LoggerFactory.getLogger(JsonTransformer.class);

final ObjectMapper objectMapper = new ObjectMapper();

JsonTransformer(final JsonConverter jsonConverter) {
super();
this.jsonConverter = jsonConverter;
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMAS_ENABLE, "false");
}

public Schema getKeySchema() {
return null;
}

@Override
public StreamSpliterator<JsonNode> createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final String topic, final int topicPartition, final AbstractConfig sourceConfig) {
final StreamSpliterator<JsonNode> spliterator = new StreamSpliterator<JsonNode>(LOGGER, inputStreamIOSupplier) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
BufferedReader reader;

@Override
Expand All @@ -71,7 +80,7 @@ public void doClose() {
}

@Override
public boolean doAdvance(final Consumer<? super JsonNode> action) {
public boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
String line = null;
try {
// remove blank and empty lines.
Expand All @@ -83,30 +92,13 @@ public boolean doAdvance(final Consumer<? super JsonNode> action) {
}
}
line = line.trim();
try {
action.accept(objectMapper.readTree(line)); // Parse the JSON
} catch (IOException e) {
LOGGER.error("Error parsing JSON record: {}", e.getMessage(), e);
return false;
}
action.accept( jsonConverter.toConnectData(topic, line.getBytes(StandardCharsets.UTF_8)));
return true;
} catch (IOException e) {
LOGGER.error("Error reading input stream: {}", e.getMessage(), e);
return false;
}
}
};

return spliterator;
}

@Override
public byte[] getValueBytes(final JsonNode record, final String topic, final AbstractConfig sourceConfig) {
try {
return objectMapper.writeValueAsBytes(record);
} catch (JsonProcessingException e) {
LOGGER.error("Failed to serialize record to JSON bytes. Error: {}", e.getMessage(), e);
return new byte[0];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.source.input.parquet.LocalInputFile;

import io.confluent.connect.avro.AvroData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.io.function.IOSupplier;
Expand All @@ -41,26 +44,31 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetTransformer extends Transformer<GenericRecord> {
public class ParquetTransformer extends Transformer {

private final AvroData avroData;

private static final Logger LOGGER = LoggerFactory.getLogger(ParquetTransformer.class);

ParquetTransformer(final AvroData avroData) {
super();
this.avroData = avroData;
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public byte[] getValueBytes(final GenericRecord record, final String topic, final AbstractConfig sourceConfig) {
return TransformationUtils.serializeAvroRecordToBytes(Collections.singletonList(record), topic, sourceConfig);
public Schema getKeySchema() {
return null;
}

@Override
public StreamSpliterator<GenericRecord> createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final String topic, final int topicPartition, final AbstractConfig sourceConfig) {

final StreamSpliterator<GenericRecord> spliterator = new StreamSpliterator<GenericRecord>(LOGGER,
inputStreamIOSupplier) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {

private ParquetReader<GenericRecord> reader;
private File parquetFile;
Expand Down Expand Up @@ -99,11 +107,11 @@ protected void doClose() {
}

@Override
protected boolean doAdvance(final Consumer<? super GenericRecord> action) {
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
try {
final GenericRecord record = reader.read();
if (record != null) {
action.accept(record); // Pass record to the stream
action.accept(avroData.toConnectData(record.getSchema(), record)); // Pass record to the stream
return true;
}
} catch (IOException e) {
Expand All @@ -112,7 +120,6 @@ protected boolean doAdvance(final Consumer<? super GenericRecord> action) {
return false;
}
};
return spliterator;
}

static void deleteTmpFile(final Path parquetFile) {
Expand Down
Loading

0 comments on commit 218bf20

Please sign in to comment.