Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes based on review of PR 317 #329

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
public interface IntegrationBase {

String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0";
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it really be S_3? This even complicates search in the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah thats a fair point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will anyways have more reviews on parent pr. I will make sure this is addressed. There are a couple of more vars like these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then we will end up with a PR with hundreds(if not thousands) of comments?

String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";

default AdminClient newAdminClient(final KafkaContainer kafka) {
final Properties adminClientConfig = new Properties();
Expand All @@ -81,9 +83,9 @@ static void extractConnectorPlugin(File pluginDir) throws IOException, Interrupt
}

static File getPluginDir() throws IOException {
final File testDir = Files.createTempDirectory("s3-source-connector-for-apache-kafka-test-").toFile();
final File testDir = Files.createTempDirectory(S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile();

final File pluginDir = new File(testDir, "plugins/s3-source-connector-for-apache-kafka/");
final File pluginDir = new File(testDir, PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA);
assert pluginDir.mkdirs();
return pluginDir;
}
Expand All @@ -98,7 +100,7 @@ static KafkaContainer createKafkaContainer() {
}

static String topicName(final TestInfo testInfo) {
return testInfo.getTestMethod().get().getName();// + "-" + testInfo.getDisplayName().hashCode();
return testInfo.getTestMethod().get().getName();
}

static void createTopics(final AdminClient adminClient, final List<String> topicNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -50,7 +49,6 @@
import io.aiven.kafka.connect.s3.source.input.InputFormat;
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.testutils.ContentUtils;
import io.aiven.kafka.connect.s3.source.testutils.S3OutputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
Expand All @@ -61,7 +59,6 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -202,20 +199,6 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo)
assertThat(records.get(4)).isEqualTo("EE");
}

@Test
void multiPartUploadBytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedException {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);

connectRunner.createConnector(connectorConfig);
final String partition = "00001";
final String key = topicName + "-" + partition + "-" + System.currentTimeMillis() + ".txt";
multipartUpload(TEST_BUCKET_NAME, key);
// Poll messages from the Kafka topic and verify the consumed data
final List<String> records = IntegrationBase.consumeMessages(topicName, 1, KAFKA_CONTAINER);
assertThat(records.get(0)).contains("performanceeeqjz");
}

