Skip to content

Commit

Permalink
Allow multiple customizers for several components (#435)
Browse files Browse the repository at this point in the history
* DefaultPulsarReaderFactory accepts multiple customizers
* DefaultPulsarConsumerFactory accepts multiple customizers
* DefaultReactivePulsarSenderFactory accepts multiple customizers

See #432
  • Loading branch information
onobc authored Aug 20, 2023
1 parent 0b9baf8 commit 495de95
Show file tree
Hide file tree
Showing 17 changed files with 237 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,49 @@ public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSend
@Nullable
private final ReactiveMessageSenderCache reactiveMessageSenderCache;

@Nullable
private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers;

private TopicResolver topicResolver;

/**
* Construct an instance.
* @param pulsarClient the pulsar client to adapt into a reactive client
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
* created senders
* @param reactiveMessageSenderCache cache used to cache created senders
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
* to apply to the created senders
*/
public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient,
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache) {
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers) {
this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec,
reactiveMessageSenderCache, new DefaultTopicResolver());
reactiveMessageSenderCache, defaultSenderBuilderCustomizers, new DefaultTopicResolver());
}

/**
* Construct an instance.
* @param reactivePulsarClient the reactive client to use
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
* created senders
* @param reactiveMessageSenderCache cache used to cache created senders
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
* to apply to the created senders
* @param topicResolver the topic resolver to use
*/
public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, TopicResolver topicResolver) {
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers,
TopicResolver topicResolver) {
this.reactivePulsarClient = reactivePulsarClient;
this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec(
reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec());
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
this.topicResolver = topicResolver;
this.defaultSenderBuilderCustomizers = defaultSenderBuilderCustomizers;
}

@Override
Expand Down Expand Up @@ -102,6 +128,11 @@ private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema,

ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
sender.applySpec(this.reactiveMessageSenderSpec);

