diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index d89d85473..67253ec29 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -59,6 +59,8 @@ public interface IntegrationBase { String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0"; + String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; + String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-"; default AdminClient newAdminClient(final KafkaContainer kafka) { final Properties adminClientConfig = new Properties(); @@ -81,9 +83,9 @@ static void extractConnectorPlugin(File pluginDir) throws IOException, Interrupt } static File getPluginDir() throws IOException { - final File testDir = Files.createTempDirectory("s3-source-connector-for-apache-kafka-test-").toFile(); + final File testDir = Files.createTempDirectory(S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile(); - final File pluginDir = new File(testDir, "plugins/s3-source-connector-for-apache-kafka/"); + final File pluginDir = new File(testDir, PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA); assert pluginDir.mkdirs(); return pluginDir; } @@ -98,7 +100,7 @@ static KafkaContainer createKafkaContainer() { } static String topicName(final TestInfo testInfo) { - return testInfo.getTestMethod().get().getName();// + "-" + testInfo.getDisplayName().hashCode(); + return testInfo.getTestMethod().get().getName(); } static void createTopics(final AdminClient adminClient, final List topicNames) diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 0f4a3bf61..461e991ad 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -34,7 +34,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -50,7 +49,6 @@ import io.aiven.kafka.connect.s3.source.input.InputFormat; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.testutils.ContentUtils; -import io.aiven.kafka.connect.s3.source.testutils.S3OutputStream; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PutObjectRequest; @@ -61,7 +59,6 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; -import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -202,20 +199,6 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo) assertThat(records.get(4)).isEqualTo("EE"); } - @Test - void multiPartUploadBytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedException { - final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName); - - connectRunner.createConnector(connectorConfig); - final String partition = "00001"; - final String key = topicName + "-" + partition + "-" + System.currentTimeMillis() + ".txt"; - multipartUpload(TEST_BUCKET_NAME, key); - // Poll messages from the Kafka topic and verify the consumed data - final List records = IntegrationBase.consumeMessages(topicName, 1, KAFKA_CONTAINER); - assertThat(records.get(0)).contains("performanceeeqjz"); - } - @Test void avroTest(final TestInfo testInfo) throws ExecutionException, InterruptedException, IOException { final var topicName = IntegrationBase.topicName(testInfo); @@ -384,18 +367,4 @@ public static void saveToS3(final String bucketName, final String folderName, fi final PutObjectRequest request = new PutObjectRequest(bucketName, folderName + fileNameInS3, fileToWrite); s3Client.putObject(request); } - - public void multipartUpload(final String bucketName, final String key) { - try (S3OutputStream s3OutputStream = new S3OutputStream(bucketName, key, S3OutputStream.DEFAULT_PART_SIZE, - s3Client); - InputStream resourceStream = Thread.currentThread() - .getContextClassLoader() - .getResourceAsStream(S3_FILE_NAME)) { - assert resourceStream != null; - final byte[] fileBytes = IOUtils.toByteArray(resourceStream); - s3OutputStream.write(fileBytes); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } - } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index eee1b36a2..087fd0451 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -39,10 +39,10 @@ import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.input.Transformer; import io.aiven.kafka.connect.s3.source.input.TransformerFactory; -import io.aiven.kafka.connect.s3.source.utils.AivenS3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.FileReader; import io.aiven.kafka.connect.s3.source.utils.OffsetManager; import io.aiven.kafka.connect.s3.source.utils.RecordProcessor; +import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import io.aiven.kafka.connect.s3.source.utils.Version; @@ -72,7 +72,7 @@ public class S3SourceTask extends SourceTask { private S3SourceConfig s3SourceConfig; private AmazonS3 s3Client; - private Iterator sourceRecordIterator; + private Iterator sourceRecordIterator; private Optional keyConverter; private Converter valueConverter; @@ -144,24 +144,35 @@ private void prepareReaderFromOffsetStorageReader() { @Override public List poll() throws InterruptedException { - LOGGER.info("Polling again"); + LOGGER.info("Polling for new records..."); synchronized (pollLock) { final List results = new ArrayList<>(s3SourceConfig.getInt(MAX_POLL_RECORDS)); if (connectorStopped.get()) { + LOGGER.info("Connector has been stopped. Returning empty result list."); return results; } while (!connectorStopped.get()) { try { - LOGGER.info("Number of records sent {}", extractSourceRecords(results).size()); + extractSourceRecords(results); + LOGGER.info("Number of records extracted and sent: {}", results.size()); return results; - } catch (AmazonS3Exception | DataException exception) { - if (handleException(exception)) { + } catch (AmazonS3Exception exception) { + if (exception.isRetryable()) { + LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...", + exception); + pollLock.wait(ERROR_BACKOFF); + + prepareReaderFromOffsetStorageReader(); + } else { + LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception); return null; // NOPMD } + } catch (DataException exception) { + LOGGER.warn("DataException occurred during polling. No retries will be attempted.", exception); } catch (final Throwable t) { // NOPMD - // This task has failed, so close any resources (may be reopened if needed) before throwing + LOGGER.error("Unexpected error encountered. Closing resources and stopping task.", t); closeResources(); throw t; } @@ -170,23 +181,6 @@ public List poll() throws InterruptedException { } } - private boolean handleException(final RuntimeException exception) throws InterruptedException { - if (exception instanceof AmazonS3Exception) { - if (((AmazonS3Exception) exception).isRetryable()) { - LOGGER.warn("Retryable error while polling. Will sleep and try again.", exception); - Thread.sleep(ERROR_BACKOFF); - - prepareReaderFromOffsetStorageReader(); - } else { - return true; - } - } - if (exception instanceof DataException) { - LOGGER.warn("DataException. Will NOT try again.", exception); - } - return false; - } - private List extractSourceRecords(final List results) throws InterruptedException { waitForObjects(); if (connectorStopped.get()) { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java index 620aee2d4..a781f6bd1 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java @@ -64,10 +64,12 @@ List readAvroRecords(final InputStream content, final DatumReader reader = new DataFileReader<>(sin, datumReader)) { reader.forEach(records::add); } catch (IOException e) { - LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); + LOGGER.error("Failed to read records from DataFileReader for S3 object stream. Error: {}", + e.getMessage(), e); } } catch (IOException e) { - LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); + LOGGER.error("Failed to initialize SeekableByteArrayInput for S3 object stream. Error: {}", e.getMessage(), + e); } return records; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java index 7e1010fa8..5cda04f1a 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java @@ -78,7 +78,7 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou try { return objectMapper.writeValueAsBytes(record); } catch (JsonProcessingException e) { - LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); + LOGGER.error("Failed to serialize record to JSON bytes. Error: {}", e.getMessage(), e); return new byte[0]; } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java index 337870a9f..40bf80bc4 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java @@ -42,7 +42,7 @@ private RecordProcessor() { } - public static List processRecords(final Iterator sourceRecordIterator, + public static List processRecords(final Iterator sourceRecordIterator, final List results, final S3SourceConfig s3SourceConfig, final Optional keyConverter, final Converter valueConverter, final AtomicBoolean connectorStopped, final Transformer transformer, final FileReader fileReader, @@ -52,9 +52,9 @@ public static List processRecords(final Iterator processRecords(final Iterator keyConverter, final Converter valueConverter, + static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, final S3SourceConfig s3SourceConfig, + final Optional keyConverter, final Converter valueConverter, final Map conversionConfig, final Transformer transformer, final FileReader fileReader, final OffsetManager offsetManager) { - final String topic = aivenS3SourceRecord.getTopic(); - final Optional keyData = keyConverter - .map(c -> c.toConnectData(topic, aivenS3SourceRecord.key())); + final String topic = s3SourceRecord.getTopic(); + final Optional keyData = keyConverter.map(c -> c.toConnectData(topic, s3SourceRecord.key())); transformer.configureValueConverter(conversionConfig, s3SourceConfig); valueConverter.configure(conversionConfig, false); try { - final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, aivenS3SourceRecord.value()); - offsetManager.updateCurrentOffsets(aivenS3SourceRecord.getPartitionMap(), - aivenS3SourceRecord.getOffsetMap()); - aivenS3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(aivenS3SourceRecord.getPartitionMap())); - return aivenS3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue); + final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, s3SourceRecord.value()); + offsetManager.updateCurrentOffsets(s3SourceRecord.getPartitionMap(), s3SourceRecord.getOffsetMap()); + s3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(s3SourceRecord.getPartitionMap())); + return s3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue); } catch (DataException e) { LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); - fileReader.addFailedObjectKeys(aivenS3SourceRecord.getObjectKey()); + fileReader.addFailedObjectKeys(s3SourceRecord.getObjectKey()); throw e; } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java similarity index 95% rename from s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java rename to s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 87803c636..7880bf868 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -24,7 +24,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.source.SourceRecord; -public class AivenS3SourceRecord { +public class S3SourceRecord { private final Map partitionMap; private Map offsetMap; private final String topic; @@ -34,7 +34,7 @@ public class AivenS3SourceRecord { private final String objectKey; - public AivenS3SourceRecord(final Map partitionMap, final Map offsetMap, + public S3SourceRecord(final Map partitionMap, final Map offsetMap, final String topic, final Integer topicPartition, final byte[] recordKey, final byte[] recordValue, final String objectKey) { this.partitionMap = new HashMap<>(partitionMap); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index e59e1a7c4..8c1fcb77d 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -44,7 +44,7 @@ * Iterator that processes S3 files and creates Kafka source records. Supports different output formats (Avro, JSON, * Parquet). */ -public final class SourceRecordIterator implements Iterator { +public final class SourceRecordIterator implements Iterator { private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class); public static final String PATTERN_TOPIC_KEY = "topicName"; public static final String PATTERN_PARTITION_KEY = "partitionId"; @@ -54,7 +54,7 @@ public final class SourceRecordIterator implements Iterator private String currentObjectKey; private final Iterator s3ObjectSummaryIterator; - private Iterator recordIterator = Collections.emptyIterator(); + private Iterator recordIterator = Collections.emptyIterator(); private final OffsetManager offsetManager; @@ -94,7 +94,7 @@ private void nextS3Object() { } } - private Iterator createIteratorForCurrentFile() throws IOException { + private Iterator createIteratorForCurrentFile() throws IOException { try (S3Object s3Object = s3Client.getObject(bucketName, currentObjectKey); S3ObjectInputStream inputStream = s3Object.getObjectContent()) { @@ -124,15 +124,15 @@ private Iterator createIteratorForCurrentFile() throws IOEx } @SuppressWarnings("PMD.CognitiveComplexity") - private Iterator getObjectIterator(final InputStream valueInputStream, final String topic, + private Iterator getObjectIterator(final InputStream valueInputStream, final String topic, final int topicPartition, final long startOffset, final Transformer transformer, final Map partitionMap) { return new Iterator<>() { - private final Iterator internalIterator = readNext().iterator(); + private final Iterator internalIterator = readNext().iterator(); - private List readNext() { + private List readNext() { final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8); - final List sourceRecords = new ArrayList<>(); + final List sourceRecords = new ArrayList<>(); int numOfProcessedRecs = 1; boolean checkOffsetMap = true; @@ -157,7 +157,7 @@ private List readNext() { return sourceRecords; } - private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic, + private S3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic, final int topicPartition, final OffsetManager offsetManager, final long startOffset, final Map partitionMap) { @@ -175,8 +175,7 @@ private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value final Map offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentOffset); - return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value, - currentObjectKey); + return new S3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value, currentObjectKey); } @Override @@ -185,7 +184,7 @@ public boolean hasNext() { } @Override - public AivenS3SourceRecord next() { + public S3SourceRecord next() { return internalIterator.next(); } }; @@ -197,7 +196,7 @@ public boolean hasNext() { } @Override - public AivenS3SourceRecord next() { + public S3SourceRecord next() { if (!recordIterator.hasNext()) { nextS3Object(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index ef5dd3eec..c839a1269 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -40,7 +40,7 @@ import io.aiven.kafka.connect.s3.source.input.InputFormat; import io.aiven.kafka.connect.s3.source.input.Transformer; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; -import io.aiven.kafka.connect.s3.source.utils.AivenS3SourceRecord; +import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -154,8 +154,8 @@ void testPoll() throws Exception { setPrivateField(s3SourceTask, "sourceRecordIterator", mockSourceRecordIterator); when(mockSourceRecordIterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false); - final AivenS3SourceRecord aivenS3SourceRecordList = getAivenS3SourceRecord(); - when(mockSourceRecordIterator.next()).thenReturn(aivenS3SourceRecordList); + final S3SourceRecord s3SourceRecordList = getAivenS3SourceRecord(); + when(mockSourceRecordIterator.next()).thenReturn(s3SourceRecordList); final List sourceRecordList = s3SourceTask.poll(); assertThat(sourceRecordList).isNotEmpty(); @@ -172,8 +172,8 @@ void testStop() { assertThat(s3SourceTask.getConnectorStopped()).isTrue(); } - private static AivenS3SourceRecord getAivenS3SourceRecord() { - return new AivenS3SourceRecord(new HashMap<>(), new HashMap<>(), "testtopic", 0, new byte[0], new byte[0], ""); + private static S3SourceRecord getAivenS3SourceRecord() { + return new S3SourceRecord(new HashMap<>(), new HashMap<>(), "testtopic", 0, new byte[0], new byte[0], ""); } @SuppressWarnings("PMD.AvoidAccessibilityAlteration") diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java index 9e1b65ec4..cc7d765c6 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java @@ -66,7 +66,7 @@ class RecordProcessorTest { private FileReader fileReader; private AtomicBoolean connectorStopped; - private Iterator sourceRecordIterator; + private Iterator sourceRecordIterator; @BeforeEach void setUp() { @@ -98,7 +98,7 @@ void testProcessRecordsWithRecords() throws ConnectException { when(s3SourceConfig.getInt(S3SourceConfig.MAX_POLL_RECORDS)).thenReturn(5); when(sourceRecordIterator.hasNext()).thenReturn(true, false); // One iteration with records - final AivenS3SourceRecord mockRecord = mock(AivenS3SourceRecord.class); + final S3SourceRecord mockRecord = mock(S3SourceRecord.class); when(sourceRecordIterator.next()).thenReturn(mockRecord); final List results = new ArrayList<>(); @@ -138,7 +138,7 @@ void testProcessRecordsConnectorStopped() { @Test void testCreateSourceRecords() { - final AivenS3SourceRecord mockRecord = mock(AivenS3SourceRecord.class); + final S3SourceRecord mockRecord = mock(S3SourceRecord.class); when(mockRecord.getTopic()).thenReturn("test-topic"); when(mockRecord.key()).thenReturn("mock-key".getBytes(StandardCharsets.UTF_8)); when(mockRecord.value()).thenReturn("mock-value".getBytes(StandardCharsets.UTF_8));