@Test
void avroTest(final TestInfo testInfo) throws ExecutionException, InterruptedException, IOException {
final var topicName = IntegrationBase.topicName(testInfo);
Expand Down Expand Up @@ -384,18 +367,4 @@ public static void saveToS3(final String bucketName, final String folderName, fi
final PutObjectRequest request = new PutObjectRequest(bucketName, folderName + fileNameInS3, fileToWrite);
s3Client.putObject(request);
}

public void multipartUpload(final String bucketName, final String key) {
try (S3OutputStream s3OutputStream = new S3OutputStream(bucketName, key, S3OutputStream.DEFAULT_PART_SIZE,
s3Client);
InputStream resourceStream = Thread.currentThread()
.getContextClassLoader()
.getResourceAsStream(S3_FILE_NAME)) {
assert resourceStream != null;
final byte[] fileBytes = IOUtils.toByteArray(resourceStream);
s3OutputStream.write(fileBytes);
} catch (IOException e) {
LOGGER.error(e.getMessage());
}
}
AnatolyPopov marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.utils.AivenS3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.FileReader;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import io.aiven.kafka.connect.s3.source.utils.Version;

Expand Down Expand Up @@ -72,7 +72,7 @@ public class S3SourceTask extends SourceTask {
private S3SourceConfig s3SourceConfig;
private AmazonS3 s3Client;

private Iterator<AivenS3SourceRecord> sourceRecordIterator;
private Iterator<S3SourceRecord> sourceRecordIterator;
private Optional<Converter> keyConverter;

private Converter valueConverter;
Expand Down Expand Up @@ -144,24 +144,35 @@ private void prepareReaderFromOffsetStorageReader() {

@Override
public List<SourceRecord> poll() throws InterruptedException {
LOGGER.info("Polling again");
LOGGER.info("Polling for new records...");
synchronized (pollLock) {
final List<SourceRecord> results = new ArrayList<>(s3SourceConfig.getInt(MAX_POLL_RECORDS));

if (connectorStopped.get()) {
LOGGER.info("Connector has been stopped. Returning empty result list.");
return results;
}

while (!connectorStopped.get()) {
try {
LOGGER.info("Number of records sent {}", extractSourceRecords(results).size());
extractSourceRecords(results);
LOGGER.info("Number of records extracted and sent: {}", results.size());
return results;
} catch (AmazonS3Exception | DataException exception) {
if (handleException(exception)) {
} catch (AmazonS3Exception exception) {
if (exception.isRetryable()) {
LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...",
exception);
pollLock.wait(ERROR_BACKOFF);

prepareReaderFromOffsetStorageReader();
} else {
LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception);
return null; // NOPMD
}
} catch (DataException exception) {
LOGGER.warn("DataException occurred during polling. No retries will be attempted.", exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? The way I read it, it will immediately retry the while loop without waiting (both before and after the change).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are ignoring the exception here with a warning. We shall revisit this section in handle malformed records ticket I believe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! 👍

} catch (final Throwable t) { // NOPMD
// This task has failed, so close any resources (may be reopened if needed) before throwing
LOGGER.error("Unexpected error encountered. Closing resources and stopping task.", t);
closeResources();
throw t;
}
Expand All @@ -170,23 +181,6 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}

private boolean handleException(final RuntimeException exception) throws InterruptedException {
if (exception instanceof AmazonS3Exception) {
if (((AmazonS3Exception) exception).isRetryable()) {
LOGGER.warn("Retryable error while polling. Will sleep and try again.", exception);
Thread.sleep(ERROR_BACKOFF);

prepareReaderFromOffsetStorageReader();
} else {
return true;
}
}
if (exception instanceof DataException) {
LOGGER.warn("DataException. Will NOT try again.", exception);
}
return false;
}

private List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) throws InterruptedException {
waitForObjects();
if (connectorStopped.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ List<Object> readAvroRecords(final InputStream content, final DatumReader<Generi
try (DataFileReader<GenericRecord> reader = new DataFileReader<>(sin, datumReader)) {
reader.forEach(records::add);
} catch (IOException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
LOGGER.error("Failed to read records from DataFileReader for S3 object stream. Error: {}",
e.getMessage(), e);
}
} catch (IOException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
LOGGER.error("Failed to initialize SeekableByteArrayInput for S3 object stream. Error: {}", e.getMessage(),
e);
}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou
try {
return objectMapper.writeValueAsBytes(record);
} catch (JsonProcessingException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
LOGGER.error("Failed to serialize record to JSON bytes. Error: {}", e.getMessage(), e);
return new byte[0];
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private RecordProcessor() {

}

public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceRecord> sourceRecordIterator,
public static List<SourceRecord> processRecords(final Iterator<S3SourceRecord> sourceRecordIterator,
final List<SourceRecord> results, final S3SourceConfig s3SourceConfig,
final Optional<Converter> keyConverter, final Converter valueConverter,
final AtomicBoolean connectorStopped, final Transformer transformer, final FileReader fileReader,
Expand All @@ -52,9 +52,9 @@ public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceReco
final int maxPollRecords = s3SourceConfig.getInt(S3SourceConfig.MAX_POLL_RECORDS);

for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) {
final AivenS3SourceRecord aivenS3SourceRecord = sourceRecordIterator.next();
if (aivenS3SourceRecord != null) {
final SourceRecord sourceRecord = createSourceRecord(aivenS3SourceRecord, s3SourceConfig, keyConverter,
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
if (s3SourceRecord != null) {
final SourceRecord sourceRecord = createSourceRecord(s3SourceRecord, s3SourceConfig, keyConverter,
valueConverter, conversionConfig, transformer, fileReader, offsetManager);
results.add(sourceRecord);
}
Expand All @@ -63,26 +63,24 @@ public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceReco
return results;
}

static SourceRecord createSourceRecord(final AivenS3SourceRecord aivenS3SourceRecord,
final S3SourceConfig s3SourceConfig, final Optional<Converter> keyConverter, final Converter valueConverter,
static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, final S3SourceConfig s3SourceConfig,
final Optional<Converter> keyConverter, final Converter valueConverter,
final Map<String, String> conversionConfig, final Transformer transformer, final FileReader fileReader,
final OffsetManager offsetManager) {

final String topic = aivenS3SourceRecord.getTopic();
final Optional<SchemaAndValue> keyData = keyConverter
.map(c -> c.toConnectData(topic, aivenS3SourceRecord.key()));
final String topic = s3SourceRecord.getTopic();
final Optional<SchemaAndValue> keyData = keyConverter.map(c -> c.toConnectData(topic, s3SourceRecord.key()));

transformer.configureValueConverter(conversionConfig, s3SourceConfig);
valueConverter.configure(conversionConfig, false);
try {
final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, aivenS3SourceRecord.value());
offsetManager.updateCurrentOffsets(aivenS3SourceRecord.getPartitionMap(),
aivenS3SourceRecord.getOffsetMap());
aivenS3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(aivenS3SourceRecord.getPartitionMap()));
return aivenS3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue);
final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, s3SourceRecord.value());
offsetManager.updateCurrentOffsets(s3SourceRecord.getPartitionMap(), s3SourceRecord.getOffsetMap());
s3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(s3SourceRecord.getPartitionMap()));
return s3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue);
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
fileReader.addFailedObjectKeys(aivenS3SourceRecord.getObjectKey());
fileReader.addFailedObjectKeys(s3SourceRecord.getObjectKey());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;

public class AivenS3SourceRecord {
public class S3SourceRecord {
private final Map<String, Object> partitionMap;
private Map<String, Object> offsetMap;
private final String topic;
Expand All @@ -34,7 +34,7 @@ public class AivenS3SourceRecord {

private final String objectKey;

public AivenS3SourceRecord(final Map<String, Object> partitionMap, final Map<String, Object> offsetMap,
public S3SourceRecord(final Map<String, Object> partitionMap, final Map<String, Object> offsetMap,
final String topic, final Integer topicPartition, final byte[] recordKey, final byte[] recordValue,
final String objectKey) {
this.partitionMap = new HashMap<>(partitionMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* Iterator that processes S3 files and creates Kafka source records. Supports different output formats (Avro, JSON,
* Parquet).
*/
public final class SourceRecordIterator implements Iterator<AivenS3SourceRecord> {
public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class);
public static final String PATTERN_TOPIC_KEY = "topicName";
public static final String PATTERN_PARTITION_KEY = "partitionId";
Expand All @@ -54,7 +54,7 @@ public final class SourceRecordIterator implements Iterator<AivenS3SourceRecord>
private String currentObjectKey;

private final Iterator<S3ObjectSummary> s3ObjectSummaryIterator;
private Iterator<AivenS3SourceRecord> recordIterator = Collections.emptyIterator();
private Iterator<S3SourceRecord> recordIterator = Collections.emptyIterator();

private final OffsetManager offsetManager;

Expand Down Expand Up @@ -94,7 +94,7 @@ private void nextS3Object() {
}
}

private Iterator<AivenS3SourceRecord> createIteratorForCurrentFile() throws IOException {
private Iterator<S3SourceRecord> createIteratorForCurrentFile() throws IOException {
try (S3Object s3Object = s3Client.getObject(bucketName, currentObjectKey);
S3ObjectInputStream inputStream = s3Object.getObjectContent()) {

Expand Down Expand Up @@ -124,15 +124,15 @@ private Iterator<AivenS3SourceRecord> createIteratorForCurrentFile() throws IOEx
}

@SuppressWarnings("PMD.CognitiveComplexity")
private Iterator<AivenS3SourceRecord> getObjectIterator(final InputStream valueInputStream, final String topic,
private Iterator<S3SourceRecord> getObjectIterator(final InputStream valueInputStream, final String topic,
final int topicPartition, final long startOffset, final Transformer transformer,
final Map<String, Object> partitionMap) {
return new Iterator<>() {
private final Iterator<AivenS3SourceRecord> internalIterator = readNext().iterator();
private final Iterator<S3SourceRecord> internalIterator = readNext().iterator();

private List<AivenS3SourceRecord> readNext() {
private List<S3SourceRecord> readNext() {
final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8);
final List<AivenS3SourceRecord> sourceRecords = new ArrayList<>();
final List<S3SourceRecord> sourceRecords = new ArrayList<>();

int numOfProcessedRecs = 1;
boolean checkOffsetMap = true;
Expand All @@ -157,7 +157,7 @@ private List<AivenS3SourceRecord> readNext() {
return sourceRecords;
}

private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic,
private S3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic,
final int topicPartition, final OffsetManager offsetManager, final long startOffset,
final Map<String, Object> partitionMap) {

Expand All @@ -175,8 +175,7 @@ private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value

final Map<String, Object> offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentOffset);

return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value,
currentObjectKey);
return new S3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value, currentObjectKey);
}

@Override
Expand All @@ -185,7 +184,7 @@ public boolean hasNext() {
}

@Override
public AivenS3SourceRecord next() {
public S3SourceRecord next() {
return internalIterator.next();
}
};
Expand All @@ -197,7 +196,7 @@ public boolean hasNext() {
}

@Override
public AivenS3SourceRecord next() {
public S3SourceRecord next() {
if (!recordIterator.hasNext()) {
nextS3Object();
}
Expand Down
Loading
Loading