Skip to content

Commit

Permalink
sstable: optimize allocation of uncompressed buffers
Browse files Browse the repository at this point in the history
When we don't compress a data buffer, the `RawColumnWriter` makes a
clone of the data buffer. In the sysbench workload, we see this
allocation taking place for almost half of the blocks (suggesting that
the data isn't very compressible).

Use the buffer that we would have used for the compressed data instead
of allocating a new one.
  • Loading branch information
RaduBerinde committed Dec 13, 2024
1 parent 924a669 commit d556bba
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 23 deletions.
15 changes: 13 additions & 2 deletions sstable/block/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func (b PhysicalBlock) Clone() PhysicalBlock {
return PhysicalBlock{data: data, trailer: b.trailer}
}

// CloneUsingBuf makes a copy of the block data, using the given slice if it has
// enough capacity.
func (b PhysicalBlock) CloneUsingBuf(buf []byte) (_ PhysicalBlock, newBuf []byte) {
newBuf = append(buf[:0], b.data...)
return PhysicalBlock{data: newBuf, trailer: b.trailer}, newBuf
}

// IsCompressed returns true if the block is compressed.
func (b *PhysicalBlock) IsCompressed() bool {
return CompressionIndicator(b.trailer[0]) != NoCompressionIndicator
Expand Down Expand Up @@ -260,14 +267,18 @@ func CompressAndChecksum(
}

// compress compresses a sstable block, using dstBuf as the desired destination.
//
// The result is aliased to dstBuf if that buffer had enough capacity, otherwise
// it is a newly-allocated buffer.
func compress(
compression Compression, b []byte, dstBuf []byte,
) (indicator CompressionIndicator, compressed []byte) {
switch compression {
case SnappyCompression:
// snappy relies on the length of the buffer, and not the capacity to
// determine if it needs to make an allocation.
dstBuf = dstBuf[:cap(dstBuf):cap(dstBuf)]
return SnappyCompressionIndicator, snappy.Encode(dstBuf, b)
case NoCompression:
return NoCompressionIndicator, b
case ZstdCompression:
if len(dstBuf) < binary.MaxVarintLen64 {
dstBuf = append(dstBuf, make([]byte, binary.MaxVarintLen64-len(dstBuf))...)
Expand Down
3 changes: 3 additions & 0 deletions sstable/block/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func TestCompressionRoundtrip(t *testing.T) {
rng := rand.New(rand.NewPCG(0, seed))

for compression := DefaultCompression + 1; compression < NCompression; compression++ {
if compression == NoCompression {
continue
}
t.Run(compression.String(), func(t *testing.T) {
payload := make([]byte, 1+rng.IntN(10<<10 /* 10 KiB */))
for i := range payload {
Expand Down
20 changes: 13 additions & 7 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,10 @@ func (w *RawColumnWriter) AddWithForceObsolete(
size := w.dataBlock.Size()
if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
// Flush the data block excluding the key we just added.
w.flushDataBlockWithoutNextKey(key.UserKey)
if err := w.flushDataBlockWithoutNextKey(key.UserKey); err != nil {
w.err = err
return err
}
// flushDataBlockWithoutNextKey reset the data block builder, and we can
// add the key to this next block now.
w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
Expand Down Expand Up @@ -601,16 +604,19 @@ type compressedBlock struct {
blockBuf blockBuf
}

func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) {
func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) error {
serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
// Compute the separator that will be written to the index block alongside
// this data block's end offset. It is the separator between the last key in
// the finished block and the [nextKey] that was excluded from the block.
w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf)
if err := w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf); err != nil {
return err
}
w.dataBlock.Reset()
w.pendingDataBlockSize = 0
return nil
}

// maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
Expand Down Expand Up @@ -652,7 +658,7 @@ func (w *RawColumnWriter) enqueueDataBlock(
cb := compressedBlockPool.Get().(*compressedBlock)
cb.blockBuf.checksummer.Type = w.opts.Checksum
cb.physical = block.CompressAndChecksum(
&cb.blockBuf.compressedBuf,
&cb.blockBuf.dataBuf,
serializedBlock,
w.opts.Compression,
&cb.blockBuf.checksummer,
Expand All @@ -663,7 +669,7 @@ func (w *RawColumnWriter) enqueueDataBlock(
// it to the write queue to be asynchronously written to disk.
// TODO(jackson): Should we try to avoid this clone by tracking the
// lifetime of the DataBlockWriters?
cb.physical = cb.physical.Clone()
cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
}
return w.enqueuePhysicalBlock(cb, separator)
}
Expand Down Expand Up @@ -1147,7 +1153,7 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper
cb := compressedBlockPool.Get().(*compressedBlock)
cb.blockBuf.checksummer.Type = w.opts.Checksum
cb.physical = block.CompressAndChecksum(
&cb.blockBuf.compressedBuf,
&cb.blockBuf.dataBuf,
b,
w.opts.Compression,
&cb.blockBuf.checksummer,
Expand All @@ -1158,7 +1164,7 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper
// it to the write queue to be asynchronously written to disk.
// TODO(jackson): Should we try to avoid this clone by tracking the
// lifetime of the DataBlockWriters?
cb.physical = cb.physical.Clone()
cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
}
if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func (w *layoutWriter) writeBlock(
b []byte, compression block.Compression, buf *blockBuf,
) (block.Handle, error) {
return w.writePrecompressedBlock(block.CompressAndChecksum(
&buf.compressedBuf, b, compression, &buf.checksummer))
&buf.dataBuf, b, compression, &buf.checksummer))
}

// writePrecompressedBlock writes a pre-compressed block and its
Expand Down
20 changes: 9 additions & 11 deletions sstable/rowblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,19 +461,17 @@ type blockBuf struct {
// blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
// likely large enough for a block handle with properties.
tmp [blockHandleLikelyMaxLen]byte
// compressedBuf is the destination buffer for compression. It is re-used over the
// lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
compressedBuf []byte
checksummer block.Checksummer
// dataBuf is the destination buffer for compression, or (in some cases where
// compression is not used) for storing a copy of the data. It is re-used over
// the lifetime of the blockBuf, avoiding the allocation of a temporary buffer
// for each block.
dataBuf []byte
checksummer block.Checksummer
}

func (b *blockBuf) clear() {
// We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
// on the length of the buffer, and not the capacity to determine if it needs
// to make an allocation.
*b = blockBuf{
compressedBuf: b.compressedBuf, checksummer: b.checksummer,
}
b.tmp = [blockHandleLikelyMaxLen]byte{}
b.dataBuf = b.dataBuf[:0]
}

// A dataBlockBuf holds all the state required to compress and write a data block to disk.
Expand Down Expand Up @@ -545,7 +543,7 @@ func (d *dataBlockBuf) finish() {
}

func (d *dataBlockBuf) compressAndChecksum(c block.Compression) {
d.physical = block.CompressAndChecksum(&d.compressedBuf, d.uncompressed, c, &d.checksummer)
d.physical = block.CompressAndChecksum(&d.dataBuf, d.uncompressed, c, &d.checksummer)
}

func (d *dataBlockBuf) shouldFlush(
Expand Down
4 changes: 2 additions & 2 deletions sstable/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,15 @@ func TestBlockBufClear(t *testing.T) {
defer leaktest.AfterTest(t)()
b1 := &blockBuf{}
b1.tmp[0] = 1
b1.compressedBuf = make([]byte, 1)
b1.dataBuf = make([]byte, 1)
b1.clear()
testBlockBufClear(t, b1, &blockBuf{})
}

func TestClearDataBlockBuf(t *testing.T) {
defer leaktest.AfterTest(t)()
d := newDataBlockBuf(1, block.ChecksumTypeCRC32c)
d.blockBuf.compressedBuf = make([]byte, 1)
d.blockBuf.dataBuf = make([]byte, 1)
d.dataBlock.Add(ikey("apple"), nil)
d.dataBlock.Add(ikey("banana"), nil)

Expand Down

0 comments on commit d556bba

Please sign in to comment.