Skip to content

Commit

Permalink
Updated OffsetManager and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 16, 2024
1 parent 3afadf4 commit b5278e0
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 587 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,45 @@

import org.apache.kafka.connect.source.SourceTaskContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetManager<E extends OffsetManager.OffsetManagerEntry> {

private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class);
public static final String SEPARATOR = "_";
/**
* The local manager data.
*/
private final Map<Map<String, Object>, Map<String, Object>> offsets;

/**
* The context in which this is running.
*/
private final SourceTaskContext context;

/**
* Constructor
* @param context the context for this instance to use.
*/
public OffsetManager(final SourceTaskContext context) {
this.context = context;
offsets = new ConcurrentHashMap<>();
this(context, new ConcurrentHashMap<>());
}

/**
* FOR TESTING ONLY
* Package private for testing.
* @param context the context for this instance to use.
* @param offsets the offsets
*/
protected OffsetManager(final Map<Map<String, Object>, Map<String, Object>> offsets) {
this.context = null;
protected OffsetManager(final SourceTaskContext context, final Map<Map<String, Object>, Map<String, Object>> offsets) {
this.context = context;
this.offsets = offsets;
}

public E getEntry(OffsetManagerKey key, Function<Map<String, Object>, E> creator) {
Map<String, Object> data = offsets.compute(key.getPartitionMap(), (k, v) -> {
/**
* Get an entry from the offset manager.
* This method will return the local copy if it has been created otherwise will get the data from Kafka.
* @param key the key for the entry.
* @param creator a function to create the connector defined offset entry from a Map of string to object.
* @return the entry.
*/
public E getEntry(final OffsetManagerKey key, final Function<Map<String, Object>, E> creator) {
final Map<String, Object> data = offsets.compute(key.getPartitionMap(), (k, v) -> {
if (v == null) {
return context.offsetStorageReader().offset(key.getPartitionMap());
} else {
Expand All @@ -59,7 +71,11 @@ public E getEntry(OffsetManagerKey key, Function<Map<String, Object>, E> creator
return creator.apply(data);
}

public void updateCurrentOffsets(E entry) {
/**
* Copies the entry into the offset manager data.
* @param entry the entry to update.
*/
public void updateCurrentOffsets(final E entry) {
offsets.compute(entry.getManagerKey().getPartitionMap(), (k, v) -> {
if (v == null) {
return new HashMap<>(entry.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ void testUpdateCurrentOffsets() {
final Map<Map<String, Object>, Map<String, Object>> offsets = new HashMap<>();
offsets.put(offsetEntry.getManagerKey().getPartitionMap(), offsetEntry.getProperties());

OffsetManager<TestingOffsetManagerEntry> underTest = new OffsetManager<>(offsets);
OffsetManager<TestingOffsetManagerEntry> underTest = new OffsetManager<>(mock(SourceTaskContext.class), offsets);

offsetEntry.setProperty("MyProperty", "WOW");

underTest.updateCurrentOffsets(offsetEntry);

TestingOffsetManagerEntry result = offsetManager.getEntry(offsetEntry.getManagerKey(), TestingOffsetManagerEntry::new);
TestingOffsetManagerEntry result = underTest.getEntry(offsetEntry.getManagerKey(), TestingOffsetManagerEntry::new);


// Map<Map<String, Object>, Map<String, Object>> offsetMap = underTest.getOffsets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import io.aiven.kafka.connect.s3.source.utils.Version;
Expand Down Expand Up @@ -107,7 +106,8 @@ public void start(final Map<String, String> props) {
this.transformer = TransformerFactory.getTransformer(s3SourceConfig);
offsetManager = new OffsetManager<S3OffsetManagerEntry>(context);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
prepareReaderFromOffsetStorageReader();
setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient));
this.taskInitialized = true;
}

Expand All @@ -126,11 +126,13 @@ private void initializeConverters() {
}
}

private void prepareReaderFromOffsetStorageReader() {
sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient);
// mostly for testing
protected void setSourceRecordIterator(Iterator<S3SourceRecord> iterator) {
sourceRecordIterator = iterator;
}



@Override
public List<SourceRecord> poll() throws InterruptedException {
LOGGER.info("Polling for new records...");
Expand All @@ -144,6 +146,7 @@ public List<SourceRecord> poll() throws InterruptedException {

while (!connectorStopped.get()) {
try {
waitForObjects();
extractSourceRecords(results);
LOGGER.info("Number of records extracted and sent: {}", results.size());
return results;
Expand All @@ -153,7 +156,9 @@ public List<SourceRecord> poll() throws InterruptedException {
exception);
pollLock.wait(ERROR_BACKOFF);

prepareReaderFromOffsetStorageReader();
setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient));

} else {
LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception);
return null; // NOPMD
Expand All @@ -170,20 +175,35 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}

private List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) throws InterruptedException {
waitForObjects();
// package private for testing
List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) throws InterruptedException {
if (connectorStopped.get()) {
return results;
}
return RecordProcessor.processRecords(sourceRecordIterator, results, s3SourceConfig, keyConverter,
valueConverter, connectorStopped, this.transformer, awsv2SourceClient, offsetManager);
final int maxPollRecords = s3SourceConfig.getMaxPollRecords();

for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) {
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
if (s3SourceRecord != null) {
try {
offsetManager.updateCurrentOffsets(s3SourceRecord.getOffsetManagerEntry());
results.add(s3SourceRecord.getSourceRecord(keyConverter, valueConverter));
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
awsv2SourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey());
}
}
}
return results;
}


private void waitForObjects() throws InterruptedException {
while (!sourceRecordIterator.hasNext() && !connectorStopped.get()) {
LOGGER.debug("Blocking until new S3 files are available.");
Thread.sleep(S_3_POLL_INTERVAL_MS);
prepareReaderFromOffsetStorageReader();
setSourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient));
}
}

Expand Down

This file was deleted.

Loading

0 comments on commit b5278e0

Please sign in to comment.