Skip to content

Commit

Permalink
Add multi-customizers to reactive reader and consumer
Browse files Browse the repository at this point in the history
* Also add builder to ReactivePulsarSenderFactory

See #432
  • Loading branch information
onobc committed Aug 26, 2023
1 parent 495de95 commit 74678fa
Show file tree
Hide file tree
Showing 11 changed files with 488 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,37 @@
import java.util.List;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;

import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

/**
* Default implementation for {@link ReactivePulsarConsumerFactory}.
*
* @param <T> underlying payload type for the reactive consumer.
* @author Christophe Bornet
* @author Chris Bono
*/
public class DefaultReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {

private final ReactiveMessageConsumerSpec consumerSpec;

private final ReactivePulsarClient reactivePulsarClient;

@Nullable
private final List<ReactiveMessageConsumerBuilderCustomizer<T>> defaultConfigCustomizers;

/**
* Construct an instance.
* @param reactivePulsarClient the reactive client
* @param defaultConfigCustomizers the optional list of customizers that defines the
* default configuration for each created consumer.
*/
public DefaultReactivePulsarConsumerFactory(ReactivePulsarClient reactivePulsarClient,
ReactiveMessageConsumerSpec consumerSpec) {
this.consumerSpec = new ImmutableReactiveMessageConsumerSpec(
consumerSpec != null ? consumerSpec : new MutableReactiveMessageConsumerSpec());
List<ReactiveMessageConsumerBuilderCustomizer<T>> defaultConfigCustomizers) {
this.reactivePulsarClient = reactivePulsarClient;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

@Override
Expand All @@ -57,14 +62,19 @@ public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
List<ReactiveMessageConsumerBuilderCustomizer<T>> customizers) {

ReactiveMessageConsumerBuilder<T> consumer = this.reactivePulsarClient.messageConsumer(schema);
ReactiveMessageConsumerBuilder<T> consumerBuilder = this.reactivePulsarClient.messageConsumer(schema);

// Apply the default customizers
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder)));
}

consumer.applySpec(this.consumerSpec);
// Apply the user specified customizers
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach((c) -> c.customize(consumer));
customizers.forEach((c) -> c.customize(consumerBuilder));
}

return consumer.build();
return consumerBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,35 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReader;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;

import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

/**
* Default implementation for {@link ReactivePulsarReaderFactory}.
*
* @param <T> underlying payload type for the reactive reader.
* @author Christophe Bornet
* @author Chris Bono
*/
public class DefaultReactivePulsarReaderFactory<T> implements ReactivePulsarReaderFactory<T> {

private final ReactiveMessageReaderSpec readerSpec;

private final ReactivePulsarClient reactivePulsarClient;

@Nullable
private final List<ReactiveMessageReaderBuilderCustomizer<T>> defaultConfigCustomizers;

/**
* Construct an instance.
* @param reactivePulsarClient the reactive client
* @param defaultConfigCustomizers the optional list of customizers that defines the
* default configuration for each created reader.
*/
public DefaultReactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
ReactiveMessageReaderSpec readerSpec) {
List<ReactiveMessageReaderBuilderCustomizer<T>> defaultConfigCustomizers) {
this.reactivePulsarClient = reactivePulsarClient;
this.readerSpec = readerSpec;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

@Override
Expand All @@ -54,12 +62,19 @@ public ReactiveMessageReader<T> createReader(Schema<T> schema) {
public ReactiveMessageReader<T> createReader(Schema<T> schema,
List<ReactiveMessageReaderBuilderCustomizer<T>> customizers) {

ReactiveMessageReaderBuilder<T> reader = this.reactivePulsarClient.messageReader(schema)
.applySpec(this.readerSpec);
ReactiveMessageReaderBuilder<T> readerBuilder = this.reactivePulsarClient.messageReader(schema);

// Apply the default customizers
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(readerBuilder)));
}

// Apply the user specified customizers
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach((c) -> c.customize(reader));
customizers.forEach((c) -> c.customize(readerBuilder));
}
return reader.build();

return readerBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,16 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -50,54 +48,45 @@ public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSend

private final ReactivePulsarClient reactivePulsarClient;

private final ReactiveMessageSenderSpec reactiveMessageSenderSpec;
private final TopicResolver topicResolver;

@Nullable
private final ReactiveMessageSenderCache reactiveMessageSenderCache;

@Nullable
private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers;
private String defaultTopic;

private TopicResolver topicResolver;
@Nullable
private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

private DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, TopicResolver topicResolver,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, @Nullable String defaultTopic,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers) {
this.reactivePulsarClient = reactivePulsarClient;
this.topicResolver = topicResolver;
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
this.defaultTopic = defaultTopic;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

/**
* Construct an instance.
* @param pulsarClient the pulsar client to adapt into a reactive client
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
* created senders
* @param reactiveMessageSenderCache cache used to cache created senders
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
* to apply to the created senders
* Create a builder that uses the specified Reactive pulsar client.
* @param reactivePulsarClient the reactive client
* @return the newly created builder instance
* @param <X> the reactive sender type
*/
public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient,
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers) {
this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec,
reactiveMessageSenderCache, defaultSenderBuilderCustomizers, new DefaultTopicResolver());
public static <X> Builder<X> builderFor(ReactivePulsarClient reactivePulsarClient) {
return new Builder<>(reactivePulsarClient);
}

