diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java b/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java index 10164c28..b0f635df 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java @@ -24,33 +24,45 @@ import org.apache.kafka.connect.source.SourceTaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class OffsetManager { - private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); - public static final String SEPARATOR = "_"; + /** + * The local manager data. + */ private final Map, Map> offsets; + /** + * The context in which this is running. + */ private final SourceTaskContext context; + /** + * Constructor + * @param context the context for this instance to use. + */ public OffsetManager(final SourceTaskContext context) { - this.context = context; - offsets = new ConcurrentHashMap<>(); + this(context, new ConcurrentHashMap<>()); } /** - * FOR TESTING ONLY + * Package private for testing. + * @param context the context for this instance to use. * @param offsets the offsets */ - protected OffsetManager(final Map, Map> offsets) { - this.context = null; + protected OffsetManager(final SourceTaskContext context, final Map, Map> offsets) { + this.context = context; this.offsets = offsets; } - public E getEntry(OffsetManagerKey key, Function, E> creator) { - Map data = offsets.compute(key.getPartitionMap(), (k, v) -> { + /** + * Get an entry from the offset manager. + * This method will return the local copy if it has been created otherwise will get the data from Kafka. + * @param key the key for the entry. + * @param creator a function to create the connector defined offset entry from a Map of string to object. + * @return the entry. + */ + public E getEntry(final OffsetManagerKey key, final Function, E> creator) { + final Map data = offsets.compute(key.getPartitionMap(), (k, v) -> { if (v == null) { return context.offsetStorageReader().offset(key.getPartitionMap()); } else { @@ -59,7 +71,11 @@ public E getEntry(OffsetManagerKey key, Function, E> creator return creator.apply(data); } - public void updateCurrentOffsets(E entry) { + /** + * Copies the entry into the offset manager data. + * @param entry the entry to update. + */ + public void updateCurrentOffsets(final E entry) { offsets.compute(entry.getManagerKey().getPartitionMap(), (k, v) -> { if (v == null) { return new HashMap<>(entry.getProperties()); diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/OffsetManagerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/OffsetManagerTest.java index d6ce3258..e443a563 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/OffsetManagerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/OffsetManagerTest.java @@ -75,13 +75,13 @@ void testUpdateCurrentOffsets() { final Map, Map> offsets = new HashMap<>(); offsets.put(offsetEntry.getManagerKey().getPartitionMap(), offsetEntry.getProperties()); - OffsetManager underTest = new OffsetManager<>(offsets); + OffsetManager underTest = new OffsetManager<>(mock(SourceTaskContext.class), offsets); offsetEntry.setProperty("MyProperty", "WOW"); underTest.updateCurrentOffsets(offsetEntry); - TestingOffsetManagerEntry result = offsetManager.getEntry(offsetEntry.getManagerKey(), TestingOffsetManagerEntry::new); + TestingOffsetManagerEntry result = underTest.getEntry(offsetEntry.getManagerKey(), TestingOffsetManagerEntry::new); // Map, Map> offsetMap = underTest.getOffsets(); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index b3c7631a..997a876a 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -40,7 +40,6 @@ import io.aiven.kafka.connect.common.source.input.TransformerFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; -import io.aiven.kafka.connect.s3.source.utils.RecordProcessor; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import io.aiven.kafka.connect.s3.source.utils.Version; @@ -107,7 +106,8 @@ public void start(final Map props) { this.transformer = TransformerFactory.getTransformer(s3SourceConfig); offsetManager = new OffsetManager(context); awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); - prepareReaderFromOffsetStorageReader(); + setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, + awsv2SourceClient)); this.taskInitialized = true; } @@ -126,11 +126,13 @@ private void initializeConverters() { } } - private void prepareReaderFromOffsetStorageReader() { - sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, - awsv2SourceClient); + // mostly for testing + protected void setSourceRecordIterator(Iterator iterator) { + sourceRecordIterator = iterator; } + + @Override public List poll() throws InterruptedException { LOGGER.info("Polling for new records..."); @@ -144,6 +146,7 @@ public List poll() throws InterruptedException { while (!connectorStopped.get()) { try { + waitForObjects(); extractSourceRecords(results); LOGGER.info("Number of records extracted and sent: {}", results.size()); return results; @@ -153,7 +156,9 @@ public List poll() throws InterruptedException { exception); pollLock.wait(ERROR_BACKOFF); - prepareReaderFromOffsetStorageReader(); + setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, + awsv2SourceClient)); + } else { LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception); return null; // NOPMD @@ -170,20 +175,35 @@ public List poll() throws InterruptedException { } } - private List extractSourceRecords(final List results) throws InterruptedException { - waitForObjects(); + // package private for testing + List extractSourceRecords(final List results) throws InterruptedException { if (connectorStopped.get()) { return results; } - return RecordProcessor.processRecords(sourceRecordIterator, results, s3SourceConfig, keyConverter, - valueConverter, connectorStopped, this.transformer, awsv2SourceClient, offsetManager); + final int maxPollRecords = s3SourceConfig.getMaxPollRecords(); + + for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) { + final S3SourceRecord s3SourceRecord = sourceRecordIterator.next(); + if (s3SourceRecord != null) { + try { + offsetManager.updateCurrentOffsets(s3SourceRecord.getOffsetManagerEntry()); + results.add(s3SourceRecord.getSourceRecord(keyConverter, valueConverter)); + } catch (DataException e) { + LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); + awsv2SourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey()); + } + } + } + return results; } + private void waitForObjects() throws InterruptedException { while (!sourceRecordIterator.hasNext() && !connectorStopped.get()) { LOGGER.debug("Blocking until new S3 files are available."); Thread.sleep(S_3_POLL_INTERVAL_MS); - prepareReaderFromOffsetStorageReader(); + setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, + awsv2SourceClient)); } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java deleted file mode 100644 index 90b3a28d..00000000 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.utils; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.aiven.kafka.connect.common.OffsetManager; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.errors.DataException; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.Converter; - -import io.aiven.kafka.connect.common.source.input.Transformer; -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class RecordProcessor { - - private static final Logger LOGGER = LoggerFactory.getLogger(RecordProcessor.class); - - private RecordProcessor() { - - } - - public static List processRecords(final Iterator sourceRecordIterator, - final List results, final S3SourceConfig s3SourceConfig, - final Optional keyConverter, final Converter valueConverter, - final AtomicBoolean connectorStopped, final Transformer transformer, final AWSV2SourceClient sourceClient, - final OffsetManager offsetManager) { - - final Map conversionConfig = new HashMap<>(); - final int maxPollRecords = s3SourceConfig.getMaxPollRecords(); - - for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) { - final S3SourceRecord s3SourceRecord = sourceRecordIterator.next(); - if (s3SourceRecord != null) { - final SourceRecord sourceRecord = createSourceRecord(s3SourceRecord, s3SourceConfig, keyConverter, - valueConverter, conversionConfig, transformer, sourceClient, offsetManager); - results.add(sourceRecord); - } - } - - return results; - } - - static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, final S3SourceConfig s3SourceConfig, - final Optional keyConverter, final Converter valueConverter, - final Map conversionConfig, final Transformer transformer, - final AWSV2SourceClient sourceClient, final OffsetManager offsetManager) { - - - transformer.configureValueConverter(conversionConfig, s3SourceConfig); - valueConverter.configure(conversionConfig, false); - - try { - offsetManager.updateCurrentOffsets(s3SourceRecord.getOffsetManagerEntry()); - return s3SourceRecord.getSourceRecord(keyConverter, valueConverter); - } catch (DataException e) { - LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); - sourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey()); - throw e; - } - } -} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManager.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManager.java deleted file mode 100644 index 5a2b6957..00000000 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManager.java +++ /dev/null @@ -1,146 +0,0 @@ -///* -// * Copyright 2024 Aiven Oy -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package io.aiven.kafka.connect.s3.source.utils; -// -//import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; -//import static java.util.stream.Collectors.toMap; -// -//import java.util.ArrayList; -//import java.util.Arrays; -//import java.util.Collections; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -//import java.util.Set; -//import java.util.stream.Collectors; -// -//import io.aiven.kafka.connect.common.OffsetManager; -//import org.apache.kafka.connect.source.SourceTaskContext; -// -//import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -// -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//public class S3OffsetManager extends OffsetManager { -// -// private static final Logger LOGGER = LoggerFactory.getLogger(S3OffsetManager.class); -// public static final String SEPARATOR = "_"; -// private final Map, Map> offsets; -// -// public S3OffsetManager(final SourceTaskContext context, final S3SourceConfig s3SourceConfig) { -// super(context, ) -// -// final String s3Bucket = s3SourceConfig.getAwsS3BucketName(); -// final Set partitions = parsePartitions(s3SourceConfig); -// final Set topics = parseTopics(s3SourceConfig); -// -// // Build the partition keys and fetch offsets from offset storage -// final List> partitionKeys = buildPartitionKeys(s3Bucket, partitions, topics); -// final Map, Map> offsetMap = context.offsetStorageReader() -// .offsets(partitionKeys); -// -// LOGGER.info(" ********** offsetMap ***** {}", offsetMap); -// this.offsets = offsetMap.entrySet() -// .stream() -// .filter(e -> e.getValue() != null) -// .collect(toMap(entry -> new HashMap<>(entry.getKey()), entry -> new HashMap<>(entry.getValue()))); -// LOGGER.info(" ********** offsets ***** {}", offsets); -// } -// -// public Map, Map> getOffsets() { -// return Collections.unmodifiableMap(offsets); -// } -// -// public long incrementAndUpdateOffsetMap(final Map partitionMap, final String currentObjectKey, -// final long startOffset) { -// if (offsets.containsKey(partitionMap)) { -// final Map offsetValue = new HashMap<>(offsets.get(partitionMap)); -// if (offsetValue.containsKey(getObjectMapKey(currentObjectKey))) { -// final long newOffsetVal = (long) offsetValue.get(getObjectMapKey(currentObjectKey)) + 1L; -// offsetValue.put(getObjectMapKey(currentObjectKey), newOffsetVal); -// offsets.put(partitionMap, offsetValue); -// return newOffsetVal; -// } else { -// offsetValue.put(getObjectMapKey(currentObjectKey), startOffset); -// offsets.put(partitionMap, offsetValue); -// return startOffset; -// } -// } -// return startOffset; -// } -// -// public String getObjectMapKey(final String currentObjectKey) { -// return OBJECT_KEY + SEPARATOR + currentObjectKey; -// } -// -// public boolean shouldSkipRecord(final Map partitionMap, final String currentObjectKey, -// final long numOfProcessedRecs) { -// if (offsets.containsKey(partitionMap)) { -// final Map offsetVal = offsets.get(partitionMap); -// final String objectMapKey = getObjectMapKey(currentObjectKey); -// -// if (offsetVal.containsKey(objectMapKey)) { -// final long offsetValue = (long) offsetVal.get(objectMapKey); -// return numOfProcessedRecs <= offsetValue; -// } -// } -// return false; -// } -// -// public void createNewOffsetMap(final Map partitionMap, final String objectKey, -// final long offsetId) { -// final Map offsetMap = getOffsetValueMap(objectKey, offsetId); -// offsets.put(partitionMap, offsetMap); -// } -// -// public Map getOffsetValueMap(final String currentObjectKey, final long offsetId) { -// final Map offsetMap = new HashMap<>(); -// offsetMap.put(getObjectMapKey(currentObjectKey), offsetId); -// -// return offsetMap; -// } -// -// void updateCurrentOffsets(final Map partitionMap, final Map offsetValueMap) { -// if (offsets.containsKey(partitionMap)) { -// final Map offsetMap = new HashMap<>(offsets.get(partitionMap)); -// offsetMap.putAll(offsetValueMap); -// offsets.put(partitionMap, offsetMap); -// } else { -// offsets.put(partitionMap, offsetValueMap); -// } -// } -// -// private static Set parsePartitions(final S3SourceConfig s3SourceConfig) { -// final String partitionString = s3SourceConfig.getTargetTopicPartitions(); -// return Arrays.stream(partitionString.split(",")).map(Integer::parseInt).collect(Collectors.toSet()); -// } -// -// private static Set parseTopics(final S3SourceConfig s3SourceConfig) { -// final String topicString = s3SourceConfig.getTargetTopics(); -// return Arrays.stream(topicString.split(",")).collect(Collectors.toSet()); -// } -// -// private static List> buildPartitionKeys(final String bucket, final Set partitions, -// final Set topics) { -// final List> partitionKeys = new ArrayList<>(); -// partitions.forEach(partition -> topics.forEach(topic -> { -// partitionKeys.add(ConnectUtils.getPartitionMap(topic, partition, bucket)); -// })); -// return partitionKeys; -// } -//} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java index b9a8ffbd..3faa31d0 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java @@ -22,10 +22,10 @@ import java.util.List; import java.util.Map; -public class S3OffsetManagerEntry implements OffsetManager.OffsetManagerEntry { +public final class S3OffsetManagerEntry implements OffsetManager.OffsetManagerEntry { // package private statics for testing. - // TODO make this private after values in S3SourceTask are no longer needed + // TODO make this package private after values in S3SourceTask are no longer needed public static final String BUCKET = "bucket"; public static final String OBJECT_KEY = "objectKey"; public static final String TOPIC = "topic"; @@ -60,11 +60,11 @@ public S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final /** * Constructs an OffsetManagerEntry from an existing map. * used by {@link #fromProperties(Map)}. + * Package private for testing * @param properties the property map. */ - private S3OffsetManagerEntry(final Map properties) { - data = new HashMap<>(); - data.putAll(properties); + private S3OffsetManagerEntry(final Map properties) { + data = new HashMap<>(properties); for (String field : RESTRICTED_KEYS) { if (data.get(field) == null) { throw new IllegalArgumentException("Missing '"+field+"' property"); @@ -84,12 +84,11 @@ public S3OffsetManagerEntry fromProperties(final Map properties) if (properties == null) { return null; } - Long recordCount = (Long) properties.get(RECORD_COUNT); + Map ourProperties = new HashMap<>(properties); + Long recordCount = (Long) ourProperties.computeIfAbsent(RECORD_COUNT, s -> Long.valueOf(0L)); - S3OffsetManagerEntry result = new S3OffsetManagerEntry(properties); - if (recordCount != null) { - result.recordCount = recordCount; - } + S3OffsetManagerEntry result = new S3OffsetManagerEntry(ourProperties); + result.recordCount = recordCount; return result; } @@ -146,12 +145,19 @@ public Integer getPartition() { /** * Gets the Kafka topic for the current object. - * @return the Kafka topic for the current object.. + * @return the Kafka topic for the current object. */ public String getTopic() { return (String) data.get(TOPIC); } + /** + * Gets the S3 bucket for the current object. + * @return the S3 Bucket for the current object. + */ + public String getBucket() { + return (String) data.get(BUCKET); + } /** * Creates a new offset map. No defensive copy is necessary. * diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index fff12ffc..ef9530f2 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -25,7 +25,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.Converter; -public class S3SourceRecord { +public final class S3SourceRecord { /** The S3OffsetManagerEntry for this source record */ private final S3OffsetManagerEntry offsetManagerEntry; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index b20b713f..8f435ba3 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -20,16 +20,25 @@ import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.lang.reflect.Field; +import java.net.ConnectException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; +import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import org.apache.kafka.connect.converters.ByteArrayConverter; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; @@ -55,28 +64,27 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -@ExtendWith(MockitoExtension.class) + final class S3SourceTaskTest { private static final Random RANDOM = new Random(); private Map properties; - private static BucketAccessor testBucketAccessor; private static final String TEST_BUCKET = "test-bucket"; + private static final String TOPIC = "TOPIC1"; + + private static final int PARTITION = 1; + + private static final String OBJECT_KEY = "object_key"; + private static S3Mock s3Api; private static AmazonS3 s3Client; private static Map commonProperties; - @Mock - private SourceTaskContext mockedSourceTaskContext; - @Mock private OffsetStorageReader mockedOffsetStorageReader; @BeforeAll @@ -103,7 +111,7 @@ public static void setUpClass() { s3Client = builder.build(); - testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET); + BucketAccessor testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET); testBucketAccessor.createBucket(); } @@ -116,8 +124,6 @@ public static void tearDownClass() { public void setUp() { properties = new HashMap<>(commonProperties); s3Client.createBucket(TEST_BUCKET); - mockedSourceTaskContext = mock(SourceTaskContext.class); - mockedOffsetStorageReader = mock(OffsetStorageReader.class); } @AfterEach @@ -174,7 +180,8 @@ void testStop() { } private static S3SourceRecord getAivenS3SourceRecord() { - return new S3SourceRecord(new HashMap<>(), new HashMap<>(), "testtopic", 0, new byte[0], new byte[0], ""); + S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + return new S3SourceRecord(entry, new byte[0], new byte[0]); } @SuppressWarnings("PMD.AvoidAccessibilityAlteration") @@ -187,8 +194,10 @@ private void setPrivateField(final Object object, final String fieldName, final } private void startSourceTask(final S3SourceTask s3SourceTask) { - s3SourceTask.initialize(mockedSourceTaskContext); + SourceTaskContext mockedSourceTaskContext = mock(SourceTaskContext.class); + mockedOffsetStorageReader = mock(OffsetStorageReader.class); when(mockedSourceTaskContext.offsetStorageReader()).thenReturn(mockedOffsetStorageReader); + s3SourceTask.initialize(mockedSourceTaskContext); setBasicProperties(); s3SourceTask.start(properties); @@ -205,4 +214,80 @@ private void setBasicProperties() { properties.put(TARGET_TOPICS, "testtopic"); } + + @Test + void testExtractSourceRecordsWithEmptyIterator() throws InterruptedException { + final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); + when(s3SourceConfig.getMaxPollRecords()).thenReturn(5); + final Iterator sourceRecordIterator = Collections.emptyIterator(); + + final S3SourceTask s3SourceTask = new TestingS3SourceTask(sourceRecordIterator); + startSourceTask(s3SourceTask); + + final List results = s3SourceTask.extractSourceRecords(new ArrayList<>()); + assertThat(results).isEmpty(); + } + + private void assertEquals(S3SourceRecord s3Record, SourceRecord sourceRecord) { + assertThat(sourceRecord).isNotNull(); + S3OffsetManagerEntry offsetManagerEntry = s3Record.getOffsetManagerEntry(); + assertThat(sourceRecord.sourcePartition()).isEqualTo(offsetManagerEntry.getManagerKey().getPartitionMap()); + assertThat(sourceRecord.sourceOffset()).isEqualTo(offsetManagerEntry.getProperties()); + assertThat(sourceRecord.key()).isEqualTo(s3Record.key()); + assertThat(sourceRecord.value()).isEqualTo(s3Record.value()); + } + @Test + void testExtractSourceRecordsWithRecords() throws ConnectException, InterruptedException { + final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); + when(s3SourceConfig.getMaxPollRecords()).thenReturn(5); + final List lst = new ArrayList<>(); + S3OffsetManagerEntry offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + lst.add(new S3SourceRecord(offsetManagerEntry, "Hello".getBytes(StandardCharsets.UTF_8), "Hello World".getBytes(StandardCharsets.UTF_8))); + offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY+"a", TOPIC, PARTITION); + lst.add(new S3SourceRecord(offsetManagerEntry, "Goodbye".getBytes(StandardCharsets.UTF_8), "Goodbye cruel World".getBytes(StandardCharsets.UTF_8))); + + final Iterator sourceRecordIterator = lst.iterator(); + + final S3SourceTask s3SourceTask = new TestingS3SourceTask(sourceRecordIterator); + startSourceTask(s3SourceTask); + + final List results = s3SourceTask.extractSourceRecords(new ArrayList<>()); + assertThat(results).hasSize(2); + assertEquals(lst.get(0), results.get(0)); + assertEquals(lst.get(1), results.get(1)); + } + + @Test + void testExtractSourceRecordsWhenConnectorStopped() throws InterruptedException { + final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); + when(s3SourceConfig.getMaxPollRecords()).thenReturn(5); + final List lst = new ArrayList<>(); + S3OffsetManagerEntry offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + lst.add(new S3SourceRecord(offsetManagerEntry, "Hello".getBytes(StandardCharsets.UTF_8), "Hello World".getBytes(StandardCharsets.UTF_8))); + offsetManagerEntry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY+"a", TOPIC, PARTITION); + lst.add(new S3SourceRecord(offsetManagerEntry, "Goodbye".getBytes(StandardCharsets.UTF_8), "Goodbye cruel World".getBytes(StandardCharsets.UTF_8))); + + final Iterator sourceRecordIterator = lst.iterator(); + + final S3SourceTask s3SourceTask = new TestingS3SourceTask(sourceRecordIterator); + startSourceTask(s3SourceTask); + s3SourceTask.stop(); + + + final List results = s3SourceTask.extractSourceRecords(new ArrayList<>()); + assertThat(results).isEmpty(); + } + + private static class TestingS3SourceTask extends S3SourceTask { + + TestingS3SourceTask(Iterator realIterator) { + super(); + super.setSourceRecordIterator(realIterator); + } + + @Override + protected void setSourceRecordIterator(Iterator iterator) { + // do nothing. + } + } } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/OffsetManagerTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/OffsetManagerTest.java deleted file mode 100644 index 8154767e..00000000 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/OffsetManagerTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.utils; - -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; -import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.kafka.connect.source.SourceTaskContext; -import org.apache.kafka.connect.storage.OffsetStorageReader; - -import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; - -final class OffsetManagerTest { - - private Map properties; - private static final String TEST_BUCKET = "test-bucket"; - - @Mock - private SourceTaskContext sourceTaskContext; - - private S3SourceConfig s3SourceConfig; - - private S3OffsetManager offsetManager; - - @BeforeEach - public void setUp() { - properties = new HashMap<>(); - setBasicProperties(); - s3SourceConfig = new S3SourceConfig(properties); - } - - @Test - void testWithOffsets() { - sourceTaskContext = mock(SourceTaskContext.class); - final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); - when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); - - final Map partitionKey = new HashMap<>(); - partitionKey.put("topic", "topic1"); - partitionKey.put("partition", 0); - partitionKey.put("bucket", TEST_BUCKET); - - final Map offsetValue = new HashMap<>(); - offsetValue.put("object_key_file", 5L); - final Map, Map> offsets = new HashMap<>(); - offsets.put(partitionKey, offsetValue); - - when(offsetStorageReader.offsets(any())).thenReturn(offsets); - - offsetManager = new S3OffsetManager(sourceTaskContext, s3SourceConfig); - - final Map, Map> retrievedOffsets = offsetManager.getOffsets(); - assertThat(retrievedOffsets.size()).isEqualTo(1); - assertThat(retrievedOffsets.values().iterator().next().get("object_key_file")).isEqualTo(5L); - } - - @Test - void testIncrementAndUpdateOffsetMapExistingOffset() { - sourceTaskContext = mock(SourceTaskContext.class); - final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); - when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); - - // Mock partition and offset values - final String objectKey = "testObject"; - final String offsetObjectKey = OBJECT_KEY + "_" + objectKey; - - final Map partitionKey = new HashMap<>(); - partitionKey.put("topic", "topic1"); - partitionKey.put("partition", 0); - partitionKey.put("bucket", "bucket"); - - final Map offsetValue = new HashMap<>(); - offsetValue.put(offsetObjectKey, 1L); // Existing offset value - final Map, Map> offsets = new HashMap<>(); - offsets.put(partitionKey, offsetValue); - - when(offsetStorageReader.offsets(any())).thenReturn(offsets); // Mock offset retrieval - - // Initialize offset manager - offsetManager = new S3OffsetManager(sourceTaskContext, s3SourceConfig); - - // Invoke method and assert new offset value - final long newOffset = offsetManager.incrementAndUpdateOffsetMap(partitionKey, objectKey, 2L); - - assertThat(newOffset).isEqualTo(2L); // Expect incremented offset - assertThat(offsetManager.getOffsets().get(partitionKey).get(offsetObjectKey)).isEqualTo(2L); // Verify updated - // offset in map - } - - @Test - void testIncrementAndUpdateOffsetMapNonExistingOffset() { - sourceTaskContext = mock(SourceTaskContext.class); - final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); - when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); - - // Mock partition without any existing offset - final Map partitionKey = new HashMap<>(); - partitionKey.put("topic", "topic1"); - partitionKey.put("partition", 0); - - when(offsetStorageReader.offsets(any())).thenReturn(Collections.emptyMap()); // No existing offset - - // Initialize offset manager - offsetManager = new S3OffsetManager(sourceTaskContext, s3SourceConfig); - - // Invoke method and assert new offset value - final long startOffset = 5L; - final long newOffset = offsetManager.incrementAndUpdateOffsetMap(partitionKey, "", startOffset); - - // Expect the startOffset to be returned when no existing offset is found - assertThat(newOffset).isEqualTo(startOffset); - } - - private void setBasicProperties() { - properties.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET); - properties.put(TARGET_TOPIC_PARTITIONS, "0,1"); - properties.put(TARGET_TOPICS, "topic1,topic2"); - } -} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java deleted file mode 100644 index 41d3b997..00000000 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.utils; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.internal.verification.VerificationModeFactory.times; - -import java.net.ConnectException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.Converter; - -import io.aiven.kafka.connect.common.source.input.Transformer; -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class RecordProcessorTest { - - @Mock - private S3SourceConfig s3SourceConfig; - @Mock - private Converter valueConverter; - @Mock - private Transformer transformer; - @Mock - private Converter keyConverter; - @Mock - private S3OffsetManager offsetManager; - - @Mock - private AWSV2SourceClient sourceClient; - - private AtomicBoolean connectorStopped; - private Iterator sourceRecordIterator; - - @BeforeEach - void setUp() { - connectorStopped = new AtomicBoolean(false); - sourceRecordIterator = mock(Iterator.class); - } - - @Test - void testProcessRecordsNoRecords() { - when(s3SourceConfig.getMaxPollRecords()).thenReturn(5); - when(sourceRecordIterator.hasNext()).thenReturn(false); - - final List results = new ArrayList<>(); - final List processedRecords = RecordProcessor.processRecords( - sourceRecordIterator, - results, - s3SourceConfig, - Optional.of(keyConverter), - valueConverter, - connectorStopped, - transformer, sourceClient, offsetManager - ); - - assertThat(processedRecords).as("Processed records should be empty when there are no records.").isEmpty(); - } - - @Test - void testProcessRecordsWithRecords() throws ConnectException { - when(s3SourceConfig.getMaxPollRecords()).thenReturn(5); - when(sourceRecordIterator.hasNext()).thenReturn(true, false); // One iteration with records - - final S3SourceRecord mockRecord = mock(S3SourceRecord.class); - when(sourceRecordIterator.next()).thenReturn(mockRecord); - - final List results = new ArrayList<>(); - RecordProcessor.processRecords( - sourceRecordIterator, - results, - s3SourceConfig, - Optional.of(keyConverter), - valueConverter, - connectorStopped, - transformer, sourceClient, offsetManager - ); - - assertThat(results).hasSize(1); - verify(sourceRecordIterator, times(1)).next(); - } - - @Test - void testProcessRecordsConnectorStopped() { - when(s3SourceConfig.getMaxPollRecords()).thenReturn(5); - connectorStopped.set(true); // Simulate connector stopped - - final List results = new ArrayList<>(); - final List processedRecords = RecordProcessor.processRecords( - sourceRecordIterator, - results, - s3SourceConfig, - Optional.of(keyConverter), - valueConverter, - connectorStopped, - transformer, sourceClient, offsetManager - ); - - assertThat(processedRecords).as("Processed records should be empty when connector is stopped.").isEmpty(); - verify(sourceRecordIterator, never()).next(); - } - - @Test - void testCreateSourceRecords() { - final S3SourceRecord mockRecord = mock(S3SourceRecord.class); - when(mockRecord.getTopic()).thenReturn("test-topic"); - when(mockRecord.key()).thenReturn("mock-key".getBytes(StandardCharsets.UTF_8)); - when(mockRecord.value()).thenReturn("mock-value".getBytes(StandardCharsets.UTF_8)); - - when(valueConverter.toConnectData(anyString(), any())) - .thenReturn(new SchemaAndValue(null, "mock-value-converted")); - when(mockRecord.getSourceRecord(anyString(), any(), any())).thenReturn(mock(SourceRecord.class)); - - final SourceRecord sourceRecords = RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, - Optional.of(keyConverter), valueConverter, new HashMap<>(), transformer, sourceClient, offsetManager); - - assertThat(sourceRecords).isNotNull(); - } -} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java new file mode 100644 index 00000000..44670aaf --- /dev/null +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; +import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import io.aiven.kafka.connect.common.OffsetManager; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +import io.aiven.kafka.connect.config.s3.S3ConfigFragment; +import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import scala.concurrent.java8.FuturesConvertersImpl; + +final class S3OffsetManagerEntryTest { + + private Map properties; + static final String TEST_BUCKET = "test-bucket"; + + static final String TOPIC = "TOPIC1"; + + static final int PARTITION = 1; + + static final String OBJECT_KEY = "object_key"; + + + private SourceTaskContext sourceTaskContext; + + private S3OffsetManagerEntry s3OffsetManagerEntry; + + private OffsetManager offsetManager; + + private OffsetStorageReader offsetStorageReader; + + @BeforeEach + public void setUp() { + properties = new HashMap<>(); + offsetStorageReader = mock(OffsetStorageReader.class); + sourceTaskContext = mock(SourceTaskContext.class); + when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); + offsetManager = new OffsetManager<>(sourceTaskContext); + } + + private Map createPartitionMap() { + final Map partitionKey = new HashMap<>(); + partitionKey.put(S3OffsetManagerEntry.TOPIC, TOPIC); + partitionKey.put(S3OffsetManagerEntry.PARTITION, PARTITION); + partitionKey.put(S3OffsetManagerEntry.BUCKET, TEST_BUCKET); + partitionKey.put(S3OffsetManagerEntry.OBJECT_KEY, OBJECT_KEY); + return partitionKey; + } + + public static S3OffsetManagerEntry newEntry() { + return new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + } + + @Test + void getEntry() { + final Map storedData = new HashMap<>(); + storedData.putAll(createPartitionMap()); + storedData.put("random_entry", 5L); + + when(offsetStorageReader.offset(any())).thenReturn(storedData); + + S3OffsetManagerEntry keyEntry = newEntry(); + + final S3OffsetManagerEntry entry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties); + + assertThat(entry.getPartition()).isEqualTo(PARTITION); + assertThat(entry.getRecordCount()).isEqualTo(0); + assertThat(entry.getTopic()).isEqualTo(TOPIC); + assertThat(entry.getBucket()).isEqualTo(TEST_BUCKET); + assertThat(entry.getProperty("random_entry")).isEqualTo(5L); + verify(sourceTaskContext, times(1)).offsetStorageReader(); + + + // verify second read reads from local data + + final S3OffsetManagerEntry entry2 = offsetManager.getEntry(entry.getManagerKey(), entry::fromProperties); + assertThat(entry2.getPartition()).isEqualTo(PARTITION); + assertThat(entry2.getRecordCount()).isEqualTo(0); + assertThat(entry2.getTopic()).isEqualTo(TOPIC); + assertThat(entry2.getBucket()).isEqualTo(TEST_BUCKET); + assertThat(entry2.getProperty("random_entry")).isEqualTo(5L); + verify(sourceTaskContext, times(1)).offsetStorageReader(); + } + + @Test + void testUpdate() { + + S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + assertThat(entry.getRecordCount()).isEqualTo(0L); + assertThat(entry.getProperty("random_entry")).isNull(); + + offsetManager.updateCurrentOffsets(entry); + + entry.setProperty("random_entry", 5L); + entry.incrementRecordCount(); + assertThat(entry.getRecordCount()).isEqualTo(1L); + + offsetManager.updateCurrentOffsets(entry); + + final S3OffsetManagerEntry entry2 = offsetManager.getEntry(entry.getManagerKey(), entry::fromProperties); + assertThat(entry2.getPartition()).isEqualTo(PARTITION); + assertThat(entry2.getRecordCount()).isEqualTo(1L); + assertThat(entry2.getTopic()).isEqualTo(TOPIC); + assertThat(entry2.getBucket()).isEqualTo(TEST_BUCKET); + assertThat(entry2.getProperty("random_entry")).isEqualTo(5L); + verify(sourceTaskContext.offsetStorageReader(), times(0)); + // offset in map + } +} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 4feae2f5..2e01e613 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -35,6 +35,8 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,10 +48,16 @@ final class SourceRecordIteratorTest { private AWSV2SourceClient mockSourceApiClient; + private OffsetStorageReader offsetStorageReader; + @BeforeEach public void setUp() { mockConfig = mock(S3SourceConfig.class); - offsetManager = new OffsetManager(new HashMap<>()); + + offsetStorageReader = mock(OffsetStorageReader.class); + SourceTaskContext taskContext = mock(SourceTaskContext.class); + when(taskContext.offsetStorageReader()).thenReturn(offsetStorageReader); + offsetManager = new OffsetManager(taskContext); mockTransformer = mock(Transformer.class); mockSourceApiClient = mock(AWSV2SourceClient.class); } @@ -72,10 +80,10 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockTransformer.getValueBytes(any(), anyString(), any())) .thenReturn(outStr.getBytes(StandardCharsets.UTF_8)); - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + when(offsetStorageReader.offset(any())).thenReturn(Collections.emptyMap()); when(mockSourceApiClient.getListOfObjectKeys(any())).thenReturn(Collections.emptyIterator()); - SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, offsetManager, mockTransformer, mockSourceApiClient); assertThat(iterator.hasNext()).isFalse(); @@ -84,7 +92,7 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockSourceApiClient.getListOfObjectKeys(any())) .thenReturn(Collections.singletonList(key).listIterator()); - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient); + iterator = new SourceRecordIterator(mockConfig, offsetManager, mockTransformer, mockSourceApiClient); assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull();