Skip to content

PIP 45: Pluggable metadata interface

Matteo Merli edited this page Oct 9, 2019 · 2 revisions

PIP-45 - Pluggable metadata interface

Goals

Provide a unified pluggable interface that can abstract all the Pulsar metadata interactions.

After the refactoring, the default implementation will still be based on ZooKeeper and it will be 100% compatible with the existing metadata. The metadata will be kept in the same location and in the same exact format.

Once we have the interface defined we could have multiple backend implementations:

  • ZooKeeper
  • Etcd
  • In memory - for unit tests purposes
  • On local disk - for usage in Pulsar standalone

Context

Pulsar is currently using ZooKeeper for metadata and coordination purposes. These accesses are being done from Pulsar brokers and some administrative CLI tools. BookKeeper already supports a pluggable metadata store.

Additionally, ZooKeeper client API is being accessed from several places in the codebase, so we first need to consolidate all these accesses through a single generic MetadataStore interface.

This interface is based on the needs that Pulsar has in interacting with metadata and with the semantics offered by existing metadata stores (eg. ZooKeeper, Etcd and others). The API will be considered as "Beta" (meaning it could be evolved in breaking way) until we have at least few concrete implementations. Therefore, at least initially, this will be an internal Pulsar API and it will not be open to user plugins.

Refactoring steps

1. Define metadata store API

The metadata store is modeled after a basic Key-Value interface with compareAndSet() updates based on the version of a particular value.

public interface MetadataStore extends AutoCloseable {

    /**
     * Read the value of one key, identified by the path
     *
     * The async call will return a future that yields a {@link GetResult} that will contain the value and the
     * associated {@link Stat} object.
     *
     * If the value is not found, the future will yield an empty {@link Optional}.
     *
     * @param path
     *            the path of the key to get from the store
     * @return a future to track the async request
     */
    CompletableFuture<Optional<GetResult>> get(String path);

    /**
     * Return all the nodes (lexicographically sorted) that are children to the specific path.
     *
     * If the path itself does not exist, it will return an empty list.
     *
     * @param path
     *            the path of the key to check on the store
     * @return a future to track the async request
     */
    CompletableFuture<List<String>> getChildren(String path);

    /**
     * Read whether a specific path exists.
     *
     * Note: In case of keys with multiple levels (eg: '/a/b/c'), checking the existence of a parent (eg. '/a') might
     * not necessarily return true, unless the key had been explicitly created.
     *
     * @param path
     *            the path of the key to check on the store
     * @return a future to track the async request
     */
    CompletableFuture<Boolean> exists(String path);

    /**
     * Put a new value for a given key.
     *
     * The caller can specify an expected version to be atomically checked against the current version of the stored
     * data.
     *
     * The future will return the {@link Stat} object associated with the newly inserted value.
     *
     *
     * @param path
     *            the path of the key to delete from the store
     * @param value
     *            the value to
     * @param expectedVersion
     *            if present, the version will have to match with the currently stored value for the operation to
     *            succeed. Use -1 to enforce a non-existing value.
     * @throws BadVersionException
     *             if the expected version doesn't match the actual version of the data
     * @return a future to track the async request
     */
    CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> expectedVersion);

    /**
     *
     * @param path
     *            the path of the key to delete from the store
     * @param expectedVersion
     *            if present, the version will have to match with the currently stored value for the operation to
     *            succeed
     * @throws NotFoundException
     *             if the path is not found
     * @throws BadVersionException
     *             if the expected version doesn't match the actual version of the data
     * @return a future to track the async request
     */
    CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion);
}

Additionally, when the MetadataStore is created, it should be possible to specify an observer function that will be triggered whenever there are changes on a sub-tree of the specified keys.

This will be used to keep local caches updated without any polling.

2. Define coordination interface

Pulsar broker uses "coordination" in several different places. Examples are:

  • List of active brokers and their current load data report
  • Acquire ownership on a portion of a namespace topics (bundle)
  • Leader election (load manager)
  • Counters for generating unique prefix identifier

While in general these can be implemented through a Key-Value interface with the help of flags (eg. "ephemeral" nodes in ZooKeeper), each backend system might have a more direct way to implement these.

/**
 * Interface for the coordination service. Provides abstraction for distributed locks and leader election.
 */
public interface CoordinationService extends AutoCloseable {

    /**
     * Read the content of an existing lock.
     *
     * If the lock is already taken, this operation will fail immediately.
     *
     * Warning: because of the distributed nature of the lock, having acquired a lock will never provide a strong
     * guarantee that no one else also think it owns the same resource. The caller will have to deal with these race
     * conditions when using the resource itself (eg. using compareAndSet() or fencing mechanisms).
     *
     * @param path
     *            the path of the resource on which to acquire the lock
     * @param content
     *            the payload of the lock
     * @return a future that will track the completion of the operation
     * @throws NotFoundException
     *             if the lock is not taken
     * @throws CoordinationServiceException
     *             if there's a failure in reading the lock
     */
    CompletableFuture<Optional<byte[]>> readLock(String path);

