Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Place offset manager in commons #373

Open
wants to merge 37 commits into
base: s3-source-release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3ee947a
initial work
Dec 10, 2024
6fc32f4
Updated OffsetManager and tests
Dec 16, 2024
7252107
updated tests
Dec 16, 2024
2579b09
fixed some tests
Dec 16, 2024
bb70786
fixes for some tests
Dec 16, 2024
dc434f0
fixes for iterator
Dec 16, 2024
688a731
partial fix
Dec 17, 2024
218bf20
partial fix
Dec 17, 2024
69ea274
partial fixes
Dec 17, 2024
640d007
Fixe commons
Dec 17, 2024
6bc1dec
fixed AWSV2ClientTests
Dec 17, 2024
9e43146
Fixed SourceClient tests
Dec 18, 2024
388b299
Fixed remaining tests
Dec 18, 2024
82a7451
common code cleanup
Dec 18, 2024
a524ff3
fixed s3-source-connector PMDMain errors
Dec 18, 2024
18e0eeb
fixed s3-source-connector PMTTest errors
Dec 18, 2024
e47556f
fixed commons checkstyle. it bilds
Dec 18, 2024
cc5e7c3
fixed issues with s3-source-connector build
Dec 18, 2024
934ab93
fixed spotlessApply issues
Dec 18, 2024
79d6723
fixed byte integration test
Dec 19, 2024
3bf46ae
fixes for jsontess and sourcetask reset
Dec 19, 2024
a81b01a
changes to speed up and verify processing
Dec 20, 2024
db868ac
fixed spotlessApply issues
Dec 20, 2024
6fd112f
fixed spotlessApply
Dec 20, 2024
aebc405
Updated as per reviews
Dec 20, 2024
b2361bc
Updated as per reviews
Dec 20, 2024
cf14b6f
Updated as per reviews
Dec 20, 2024
59178b6
Changes for OffsetManager
Dec 23, 2024
994bc9d
updated javadoc
Dec 23, 2024
a67f7cb
fixed pmd errors
Dec 23, 2024
16117af
added logging
Dec 24, 2024
8d14e7c
added removing entry from OffsetManager
Dec 24, 2024
2db3a30
added removing entry from OffsetManager
Dec 24, 2024
a75f466
Use ConcurrentMap in OffsetManager
Dec 24, 2024
c21c040
removed offsetverification test due to KAFKA-14947 issues
Dec 24, 2024
00052b0
modifications for KAFKA-14947
Dec 24, 2024
0eaf29a
spotlessApply
Dec 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An iterator that returns closable items and will close the last item returned in one of two cases:
* <ul>
* <li>when {@link #next} is called.</li>
* <li>when {@link #hasNext} returns false.</li>
* </ul>
*
* @param <T>
* The type of Closeable object to return.
*/
public final class ClosableIterator<T extends Closeable> implements Iterator<T>, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(ClosableIterator.class);
private final Iterator<T> delegate;
private T nextItem;
private T lastItem;

public static <T extends Closeable> ClosableIterator<T> wrap(final Iterator<T> iterator) {
return iterator instanceof ClosableIterator ? (ClosableIterator<T>) iterator : new ClosableIterator<>(iterator);
}
private ClosableIterator(final Iterator<T> iterator) {
delegate = iterator;
}

@Override
public boolean hasNext() {
if (nextItem == null) {
if (delegate.hasNext()) {
nextItem = delegate.next();
} else {
closeItem(lastItem);
}
}
return nextItem != null;
}

private void closeItem(final T item) {
if (item != null) {
try {
item.close();
} catch (IOException e) {
LOGGER.error(String.format("Error closing %s:", item), e);
}
}
}

@Override
public T next() {
closeItem(lastItem);
lastItem = nextItem;
nextItem = null; // NOPMD assign null
return lastItem;
}

@Override
public void close() {
closeItem(lastItem);
closeItem(nextItem);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import org.apache.kafka.connect.source.SourceTaskContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetManager<E extends OffsetManager.OffsetManagerEntry<E>> {
/** The loger to write to */
private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class);

/**
* The local manager data.
*/
private final ConcurrentMap<Map<String, Object>, Map<String, Object>> offsets;

/**
* The context in which this is running.
*/
private final SourceTaskContext context;

/**
* Constructor
*
* @param context
* the context for this instance to use.
*/
public OffsetManager(final SourceTaskContext context) {
this(context, new ConcurrentHashMap<>());
}

/**
* Package private for testing.
*
* @param context
* the context for this instance to use.
* @param offsets
* the offsets
*/
protected OffsetManager(final SourceTaskContext context,
final ConcurrentMap<Map<String, Object>, Map<String, Object>> offsets) {
this.context = context;
this.offsets = offsets;
}

/**
* Get an entry from the offset manager. This method will return the local copy if it has been created otherwise
* will get the data from Kafka. If there is not a local copy and not one from Kafka then an empty Optional is
* returned
*
* @param key
* the key for the entry.
* @param creator
* a function to create the connector defined offset entry from a Map of string to object.
* @return the entry.
*/
public Optional<E> getEntry(final OffsetManagerKey key, final Function<Map<String, Object>, E> creator) {
LOGGER.info("getEntry: {}", key.getPartitionMap());
final Map<String, Object> data = offsets.compute(key.getPartitionMap(), (k, v) -> {
if (v == null) {
final Map<String, Object> kafkaData = context.offsetStorageReader().offset(key.getPartitionMap());
LOGGER.info("Context stored offset map {}", kafkaData);
return kafkaData == null || kafkaData.isEmpty() ? null : kafkaData;
} else {
LOGGER.info("Previously stored offset map {}", v);
return v;
}
});
return data == null ? Optional.empty() : Optional.of(creator.apply(data));
}

/**
* Copies the entry into the offset manager data.
*
* @param entry
* the entry to update.
*/
public void updateCurrentOffsets(final E entry) {
LOGGER.debug("Updating current offsets: {}", entry.getManagerKey().getPartitionMap());
offsets.compute(entry.getManagerKey().getPartitionMap(), (k, v) -> {
if (v == null) {
return new HashMap<>(entry.getProperties());
} else {
v.putAll(entry.getProperties());
return v;
}
});
}

/**
* Removes the specified entry from the in memory table. Does not impact the records stored in the
* {@link SourceTaskContext}.
*
* @param key
* the key for the entry to remove.
*/
public void remove(final OffsetManagerKey key) {
LOGGER.info("Removing: {}", key.getPartitionMap());
offsets.remove(key.getPartitionMap());
}

/**
* The definition of an entry in the OffsetManager.
*/
public interface OffsetManagerEntry<T extends OffsetManagerEntry<T>> extends Comparable<T> {

/**
* Creates a new OffsetManagerEntry by wrapping the properties with the current implementation. This method may
* throw a RuntimeException if requried properties are not defined in the map.
*
* @param properties
* the properties to wrap. May be {@code null}.
* @return an OffsetManagerProperty
*/
T fromProperties(Map<String, Object> properties);

/**
* Extracts the data from the entry in the correct format to return to Kafka.
*
* @return the properties in a format to return to Kafka.
*/
Map<String, Object> getProperties();

/**
* Gets the value of the named property. The value returned from a {@code null} key is implementation dependant.
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
Object getProperty(String key);

/**
* Sets a key/value pair. Will overwrite any existing value. Implementations of OffsetManagerEntry may declare
* specific keys as restricted. These are generally keys that are managed internally by the OffsetManagerEntry
* and may not be set except through provided setter methods or the constructor.
*
* @param key
* the key to set.
* @param value
* the value to set.
* @throws IllegalArgumentException
* if the key is restricted.
*/
void setProperty(String key, Object value);

/**
* Gets the value of the named property as an {@code int}.
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default int getInt(final String key) {
return ((Number) getProperty(key)).intValue();
}

/**
* Gets the value of the named property as a {@code long}
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default long getLong(final String key) {
return ((Number) getProperty(key)).longValue();
}

/**
* Gets the value of the named property as a String.
*
* @param key
* the property to retrieve.
* @return the value associated with the property or @{code null} if not set.
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default String getString(final String key) {
return getProperty(key).toString();
}

/**
* ManagerKey getManagerKey
*
* @return The offset manager key for this entry.
*/
OffsetManagerKey getManagerKey();

/**
* Gets the Kafka topic for this entry.
*
* @return The Kafka topic for this entry.
*/
String getTopic();

/**
* Gets the Kafka partition for this entry.
*
* @return The Kafka partition for this entry.
*/
int getPartition();

/**
* Gets the number of records to skip to get to this record. This is the same as the zero-based index of this
* record if all records were in an array.
*
* @return The number of records to skip to get to this record.
*/
default long skipRecords() {
return 0;
}

/**
* Increments the record count.
*/
void incrementRecordCount();
}

/**
* The OffsetManager Key. Must override hashCode() and equals().
*/
@FunctionalInterface
public interface OffsetManagerKey {
/**
* gets the partition map used by Kafka to identify this Offset entry.
*
* @return The partition map used by Kafka to identify this Offset entry.
*/
Map<String, Object> getPartitionMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;

public class SourceCommonConfig extends CommonConfig {

Expand Down Expand Up @@ -64,11 +66,15 @@ public String getTargetTopicPartitions() {
}

public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(sourceConfigFragment.getErrorsTolerance());
return sourceConfigFragment.getErrorsTolerance();
}

public int getMaxPollRecords() {
return sourceConfigFragment.getMaxPollRecords();
}

public Transformer getTransformer() {
return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public int getExpectedMaxMessageBytes() {
return cfg.getInt(EXPECTED_MAX_MESSAGE_BYTES);
}

public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
}

private static class ErrorsToleranceValidator implements ConfigDef.Validator {
Expand Down
Loading
Loading