Skip to content

PIP 13: Subscribe to topics represented by regular expressions

Matteo Merli edited this page Jan 24, 2018 · 2 revisions
  • Status: Proposal
  • Author: Jia Zhai - Streamlio
  • Pull Request: []
  • Mailing List discussion:

Motivation

The consumer needs to handle subscription to topics represented by regular expressions. The scope is namespace in first stage, all topics/patten should be targeted in same namespace, This will make easy authentication and authorization control.

At last, we should add and implementation a serials of new methods in PulsarClient.java

Consumer subscribe(Collection<String> topics, String subscription);
Consumer subscribe(Pattern topicsPattern, String subscription);

The goals the should be achieved are these below, we could achieve it one by one:

  • support subscription to multiple topics in the same namespace (no guarantee on ordering between topics)
  • support regex based subscription
  • auto-discover topic addition/deletion

Design

support subscription to multiple topics

This will need a new implementation of ConsumerBase which wrapper over multiple single-topic-consumers, let’s name it as TopicsConsumerImpl. When user call new method Consumer subscribe(Collection<String> topics, String subscription); It will iteratively new a ConsumerImpl for each topic, and return a TopicsConsumerImpl. The main work is:

  1. This TopicsConsumerImpl class should provide implementation of abstract methods in ConsumerBase, Should also provide some specific methods such as:
// maintain a map for all the <Topic, Consumer>, after we subscribe all the topics.
private final ConcurrentMap<String, ConsumerImpl> consumers = new ConcurrentHashMap<>();
// get topics
Set<String> getTopics();
// get consumers
List<ConsumerImpl> getConsumers();

// subscribe a topic
void subscribeTopic(String topic);
// unSubscribe a topic
void unSubscribeTopic(String topic);
  1. While Message receive/ack, the message identify is needed. In the implementation, we need handle Message identify(MessageId) differently for some of the abstract methods in ConsumerBase, because we have to add MessageId with additional String topic or consumer id, Or we may need to change MessageIdData in PulsarApi.proto.

support regex based subscription.

As mentioned before, the scope is namespace. The main work is:

  1. In above TopicsConsumerImpl class, need to keep the Pattern, which was passed in from api for subscription.
  2. leverage currently pulsar admin API of getList to get a list of Topics. In interface PersistentTopics :
List<String> getList(String namespace) throws PulsarAdminException;
List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException;
  1. The process of new method Consumer subscribe(String namespace, Pattern topicsPattern, String subscription) should be like this:
  • call method List<String> getList(String namespace) to get all the topics;
  • Use topicsPattern to filter out the matched sub-topics-list.
  • construct the TopicsConsumerImpl with the the sub-topics-list.

auto-discover topic addition/deletion

The main work is:

  1. provide a listener, which based on topics changes, to do subscribe and unsubscribe on individual topic when target topic been changed(remove/add).
Interface TopicsChangeListener {
	// unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics.
	void onTopicsRemoved(Collection<String> topics);
	// subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`.
	void onTopicsAdded(Collection<String> topics);
}

Add a method void registerListener(TopicsChangeListener listener) to TopicsConsumerImpl

  1. Based on above work, using a timer, periodically call List<String> getList(String namespace). And comparing the filtered fresh sub-topics-list with current topics holden in TopicsConsumerImpl, try to get 2 lists: newAddedTopicsList and removedTopicsList.
  2. If the 2 lists not empty, call TopicsChangeListener.onTopicsAdded(newAddedTopicsList), and TopicsChangeListener.onTopicsRemoved(removedTopicsList) to do subscribe and unsubscribe, and update consumers map in TopicsConsumerImpl.

Changes

The changes will be mostly on the surface and on client side:

  1. add and implementation a serials of new methods in org.apache.pulsar.client.api.PulsarClient.java
Consumer subscribe(Collection<String> topics, String subscription);
Consumer subscribe(Pattern topicsPattern, String subscription);
  1. add and implenentation of new Consumer, which is TopicsConsumerImpl , returned by above subscribe method
Clone this wiki locally