/**
* Construct an instance.
* @param reactivePulsarClient the reactive client to use
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
* created senders
* @param reactiveMessageSenderCache cache used to cache created senders
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
* to apply to the created senders
* @param topicResolver the topic resolver to use
* Create a builder that adapts the specified pulsar client.
* @param pulsarClient the Pulsar client to adapt into a Reactive client.
* @return the newly created builder instance
* @param <T> the reactive sender type
*/
public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers,
TopicResolver topicResolver) {
this.reactivePulsarClient = reactivePulsarClient;
this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec(
reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec());
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
this.topicResolver = topicResolver;
this.defaultSenderBuilderCustomizers = defaultSenderBuilderCustomizers;
public static <T> Builder<T> builderFor(PulsarClient pulsarClient) {
return new Builder<>(AdaptedReactivePulsarClientFactory.create(pulsarClient));
}

@Override
Expand All @@ -121,34 +110,123 @@ public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String
private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String topic,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
Objects.requireNonNull(schema, "Schema must be specified");
String resolvedTopic = this.topicResolver
.resolveTopic(topic, () -> getReactiveMessageSenderSpec().getTopicName())
.orElseThrow();
String resolvedTopic = this.topicResolver.resolveTopic(topic, () -> getDefaultTopic()).orElseThrow();
this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic));

ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
sender.applySpec(this.reactiveMessageSenderSpec);

// Apply the default config customizer (preserve the topic)
if (!CollectionUtils.isEmpty(this.defaultSenderBuilderCustomizers)) {
this.defaultSenderBuilderCustomizers.forEach((customizer -> customizer.customize(sender)));
// Apply the default customizers (preserve the topic)
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(sender)));
}
sender.topic(resolvedTopic);

if (this.reactiveMessageSenderCache != null) {
sender.cache(this.reactiveMessageSenderCache);
}

// Apply the user specified customizers (preserve the topic)
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach((c) -> c.customize(sender));
}
// make sure the customizer do not override the topic
sender.topic(resolvedTopic);

return sender.build();
}

@Override
public ReactiveMessageSenderSpec getReactiveMessageSenderSpec() {
return this.reactiveMessageSenderSpec;
public String getDefaultTopic() {
return this.defaultTopic;
}

/**
* Builder for {@link DefaultReactivePulsarSenderFactory}.
*
* @param <T> the reactive sender type
*/
public static class Builder<T> {

private final ReactivePulsarClient reactivePulsarClient;

private TopicResolver topicResolver = new DefaultTopicResolver();

@Nullable
private ReactiveMessageSenderCache messageSenderCache;

@Nullable
private String defaultTopic;

@Nullable
private List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

private Builder(ReactivePulsarClient reactivePulsarClient) {
Assert.notNull(reactivePulsarClient, "Reactive client is required");
this.reactivePulsarClient = reactivePulsarClient;
}

/**
* Provide the topic resolver to use.
* @param topicResolver the topic resolver to use
* @return this same builder instance
*/
public Builder<T> withTopicResolver(TopicResolver topicResolver) {
this.topicResolver = topicResolver;
return this;
}

/**
* Provide the message sender cache to use.
* @param messageSenderCache the message sender cache to use
* @return this same builder instance
*/
public Builder<T> withMessageSenderCache(ReactiveMessageSenderCache messageSenderCache) {
this.messageSenderCache = messageSenderCache;
return this;
}

/**
* Provide the default topic to use when one is not specified.
* @param defaultTopic the default topic to use
* @return this same builder instance
*/
public Builder<T> withDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
return this;
}

/**
* Provide a customizer to apply to the sender builder.
* @param customizer the customizer to apply to the builder before creating
* senders
* @return this same builder instance
*/
public Builder<T> withDefaultConfigCustomizer(ReactiveMessageSenderBuilderCustomizer<T> customizer) {
this.defaultConfigCustomizers = List.of(customizer);
return this;
}

/**
* Provide an optional list of sender builder customizers to apply to the builder
* before creating the senders.
* @param customizers optional list of sender builder customizers to apply to the
* builder before creating the senders.
* @return this same builder instance
*/
public Builder<T> withDefaultConfigCustomizers(List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
this.defaultConfigCustomizers = customizers;
return this;
}

/**
* Construct the sender factory using the specified settings.
* @return pulsar sender factory
*/
public DefaultReactivePulsarSenderFactory<T> build() {
Assert.notNull(this.topicResolver, "Topic resolver is required");
return new DefaultReactivePulsarSenderFactory<>(this.reactivePulsarClient, this.topicResolver,
this.messageSenderCache, this.defaultTopic, this.defaultConfigCustomizers);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;

import org.springframework.lang.Nullable;

Expand Down Expand Up @@ -64,9 +63,10 @@ ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String topic,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers);

/**
* Return the ReactiveMessageSenderSpec to use when creating reactive senders.
* @return the ReactiveMessageSenderSpec
* Get the default topic to use for all created senders.
* @return the default topic to use for all created senders or null if no default set.
*/
ReactiveMessageSenderSpec getReactiveMessageSenderSpec();
@Nullable
String getDefaultTopic();

}
Loading

0 comments on commit 74678fa

Please sign in to comment.