Skip to content

Commit

Permalink
feat: Add: 1.upgrade version to v1.4(3.7.0.1.4); 2.consumer support i…
Browse files Browse the repository at this point in the history
…solation level method etc.

Signed-off-by: photowey <photowey@gmail.com>
  • Loading branch information
photowey committed Apr 13, 2024
1 parent f583e40 commit c7ea3f2
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 11 deletions.
2 changes: 1 addition & 1 deletion kafka-plus-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
</parent>

<artifactId>kafka-plus-autoconfigure</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion kafka-plus-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
</parent>

<artifactId>kafka-plus-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.github.photowey.kafka.plus.core.enums.Kafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.*;
Expand All @@ -26,8 +27,8 @@
* {@code ConsumerBuilder}
*
* @author photowey
* @since 2024/04/05
* @version 1.0.0
* @since 2024/04/05
*/
public interface ConsumerBuilder {

Expand Down Expand Up @@ -103,6 +104,47 @@ default <V> ConsumerBuilder valueDeserializer(Class<V> valueDeserializer) {

// ----------------------------------------------------------------

/**
* The {@code isolation.level}.
*
* @param isolation the {@code isolation.level}
* @return {@link ConsumerBuilder}
* @see IsolationLevel
* @since 3.7.0.1.4
*/
ConsumerBuilder isolation(Kafka.Consumer.Isolation isolation);

/**
* The consumer group instance ID.
* |- {@code group.instance.id}
*
* @param instanceId the {@code group.instance.id}.
* @since 3.7.0.1.4
*/
ConsumerBuilder instanceId(String instanceId);

/**
* The partition assignment strategy.
* |- {@code partition.assignment.strategy}
*
* @param strategy the Class of strategy.
* @since 3.7.0.1.4
*/
default ConsumerBuilder strategy(Class<?> strategy) {
return this.strategy(strategy.getName());
}

/**
* The partition assignment strategy.
* |- {@code partition.assignment.strategy}
*
* @param strategy {@code partition.assignment.strategy}
* @since 3.7.0.1.4
*/
ConsumerBuilder strategy(String strategy);

// ----------------------------------------------------------------

/**
* The key deserializer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
* {@code ConsumerBuilderImpl}
*
* @author photowey
* @since 2024/04/05
* @version 1.0.0
* @since 2024/04/05
*/
public class ConsumerBuilderImpl extends AbstractBuilder implements ConsumerBuilder {

Expand Down Expand Up @@ -79,6 +79,34 @@ public ConsumerBuilder groupId(String groupId) {
return this;
}

// ----------------------------------------------------------------

@Override
public ConsumerBuilder isolation(Kafka.Consumer.Isolation isolation) {
super.initConfigsIfNecessary();
super.configs.put(Kafka.Consumer.ISOLATION_LEVEL.key(), isolation.value());

return this;
}

@Override
public ConsumerBuilder instanceId(String instanceId) {
super.initConfigsIfNecessary();
super.configs.put(Kafka.Consumer.GROUP_INSTANCE_ID.key(), instanceId);

return this;
}

@Override
public ConsumerBuilder strategy(String strategy) {
super.initConfigsIfNecessary();
super.configs.put(Kafka.Consumer.PARTITION_ASSIGNMENT_STRATEGY.key(), strategy);

return this;
}

// ----------------------------------------------------------------

@Override
public ConsumerBuilder autoCommit(boolean enabled) {
super.initConfigsIfNecessary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.IsolationLevel;

/**
* {@code Kafka}
*
* @author photowey
* @since 2024/04/05
* @version 1.0.0
* @since 2024/04/05
*/
public enum Kafka {

Expand Down Expand Up @@ -78,8 +79,15 @@ public enum Consumer {
AUTO_OFFSET_RESET(ConsumerConfig.AUTO_OFFSET_RESET_DOC, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),

GROUP_ID(CommonClientConfigs.GROUP_ID_DOC, ConsumerConfig.GROUP_ID_CONFIG),

AUTO_COMMIT_ENABLED("If true the consumer's offset will be periodically committed in the background.", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),

// ---------------------------------------------------------------- 3.7.0.1.4

GROUP_INSTANCE_ID(CommonClientConfigs.GROUP_INSTANCE_ID_DOC, ConsumerConfig.GROUP_INSTANCE_ID_CONFIG),
ISOLATION_LEVEL(ConsumerConfig.ISOLATION_LEVEL_DOC, ConsumerConfig.ISOLATION_LEVEL_CONFIG),
PARTITION_ASSIGNMENT_STRATEGY(Document.Consumer.PARTITION_ASSIGNMENT_STRATEGY_DOC, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),

;

private final String doc;
Expand Down Expand Up @@ -125,6 +133,30 @@ public String value() {
return this.value;
}
}

public enum Isolation {

READ_COMMITTED("consumer.poll() will only return transactional messages which have been committed", IsolationLevel.READ_COMMITTED.toString()),
READ_UNCOMMITTED("consumer.poll() will return all messages, even transactional messages which have been aborted", IsolationLevel.READ_UNCOMMITTED.toString()),

;

private final String doc;
private final String value;

Isolation(String doc, String value) {
this.doc = doc;
this.value = value;
}

public String doc() {
return this.doc;
}

public String value() {
return this.value;
}
}
}

public enum Producer {
Expand Down Expand Up @@ -196,4 +228,31 @@ public String value() {
}
}

/**
* @since 3.7.0.1.4
*/
interface Document {

interface Server {}

interface Consumer {
String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
"ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
"partition ownership amongst consumer instances when group management is used. Available options are:" +
"<ul>" +
"<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: Assigns partitions on a per-topic basis.</li>" +
"<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +
"<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +
"maximally balanced while preserving as many existing partition assignments as possible.</li>" +
"<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +
"logic, but allows for cooperative rebalancing.</li>" +
"</ul>" +
"<p>The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
"but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.</p>" +
"<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +
"interface allows you to plug in a custom assignment strategy.</p>";
}

interface Producer {}
}
}
2 changes: 1 addition & 1 deletion kafka-plus-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
</parent>

<artifactId>kafka-plus-engine</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion kafka-plus-jackson/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
</parent>

<artifactId>kafka-plus-jackson</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion kafka-plus-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
</parent>

<artifactId>kafka-plus-runtime</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion kafkaplus-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
</parent>

<artifactId>kafkaplus-spring-boot-starter</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion kafkaplus-spring-boot3-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
</parent>

<artifactId>kafkaplus-spring-boot3-starter</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<!-- ${kafka.version}.x.y -->
<version>3.7.0.1.3</version>
<version>3.7.0.1.4-SNAPSHOT</version>
<packaging>pom</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down

0 comments on commit c7ea3f2

Please sign in to comment.