Skip to content

Commit

Permalink
sstable: lazily allocate ValueFetcher
Browse files Browse the repository at this point in the history
Currently the type `valueBlockReader` implements both
`block.GetLazyValueForPrefixAndValueHandler` and `base.ValueFetcher`.
The lifetime of the `ValueFetcher` needs to extend beyond that
of the iterator.

This commit splits it into two objects - `valueBlockReader` continues
to implement `block.GetLazyValueForPrefixAndValueHandler` but the new
`valueBlockFetcher` now implements `base.ValueFetcher`. The
`valueBlockReader` lazily allocates the `valueBlockFetcher` the first
time it is asked to produce a `LazyValue`. The result is that we
should avoid the allocation if we are never positioned on a KV with
non-in-place value (which is the norm when we get the latest version
of a single key).

```
name                              old time/op    new time/op    delta
RandSeekInSST/v4/single-level-10     691ns ± 2%     668ns ± 3%    -3.38%  (p=0.000 n=8+7)
RandSeekInSST/v4/two-level-10       1.57µs ± 2%    1.54µs ± 4%      ~     (p=0.067 n=7+8)
RandSeekInSST/v5/single-level-10     479ns ± 1%     425ns ± 1%   -11.28%  (p=0.001 n=7+7)
RandSeekInSST/v5/two-level-10        967ns ± 4%     868ns ± 4%   -10.25%  (p=0.001 n=7+7)

name                              old alloc/op   new alloc/op   delta
RandSeekInSST/v4/single-level-10      288B ± 0%      122B ± 2%   -57.81%  (p=0.000 n=8+8)
RandSeekInSST/v4/two-level-10         288B ± 0%      122B ± 1%   -57.74%  (p=0.000 n=8+7)
RandSeekInSST/v5/single-level-10      288B ± 0%       11B ± 0%   -96.18%  (p=0.000 n=8+7)
RandSeekInSST/v5/two-level-10         288B ± 0%       11B ± 0%   -96.18%  (p=0.000 n=8+8)

name                              old allocs/op  new allocs/op  delta
RandSeekInSST/v4/single-level-10      1.00 ± 0%      0.00       -100.00%  (p=0.000 n=8+8)
RandSeekInSST/v4/two-level-10         1.00 ± 0%      0.00       -100.00%  (p=0.000 n=8+8)
RandSeekInSST/v5/single-level-10      1.00 ± 0%      0.00       -100.00%  (p=0.000 n=8+8)
RandSeekInSST/v5/two-level-10         1.00 ± 0%      0.00       -100.00%  (p=0.000 n=8+8)
```
  • Loading branch information