// Apply the default config customizer (preserve the topic)
if (!CollectionUtils.isEmpty(this.defaultSenderBuilderCustomizers)) {
this.defaultSenderBuilderCustomizers.forEach((customizer -> customizer.customize(sender)));
}
sender.topic(resolvedTopic);
if (this.reactiveMessageSenderCache != null) {
sender.cache(this.reactiveMessageSenderCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* @author Christophe Bornet
* @author Chris Bono
*/
class DefaultReactiveMessageConsumerFactoryTests {
class DefaultReactivePulsarConsumerFactoryTests {

private static final Schema<String> SCHEMA = Schema.STRING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
* Tests for {@link DefaultReactivePulsarReaderFactory}.
*
* @author Christophe Bornet
* @author Chris Bono
*/
class DefaultReactiveMessageReaderFactoryTests {
class DefaultReactivePulsarReaderFactoryTests {

private static final Schema<String> schema = Schema.STRING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,35 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

/**
* Unit tests for {@link DefaultReactivePulsarSenderFactory}.
*
* @author Christophe Bornet
* @author Chris Bono
*/
class DefaultReactiveMessageSenderFactoryTests {
class DefaultReactivePulsarSenderFactoryTests {

protected final Schema<String> schema = Schema.STRING;

Expand All @@ -66,17 +71,17 @@ private void assertThatSenderSpecSatisfies(ReactiveMessageSender<String> sender,
}

private ReactivePulsarSenderFactory<String> newSenderFactory() {
return new DefaultReactivePulsarSenderFactory<>((PulsarClient) null, null, null);
return new DefaultReactivePulsarSenderFactory<>(null, null, null, null);
}

private ReactivePulsarSenderFactory<String> newSenderFactoryWithDefaultTopic(String defaultTopic) {
MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec();
senderSpec.setTopicName(defaultTopic);
return new DefaultReactivePulsarSenderFactory<>((PulsarClient) null, senderSpec, null);
return new DefaultReactivePulsarSenderFactory<>(null, senderSpec, null, null);
}

private ReactivePulsarSenderFactory<String> newSenderFactoryWithCache(ReactiveMessageSenderCache cache) {
return new DefaultReactivePulsarSenderFactory<>((PulsarClient) null, null, cache);
return new DefaultReactivePulsarSenderFactory<>(null, null, cache, null);
}

@Nested
Expand Down Expand Up @@ -150,4 +155,44 @@ void customizerThatSetsTopicHasNoEffect() {

}

@Nested
@SuppressWarnings("unchecked")
class DefaultConfigCustomizerApi {

private ReactiveMessageSenderBuilderCustomizer<String> configCustomizer1 = mock(
ReactiveMessageSenderBuilderCustomizer.class);

private ReactiveMessageSenderBuilderCustomizer<String> configCustomizer2 = mock(
ReactiveMessageSenderBuilderCustomizer.class);

private ReactiveMessageSenderBuilderCustomizer<String> createSenderCustomizer = mock(
ReactiveMessageSenderBuilderCustomizer.class);

@Test
void singleConfigCustomizer() {
newSenderFactoryWithCustomizers(List.of(configCustomizer1)).createSender(schema, "topic1",
List.of(createSenderCustomizer));
InOrder inOrder = inOrder(configCustomizer1, createSenderCustomizer);
inOrder.verify(configCustomizer1).customize(any(ReactiveMessageSenderBuilder.class));
inOrder.verify(createSenderCustomizer).customize(any(ReactiveMessageSenderBuilder.class));
}

@Test
void multipleConfigCustomizers() {
newSenderFactoryWithCustomizers(List.of(configCustomizer2, configCustomizer1)).createSender(schema,
"topic1", List.of(createSenderCustomizer));
InOrder inOrder = inOrder(configCustomizer1, configCustomizer2, createSenderCustomizer);
inOrder.verify(configCustomizer2).customize(any(ReactiveMessageSenderBuilder.class));
inOrder.verify(configCustomizer1).customize(any(ReactiveMessageSenderBuilder.class));
inOrder.verify(createSenderCustomizer).customize(any(ReactiveMessageSenderBuilder.class));
}

private ReactivePulsarSenderFactory<String> newSenderFactoryWithCustomizers(
List<ReactiveMessageSenderBuilderCustomizer<String>> customizers) {
MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec();
return new DefaultReactivePulsarSenderFactory<>(null, senderSpec, null, customizers);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaul
if (producerFactoryHasDefaultTopic) {
spec.setTopicName("fake-topic");
}
ReactivePulsarSenderFactory<Foo> producerFactory = new DefaultReactivePulsarSenderFactory<>(client, spec, null);
ReactivePulsarSenderFactory<Foo> producerFactory = new DefaultReactivePulsarSenderFactory<>(client, spec, null,
null);
// Topic mappings allows not specifying the topic when sending (nor having
// default on producer)
DefaultTopicResolver topicResolver = new DefaultTopicResolver();
Expand All @@ -198,7 +199,7 @@ void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaul
@Test
void sendMessageWithoutTopicFails() {
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client,
new MutableReactiveMessageSenderSpec(), null);
new MutableReactiveMessageSenderSpec(), null, null);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send("test-message").subscribe())
.withMessage("Topic must be specified when no default topic is configured");
Expand All @@ -211,7 +212,7 @@ private <T> Message<T> sendAndConsume(Consumer<ReactivePulsarTemplate<T>> sendFu
senderSpec.setTopicName(topic);
}
ReactivePulsarSenderFactory<T> senderFactory = new DefaultReactivePulsarSenderFactory<>(client, senderSpec,
null);
null, null);

ReactivePulsarTemplate<T> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);

Expand Down Expand Up @@ -260,7 +261,7 @@ void withSchemaInferredByTypeMappings() throws Exception {
MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec();
spec.setTopicName(topic);
ReactivePulsarSenderFactory<Foo> producerFactory = new DefaultReactivePulsarSenderFactory<>(client, spec,
null);
null, null);
// Custom schema resolver allows not specifying the schema when sending
DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver();
schemaResolver.addCustomSchemaMapping(Foo.class, Schema.JSON(Foo.class));
Expand All @@ -282,7 +283,7 @@ void sendNullWithDefaultTopicFails() {
MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec();
spec.setTopicName("sendNullWithDefaultTopicFails");
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client, spec,
null);
null, null);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException()
.isThrownBy(() -> pulsarTemplate.send((String) null, Schema.STRING).subscribe())
Expand All @@ -292,7 +293,7 @@ void sendNullWithDefaultTopicFails() {
@Test
void sendNullWithoutSchemaFails() {
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client,
new MutableReactiveMessageSenderSpec(), null);
new MutableReactiveMessageSenderSpec(), null, null);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException()
.isThrownBy(() -> pulsarTemplate.send("sendNullWithoutSchemaFails", (String) null, null).subscribe())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void messageHandlerListener() throws Exception {
MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec();
prodConfig.setTopicName(topic);
DefaultReactivePulsarSenderFactory<String> pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>(
reactivePulsarClient, prodConfig, null, new DefaultTopicResolver());
reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver());
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory);
pulsarTemplate.send("hello john doe").subscribe();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Expand Down Expand Up @@ -116,7 +116,7 @@ void streamingHandlerListener() throws Exception {
MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec();
prodConfig.setTopicName(topic);
DefaultReactivePulsarSenderFactory<String> pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>(
reactivePulsarClient, prodConfig, null, new DefaultTopicResolver());
reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver());
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory);
Flux.range(0, 5).map(i -> MessageSpec.of("hello john doe" + i)).as(pulsarTemplate::send).subscribe();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Expand Down Expand Up @@ -157,7 +157,7 @@ void containerProperties() throws Exception {
MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec();
prodConfig.setTopicName(topic);
DefaultReactivePulsarSenderFactory<String> pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>(
reactivePulsarClient, prodConfig, null, new DefaultTopicResolver());
reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver());
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory);
pulsarTemplate.send("hello john doe").subscribe();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Expand Down Expand Up @@ -271,7 +271,7 @@ void containerTopicsPattern() throws Exception {
MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec();
prodConfig.setTopicName(topic);
DefaultReactivePulsarSenderFactory<String> pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>(
reactivePulsarClient, prodConfig, null, new DefaultTopicResolver());
reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver());
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory);
pulsarTemplate.send("hello john doe").subscribe();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Expand Down Expand Up @@ -321,7 +321,7 @@ void consumerCustomizer() throws Exception {
prodConfig.setBatchingEnabled(false);
prodConfig.setTopicName(topic);
DefaultReactivePulsarSenderFactory<String> pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>(
reactivePulsarClient, prodConfig, null, new DefaultTopicResolver());
reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver());
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory);
Flux.range(0, 5).map(i -> MessageSpec.of("hello john doe" + i)).as(pulsarTemplate::send).subscribe();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T>
private final PulsarClient pulsarClient;

