Skip to content

Commit

Permalink
Improve logging and some exception handling for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani authored Nov 6, 2024
1 parent ec96a72 commit 5084cbf
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
public interface IntegrationBase {

String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0";
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";

default AdminClient newAdminClient(final KafkaContainer kafka) {
final Properties adminClientConfig = new Properties();
Expand All @@ -81,9 +83,9 @@ static void extractConnectorPlugin(File pluginDir) throws IOException, Interrupt
}

static File getPluginDir() throws IOException {
final File testDir = Files.createTempDirectory("s3-source-connector-for-apache-kafka-test-").toFile();
final File testDir = Files.createTempDirectory(S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile();

final File pluginDir = new File(testDir, "plugins/s3-source-connector-for-apache-kafka/");
final File pluginDir = new File(testDir, PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA);
assert pluginDir.mkdirs();
return pluginDir;
}
Expand All @@ -98,7 +100,7 @@ static KafkaContainer createKafkaContainer() {
}

static String topicName(final TestInfo testInfo) {
return testInfo.getTestMethod().get().getName();// + "-" + testInfo.getDisplayName().hashCode();
return testInfo.getTestMethod().get().getName();
}

static void createTopics(final AdminClient adminClient, final List<String> topicNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -50,7 +49,6 @@
import io.aiven.kafka.connect.s3.source.input.InputFormat;
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.testutils.ContentUtils;
import io.aiven.kafka.connect.s3.source.testutils.S3OutputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
Expand All @@ -61,7 +59,6 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -202,20 +199,6 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo)
assertThat(records.get(4)).isEqualTo("EE");
}

@Test
void multiPartUploadBytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedException {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);

connectRunner.createConnector(connectorConfig);
final String partition = "00001";
final String key = topicName + "-" + partition + "-" + System.currentTimeMillis() + ".txt";
multipartUpload(TEST_BUCKET_NAME, key);
// Poll messages from the Kafka topic and verify the consumed data
final List<String> records = IntegrationBase.consumeMessages(topicName, 1, KAFKA_CONTAINER);
assertThat(records.get(0)).contains("performanceeeqjz");
}