RaduBerinde committed Oct 31, 2024
1 parent 405b4e1 commit 9cf3f64
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 93 deletions.
7 changes: 7 additions & 0 deletions sstable/block/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,12 @@ func InPlaceValuePrefix(setHasSameKeyPrefix bool) ValuePrefix {
// GetLazyValueForPrefixAndValueHandler is an interface for getting a LazyValue
// from a value prefix and value.
type GetLazyValueForPrefixAndValueHandler interface {
// GetLazyValueForPrefixAndValueHandle returns a LazyValue for the given value
// prefix and value.
//
// The result is only valid until the next call to
// GetLazyValueForPrefixAndValueHandle. Use LazyValue.Clone if the lifetime of
// the LazyValue needs to be extended. For more details, see the "memory
// management" comment where LazyValue is declared.
GetLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue
}
23 changes: 6 additions & 17 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
// loading. It may not actually have loaded the block, due to an error or
// because it was considered irrelevant.
dataBH block.Handle
vbReader *valueBlockReader
vbReader valueBlockReader
// vbRH is the read handle for value blocks, which are in a different
// part of the sstable than data blocks.
vbRH objstorage.ReadHandle
Expand Down Expand Up @@ -228,22 +228,13 @@ func newColumnBlockSingleLevelIterator(
)
var getLazyValuer block.GetLazyValueForPrefixAndValueHandler
if r.Properties.NumValueBlocks > 0 {
// NB: we cannot avoid this ~248 byte allocation, since valueBlockReader
// can outlive the singleLevelIterator due to be being embedded in a
// LazyValue. This consumes ~2% in microbenchmark CPU profiles, but we
// should only optimize this if it shows up as significant in end-to-end
// CockroachDB benchmarks, since it is tricky to do so. One possibility
// is that if many sstable iterators only get positioned at latest
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.vbReader = &valueBlockReader{
i.vbReader = valueBlockReader{
bpOpen: i,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
getLazyValuer = i.vbReader
getLazyValuer = &i.vbReader
i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc)
}
i.data.InitOnce(r.keySchema, i.cmp, r.Split, getLazyValuer)
Expand Down Expand Up @@ -301,13 +292,13 @@ func newRowBlockSingleLevelIterator(
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.vbReader = &valueBlockReader{
i.vbReader = valueBlockReader{
bpOpen: i,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
(&i.data).SetGetLazyValuer(i.vbReader)
(&i.data).SetGetLazyValuer(&i.vbReader)
i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc)
}
i.data.SetHasValuePrefix(true)
Expand Down Expand Up @@ -1600,9 +1591,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error {
if i.bpfs != nil {
releaseBlockPropertiesFilterer(i.bpfs)
}
if i.vbReader != nil {
i.vbReader.close()
}
i.vbReader.close()
if i.vbRH != nil {
err = firstError(err, i.vbRH.Close())
i.vbRH = nil
Expand Down
8 changes: 4 additions & 4 deletions sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ func newColumnBlockTwoLevelIterator(
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.secondLevel.vbReader = &valueBlockReader{
i.secondLevel.vbReader = valueBlockReader{
bpOpen: &i.secondLevel,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
getLazyValuer = i.secondLevel.vbReader
getLazyValuer = &i.secondLevel.vbReader
i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc)
}
i.secondLevel.data.InitOnce(r.keySchema, r.Compare, r.Split, getLazyValuer)
Expand Down Expand Up @@ -249,13 +249,13 @@ func newRowBlockTwoLevelIterator(
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.secondLevel.vbReader = &valueBlockReader{
i.secondLevel.vbReader = valueBlockReader{
bpOpen: &i.secondLevel,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.secondLevel.data.SetGetLazyValuer(i.secondLevel.vbReader)
i.secondLevel.data.SetGetLazyValuer(&i.secondLevel.vbReader)
i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc)
}
i.secondLevel.data.SetHasValuePrefix(true)
Expand Down
193 changes: 122 additions & 71 deletions sstable/value_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,38 +721,41 @@ func (trp *trivialReaderProvider) GetReader(ctx context.Context) (*Reader, error
// Close implements ReaderProvider.
func (trp *trivialReaderProvider) Close() {}

// valueBlockReader is used to retrieve values in value
// blocks. It is used when the sstable was written with
// Properties.ValueBlocksAreEnabled.
// valueBlockReader implements GetLazyValueForPrefixAndValueHandler; it is used
// to create LazyValues (each of which can can be used to retrieve a value in a
// value block). It is used when the sstable was written with
// Properties.ValueBlocksAreEnabled. The lifetime of this object is tied to the
// lifetime of the sstable iterator.
type valueBlockReader struct {
bpOpen blockProviderWhenOpen
rp ReaderProvider
vbih valueBlocksIndexHandle
stats *base.InternalIteratorStats
// fetcher is allocated lazily the first time we create a LazyValue, in order
// to avoid the allocation if we never read a lazy value (which should be the
// case when we're reading the latest value of a key).
fetcher *valueBlockFetcher

// The value blocks index is lazily retrieved the first time the reader
// needs to read a value that resides in a value block.
vbiBlock []byte
vbiCache block.BufferHandle
// When sequentially iterating through all key-value pairs, the cost of
// repeatedly getting a block that is already in the cache and releasing the
// bufferHandle can be ~40% of the cpu overhead. So the reader remembers the
// last value block it retrieved, in case there is locality of access, and
// this value block can be used for the next value retrieval.
valueBlockNum uint32
valueBlock []byte
valueBlockPtr unsafe.Pointer
valueCache block.BufferHandle
lazyFetcher base.LazyFetcher
closed bool
bufToMangle []byte
// lazyFetcher is the LazyFetcher value embedded in any LazyValue that was returned.
lazyFetcher base.LazyFetcher
}

var _ block.GetLazyValueForPrefixAndValueHandler = (*valueBlockReader)(nil)

func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue {
if r.fetcher == nil {
// NB: we cannot avoid this allocation, since the valueBlockFetcher
// can outlive the singleLevelIterator due to be being embedded in a
// LazyValue.
//
// Since it is a relatively small object, we could allocate multiple
// instances together, using a sync.Pool.
r.fetcher = newValueBlockFetcher(r.bpOpen, r.rp, r.vbih, r.stats)
}
fetcher := &r.lazyFetcher
valLen, h := decodeLenFromValueHandle(handle[1:])
*fetcher = base.LazyFetcher{
Fetcher: r,
Fetcher: r.fetcher,
Attribute: base.AttributeAndLen{
ValueLen: int32(valLen),
ShortAttribute: block.ValuePrefix(handle[0]).ShortAttribute(),
Expand All @@ -770,120 +773,168 @@ func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) ba

func (r *valueBlockReader) close() {
r.bpOpen = nil
r.vbiBlock = nil
r.vbiCache.Release()
// Set the handle to empty since Release does not nil the Handle.value. If
// we were to reopen this valueBlockReader and retrieve the same
// Handle.value from the cache, we don't want to accidentally unref it when
// attempting to unref the old handle.
r.vbiCache = block.BufferHandle{}
r.valueBlock = nil
r.valueBlockPtr = nil
r.valueCache.Release()
// See comment above.
r.valueCache = block.BufferHandle{}
r.closed = true
// rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be
// implemented.
if r.fetcher != nil {
r.fetcher.close()
r.fetcher = nil
}
}

// valueBlockFetcher implements base.ValueFetcher and is used through LazyValue
// to fetch a value from a value block. The lifetime of this object is not tied
// to the lifetime of the iterator - a LazyValue can be accessed later.
type valueBlockFetcher struct {
bpOpen blockProviderWhenOpen
rp ReaderProvider
vbih valueBlocksIndexHandle
stats *base.InternalIteratorStats
// The value blocks index is lazily retrieved the first time the reader
// needs to read a value that resides in a value block.
vbiBlock []byte
vbiCache block.BufferHandle
// When sequentially iterating through all key-value pairs, the cost of
// repeatedly getting a block that is already in the cache and releasing the
// bufferHandle can be ~40% of the cpu overhead. So the reader remembers the
// last value block it retrieved, in case there is locality of access, and
// this value block can be used for the next value retrieval.
valueBlockNum uint32
valueBlock []byte
valueBlockPtr unsafe.Pointer
valueCache block.BufferHandle
closed bool
bufToMangle []byte
}

var _ base.ValueFetcher = (*valueBlockFetcher)(nil)

func newValueBlockFetcher(
bpOpen blockProviderWhenOpen,
rp ReaderProvider,
vbih valueBlocksIndexHandle,
stats *base.InternalIteratorStats,
) *valueBlockFetcher {
return &valueBlockFetcher{
bpOpen: bpOpen,
rp: rp,
vbih: vbih,
stats: stats,
}
}

// Fetch implements base.ValueFetcher.
func (r *valueBlockReader) Fetch(
func (f *valueBlockFetcher) Fetch(
ctx context.Context, handle []byte, valLen int32, buf []byte,
) (val []byte, callerOwned bool, err error) {
if !r.closed {
val, err := r.getValueInternal(handle, valLen)
if !f.closed {
val, err := f.getValueInternal(handle, valLen)
if invariants.Enabled {
val = r.doValueMangling(val)
val = f.doValueMangling(val)
}
return val, false, err
}

bp := blockProviderWhenClosed{rp: r.rp}
bp := blockProviderWhenClosed{rp: f.rp}
err = bp.open(ctx)
if err != nil {
return nil, false, err
}
defer bp.close()
defer r.close()
r.bpOpen = bp
defer f.close()
f.bpOpen = bp
var v []byte
v, err = r.getValueInternal(handle, valLen)
v, err = f.getValueInternal(handle, valLen)
if err != nil {
return nil, false, err
}
buf = append(buf[:0], v...)
return buf, true, nil
}

func (f *valueBlockFetcher) close() {
f.vbiBlock = nil
f.vbiCache.Release()
// Set the handle to empty since Release does not nil the Handle.value. If
// we were to reopen this valueBlockFetcher and retrieve the same
// Handle.value from the cache, we don't want to accidentally unref it when
// attempting to unref the old handle.
f.vbiCache = block.BufferHandle{}
f.valueBlock = nil
f.valueBlockPtr = nil
f.valueCache.Release()
// See comment above.
f.valueCache = block.BufferHandle{}
f.closed = true
// rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be
// implemented.
}

// doValueMangling attempts to uncover violations of the contract listed in
// the declaration comment of LazyValue. It is expensive, hence only called
// when invariants.Enabled.
func (r *valueBlockReader) doValueMangling(v []byte) []byte {
func (f *valueBlockFetcher) doValueMangling(v []byte) []byte {
// Randomly set the bytes in the previous retrieved value to 0, since
// property P1 only requires the valueBlockReader to maintain the memory of
// one fetched value.
if rand.IntN(2) == 0 {
clear(r.bufToMangle)
clear(f.bufToMangle)
}
// Store the current value in a new buffer for future mangling.
r.bufToMangle = append([]byte(nil), v...)
return r.bufToMangle
f.bufToMangle = append([]byte(nil), v...)
return f.bufToMangle
}

func (r *valueBlockReader) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
vh := decodeRemainingValueHandle(handle)
vh.valueLen = uint32(valLen)
if r.vbiBlock == nil {
ch, err := r.bpOpen.readBlockForVBR(r.vbih.h, r.stats)
if f.vbiBlock == nil {
ch, err := f.bpOpen.readBlockForVBR(f.vbih.h, f.stats)
if err != nil {
return nil, err
}
r.vbiCache = ch
r.vbiBlock = ch.BlockData()
f.vbiCache = ch
f.vbiBlock = ch.BlockData()
}
if r.valueBlock == nil || r.valueBlockNum != vh.blockNum {
vbh, err := r.getBlockHandle(vh.blockNum)
if f.valueBlock == nil || f.valueBlockNum != vh.blockNum {
vbh, err := f.getBlockHandle(vh.blockNum)
if err != nil {
return nil, err
}
vbCacheHandle, err := r.bpOpen.readBlockForVBR(vbh, r.stats)
vbCacheHandle, err := f.bpOpen.readBlockForVBR(vbh, f.stats)
if err != nil {
return nil, err
}
r.valueBlockNum = vh.blockNum
r.valueCache.Release()
r.valueCache = vbCacheHandle
r.valueBlock = vbCacheHandle.BlockData()
r.valueBlockPtr = unsafe.Pointer(&r.valueBlock[0])
f.valueBlockNum = vh.blockNum
f.valueCache.Release()
f.valueCache = vbCacheHandle
f.valueBlock = vbCacheHandle.BlockData()
f.valueBlockPtr = unsafe.Pointer(&f.valueBlock[0])
}
if r.stats != nil {
r.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen)
if f.stats != nil {
f.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen)
}
return r.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil
return f.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil
}

func (r *valueBlockReader) getBlockHandle(blockNum uint32) (block.Handle, error) {
func (f *valueBlockFetcher) getBlockHandle(blockNum uint32) (block.Handle, error) {
indexEntryLen :=
int(r.vbih.blockNumByteLength + r.vbih.blockOffsetByteLength + r.vbih.blockLengthByteLength)
int(f.vbih.blockNumByteLength + f.vbih.blockOffsetByteLength + f.vbih.blockLengthByteLength)
offsetInIndex := indexEntryLen * int(blockNum)
if len(r.vbiBlock) < offsetInIndex+indexEntryLen {
if len(f.vbiBlock) < offsetInIndex+indexEntryLen {
return block.Handle{}, base.AssertionFailedf(
"index entry out of bounds: offset %d length %d block length %d",
offsetInIndex, indexEntryLen, len(r.vbiBlock))
offsetInIndex, indexEntryLen, len(f.vbiBlock))
}
b := r.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
n := int(r.vbih.blockNumByteLength)
b := f.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
n := int(f.vbih.blockNumByteLength)
bn := littleEndianGet(b, n)
if uint32(bn) != blockNum {
return block.Handle{},
errors.Errorf("expected block num %d but found %d", blockNum, bn)
}
b = b[n:]
n = int(r.vbih.blockOffsetByteLength)
n = int(f.vbih.blockOffsetByteLength)
blockOffset := littleEndianGet(b, n)
b = b[n:]
n = int(r.vbih.blockLengthByteLength)
n = int(f.vbih.blockLengthByteLength)
blockLen := littleEndianGet(b, n)
return block.Handle{Offset: blockOffset, Length: blockLen}, nil
}
2 changes: 1 addition & 1 deletion sstable/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestWriterWithValueBlocks(t *testing.T) {
return err.Error()
}
forceRowIterIgnoreValueBlocks := func(i *singleLevelIteratorRowBlocks) {
i.vbReader = nil
i.vbReader = valueBlockReader{}
i.data.SetGetLazyValuer(nil)
i.data.SetHasValuePrefix(false)
}
Expand Down

0 comments on commit 9cf3f64

Please sign in to comment.