@Nullable
private final ConsumerBuilderCustomizer<T> defaultConfigCustomizer;
private final List<ConsumerBuilderCustomizer<T>> defaultConfigCustomizers;

/**
* Construct a consumer factory instance.
* @param pulsarClient the client used to consume
* @param defaultConfigCustomizer the default configuration to apply to the consumers
* or null to use no default configuration
* @param defaultConfigCustomizers the optional list of customizers to apply to the
* created consumers or null to use no default configuration
*/
public DefaultPulsarConsumerFactory(PulsarClient pulsarClient,
ConsumerBuilderCustomizer<T> defaultConfigCustomizer) {
List<ConsumerBuilderCustomizer<T>> defaultConfigCustomizers) {
this.pulsarClient = pulsarClient;
this.defaultConfigCustomizer = defaultConfigCustomizer;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

@Override
Expand All @@ -77,8 +77,8 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);

// Apply the default config customizer (preserve the topic)
if (this.defaultConfigCustomizer != null) {
this.defaultConfigCustomizer.customize(consumerBuilder);
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder)));
}
if (topics != null) {
replaceTopicsOnBuilder(consumerBuilder, topics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,
var producerBuilder = this.pulsarClient.newProducer(schema);

// Apply the default config customizer (preserve the topic)
if (this.defaultConfigCustomizers != null) {
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer) -> customizer.customize(producerBuilder));
}
producerBuilder.topic(resolvedTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class DefaultPulsarReaderFactory<T> implements PulsarReaderFactory<T> {
private final PulsarClient pulsarClient;

@Nullable
private final ReaderBuilderCustomizer<T> defaultConfigCustomizer;
private final List<ReaderBuilderCustomizer<T>> defaultConfigCustomizers;

/**
* Construct a reader factory instance with no default configuration.
Expand All @@ -56,13 +56,13 @@ public DefaultPulsarReaderFactory(PulsarClient pulsarClient) {
/**
* Construct a reader factory instance.
* @param pulsarClient the client used to consume
* @param defaultConfigCustomizer the default configuration to apply to the readers or
* null to use no default configuration
* @param defaultConfigCustomizers the optional list of customizers to apply to the
* readers or null to use no default configuration
*/
public DefaultPulsarReaderFactory(PulsarClient pulsarClient,
@Nullable ReaderBuilderCustomizer<T> defaultConfigCustomizer) {
@Nullable List<ReaderBuilderCustomizer<T>> defaultConfigCustomizers) {
this.pulsarClient = pulsarClient;
this.defaultConfigCustomizer = defaultConfigCustomizer;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

@Override
Expand All @@ -72,8 +72,8 @@ public Reader<T> createReader(@Nullable List<String> topics, @Nullable MessageId
ReaderBuilder<T> readerBuilder = this.pulsarClient.newReader(schema);

// Apply the default config customizer (preserve the topics)
if (this.defaultConfigCustomizer != null) {
this.defaultConfigCustomizer.customize(readerBuilder);
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(readerBuilder)));
}

if (!CollectionUtils.isEmpty(topics)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,11 @@ void messagesAreProperlyAckdOnContainerStopBeforeExitingListenerThread() throws
pulsarClient.close();
}

private <T> ConsumerBuilderCustomizer<T> defaultConfig(String topicName, String subscriptionName) {
return (consumerBuilder) -> {
private <T> List<ConsumerBuilderCustomizer<T>> defaultConfig(String topicName, String subscriptionName) {
return List.of((consumerBuilder) -> {
consumerBuilder.topic(topicName);
consumerBuilder.subscriptionName(subscriptionName);
};
});
}

}
Loading

0 comments on commit 495de95

Please sign in to comment.