@Test
void avroTest(final TestInfo testInfo) throws ExecutionException, InterruptedException, IOException {
final var topicName = IntegrationBase.topicName(testInfo);
Expand Down Expand Up @@ -384,18 +367,4 @@ public static void saveToS3(final String bucketName, final String folderName, fi
final PutObjectRequest request = new PutObjectRequest(bucketName, folderName + fileNameInS3, fileToWrite);
s3Client.putObject(request);
}

public void multipartUpload(final String bucketName, final String key) {
try (S3OutputStream s3OutputStream = new S3OutputStream(bucketName, key, S3OutputStream.DEFAULT_PART_SIZE,
s3Client);
InputStream resourceStream = Thread.currentThread()
.getContextClassLoader()
.getResourceAsStream(S3_FILE_NAME)) {
assert resourceStream != null;
final byte[] fileBytes = IOUtils.toByteArray(resourceStream);
s3OutputStream.write(fileBytes);
} catch (IOException e) {
LOGGER.error(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.utils.AivenS3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.FileReader;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import io.aiven.kafka.connect.s3.source.utils.Version;

Expand Down Expand Up @@ -72,7 +72,7 @@ public class S3SourceTask extends SourceTask {
private S3SourceConfig s3SourceConfig;
private AmazonS3 s3Client;

private Iterator<AivenS3SourceRecord> sourceRecordIterator;
private Iterator<S3SourceRecord> sourceRecordIterator;
private Optional<Converter> keyConverter;

private Converter valueConverter;
Expand Down Expand Up @@ -144,24 +144,35 @@ private void prepareReaderFromOffsetStorageReader() {

@Override
public List<SourceRecord> poll() throws InterruptedException {
LOGGER.info("Polling again");
LOGGER.info("Polling for new records...");
synchronized (pollLock) {
final List<SourceRecord> results = new ArrayList<>(s3SourceConfig.getInt(MAX_POLL_RECORDS));

if (connectorStopped.get()) {
LOGGER.info("Connector has been stopped. Returning empty result list.");
return results;
}

while (!connectorStopped.get()) {
try {
LOGGER.info("Number of records sent {}", extractSourceRecords(results).size());
extractSourceRecords(results);
LOGGER.info("Number of records extracted and sent: {}", results.size());
return results;
} catch (AmazonS3Exception | DataException exception) {
if (handleException(exception)) {
} catch (AmazonS3Exception exception) {
if (exception.isRetryable()) {
LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...",
exception);
pollLock.wait(ERROR_BACKOFF);

prepareReaderFromOffsetStorageReader();
} else {
LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception);
return null; // NOPMD
}
} catch (DataException exception) {
LOGGER.warn("DataException occurred during polling. No retries will be attempted.", exception);
} catch (final Throwable t) { // NOPMD
// This task has failed, so close any resources (may be reopened if needed) before throwing
LOGGER.error("Unexpected error encountered. Closing resources and stopping task.", t);
closeResources();
throw t;
}
Expand All @@ -170,23 +181,6 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}

private boolean handleException(final RuntimeException exception) throws InterruptedException {
if (exception instanceof AmazonS3Exception) {
if (((AmazonS3Exception) exception).isRetryable()) {
LOGGER.warn("Retryable error while polling. Will sleep and try again.", exception);
Thread.sleep(ERROR_BACKOFF);

prepareReaderFromOffsetStorageReader();
} else {
return true;
}
}
if (exception instanceof DataException) {
LOGGER.warn("DataException. Will NOT try again.", exception);
}
return false;
}

private List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) throws InterruptedException {
waitForObjects();
if (connectorStopped.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ List<Object> readAvroRecords(final InputStream content, final DatumReader<Generi
try (DataFileReader<GenericRecord> reader = new DataFileReader<>(sin, datumReader)) {
reader.forEach(records::add);
} catch (IOException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
LOGGER.error("Failed to read records from DataFileReader for S3 object stream. Error: {}",
e.getMessage(), e);
}
} catch (IOException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
LOGGER.error("Failed to initialize SeekableByteArrayInput for S3 object stream. Error: {}", e.getMessage(),
e);
}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou
try {
return objectMapper.writeValueAsBytes(record);
} catch (JsonProcessingException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
LOGGER.error("Failed to serialize record to JSON bytes. Error: {}", e.getMessage(), e);
return new byte[0];
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private RecordProcessor() {

}

public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceRecord> sourceRecordIterator,
public static List<SourceRecord> processRecords(final Iterator<S3SourceRecord> sourceRecordIterator,
final List<SourceRecord> results, final S3SourceConfig s3SourceConfig,
final Optional<Converter> keyConverter, final Converter valueConverter,
final AtomicBoolean connectorStopped, final Transformer transformer, final FileReader fileReader,
Expand All @@ -52,9 +52,9 @@ public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceReco
final int maxPollRecords = s3SourceConfig.getInt(S3SourceConfig.MAX_POLL_RECORDS);

for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) {
final AivenS3SourceRecord aivenS3SourceRecord = sourceRecordIterator.next();
if (aivenS3SourceRecord != null) {
final SourceRecord sourceRecord = createSourceRecord(aivenS3SourceRecord, s3SourceConfig, keyConverter,
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
if (s3SourceRecord != null) {
final SourceRecord sourceRecord = createSourceRecord(s3SourceRecord, s3SourceConfig, keyConverter,
valueConverter, conversionConfig, transformer, fileReader, offsetManager);
results.add(sourceRecord);
}
Expand All @@ -63,26 +63,24 @@ public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceReco
return results;
}

static SourceRecord createSourceRecord(final AivenS3SourceRecord aivenS3SourceRecord,
final S3SourceConfig s3SourceConfig, final Optional<Converter> keyConverter, final Converter valueConverter,
static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, final S3SourceConfig s3SourceConfig,
final Optional<Converter> keyConverter, final Converter valueConverter,
final Map<String, String> conversionConfig, final Transformer transformer, final FileReader fileReader,
final OffsetManager offsetManager) {

final String topic = aivenS3SourceRecord.getTopic();
final Optional<SchemaAndValue> keyData = keyConverter
.map(c -> c.toConnectData(topic, aivenS3SourceRecord.key()));
final String topic = s3SourceRecord.getTopic();
final Optional<SchemaAndValue> keyData = keyConverter.map(c -> c.toConnectData(topic, s3SourceRecord.key()));

transformer.configureValueConverter(conversionConfig, s3SourceConfig);
valueConverter.configure(conversionConfig, false);
try {
final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, aivenS3SourceRecord.value());
offsetManager.updateCurrentOffsets(aivenS3SourceRecord.getPartitionMap(),
aivenS3SourceRecord.getOffsetMap());
aivenS3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(aivenS3SourceRecord.getPartitionMap()));
return aivenS3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue);
final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, s3SourceRecord.value());
offsetManager.updateCurrentOffsets(s3SourceRecord.getPartitionMap(), s3SourceRecord.getOffsetMap());
s3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(s3SourceRecord.getPartitionMap()));
return s3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue);
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
fileReader.addFailedObjectKeys(aivenS3SourceRecord.getObjectKey());
fileReader.addFailedObjectKeys(s3SourceRecord.getObjectKey());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;

public class AivenS3SourceRecord {
public class S3SourceRecord {
private final Map<String, Object> partitionMap;
private Map<String, Object> offsetMap;
private final String topic;
Expand All @@ -34,7 +34,7 @@ public class AivenS3SourceRecord {

private final String objectKey;

public AivenS3SourceRecord(final Map<String, Object> partitionMap, final Map<String, Object> offsetMap,
public S3SourceRecord(final Map<String, Object> partitionMap, final Map<String, Object> offsetMap,
final String topic, final Integer topicPartition, final byte[] recordKey, final byte[] recordValue,
final String objectKey) {
this.partitionMap = new HashMap<>(partitionMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* Iterator that processes S3 files and creates Kafka source records. Supports different output formats (Avro, JSON,
* Parquet).
*/
public final class SourceRecordIterator implements Iterator<AivenS3SourceRecord> {
public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class);
public static final String PATTERN_TOPIC_KEY = "topicName";
public static final String PATTERN_PARTITION_KEY = "partitionId";
Expand All @@ -54,7 +54,7 @@ public final class SourceRecordIterator implements Iterator<AivenS3SourceRecord>
private String currentObjectKey;

private final Iterator<S3ObjectSummary> s3ObjectSummaryIterator;
private Iterator<AivenS3SourceRecord> recordIterator = Collections.emptyIterator();
private Iterator<S3SourceRecord> recordIterator = Collections.emptyIterator();

private final OffsetManager offsetManager;

Expand Down Expand Up @@ -94,7 +94,7 @@ private void nextS3Object() {
}
}

private Iterator<AivenS3SourceRecord> createIteratorForCurrentFile() throws IOException {
private Iterator<S3SourceRecord> createIteratorForCurrentFile() throws IOException {
try (S3Object s3Object = s3Client.getObject(bucketName, currentObjectKey);
S3ObjectInputStream inputStream = s3Object.getObjectContent()) {

Expand Down Expand Up @@ -124,15 +124,15 @@ private Iterator<AivenS3SourceRecord> createIteratorForCurrentFile() throws IOEx
}

@SuppressWarnings("PMD.CognitiveComplexity")
private Iterator<AivenS3SourceRecord> getObjectIterator(final InputStream valueInputStream, final String topic,
private Iterator<S3SourceRecord> getObjectIterator(final InputStream valueInputStream, final String topic,
final int topicPartition, final long startOffset, final Transformer transformer,
final Map<String, Object> partitionMap) {
return new Iterator<>() {
private final Iterator<AivenS3SourceRecord> internalIterator = readNext().iterator();
private final Iterator<S3SourceRecord> internalIterator = readNext().iterator();

private List<AivenS3SourceRecord> readNext() {
private List<S3SourceRecord> readNext() {
final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8);
final List<AivenS3SourceRecord> sourceRecords = new ArrayList<>();
final List<S3SourceRecord> sourceRecords = new ArrayList<>();

int numOfProcessedRecs = 1;
boolean checkOffsetMap = true;
Expand All @@ -157,7 +157,7 @@ private List<AivenS3SourceRecord> readNext() {
return sourceRecords;
}

private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic,
private S3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic,
final int topicPartition, final OffsetManager offsetManager, final long startOffset,
final Map<String, Object> partitionMap) {

Expand All @@ -175,8 +175,7 @@ private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value

final Map<String, Object> offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentOffset);

return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value,
currentObjectKey);
return new S3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value, currentObjectKey);
}

@Override
Expand All @@ -185,7 +184,7 @@ public boolean hasNext() {
}

@Override
public AivenS3SourceRecord next() {
public S3SourceRecord next() {
return internalIterator.next();
}
};
Expand All @@ -197,7 +196,7 @@ public boolean hasNext() {
}

@Override
public AivenS3SourceRecord next() {
public S3SourceRecord next() {
if (!recordIterator.hasNext()) {
nextS3Object();
}
Expand Down
Loading

0 comments on commit 5084cbf

Please sign in to comment.