valueDeserializer) {
// ----------------------------------------------------------------
+ /**
+ * The {@code isolation.level}.
+ *
+ * @param isolation the {@code isolation.level}
+ * @return {@link ConsumerBuilder}
+ * @see IsolationLevel
+ * @since 3.7.0.1.4
+ */
+ ConsumerBuilder isolation(Kafka.Consumer.Isolation isolation);
+
+ /**
+ * The consumer group instance ID.
+ * |- {@code group.instance.id}
+ *
+ * @param instanceId the {@code group.instance.id}.
+ * @since 3.7.0.1.4
+ */
+ ConsumerBuilder instanceId(String instanceId);
+
+ /**
+ * The partition assignment strategy.
+ * |- {@code partition.assignment.strategy}
+ *
+ * @param strategy the Class of strategy.
+ * @since 3.7.0.1.4
+ */
+ default ConsumerBuilder strategy(Class> strategy) {
+ return this.strategy(strategy.getName());
+ }
+
+ /**
+ * The partition assignment strategy.
+ * |- {@code partition.assignment.strategy}
+ *
+ * @param strategy {@code partition.assignment.strategy}
+ * @since 3.7.0.1.4
+ */
+ ConsumerBuilder strategy(String strategy);
+
+ // ----------------------------------------------------------------
+
/**
* The key deserializer.
*
diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java
index 550f438..394946b 100644
--- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java
+++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java
@@ -29,8 +29,8 @@
* {@code ConsumerBuilderImpl}
*
* @author photowey
- * @since 2024/04/05
* @version 1.0.0
+ * @since 2024/04/05
*/
public class ConsumerBuilderImpl extends AbstractBuilder implements ConsumerBuilder {
@@ -79,6 +79,34 @@ public ConsumerBuilder groupId(String groupId) {
return this;
}
+ // ----------------------------------------------------------------
+
+ @Override
+ public ConsumerBuilder isolation(Kafka.Consumer.Isolation isolation) {
+ super.initConfigsIfNecessary();
+ super.configs.put(Kafka.Consumer.ISOLATION_LEVEL.key(), isolation.value());
+
+ return this;
+ }
+
+ @Override
+ public ConsumerBuilder instanceId(String instanceId) {
+ super.initConfigsIfNecessary();
+ super.configs.put(Kafka.Consumer.GROUP_INSTANCE_ID.key(), instanceId);
+
+ return this;
+ }
+
+ @Override
+ public ConsumerBuilder strategy(String strategy) {
+ super.initConfigsIfNecessary();
+ super.configs.put(Kafka.Consumer.PARTITION_ASSIGNMENT_STRATEGY.key(), strategy);
+
+ return this;
+ }
+
+ // ----------------------------------------------------------------
+
@Override
public ConsumerBuilder autoCommit(boolean enabled) {
super.initConfigsIfNecessary();
diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java
index 43bfd9f..5a8ac98 100644
--- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java
+++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java
@@ -19,13 +19,14 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.IsolationLevel;
/**
* {@code Kafka}
*
* @author photowey
- * @since 2024/04/05
* @version 1.0.0
+ * @since 2024/04/05
*/
public enum Kafka {
@@ -78,8 +79,15 @@ public enum Consumer {
AUTO_OFFSET_RESET(ConsumerConfig.AUTO_OFFSET_RESET_DOC, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
GROUP_ID(CommonClientConfigs.GROUP_ID_DOC, ConsumerConfig.GROUP_ID_CONFIG),
+
AUTO_COMMIT_ENABLED("If true the consumer's offset will be periodically committed in the background.", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
+ // ---------------------------------------------------------------- 3.7.0.1.4
+
+ GROUP_INSTANCE_ID(CommonClientConfigs.GROUP_INSTANCE_ID_DOC, ConsumerConfig.GROUP_INSTANCE_ID_CONFIG),
+ ISOLATION_LEVEL(ConsumerConfig.ISOLATION_LEVEL_DOC, ConsumerConfig.ISOLATION_LEVEL_CONFIG),
+ PARTITION_ASSIGNMENT_STRATEGY(Document.Consumer.PARTITION_ASSIGNMENT_STRATEGY_DOC, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+
;
private final String doc;
@@ -125,6 +133,30 @@ public String value() {
return this.value;
}
}
+
+ public enum Isolation {
+
+ READ_COMMITTED("consumer.poll() will only return transactional messages which have been committed", IsolationLevel.READ_COMMITTED.toString()),
+ READ_UNCOMMITTED("consumer.poll() will return all messages, even transactional messages which have been aborted", IsolationLevel.READ_UNCOMMITTED.toString()),
+
+ ;
+
+ private final String doc;
+ private final String value;
+
+ Isolation(String doc, String value) {
+ this.doc = doc;
+ this.value = value;
+ }
+
+ public String doc() {
+ return this.doc;
+ }
+
+ public String value() {
+ return this.value;
+ }
+ }
}
public enum Producer {
@@ -196,4 +228,31 @@ public String value() {
}
}
+ /**
+ * @since 3.7.0.1.4
+ */
+ interface Document {
+
+ interface Server {}
+
+ interface Consumer {
+ String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
+ "ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
+ "partition ownership amongst consumer instances when group management is used. Available options are:" +
+ "" +
+ "org.apache.kafka.clients.consumer.RangeAssignor
: Assigns partitions on a per-topic basis. " +
+ "org.apache.kafka.clients.consumer.RoundRobinAssignor
: Assigns partitions to consumers in a round-robin fashion. " +
+ "org.apache.kafka.clients.consumer.StickyAssignor
: Guarantees an assignment that is " +
+ "maximally balanced while preserving as many existing partition assignments as possible. " +
+ "org.apache.kafka.clients.consumer.CooperativeStickyAssignor
: Follows the same StickyAssignor " +
+ "logic, but allows for cooperative rebalancing. " +
+ "
" +
+ "The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
+ "but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.
" +
+ "Implementing the org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
" +
+ "interface allows you to plug in a custom assignment strategy.
";
+ }
+
+ interface Producer {}
+ }
}
diff --git a/kafka-plus-engine/pom.xml b/kafka-plus-engine/pom.xml
index e8f6500..50e3d53 100644
--- a/kafka-plus-engine/pom.xml
+++ b/kafka-plus-engine/pom.xml
@@ -7,7 +7,7 @@
io.github.photowey
kafka-plus
- 3.7.0.1.3
+ 3.7.0.1.4-SNAPSHOT
kafka-plus-engine
diff --git a/kafka-plus-jackson/pom.xml b/kafka-plus-jackson/pom.xml
index 06a4678..cc5ccb4 100644
--- a/kafka-plus-jackson/pom.xml
+++ b/kafka-plus-jackson/pom.xml
@@ -7,7 +7,7 @@
io.github.photowey
kafka-plus
- 3.7.0.1.3
+ 3.7.0.1.4-SNAPSHOT
kafka-plus-jackson
diff --git a/kafka-plus-runtime/pom.xml b/kafka-plus-runtime/pom.xml
index ec95f4a..dcd9d7a 100644
--- a/kafka-plus-runtime/pom.xml
+++ b/kafka-plus-runtime/pom.xml
@@ -7,7 +7,7 @@
io.github.photowey
kafka-plus
- 3.7.0.1.3
+ 3.7.0.1.4-SNAPSHOT
kafka-plus-runtime
diff --git a/kafkaplus-spring-boot-starter/pom.xml b/kafkaplus-spring-boot-starter/pom.xml
index df3a7ae..bafaa8d 100644
--- a/kafkaplus-spring-boot-starter/pom.xml
+++ b/kafkaplus-spring-boot-starter/pom.xml
@@ -7,7 +7,7 @@
io.github.photowey
kafka-plus
- 3.7.0.1.3
+ 3.7.0.1.4-SNAPSHOT
kafkaplus-spring-boot-starter
diff --git a/kafkaplus-spring-boot3-starter/pom.xml b/kafkaplus-spring-boot3-starter/pom.xml
index 9985a6e..0431108 100644
--- a/kafkaplus-spring-boot3-starter/pom.xml
+++ b/kafkaplus-spring-boot3-starter/pom.xml
@@ -7,7 +7,7 @@
io.github.photowey
kafka-plus
- 3.7.0.1.3
+ 3.7.0.1.4-SNAPSHOT
kafkaplus-spring-boot3-starter
diff --git a/pom.xml b/pom.xml
index d3bbfde..a8fef79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
io.github.photowey
kafka-plus
- 3.7.0.1.3
+ 3.7.0.1.4-SNAPSHOT
pom
${project.groupId}:${project.artifactId}