Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-customizers to reactive reader and consumer #436

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,37 @@
import java.util.List;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;

import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

/**
* Default implementation for {@link ReactivePulsarConsumerFactory}.
*
* @param <T> underlying payload type for the reactive consumer.
* @author Christophe Bornet
* @author Chris Bono
*/
public class DefaultReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {

private final ReactiveMessageConsumerSpec consumerSpec;

private final ReactivePulsarClient reactivePulsarClient;

@Nullable
private final List<ReactiveMessageConsumerBuilderCustomizer<T>> defaultConfigCustomizers;

/**
* Construct an instance.
* @param reactivePulsarClient the reactive client
* @param defaultConfigCustomizers the optional list of customizers that defines the
* default configuration for each created consumer.
*/
public DefaultReactivePulsarConsumerFactory(ReactivePulsarClient reactivePulsarClient,
ReactiveMessageConsumerSpec consumerSpec) {
this.consumerSpec = new ImmutableReactiveMessageConsumerSpec(
consumerSpec != null ? consumerSpec : new MutableReactiveMessageConsumerSpec());
List<ReactiveMessageConsumerBuilderCustomizer<T>> defaultConfigCustomizers) {
this.reactivePulsarClient = reactivePulsarClient;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

@Override
Expand All @@ -57,14 +62,19 @@ public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
List<ReactiveMessageConsumerBuilderCustomizer<T>> customizers) {

ReactiveMessageConsumerBuilder<T> consumer = this.reactivePulsarClient.messageConsumer(schema);
ReactiveMessageConsumerBuilder<T> consumerBuilder = this.reactivePulsarClient.messageConsumer(schema);

// Apply the default customizers
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder)));
}

consumer.applySpec(this.consumerSpec);
// Apply the user specified customizers
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach((c) -> c.customize(consumer));
customizers.forEach((c) -> c.customize(consumerBuilder));
}

return consumer.build();
return consumerBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,35 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReader;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;

import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

/**
* Default implementation for {@link ReactivePulsarReaderFactory}.
*
* @param <T> underlying payload type for the reactive reader.
* @author Christophe Bornet
* @author Chris Bono
*/
public class DefaultReactivePulsarReaderFactory<T> implements ReactivePulsarReaderFactory<T> {

private final ReactiveMessageReaderSpec readerSpec;

private final ReactivePulsarClient reactivePulsarClient;

@Nullable
private final List<ReactiveMessageReaderBuilderCustomizer<T>> defaultConfigCustomizers;

/**
* Construct an instance.
* @param reactivePulsarClient the reactive client
* @param defaultConfigCustomizers the optional list of customizers that defines the
* default configuration for each created reader.
*/
public DefaultReactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
ReactiveMessageReaderSpec readerSpec) {
List<ReactiveMessageReaderBuilderCustomizer<T>> defaultConfigCustomizers) {
this.reactivePulsarClient = reactivePulsarClient;
this.readerSpec = readerSpec;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

@Override
Expand All @@ -54,12 +62,19 @@ public ReactiveMessageReader<T> createReader(Schema<T> schema) {
public ReactiveMessageReader<T> createReader(Schema<T> schema,
List<ReactiveMessageReaderBuilderCustomizer<T>> customizers) {

ReactiveMessageReaderBuilder<T> reader = this.reactivePulsarClient.messageReader(schema)
.applySpec(this.readerSpec);
ReactiveMessageReaderBuilder<T> readerBuilder = this.reactivePulsarClient.messageReader(schema);

// Apply the default customizers
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(readerBuilder)));
}

// Apply the user specified customizers
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach((c) -> c.customize(reader));
customizers.forEach((c) -> c.customize(readerBuilder));
}
return reader.build();

return readerBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,81 +23,70 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/**
* Default implementation of {@link ReactivePulsarSenderFactory}.
*
* @param <T> reactive sender type.
* @param <T> underlying payload type for the reactive sender.
* @author Christophe Bornet
* @author Chris Bono
*/
public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSenderFactory<T> {
public final class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSenderFactory<T> {

private final LogAccessor logger = new LogAccessor(this.getClass());

private final ReactivePulsarClient reactivePulsarClient;

private final ReactiveMessageSenderSpec reactiveMessageSenderSpec;
private final TopicResolver topicResolver;

@Nullable
private final ReactiveMessageSenderCache reactiveMessageSenderCache;

@Nullable
private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers;
private String defaultTopic;

private TopicResolver topicResolver;
@Nullable
private final List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

private DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, TopicResolver topicResolver,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache, @Nullable String defaultTopic,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers) {
this.reactivePulsarClient = reactivePulsarClient;
this.topicResolver = topicResolver;
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
this.defaultTopic = defaultTopic;
this.defaultConfigCustomizers = defaultConfigCustomizers;
}

/**
* Construct an instance.
* @param pulsarClient the pulsar client to adapt into a reactive client
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
* created senders
* @param reactiveMessageSenderCache cache used to cache created senders
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
* to apply to the created senders
* Create a builder that uses the specified Reactive pulsar client.
* @param reactivePulsarClient the reactive client
* @param <T> underlying payload type for the reactive sender
* @return the newly created builder instance
*/
public DefaultReactivePulsarSenderFactory(PulsarClient pulsarClient,
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers) {
this(AdaptedReactivePulsarClientFactory.create(pulsarClient), reactiveMessageSenderSpec,
reactiveMessageSenderCache, defaultSenderBuilderCustomizers, new DefaultTopicResolver());
public static <T> Builder<T> builderFor(ReactivePulsarClient reactivePulsarClient) {
return new Builder<>(reactivePulsarClient);
}

/**
* Construct an instance.
* @param reactivePulsarClient the reactive client to use
* @param reactiveMessageSenderSpec spec that defines the initial settings on the
* created senders
* @param reactiveMessageSenderCache cache used to cache created senders
* @param defaultSenderBuilderCustomizers optional list of sender builder customizers
* to apply to the created senders
* @param topicResolver the topic resolver to use
* Create a builder that adapts the specified pulsar client.
* @param pulsarClient the Pulsar client to adapt into a Reactive client
* @param <T> underlying payload type for the reactive sender
* @return the newly created builder instance
*/
public DefaultReactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
@Nullable ReactiveMessageSenderSpec reactiveMessageSenderSpec,
@Nullable ReactiveMessageSenderCache reactiveMessageSenderCache,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> defaultSenderBuilderCustomizers,
TopicResolver topicResolver) {
this.reactivePulsarClient = reactivePulsarClient;
this.reactiveMessageSenderSpec = new ImmutableReactiveMessageSenderSpec(
reactiveMessageSenderSpec != null ? reactiveMessageSenderSpec : new MutableReactiveMessageSenderSpec());
this.reactiveMessageSenderCache = reactiveMessageSenderCache;
this.topicResolver = topicResolver;
this.defaultSenderBuilderCustomizers = defaultSenderBuilderCustomizers;
public static <T> Builder<T> builderFor(PulsarClient pulsarClient) {
return new Builder<>(AdaptedReactivePulsarClientFactory.create(pulsarClient));
}

@Override
Expand All @@ -121,34 +110,123 @@ public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String
private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String topic,
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
Objects.requireNonNull(schema, "Schema must be specified");
String resolvedTopic = this.topicResolver
.resolveTopic(topic, () -> getReactiveMessageSenderSpec().getTopicName())
.orElseThrow();
String resolvedTopic = this.topicResolver.resolveTopic(topic, () -> getDefaultTopic()).orElseThrow();
this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic));

ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
sender.applySpec(this.reactiveMessageSenderSpec);

// Apply the default config customizer (preserve the topic)
if (!CollectionUtils.isEmpty(this.defaultSenderBuilderCustomizers)) {
this.defaultSenderBuilderCustomizers.forEach((customizer -> customizer.customize(sender)));
// Apply the default customizers (preserve the topic)
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(sender)));
}
sender.topic(resolvedTopic);

if (this.reactiveMessageSenderCache != null) {
sender.cache(this.reactiveMessageSenderCache);
}

// Apply the user specified customizers (preserve the topic)
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach((c) -> c.customize(sender));
}
// make sure the customizer do not override the topic
sender.topic(resolvedTopic);

return sender.build();
}

@Override
public ReactiveMessageSenderSpec getReactiveMessageSenderSpec() {
return this.reactiveMessageSenderSpec;
public String getDefaultTopic() {
return this.defaultTopic;
}

/**
* Builder for {@link DefaultReactivePulsarSenderFactory}.
*
* @param <T> the reactive sender type
*/
public static final class Builder<T> {

private final ReactivePulsarClient reactivePulsarClient;

private TopicResolver topicResolver = new DefaultTopicResolver();

@Nullable
private ReactiveMessageSenderCache messageSenderCache;

@Nullable
private String defaultTopic;

@Nullable
private List<ReactiveMessageSenderBuilderCustomizer<T>> defaultConfigCustomizers;

private Builder(ReactivePulsarClient reactivePulsarClient) {
Assert.notNull(reactivePulsarClient, "Reactive client is required");
this.reactivePulsarClient = reactivePulsarClient;
}

/**
* Provide the topic resolver to use.
* @param topicResolver the topic resolver to use
* @return this same builder instance
*/
public Builder<T> withTopicResolver(TopicResolver topicResolver) {
this.topicResolver = topicResolver;
return this;
}

/**
* Provide the message sender cache to use.
* @param messageSenderCache the message sender cache to use
* @return this same builder instance
*/
public Builder<T> withMessageSenderCache(ReactiveMessageSenderCache messageSenderCache) {
this.messageSenderCache = messageSenderCache;
return this;
}

/**
* Provide the default topic to use when one is not specified.
* @param defaultTopic the default topic to use
* @return this same builder instance
*/
public Builder<T> withDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
return this;
}

/**
* Provide a customizer to apply to the sender builder.
* @param customizer the customizer to apply to the builder before creating
* senders
* @return this same builder instance
*/
public Builder<T> withDefaultConfigCustomizer(ReactiveMessageSenderBuilderCustomizer<T> customizer) {
this.defaultConfigCustomizers = List.of(customizer);
return this;
}

/**
* Provide an optional list of sender builder customizers to apply to the builder
* before creating the senders.
* @param customizers optional list of sender builder customizers to apply to the
* builder before creating the senders.
* @return this same builder instance
*/
public Builder<T> withDefaultConfigCustomizers(List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
this.defaultConfigCustomizers = customizers;
return this;
}

/**
* Construct the sender factory using the specified settings.
* @return pulsar sender factory
*/
public DefaultReactivePulsarSenderFactory<T> build() {
Assert.notNull(this.topicResolver, "Topic resolver is required");
return new DefaultReactivePulsarSenderFactory<>(this.reactivePulsarClient, this.topicResolver,
this.messageSenderCache, this.defaultTopic, this.defaultConfigCustomizers);
}

}

}
Loading
Loading