    /**
     * Acquire a lock on a shared resource.
     *
     * If the lock is already taken, this operation will fail immediately.
     *
     * Warning: because of the distributed nature of the lock, having acquired a lock will never provide a strong
     * guarantee that no one else also think it owns the same resource. The caller will have to deal with these race
     * conditions when using the resource itself (eg. using compareAndSet() or fencing mechanisms).
     *
     * @param path
     *            the path of the resource on which to acquire the lock
     * @param content
     *            the payload of the lock
     * @return a future that will track the completion of the operation
     * @throws ResourceBusyException
     *             if the lock is already taken
     * @throws CoordinationServiceException
     *             if there's a failure in acquiring the lock
     */
    CompletableFuture<ResourceLock> acquireLock(String path, byte[] content);

    /**
     * List all the locks that are children of a specific path.
     *
     * For example, given locks: <code>/a/b/lock-1</code> and <code>/a/b/lock-2</code>, the <code>listLocks()</code>
     * will return a list of <code>["lock-1", "lock-2"]</code>.
     *
     * @param path
     *            the prefix path to get the list of locks
     * @return a future that will track the completion of the operation
     * @throws CoordinationServiceException
     *             if there's a failure in getting the list of locks
     */
    CompletableFuture<List<String>> listLocks(String path);

    /**
     * Try to become the leader for the specified resource.
     *
     * If there's already a leader, this request will be kept pending the current process is the one to become the
     * leader.
     *
     *
     * Warning: because of the distributed nature of the leader election, having been promoted to "leader" status will
     * never provide a strong guarantee that no one else also thinks it's the leader. The caller will have to deal with
     * these race conditions when using the resource itself (eg. using compareAndSet() or fencing mechanisms).
     *
     * @param path
     *            the path of the resource of which to become the leader
     * @param content
     *            the payload of the lock
     * @return a future that will track the completion of the operation
     * @throws CoordinationServiceException
     *             if there's a failure in the leader election
     */
    CompletableFuture<ResourceLock> becomeLeader(String path, byte[] content);

    /**
     * Increment a counter identified by the specified path and return the current value.
     *
     * The counter value will be guaranteed to be unique within the context of the path.
     *
     * @param path
     *            the path that identifies a particular counter
     * @return a future that will track the completion of the operation
     * @throws CoordinationServiceException
     *             if there's a failure in incrementing the counter
     */
    CompletableFuture<Long> getNextCounterValue(String path);
}


/**
 * Represent a lock that the current process has on a shared resource.
 */
public interface ResourceLock {

    /**
     * @return the content associated with the lock
     */
    byte[] getContent();

    /**
     * Release the lock on the resource.
     *
     * @return a future to track when the release operation is complete
     */
    CompletableFuture<Void> release();

    /**
     * Get a future that can be used to get notified when the lock is not more valid.
     *
     * Note: the future will not be triggered when the lock is voluntarily released.
     *
     * @return a future to get notification if the lock is expired
     */
    CompletableFuture<Void> getLockExpiredFuture();
}

3. Port ManagedLedger to use MetadataStore

ManagedLedger is already using an abstraction for metadata access (see MetaStore). It will be easy to convert that to the MetadataStore API.

4. Define metadata cache API

Currently, most metadata read accesses are happening through the ZooKeeperCache and additional classes based on it, like ZooKeeperDataCache and ZooKeeperChildrenCache.

The cache needs to be ported to use MetadataStore API, along with the support for receiving notifications and invalidating the stale entries.

An additional concept that should be added to the cache is the atomic "read-modify-update" operation.

This is currently being performed from many places in the code base and it should be consolidated into a single implementation. For example:

public interface TypedMetadataCache<T> {
    // ...

    /**
     * Perform an atomic read-modify-update of the value.
     *
     * The modify function can potentially be called multiple times if there are concurrent updates happening.
     *
     * @param path
     *            the path of the value
     * @param modifyFunction
     *            a function that will be passed the current value and returns a modified value to be stored
     * @return a future to track the completion of the operation
     */
    CompletableFuture<Void> readModifyUpdate(String path, Function<T, T> modifyFunction);
}

5. Define higher level abstraction for metadata

While the ZooKeeperCache is already typed (in order to store the object already deserialized), we should have an additional abstraction layer to mediate the access to the metadata.

For example, to get the list of tenants in a cluster, we shouldn't use MetadataStore.getChildren() directly from multiple places. Rather, we need to provide a ConfigurationStore interface such as:

public interface ConfigurationStore {
    CompletableFuture<List<String>> getTenants();

    CompletableFuture<List<String>> getNamespaces(String tenant);

    // ....
}
Clone this wiki locally