Skip to content

PIP 17: Tiered storage for Pulsar topics

Matteo Merli edited this page Apr 5, 2018 · 5 revisions

Motivation

Pulsar stores topics in bookkeeper ledgers, henceforth referred to as segments to avoid confusion with “ManagedLedger”. BookKeeper segments are normally replicated to three nodes. This can get expensive if you want to store the data for a long time. Even more so, if these are fast disks.

Bookkeeper segments have 3 primary access patterns.

  • Writes: Need to have low latency
  • Tailing reads: Need to have low latency
  • Catchup reads: Latency is unimportant. Throughput is important but how important depends on the use case.

BookKeeper uses the same type of storage for all three patterns. Moveover, bookkeeper uses the simplest form of replication available. It just makes 3(or N) copies and puts them on three different disks. The 3 copies are important while writing as it makes the voting protocol simple.

However, once a segment has been sealed it is immutable. I.e. The set of entries, the content of those entries and the order of the entries can never change. As a result once a segment is sealed you don’t need store it on your expensive 3X replicated SSD cluster. It can be transferred to some object store using Reed-Solomon on HDDs.

The remainder of this document covers how we will do this (in the context of Pulsar).

Background

Pulsar topics are stored in managed ledgers. Each topic (or partition of the topic, if the topic is partitioned), is stored on a single managed ledger. A managed ledger consists of a list of log segments, in a fixed order, oldest first. All segments in this list, apart from the most recent, are sealed. The most recent segment is the segment messages are currently being written to.

For tiered storage, the unit of storage will be a segment.

Proposal

High level

To implement tiered storage, Pulsar will copy segments as a whole from bookkeeper to an object store. Once the segment has been copied, we tag the segment in the managed ledger’s segment list with a key that identifies the segment in the object store. Once the tag has been added to the segment list, the segment can be deleted from bookkeeper.

For Pulsar to read back and serve messages from the object store, we will provide a ReadHandle implementation which reads data from the object store. With this, Pulsar only needs to know whether it is reading from bookkeeper or the object store when it is constructing the ReadHandle.

Detail

Tiered storage requires the following changes:

  • An interface for offloading
  • Modifications to managed ledger to use offloading
  • Triggering mechanism
  • Offloading implementation for S3

SegmentOffloader interface

interface SegmentOffloader {
    CompletableFuture<byte[]> offload(ReadHandle segment,
                                      Map<String, String> extraMetadata);
    CompletableFuture<ReadHandle> readOffloaded(long ledgerId,
                                                byte[] offloadContext);
    CompletableFuture<Void> deleteOffloaded(long ledgerId,
                                            byte[] offloadContext);
}

The offload method should take a ReadHandle to a segment, and copy all the data in that segment to an object store. The method returns a context, containing extra information which can be used to lookup the object on read. For example, in the case of S3, this context could be used to specify a region or a bucket. From Pulsar’s point of view, this data is opaque. It is up to the implementation of the SegmentOffloader interface to serialize and deserialize the context. The context is stored in the ManagedLedger metadata as part of LedgerInfo (see below). The context can be an empty byte array, in which case only the ledgerId is used for lookup.

The extraMetadata parameter is used to add some extra information to the object in the data store. For example, it can be used to carry the name of the managed ledger the segment is part of and the version of the software doing the offloading. The metadata is not intended to be read back by Pulsar, but rather to be used in inspection and debugging of the object store.

The readOffloaded method takes a ledger ID and a context and returns a ReadHandle which is backed by an object in the object store. Reading from this handle is identical to reading from the bookkeeper-backed ReadHandle passed into offload.

The deleteOffloaded method deletes the segment from the object store.

A SegmentOffloader implementation is passed as a parameter in a ManagedLedgerConfig object when opening a managed ledger via the managed ledger factory. If no implementation is specified, a default implementation is used which simply throws an exception on any usage.

Segment offloading is implemented as an interface rather than directly in managed ledger to allow implementations to inspect and massage the data before it goes to the object store. For example, it may make sense for Pulsar to break out batch messages into individual messages before storing in the object store, so that the object can be used directly by some other system. Managed ledgers know nothing of these batches, nor should they, so the interface allows these concerns to be kept separate.

Modifications to ManagedLedger

From the perspective of ManagedLedger, offloading looks very similar to trimming the head of the log, so it will follow the same pattern.

However, trimming occurs in the background, after a specified retention time, whereas offloading is triggered directly by the client. Offloading after a specified retention time will likely be added in a later iteration.

A new managed ledger call is provided.

interface ManagedLedger {
    void asyncOffloadPrefix(Position pos, OffloadCallback cb, Object ctx);
}

This call selects all sealed segments in the managed ledger before the passed in position, offloads them to an object store, updates the metadata, and deletes the key.

Updating the metadata involves setting a string field, offloadKey, in the LedgerInfo object associated with the segment. At the end of the operation, the metadata is written to zookeeper.

The offload prefix operation is mutually exclusive with trimming operations.

The offloadKey is used when creating opening segments for reading. If an offloadKey exists, then readOffloaded is called with the key. Otherwise the segment is opened from bookkeeper as is currently always the case.

It is also used when an offloaded segment is trimmed from the head of the log.

Triggering an offload

There will be a admin rest endpoint on the broker, to which a managed ledger position can be passed, and this will call asyncOffloadPrefix on the managed ledger.

