From 74678fa39fe2ef50353f66a6132d1eb37e26391c Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sat, 26 Aug 2023 00:57:55 -0500 Subject: [PATCH] Add multi-customizers to reactive reader and consumer * Also add builder to ReactivePulsarSenderFactory See #432 --- .../DefaultReactivePulsarConsumerFactory.java | 34 +- .../DefaultReactivePulsarReaderFactory.java | 33 +- .../DefaultReactivePulsarSenderFactory.java | 174 ++++-- .../core/ReactivePulsarSenderFactory.java | 8 +- .../reactive/core/ReactivePulsarTemplate.java | 2 +- ...ultReactivePulsarConsumerFactoryTests.java | 9 +- ...faultReactivePulsarReaderFactoryTests.java | 12 +- ...faultReactivePulsarSenderFactoryTests.java | 31 +- .../core/ReactivePulsarTemplateTests.java | 48 +- ...vePulsarMessageListenerContainerTests.java | 513 +++++++++--------- .../listener/ReactivePulsarListenerTests.java | 4 +- 11 files changed, 488 insertions(+), 380 deletions(-) diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index f0ba308b..8a51a675 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -20,13 +20,11 @@ import java.util.List; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec; import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; +import org.springframework.lang.Nullable; import org.springframework.util.CollectionUtils; /** @@ -34,18 +32,25 @@ * * @param underlying payload type for the reactive consumer. * @author Christophe Bornet + * @author Chris Bono */ public class DefaultReactivePulsarConsumerFactory implements ReactivePulsarConsumerFactory { - private final ReactiveMessageConsumerSpec consumerSpec; - private final ReactivePulsarClient reactivePulsarClient; + @Nullable + private final List> defaultConfigCustomizers; + + /** + * Construct an instance. + * @param reactivePulsarClient the reactive client + * @param defaultConfigCustomizers the optional list of customizers that defines the + * default configuration for each created consumer. + */ public DefaultReactivePulsarConsumerFactory(ReactivePulsarClient reactivePulsarClient, - ReactiveMessageConsumerSpec consumerSpec) { - this.consumerSpec = new ImmutableReactiveMessageConsumerSpec( - consumerSpec != null ? consumerSpec : new MutableReactiveMessageConsumerSpec()); + List> defaultConfigCustomizers) { this.reactivePulsarClient = reactivePulsarClient; + this.defaultConfigCustomizers = defaultConfigCustomizers; } @Override @@ -57,14 +62,19 @@ public ReactiveMessageConsumer createConsumer(Schema schema) { public ReactiveMessageConsumer createConsumer(Schema schema, List> customizers) { - ReactiveMessageConsumerBuilder consumer = this.reactivePulsarClient.messageConsumer(schema); + ReactiveMessageConsumerBuilder consumerBuilder = this.reactivePulsarClient.messageConsumer(schema); + + // Apply the default customizers + if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) { + this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder))); + } - consumer.applySpec(this.consumerSpec); + // Apply the user specified customizers if (!CollectionUtils.isEmpty(customizers)) { - customizers.forEach((c) -> c.customize(consumer)); + customizers.forEach((c) -> c.customize(consumerBuilder)); } - return consumer.build(); + return consumerBuilder.build(); } } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactory.java index a772a78f..374f39ce 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactory.java @@ -22,9 +22,9 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.ReactiveMessageReader; import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder; -import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec; import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; +import org.springframework.lang.Nullable; import org.springframework.util.CollectionUtils; /** @@ -32,17 +32,25 @@ * * @param underlying payload type for the reactive reader. * @author Christophe Bornet + * @author Chris Bono */ public class DefaultReactivePulsarReaderFactory implements ReactivePulsarReaderFactory { - private final ReactiveMessageReaderSpec readerSpec; - private final ReactivePulsarClient reactivePulsarClient; + @Nullable + private final List> defaultConfigCustomizers; + + /** + * Construct an instance. + * @param reactivePulsarClient the reactive client + * @param defaultConfigCustomizers the optional list of customizers that defines the + * default configuration for each created reader. + */ public DefaultReactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient, - ReactiveMessageReaderSpec readerSpec) { + List> defaultConfigCustomizers) { this.reactivePulsarClient = reactivePulsarClient; - this.readerSpec = readerSpec; + this.defaultConfigCustomizers = defaultConfigCustomizers; } @Override @@ -54,12 +62,19 @@ public ReactiveMessageReader createReader(Schema schema) { public ReactiveMessageReader createReader(Schema schema, List> customizers) { - ReactiveMessageReaderBuilder reader = this.reactivePulsarClient.messageReader(schema) - .applySpec(this.readerSpec); + ReactiveMessageReaderBuilder readerBuilder = this.reactivePulsarClient.messageReader(schema); + + // Apply the default customizers + if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) { + this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(readerBuilder))); + } + + // Apply the user specified customizers if (!CollectionUtils.isEmpty(customizers)) { - customizers.forEach((c) -> c.customize(reader)); + customizers.forEach((c) -> c.customize(readerBuilder)); } - return reader.build(); + + return readerBuilder.build(); } } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java index 929a6091..adce08c7 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java @@ -23,18 +23,16 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; -import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec; import org.apache.pulsar.reactive.client.api.ReactiveMessageSender; import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec; import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; /** @@ -50,54 +48,45 @@ public class DefaultReactivePulsarSenderFactory implements ReactivePulsarSend private final ReactivePulsarClient reactivePulsarClient; - private final ReactiveMessageSenderSpec reactiveMessageSenderSpec; + private final TopicResolver topicResolver; @Nullable private final ReactiveMessageSenderCache reactiveMessageSenderCache; @Nullable - private final List> defaultSenderBuilderCustomizers; + private String defaultTopic; - private TopicResolver topicResolver; + @Nullable + private final List> defaultConfigCustomizers; + + private DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, TopicResolver topicResolver, + @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, @Nullable String defaultTopic, + @Nullable List> defaultConfigCustomizers) { + this.reactivePulsarClient = reactivePulsarClient; + this.topicResolver = topicResolver; + this.reactiveMessageSenderCache = reactiveMessageSenderCache; + this.defaultTopic = defaultTopic; + this.defaultConfigCustomizers = defaultConfigCustomizers; + } /** - * Construct an instance. - * @param pulsarClient the pulsar client to adapt into a reactive client - * @param reactiveMessageSenderSpec spec that defines the initial settings on the - * created senders - * @param reactiveMessageSenderCache cache used to cache created senders - * @param defaultSenderBuilderCustomizers optional list of sender builder customizers - * to apply to the created senders + * Create a builder that uses the specified Reactive pulsar client. + * @param reactivePulsarClient the reactive client + * @return the newly created builder instance + * @param the reactive sender type */ - public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient, - @Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec, - @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, - @Nullable List> defaultSenderBuilderCustomizers) { - this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec, - reactiveMessageSenderCache, defaultSenderBuilderCustomizers, new DefaultTopicResolver()); + public static Builder builderFor(ReactivePulsarClient reactivePulsarClient) { + return new Builder<>(reactivePulsarClient); } /** - * Construct an instance. - * @param reactivePulsarClient the reactive client to use - * @param reactiveMessageSenderSpec spec that defines the initial settings on the - * created senders - * @param reactiveMessageSenderCache cache used to cache created senders - * @param defaultSenderBuilderCustomizers optional list of sender builder customizers - * to apply to the created senders - * @param topicResolver the topic resolver to use + * Create a builder that adapts the specified pulsar client. + * @param pulsarClient the Pulsar client to adapt into a Reactive client. + * @return the newly created builder instance + * @param the reactive sender type */ - public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, - @Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec, - @Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, - @Nullable List> defaultSenderBuilderCustomizers, - TopicResolver topicResolver) { - this.reactivePulsarClient = reactivePulsarClient; - this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec( - reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec()); - this.reactiveMessageSenderCache = reactiveMessageSenderCache; - this.topicResolver = topicResolver; - this.defaultSenderBuilderCustomizers = defaultSenderBuilderCustomizers; + public static Builder builderFor(PulsarClient pulsarClient) { + return new Builder<>(AdaptedReactivePulsarClientFactory.create(pulsarClient)); } @Override @@ -121,34 +110,123 @@ public ReactiveMessageSender createSender(Schema schema, @Nullable String private ReactiveMessageSender doCreateReactiveMessageSender(Schema schema, @Nullable String topic, @Nullable List> customizers) { Objects.requireNonNull(schema, "Schema must be specified"); - String resolvedTopic = this.topicResolver - .resolveTopic(topic, () -> getReactiveMessageSenderSpec().getTopicName()) - .orElseThrow(); + String resolvedTopic = this.topicResolver.resolveTopic(topic, () -> getDefaultTopic()).orElseThrow(); this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic)); ReactiveMessageSenderBuilder sender = this.reactivePulsarClient.messageSender(schema); - sender.applySpec(this.reactiveMessageSenderSpec); - // Apply the default config customizer (preserve the topic) - if (!CollectionUtils.isEmpty(this.defaultSenderBuilderCustomizers)) { - this.defaultSenderBuilderCustomizers.forEach((customizer -> customizer.customize(sender))); + // Apply the default customizers (preserve the topic) + if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) { + this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(sender))); } sender.topic(resolvedTopic); + if (this.reactiveMessageSenderCache != null) { sender.cache(this.reactiveMessageSenderCache); } + + // Apply the user specified customizers (preserve the topic) if (!CollectionUtils.isEmpty(customizers)) { customizers.forEach((c) -> c.customize(sender)); } - // make sure the customizer do not override the topic sender.topic(resolvedTopic); return sender.build(); } @Override - public ReactiveMessageSenderSpec getReactiveMessageSenderSpec() { - return this.reactiveMessageSenderSpec; + public String getDefaultTopic() { + return this.defaultTopic; + } + + /** + * Builder for {@link DefaultReactivePulsarSenderFactory}. + * + * @param the reactive sender type + */ + public static class Builder { + + private final ReactivePulsarClient reactivePulsarClient; + + private TopicResolver topicResolver = new DefaultTopicResolver(); + + @Nullable + private ReactiveMessageSenderCache messageSenderCache; + + @Nullable + private String defaultTopic; + + @Nullable + private List> defaultConfigCustomizers; + + private Builder(ReactivePulsarClient reactivePulsarClient) { + Assert.notNull(reactivePulsarClient, "Reactive client is required"); + this.reactivePulsarClient = reactivePulsarClient; + } + + /** + * Provide the topic resolver to use. + * @param topicResolver the topic resolver to use + * @return this same builder instance + */ + public Builder withTopicResolver(TopicResolver topicResolver) { + this.topicResolver = topicResolver; + return this; + } + + /** + * Provide the message sender cache to use. + * @param messageSenderCache the message sender cache to use + * @return this same builder instance + */ + public Builder withMessageSenderCache(ReactiveMessageSenderCache messageSenderCache) { + this.messageSenderCache = messageSenderCache; + return this; + } + + /** + * Provide the default topic to use when one is not specified. + * @param defaultTopic the default topic to use + * @return this same builder instance + */ + public Builder withDefaultTopic(String defaultTopic) { + this.defaultTopic = defaultTopic; + return this; + } + + /** + * Provide a customizer to apply to the sender builder. + * @param customizer the customizer to apply to the builder before creating + * senders + * @return this same builder instance + */ + public Builder withDefaultConfigCustomizer(ReactiveMessageSenderBuilderCustomizer customizer) { + this.defaultConfigCustomizers = List.of(customizer); + return this; + } + + /** + * Provide an optional list of sender builder customizers to apply to the builder + * before creating the senders. + * @param customizers optional list of sender builder customizers to apply to the + * builder before creating the senders. + * @return this same builder instance + */ + public Builder withDefaultConfigCustomizers(List> customizers) { + this.defaultConfigCustomizers = customizers; + return this; + } + + /** + * Construct the sender factory using the specified settings. + * @return pulsar sender factory + */ + public DefaultReactivePulsarSenderFactory build() { + Assert.notNull(this.topicResolver, "Topic resolver is required"); + return new DefaultReactivePulsarSenderFactory<>(this.reactivePulsarClient, this.topicResolver, + this.messageSenderCache, this.defaultTopic, this.defaultConfigCustomizers); + } + } } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java index 6955c7e8..090f15bd 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarSenderFactory.java @@ -20,7 +20,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.ReactiveMessageSender; -import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec; import org.springframework.lang.Nullable; @@ -64,9 +63,10 @@ ReactiveMessageSender createSender(Schema schema, @Nullable String topic, @Nullable List> customizers); /** - * Return the ReactiveMessageSenderSpec to use when creating reactive senders. - * @return the ReactiveMessageSenderSpec + * Get the default topic to use for all created senders. + * @return the default topic to use for all created senders or null if no default set. */ - ReactiveMessageSenderSpec getReactiveMessageSenderSpec(); + @Nullable + String getDefaultTopic(); } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java index 4dfedb54..9662347e 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplate.java @@ -157,7 +157,7 @@ private Flux> doSendMany(@Nullable String topic, Flux defaultTopic).orElseThrow(); } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java index 4afe27dd..040d62a3 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java @@ -19,11 +19,11 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Collections; +import java.util.List; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec; import org.assertj.core.api.InstanceOfAssertFactories; @@ -44,7 +44,7 @@ class DefaultReactivePulsarConsumerFactoryTests { @Nested class FactoryCreatedWithoutSpec { - private org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory consumerFactory = new org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory<>( + private DefaultReactivePulsarConsumerFactory consumerFactory = new DefaultReactivePulsarConsumerFactory<>( AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null); @Test @@ -76,10 +76,9 @@ class FactoryCreatedWithSpec { @BeforeEach void createConsumerFactory() { - MutableReactiveMessageConsumerSpec spec = new MutableReactiveMessageConsumerSpec(); - spec.setConsumerName("test-consumer"); consumerFactory = new DefaultReactivePulsarConsumerFactory<>( - AdaptedReactivePulsarClientFactory.create((PulsarClient) null), spec); + AdaptedReactivePulsarClientFactory.create((PulsarClient) null), + List.of((builder) -> builder.consumerName("test-consumer"))); } @Test diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactoryTests.java index d0064fb7..08f60894 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarReaderFactoryTests.java @@ -19,11 +19,11 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Collections; +import java.util.List; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec; import org.apache.pulsar.reactive.client.api.ReactiveMessageReader; import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec; import org.assertj.core.api.InstanceOfAssertFactories; @@ -41,10 +41,9 @@ class DefaultReactivePulsarReaderFactoryTests { @Test void createReader() { - MutableReactiveMessageReaderSpec spec = new MutableReactiveMessageReaderSpec(); - spec.setReaderName("test-reader"); DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>( - AdaptedReactivePulsarClientFactory.create((PulsarClient) null), spec); + AdaptedReactivePulsarClientFactory.create((PulsarClient) null), + List.of((builder) -> builder.readerName("test-reader"))); ReactiveMessageReader reader = readerFactory.createReader(schema); @@ -55,10 +54,9 @@ void createReader() { @Test void createReaderWithCustomizer() { - MutableReactiveMessageReaderSpec spec = new MutableReactiveMessageReaderSpec(); - spec.setReaderName("test-reader"); DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>( - AdaptedReactivePulsarClientFactory.create((PulsarClient) null), spec); + AdaptedReactivePulsarClientFactory.create((PulsarClient) null), + List.of((builder) -> builder.readerName("test-reader"))); ReactiveMessageReader reader = readerFactory.createReader(schema, Collections.singletonList(builder -> builder.readerName("new-test-reader"))); diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactoryTests.java index e8ba85e9..e4f3c258 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactoryTests.java @@ -28,19 +28,22 @@ import java.util.List; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec; import org.apache.pulsar.reactive.client.api.ReactiveMessageSender; import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache; import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec; +import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.ThrowingConsumer; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.mockito.InOrder; +import org.springframework.pulsar.core.TopicResolver; + /** * Unit tests for {@link DefaultReactivePulsarSenderFactory}. * @@ -58,6 +61,15 @@ void createSenderWithCache() { assertThat(sender).extracting("producerCache").isSameAs(cache); } + @Test + void createSenderWithTopicResolver() { + var customTopicResolver = mock(TopicResolver.class); + var senderFactory = DefaultReactivePulsarSenderFactory.builderFor(mock(ReactivePulsarClient.class)) + .withTopicResolver(customTopicResolver) + .build(); + assertThat(senderFactory).hasFieldOrPropertyWithValue("topicResolver", customTopicResolver); + } + private void assertThatSenderHasTopic(ReactiveMessageSender sender, String expectedTopic) { assertThatSenderSpecSatisfies(sender, (senderSpec) -> assertThat(senderSpec).extracting(ReactiveMessageSenderSpec::getTopicName) @@ -71,17 +83,19 @@ private void assertThatSenderSpecSatisfies(ReactiveMessageSender sender, } private ReactivePulsarSenderFactory newSenderFactory() { - return new DefaultReactivePulsarSenderFactory<>(null, null, null, null); + return DefaultReactivePulsarSenderFactory.builderFor(mock(PulsarClient.class)).build(); } private ReactivePulsarSenderFactory newSenderFactoryWithDefaultTopic(String defaultTopic) { - MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec(); - senderSpec.setTopicName(defaultTopic); - return new DefaultReactivePulsarSenderFactory<>(null, senderSpec, null, null); + return DefaultReactivePulsarSenderFactory.builderFor(mock(PulsarClient.class)) + .withDefaultTopic(defaultTopic) + .build(); } private ReactivePulsarSenderFactory newSenderFactoryWithCache(ReactiveMessageSenderCache cache) { - return new DefaultReactivePulsarSenderFactory<>(null, null, cache, null); + return DefaultReactivePulsarSenderFactory.builderFor(mock(PulsarClient.class)) + .withMessageSenderCache(cache) + .build(); } @Nested @@ -189,8 +203,9 @@ void multipleConfigCustomizers() { private ReactivePulsarSenderFactory newSenderFactoryWithCustomizers( List> customizers) { - MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec(); - return new DefaultReactivePulsarSenderFactory<>(null, senderSpec, null, customizers); + return DefaultReactivePulsarSenderFactory.builderFor(mock(PulsarClient.class)) + .withDefaultConfigCustomizers(customizers) + .build(); } } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java index 96213ad3..63852a96 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.MessageSpec; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -47,6 +46,7 @@ import org.springframework.lang.Nullable; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; +import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory.Builder; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; import org.springframework.util.function.ThrowingConsumer; @@ -178,12 +178,9 @@ void sendMessageWithSenderCustomizer() throws Exception { @ValueSource(booleans = { true, false }) void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaultTopic) throws Exception { String topic = "ptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic"; - MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec(); - if (producerFactoryHasDefaultTopic) { - spec.setTopicName("fake-topic"); - } - ReactivePulsarSenderFactory producerFactory = new DefaultReactivePulsarSenderFactory<>(client, spec, null, - null); + ReactivePulsarSenderFactory producerFactory = DefaultReactivePulsarSenderFactory.builderFor(client) + .withDefaultTopic(producerFactoryHasDefaultTopic ? "fake-topic" : null) + .build(); // Topic mappings allows not specifying the topic when sending (nor having // default on producer) DefaultTopicResolver topicResolver = new DefaultTopicResolver(); @@ -198,24 +195,18 @@ void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaul @Test void sendMessageWithoutTopicFails() { - ReactivePulsarSenderFactory senderFactory = new DefaultReactivePulsarSenderFactory<>(client, - new MutableReactiveMessageSenderSpec(), null, null); - ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory); + ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>( + DefaultReactivePulsarSenderFactory.builderFor(client).build()); assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send("test-message").subscribe()) .withMessage("Topic must be specified when no default topic is configured"); } private Message sendAndConsume(Consumer> sendFunction, String topic, Schema schema, @Nullable T expectedValue, Boolean withDefaultTopic) throws Exception { - MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec(); - if (withDefaultTopic) { - senderSpec.setTopicName(topic); - } - ReactivePulsarSenderFactory senderFactory = new DefaultReactivePulsarSenderFactory<>(client, senderSpec, - null, null); - + ReactivePulsarSenderFactory senderFactory = DefaultReactivePulsarSenderFactory.builderFor(client) + .withDefaultTopic(withDefaultTopic ? topic : null) + .build(); ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory); - return sendAndConsume(pulsarTemplate, sendFunction, topic, schema, expectedValue); } @@ -258,10 +249,10 @@ void withSchemaInferredByMessageType() throws Exception { @Test void withSchemaInferredByTypeMappings() throws Exception { String topic = "ptt-schemaInferred-topic"; - MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec(); - spec.setTopicName(topic); - ReactivePulsarSenderFactory producerFactory = new DefaultReactivePulsarSenderFactory<>(client, spec, - null, null); + ReactivePulsarSenderFactory producerFactory = DefaultReactivePulsarSenderFactory + .builderFor(client) + .withDefaultTopic(topic) + .build(); // Custom schema resolver allows not specifying the schema when sending DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver(); schemaResolver.addCustomSchemaMapping(Foo.class, Schema.JSON(Foo.class)); @@ -280,10 +271,10 @@ class SendNullTests { @Test void sendNullWithDefaultTopicFails() { - MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec(); - spec.setTopicName("sendNullWithDefaultTopicFails"); - ReactivePulsarSenderFactory senderFactory = new DefaultReactivePulsarSenderFactory<>(client, spec, - null, null); + ReactivePulsarSenderFactory senderFactory = DefaultReactivePulsarSenderFactory + .builderFor(client) + .withDefaultConfigCustomizer((builder) -> builder.topic("sendNullWithDefaultTopicFails")) + .build(); ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory); assertThatIllegalArgumentException() .isThrownBy(() -> pulsarTemplate.send((String) null, Schema.STRING).subscribe()) @@ -292,8 +283,9 @@ void sendNullWithDefaultTopicFails() { @Test void sendNullWithoutSchemaFails() { - ReactivePulsarSenderFactory senderFactory = new DefaultReactivePulsarSenderFactory<>(client, - new MutableReactiveMessageSenderSpec(), null, null); + ReactivePulsarSenderFactory senderFactory = DefaultReactivePulsarSenderFactory + .builderFor(client) + .build(); ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory); assertThatIllegalArgumentException() .isThrownBy(() -> pulsarTemplate.send("sendNullWithoutSchemaFails", (String) null, null).subscribe()) diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java index f745e3c9..4e80735f 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainerTests.java @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -32,17 +31,15 @@ import org.apache.pulsar.reactive.client.adapter.DefaultMessageGroupingFunction; import org.apache.pulsar.reactive.client.api.MessageResult; import org.apache.pulsar.reactive.client.api.MessageSpec; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec; -import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline; import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; -import org.springframework.pulsar.core.DefaultTopicResolver; +import org.springframework.core.log.LogAccessor; import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory; import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory; +import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; @@ -54,290 +51,294 @@ * Tests for {@link DefaultReactivePulsarMessageListenerContainer} * * @author Christophe Bornet + * @author Chris Bono */ class DefaultReactivePulsarMessageListenerContainerTests implements PulsarTestContainerSupport { + private final LogAccessor logger = new LogAccessor(this.getClass()); + @Test - void messageHandlerListener() throws Exception { - String topic = "drpmlct-012"; - MutableReactiveMessageConsumerSpec config = new MutableReactiveMessageConsumerSpec(); - config.setTopicNames(Collections.singletonList(topic)); - config.setSubscriptionName("drpmlct-sb-012"); - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); - DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( - reactivePulsarClient, config); - // Ensure subscription is created - pulsarConsumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(10)); - CountDownLatch latch = new CountDownLatch(1); - ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); - pulsarContainerProperties.setMessageHandler( - (ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.fromRunnable(latch::countDown)); - pulsarContainerProperties.setSchema(Schema.STRING); - DefaultReactivePulsarMessageListenerContainer container = new DefaultReactivePulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.start(); - MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec(); - prodConfig.setTopicName(topic); - DefaultReactivePulsarSenderFactory pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>( - reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver()); - ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory); - pulsarTemplate.send("hello john doe").subscribe(); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - container.stop(); - pulsarClient.close(); + void oneByOneMessageHandler() throws Exception { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + ReactivePulsarMessageListenerContainer container = null; + try { + var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); + var topic = topicNameForTest("1"); + var consumerFactory = createAndPrepareConsumerFactory(topic, reactivePulsarClient); + var latch = new CountDownLatch(1); + var containerProperties = new ReactivePulsarContainerProperties(); + containerProperties.setSchema(Schema.STRING); + containerProperties.setMessageHandler( + (ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.fromRunnable(latch::countDown)); + container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties); + container.start(); + createPulsarTemplate(topic, reactivePulsarClient).send("hello john doe").subscribe(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } } @Test - void streamingHandlerListener() throws Exception { - String topic = "drpmlct-013"; - MutableReactiveMessageConsumerSpec config = new MutableReactiveMessageConsumerSpec(); - config.setTopicNames(Collections.singletonList(topic)); - config.setSubscriptionName("drpmlct-sb-013"); - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); - DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( - reactivePulsarClient, config); - // Ensure subscription is created - pulsarConsumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(10)); - CountDownLatch latch = new CountDownLatch(5); - ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); - pulsarContainerProperties - .setMessageHandler((ReactivePulsarStreamingHandler) (msg) -> msg.doOnNext((m) -> latch.countDown()) - .map(MessageResult::acknowledge)); - pulsarContainerProperties.setSchema(Schema.STRING); - DefaultReactivePulsarMessageListenerContainer container = new DefaultReactivePulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.start(); - MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec(); - prodConfig.setTopicName(topic); - DefaultReactivePulsarSenderFactory pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>( - reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver()); - ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory); - Flux.range(0, 5).map(i -> MessageSpec.of("hello john doe" + i)).as(pulsarTemplate::send).subscribe(); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - container.stop(); - pulsarClient.close(); + void streamingMessageHandler() throws Exception { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + ReactivePulsarMessageListenerContainer container = null; + try { + var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); + var topic = topicNameForTest("2"); + var consumerFactory = createAndPrepareConsumerFactory(topic, reactivePulsarClient); + var latch = new CountDownLatch(5); + var containerProperties = new ReactivePulsarContainerProperties(); + containerProperties.setSchema(Schema.STRING); + containerProperties.setMessageHandler( + (ReactivePulsarStreamingHandler) (msg) -> msg.doOnNext((m) -> latch.countDown()) + .map(MessageResult::acknowledge)); + container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties); + container.start(); + createPulsarTemplate(topic, reactivePulsarClient) + .newMessages(Flux.range(0, 5).map(i -> MessageSpec.of("hello john doe" + i))) + .send() + .subscribe(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } } @Test - void containerProperties() throws Exception { - String topic = "drpmlct-sb-014"; - String subscriptionName = "drpmlct-sb-014"; - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); - DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( - reactivePulsarClient, null); - // Ensure subscription is created - pulsarConsumerFactory - .createConsumer(Schema.STRING, - Collections.singletonList( - c -> c.topics(Collections.singletonList(topic)).subscriptionName(subscriptionName))) - .consumeNothing() - .block(Duration.ofSeconds(10)); - CountDownLatch latch = new CountDownLatch(1); - ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); - pulsarContainerProperties.setMessageHandler( - (ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.fromRunnable(latch::countDown)); - pulsarContainerProperties.setSchema(Schema.STRING); - pulsarContainerProperties.setTopics(List.of(topic)); - pulsarContainerProperties.setSubscriptionName(subscriptionName); - pulsarContainerProperties.setConcurrency(5); - pulsarContainerProperties.setUseKeyOrderedProcessing(true); - pulsarContainerProperties.setHandlingTimeout(Duration.ofMillis(7)); - DefaultReactivePulsarMessageListenerContainer container = new DefaultReactivePulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.start(); - MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec(); - prodConfig.setTopicName(topic); - DefaultReactivePulsarSenderFactory pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>( - reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver()); - ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory); - pulsarTemplate.send("hello john doe").subscribe(); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + void containerPropertiesAreRespected() throws Exception { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + ReactivePulsarMessageListenerContainer container = null; + try { + var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); + var topic = topicNameForTest("3"); + var consumerFactory = createAndPrepareConsumerFactory(topic, reactivePulsarClient); + var latch = new CountDownLatch(1); + var containerProperties = new ReactivePulsarContainerProperties(); + containerProperties.setSchema(Schema.STRING); + containerProperties.setMessageHandler( + (ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.fromRunnable(latch::countDown)); + containerProperties.setConcurrency(5); + containerProperties.setUseKeyOrderedProcessing(true); + containerProperties.setHandlingTimeout(Duration.ofMillis(7)); + container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties); + container.start(); + createPulsarTemplate(topic, reactivePulsarClient).send("hello john doe").subscribe(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(container).extracting("pipeline", InstanceOfAssertFactories.type(ReactiveMessagePipeline.class)) + .hasFieldOrPropertyWithValue("concurrency", 5) + .hasFieldOrPropertyWithValue("handlingTimeout", Duration.ofMillis(7)) + .extracting("groupingFunction") + .isInstanceOf(DefaultMessageGroupingFunction.class); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } + } - assertThat(container).extracting("pipeline", InstanceOfAssertFactories.type(ReactiveMessagePipeline.class)) - .hasFieldOrPropertyWithValue("concurrency", 5) - .hasFieldOrPropertyWithValue("handlingTimeout", Duration.ofMillis(7)) - .extracting("groupingFunction") - .isInstanceOf(DefaultMessageGroupingFunction.class); + @Test + void createConsumerWithSharedSubTypeOnFactoryWithExclusiveSubType() throws Exception { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + ReactivePulsarMessageListenerContainer container = null; + try { + var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); + var topic = topicNameForTest("4"); + ReactiveMessageConsumerBuilderCustomizer defaultConfig = (builder) -> { + builder.topic(topic); + builder.subscriptionName(topic + "-sub"); + }; + var consumerFactory = new DefaultReactivePulsarConsumerFactory<>(reactivePulsarClient, + List.of(defaultConfig)); + var containerProperties = new ReactivePulsarContainerProperties(); + containerProperties.setSchema(Schema.STRING); + containerProperties.setMessageHandler((ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.empty()); + container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties); + container.start(); - container.stop(); - pulsarClient.close(); + Thread.sleep(2_000); + StepVerifier + .create(consumerFactory.createConsumer(Schema.STRING, + List.of(builder -> builder.subscriptionType(SubscriptionType.Shared))) + .consumeNothing()) + .expectErrorMatches((ex) -> { + return true; + }) + .verify(Duration.ofSeconds(10)); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } } @Test - void defaultSubscriptionType() throws Exception { - String topic = "drpmlct-015"; - MutableReactiveMessageConsumerSpec config = new MutableReactiveMessageConsumerSpec(); - config.setTopicNames(Collections.singletonList(topic)); - config.setSubscriptionName("drpmlct-sb-015"); - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); - DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( - reactivePulsarClient, config); + void createConsumerWithSharedSubTypeOnFactoryWithSharedSubType() throws Exception { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + ReactivePulsarMessageListenerContainer container = null; + try { + var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); + var topic = topicNameForTest("5"); + ReactiveMessageConsumerBuilderCustomizer defaultConfig = (builder) -> { + builder.topic(topic); + builder.subscriptionName(topic + "-sub"); + }; + var consumerFactory = new DefaultReactivePulsarConsumerFactory<>(reactivePulsarClient, + List.of(defaultConfig)); + var containerProperties = new ReactivePulsarContainerProperties(); + containerProperties.setSchema(Schema.STRING); + containerProperties.setSubscriptionType(SubscriptionType.Shared); + containerProperties.setMessageHandler((ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.empty()); + container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties); + container.start(); - ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); - pulsarContainerProperties - .setMessageHandler((ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.empty()); - pulsarContainerProperties.setSchema(Schema.STRING); - DefaultReactivePulsarMessageListenerContainer container = new DefaultReactivePulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.start(); - - Thread.sleep(2_000); + Thread.sleep(2_000); + StepVerifier + .create(consumerFactory.createConsumer(Schema.STRING, + List.of(builder -> builder.subscriptionType(SubscriptionType.Shared))) + .consumeNothing()) + .expectComplete() + .verify(Duration.ofSeconds(10)); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } + } - StepVerifier - .create(pulsarConsumerFactory.createConsumer(Schema.STRING, - Collections.singletonList(c -> c.subscriptionType(SubscriptionType.Shared))) - .consumeNothing()) - .expectError() - .verify(Duration.ofSeconds(10)); + @Test + void containerPropertiesTopicsPattern() throws Exception { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + ReactivePulsarMessageListenerContainer container = null; + try { + var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); + var topic = "drpmlct-6-foo"; + var subscription = topic + "-sub"; + ReactiveMessageConsumerBuilderCustomizer customizer = (builder) -> { + builder.topic(topic); + builder.subscriptionName(topic + "-sub"); + }; + var consumerFactory = new DefaultReactivePulsarConsumerFactory(reactivePulsarClient, null); + // Ensure subscription is created + consumerFactory.createConsumer(Schema.STRING, List.of(customizer)) + .consumeNothing() + .block(Duration.ofSeconds(5)); + var latch = new CountDownLatch(1); + var containerProperties = new ReactivePulsarContainerProperties(); + containerProperties.setSchema(Schema.STRING); + containerProperties.setTopicsPattern("persistent://public/default/drpmlct-6-.*"); + containerProperties.setSubscriptionName(subscription); + containerProperties.setMessageHandler( + (ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.fromRunnable(latch::countDown)); + container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties); + container.start(); - container.stop(); - pulsarClient.close(); + createPulsarTemplate(topic, reactivePulsarClient).send("hello john doe").subscribe(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } } @Test - void containerSubscriptionType() throws Exception { - String topic = "drpmlct-016"; - MutableReactiveMessageConsumerSpec config = new MutableReactiveMessageConsumerSpec(); - config.setTopicNames(Collections.singletonList(topic)); - config.setSubscriptionName("drpmlct-sb-016"); - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); - DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( - reactivePulsarClient, config); + void deadLetterTopicCustomizer() throws Exception { + var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + ReactivePulsarMessageListenerContainer container = null; + try { + var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); + var topic = "drpmlct-7"; + var deadLetterTopic = topic + "-dlt"; + ReactiveMessageConsumerBuilderCustomizer defaultConfig = (builder) -> { + builder.topic(topic); + builder.subscriptionName(topic + "-sub"); + builder.negativeAckRedeliveryDelay(Duration.ZERO); + }; + var consumerFactory = new DefaultReactivePulsarConsumerFactory<>(reactivePulsarClient, + List.of(defaultConfig)); + var dlqConsumer = consumerFactory.createConsumer(Schema.STRING, + List.of((builder) -> builder.topics(List.of(deadLetterTopic)))); + // Ensure subscriptions are created + consumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(5)); + dlqConsumer.consumeNothing().block(Duration.ofSeconds(5)); + var latch = new CountDownLatch(6); + var containerProperties = new ReactivePulsarContainerProperties(); + containerProperties.setSchema(Schema.STRING); + containerProperties.setMessageHandler( + (ReactivePulsarStreamingHandler) (msg) -> msg.doOnNext((m) -> latch.countDown()) + .map((m) -> m.getValue().endsWith("4") ? MessageResult.negativeAcknowledge(m) + : MessageResult.acknowledge(m))); + containerProperties.setSubscriptionType(SubscriptionType.Shared); - ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); - pulsarContainerProperties - .setMessageHandler((ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.empty()); - pulsarContainerProperties.setSchema(Schema.STRING); - pulsarContainerProperties.setSubscriptionType(SubscriptionType.Shared); - DefaultReactivePulsarMessageListenerContainer container = new DefaultReactivePulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.start(); + var deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(1) + .deadLetterTopic(deadLetterTopic) + .build(); + container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties); + container.setConsumerCustomizer(b -> b.deadLetterPolicy(deadLetterPolicy)); + container.start(); - Thread.sleep(2_000); + var producerFactory = DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient) + .withDefaultTopic(topic) + .withDefaultConfigCustomizer((builder) -> builder.batchingEnabled(false)) + .build(); + var pulsarTemplate = new ReactivePulsarTemplate<>(producerFactory); + Flux.range(0, 5).map(i -> MessageSpec.of("hello john doe" + i)).as(pulsarTemplate::send).subscribe(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - StepVerifier - .create(pulsarConsumerFactory.createConsumer(Schema.STRING, - Collections.singletonList(c -> c.subscriptionType(SubscriptionType.Shared))) - .consumeNothing()) - .expectComplete() - .verify(Duration.ofSeconds(10)); + var dlqLatch = new CountDownLatch(1); + dlqConsumer.consumeOne(message -> { + if (message.getValue().endsWith("4")) { + dlqLatch.countDown(); + } + return Mono.just(MessageResult.acknowledge(message)); + }).block(); + assertThat(dlqLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + safeStopContainer(container); + pulsarClient.close(); + } + } - container.stop(); - pulsarClient.close(); + private String topicNameForTest(String suffix) { + return "drpmlct-" + suffix; } - @Test - void containerTopicsPattern() throws Exception { - String topic = "drpmlct-017-foo"; - String subscriptionName = "drpmlct-sb-017"; - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); - DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( - reactivePulsarClient, null); + private DefaultReactivePulsarConsumerFactory createAndPrepareConsumerFactory(String topic, + ReactivePulsarClient reactivePulsarClient) { + ReactiveMessageConsumerBuilderCustomizer defaultConfig = (builder) -> { + builder.topic(topic); + builder.subscriptionName(topic + "-sub"); + }; + var consumerFactory = new DefaultReactivePulsarConsumerFactory<>(reactivePulsarClient, List.of(defaultConfig)); // Ensure subscription is created - pulsarConsumerFactory - .createConsumer(Schema.STRING, - Collections.singletonList( - c -> c.topics(Collections.singletonList(topic)).subscriptionName(subscriptionName))) - .consumeNothing() - .block(Duration.ofSeconds(10)); - CountDownLatch latch = new CountDownLatch(1); - ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); - pulsarContainerProperties.setMessageHandler( - (ReactivePulsarOneByOneMessageHandler) (msg) -> Mono.fromRunnable(latch::countDown)); - pulsarContainerProperties.setSchema(Schema.STRING); - pulsarContainerProperties.setTopicsPattern("persistent://public/default/drpmlct-017-.*"); - pulsarContainerProperties.setSubscriptionName(subscriptionName); - DefaultReactivePulsarMessageListenerContainer container = new DefaultReactivePulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - container.start(); - MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec(); - prodConfig.setTopicName(topic); - DefaultReactivePulsarSenderFactory pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>( - reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver()); - ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory); - pulsarTemplate.send("hello john doe").subscribe(); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - - container.stop(); - pulsarClient.close(); + consumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(5)); + return consumerFactory; } - @Test - void consumerCustomizer() throws Exception { - String topic = "drpmlct-018"; - String deadLetterTopic = "drpmlct-018-dlq-topic"; - MutableReactiveMessageConsumerSpec config = new MutableReactiveMessageConsumerSpec(); - config.setTopicNames(Collections.singletonList(topic)); - config.setSubscriptionName("drpmlct-sb-018"); - config.setNegativeAckRedeliveryDelay(Duration.ZERO); - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()) - .build(); - ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); - DefaultReactivePulsarConsumerFactory pulsarConsumerFactory = new DefaultReactivePulsarConsumerFactory<>( - reactivePulsarClient, config); - ReactiveMessageConsumer dlqConsumer = pulsarConsumerFactory.createConsumer(Schema.STRING, - Collections.singletonList(b -> b.topics(Collections.singletonList(deadLetterTopic)))); - - // Ensure subscriptions are created - pulsarConsumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(10)); - dlqConsumer.consumeNothing().block(Duration.ofSeconds(10)); - - CountDownLatch latch = new CountDownLatch(6); - ReactivePulsarContainerProperties pulsarContainerProperties = new ReactivePulsarContainerProperties<>(); - pulsarContainerProperties - .setMessageHandler((ReactivePulsarStreamingHandler) (msg) -> msg.doOnNext((m) -> latch.countDown()) - .map((m) -> m.getValue().endsWith("4") ? MessageResult.negativeAcknowledge(m) - : MessageResult.acknowledge(m))); - pulsarContainerProperties.setSchema(Schema.STRING); - pulsarContainerProperties.setSubscriptionType(SubscriptionType.Shared); - DefaultReactivePulsarMessageListenerContainer container = new DefaultReactivePulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); - DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder() - .maxRedeliverCount(1) - .deadLetterTopic(deadLetterTopic) + private ReactivePulsarTemplate createPulsarTemplate(String topic, + ReactivePulsarClient reactivePulsarClient) { + var producerFactory = DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient) + .withDefaultTopic(topic) .build(); - container.setConsumerCustomizer(b -> b.deadLetterPolicy(deadLetterPolicy)); - container.start(); - MutableReactiveMessageSenderSpec prodConfig = new MutableReactiveMessageSenderSpec(); - prodConfig.setBatchingEnabled(false); - prodConfig.setTopicName(topic); - DefaultReactivePulsarSenderFactory pulsarProducerFactory = new DefaultReactivePulsarSenderFactory<>( - reactivePulsarClient, prodConfig, null, null, new DefaultTopicResolver()); - ReactivePulsarTemplate pulsarTemplate = new ReactivePulsarTemplate<>(pulsarProducerFactory); - Flux.range(0, 5).map(i -> MessageSpec.of("hello john doe" + i)).as(pulsarTemplate::send).subscribe(); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + return new ReactivePulsarTemplate<>(producerFactory); + } - CountDownLatch dlqLatch = new CountDownLatch(1); - dlqConsumer.consumeOne(message -> { - if (message.getValue().endsWith("4")) { - dlqLatch.countDown(); + private void safeStopContainer(ReactivePulsarMessageListenerContainer container) { + try { + if (container != null) { + container.stop(); } - return Mono.just(MessageResult.acknowledge(message)); - }).block(); - - assertThat(dlqLatch.await(10, TimeUnit.SECONDS)).isTrue(); - - container.stop(); - pulsarClient.close(); + } + catch (Exception ex) { + logger.warn(ex, "Failed to stop container %s: %s".formatted(container, ex.getMessage())); + } } } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index 7c0d6bed..fc68e814 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.BlockingQueue; @@ -44,7 +45,6 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; import org.apache.pulsar.reactive.client.api.MessageResult; -import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec; import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -124,7 +124,7 @@ public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsa @Bean public ReactivePulsarConsumerFactory pulsarConsumerFactory(ReactivePulsarClient pulsarClient) { - return new DefaultReactivePulsarConsumerFactory<>(pulsarClient, new MutableReactiveMessageConsumerSpec()); + return new DefaultReactivePulsarConsumerFactory<>(pulsarClient, Collections.emptyList()); } @Bean