Offloading can also be enabled to occur automatically for all topics within a namespace. There are two new configurations on a namespace, offloadTimeInMinutes and offloadSizeInMB similar to the corresponding configurations for retention.

Setting offloadTimeInMinutes will cause a segment to be offloaded to longterm storage when it has been sealed for that many minutes. This value must be lower than retentionTimeInMinutes, as otherwise the segment would be deleted before being offloaded.

Setting offloadSizeInMB will cause segments to be offloaded to longterm storage when the tail of the log reaches the specified size in MB. This value must be lower than retentionSizeInMB, as otherwise the segment would be deleted before being offloaded.

S3 Implementation of SegmentOffloader

A single segment in bookkeeper maps to a single object in S3. The object has two parts, an index and payload. The payload contains all the entries of the segment.

The schema version is stored as part of the S3 object user metadata.

The object is written to S3 using a multipart upload. The index is stored in part 1. The payload is split into 64MB blocks, and each block is uploaded as a “part”.

S3’s API requires that the size of the uploaded part is specified before sending the data. Therefore, all blocks apart from the final block must be exactly 64MB. Blocks can only contain complete entries, so if an entry doesn’t fit into the end of a block, the remaining bytes must be padded, with the pattern 0xDEAD1234.

The size of the final block can be calculated exactly before writing. The calculation is as follows.

(block header size)
 + ((length of segment) - (sum of length of already written entries))
 + ((number of entries to be written) * 4)

Payload format

A block has a short header, followed by payload data.

[ magic_word ][ block_len ][ block_entry_count ][ first_entry_id ]
  • magic_word : 4 bytes, a sequence of bytes used to identify the start of a block
  • `block_len : 4 bytes, the length of the block, including the header
  • block_entry_count : 4 bytes, the number of entries contained in the block
  • first_entry_id : 8 bytes, Entry ID of first entry in the block

The payload data is a sequence of entries of the format

[ entry_len ][ entry_data ]
  • entry_len : 4 bytes, the length of the entry
  • entry_data : as many bytes as specified in entry_len

Padding may be added at the end to bring the block to the correct size.

Index format

The index is a short header and then sequence of mappings.

The index header is

[ index_magic_word ][ index_len ][ index_entry_count ][ segment metadata ]
  • index_magic_word : 4 bytes, a sequence of bytes to identify the index
  • index_len : 4 bytes, the total length of the index in bytes, including the header
  • index_entry_count : 4 bytes, the total number of mappings in the index
  • segment_metadata_len : 4 bytes, the length of the segment metadata
  • segment_metadata : the binary representation of the segment metadata, stored in protobuf format

The body of the index contains mappings from the entry id of the first entry in a block to the block id, and the offset of the block within the S3 object. For example:

[ entryId:   0 ] -> [ block: 2, offset: 0]
[ entryId: 100 ] -> [ block: 3, offset: 12345 ]
...
  • entryId is 8 bytes
  • block is 4 bytes
  • offset is 8 bytes

Block id corresponds to S3 object part number. For the payload they start at 2. The index is stored in part 1. The offset is the byte offset of block, from the start of the file.

The offset for a block can be calculated as

(index length) + ((number of preceeding blocks) * (block size, 64MB in this case))

Writing to S3

We use the S3 multipart API to upload a segment to S3. Entries are written to 2MB byte buffers, and these are uploaded as parts, with part id starting from 2. The first entry id of each block is recorded locally along with the part id and the length of the block.

void writeToS3(ReadHandle segment) {
  var uploadId = initiateS3MultiPartUpload();

  var bytesWritten = 0;
  var nextEntry = 0;
  while (nextEntry <= segment.getLastAddConfirmed()) {
    var blockSize = calcBlockSizeForRemainingEntries(
            segment, bytesWritten, nextEntry);

    var is = new BlockAwareSegmentInputStream(
            segment, nextEntry, blockSize);
    var partId = pushPartToS3(uploadId, blockSize, is);

    addToIndex(firstEntry, partId, blockSize);

    nextEntry = is.lastEntryWritten() + 1;
    bytesWritten += is.entryBytesWritten();
  }

  buildAndWriteIndex(uploadId);
  finalizeUpload(uploadId);
}

The algorithm has roughly the form of the pseudocode above. As S3 doesn’t seem to have an asynchronous API which gives you fine grained access to multipart, the upload should run in a dedicated threadpool.

The most important component when writing is BlockAwareSegmentInputStream. This allows the reader to read exactly blockSize bytes. It first returns the header, followed by complete entries from the segment in the format specified above. Once it can no longer writer a complete entry due to the blockSize limit, it outputs bytes the pattern 0xDEAD1234 until it reaches the limit. It tracks the ID of the last entry it has written as well as the sum of the cumulative entry bytes written, which can be used by the caller calculate the size of the next block.

Reading from S3

To read back from S3 we make a read request for part 1 of the object to load the index. Then when a client makes request to read entries, we do a binary search into the index to find which blocks needed to be loaded and request those blocks.

We don’t load the whole block each time while reading, as this would mean holding many 64MB blocks in memory when only a fraction of that is currently useful. Instead we read 1MB at a time from S3 using by setting the range on the get object request. To read an entries in the middle of a block, we read from the start of the block until we find that block. The offset of some of the read entries are cached and these are used to enhance the index, so we can quickly find these entries, or nearby entries.

Clone this wiki locally