Skip to content

Commit

Permalink
colblk: cache KeySeeker in the block metadata
Browse files Browse the repository at this point in the history
We allow caching the KeySeeker in the block metadata, saving on
initialization time.

```
name                              old time/op  new time/op  delta
RandSeekInSST/v4/single-level-10  1.22µs ± 1%  1.22µs ± 1%    ~     (p=0.565 n=7+7)
RandSeekInSST/v4/two-level-10     2.07µs ± 3%  2.08µs ± 3%    ~     (p=0.744 n=8+8)
RandSeekInSST/v5/single-level-10  1.06µs ± 1%  1.01µs ± 1%  -5.22%  (p=0.000 n=8+8)
RandSeekInSST/v5/two-level-10     1.60µs ± 3%  1.54µs ± 5%  -3.17%  (p=0.021 n=8+7)
```
  • Loading branch information
RaduBerinde committed Oct 20, 2024
1 parent 24d6d75 commit 1ad5f2a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 69 deletions.
31 changes: 11 additions & 20 deletions internal/crdbtest/crdbtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"io"
"math/rand/v2"
"slices"
"sync"
"time"
"unsafe"

Expand Down Expand Up @@ -533,8 +532,12 @@ var KeySchema = colblk.KeySchema{
kw.untypedVersions.Init()
return kw
},
NewKeySeeker: func() colblk.KeySeeker {
return cockroachKeySeekerPool.Get().(*cockroachKeySeeker)
InitKeySeekerMetadata: func(meta *colblk.KeySeekerMetadata, d *colblk.DataBlockDecoder) {
ks := (*cockroachKeySeeker)(unsafe.Pointer(meta))
ks.init(d)
},
KeySeeker: func(meta *colblk.KeySeekerMetadata) colblk.KeySeeker {
return (*cockroachKeySeeker)(unsafe.Pointer(meta))
},
}

Expand Down Expand Up @@ -662,10 +665,6 @@ func (kw *cockroachKeyWriter) Finish(
}
}

var cockroachKeySeekerPool = sync.Pool{
New: func() interface{} { return &cockroachKeySeeker{} },
}

type cockroachKeySeeker struct {
roachKeys colblk.PrefixBytes
roachKeyChanged colblk.Bitmap
Expand All @@ -674,23 +673,21 @@ type cockroachKeySeeker struct {
untypedVersions colblk.RawBytes
}

// Assert that the cockroachKeySeeker fits inside KeySeekerMetadata.
var _ uint = colblk.KeySeekerMetadataSize - uint(unsafe.Sizeof(cockroachKeySeeker{}))

var _ colblk.KeySeeker = (*cockroachKeySeeker)(nil)

// Init is part of the KeySeeker interface.
func (ks *cockroachKeySeeker) Init(d *colblk.DataBlockDecoder) error {
func (ks *cockroachKeySeeker) init(d *colblk.DataBlockDecoder) {
bd := d.BlockDecoder()
ks.roachKeys = bd.PrefixBytes(cockroachColRoachKey)
ks.roachKeyChanged = d.PrefixChanged()
ks.mvccWallTimes = bd.Uints(cockroachColMVCCWallTime)
ks.mvccLogical = bd.Uints(cockroachColMVCCLogical)
ks.untypedVersions = bd.RawBytes(cockroachColUntypedVersion)
return nil
}

// CompareFirstUserKey compares the provided key to the first user key
// contained within the data block. It's equivalent to performing
//
// Compare(firstUserKey, k)
// IsLowerBound is part of the KeySeeker interface.
func (ks *cockroachKeySeeker) IsLowerBound(k []byte, syntheticSuffix []byte) bool {
roachKey, untypedVersion, wallTime, logicalTime := DecodeEngineKey(k)
if v := Compare(ks.roachKeys.UnsafeFirstSlice(), roachKey); v != 0 {
Expand Down Expand Up @@ -876,11 +873,5 @@ func (ks *cockroachKeySeeker) MaterializeUserKeyWithSyntheticSuffix(
return res
}

// Release is part of the KeySeeker interface.
func (ks *cockroachKeySeeker) Release() {
*ks = cockroachKeySeeker{}
cockroachKeySeekerPool.Put(ks)
}

//go:linkname memmove runtime.memmove
func memmove(to, from unsafe.Pointer, n uintptr)
103 changes: 62 additions & 41 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"fmt"
"io"
"math"
"sync"
"unsafe"

"github.com/cockroachdb/crlib/crbytes"
Expand All @@ -37,9 +36,35 @@ type KeySchema struct {
Name string
ColumnTypes []DataType
NewKeyWriter func() KeyWriter
NewKeySeeker func() KeySeeker

// InitKeySeekerMetadata initializes the provided KeySeekerMetadata. This
// happens once when a block enters the block cache and can be used to save
// computation in NewKeySeeker.
InitKeySeekerMetadata func(meta *KeySeekerMetadata, d *DataBlockDecoder)

// KeySeeker returns a KeySeeker using metadata that was previously
// initialized with InitKeySeekerMetadata. The returned key seeker can be an
// unsafe cast of the metadata itself.
KeySeeker func(meta *KeySeekerMetadata) KeySeeker
}

// KeySeekerMetadata is an in-memory buffer that stores metadata for a block. It
// is allocated together with the buffer storing the block and is initialized
// once when the block is read from disk. It is always 8-byte aligned.
//
// Portions of this buffer can be cast to the structures we need (through
// unsafe.Pointer), but note that any pointers in these structures will be
// invisible to the GC. Pointers to the block's data buffer are ok, since the
// metadata and the data have the same lifetime (sharing the underlying
// allocation).
//
// KeySeekerMetadata is stored inside block.Metadata.
type KeySeekerMetadata [KeySeekerMetadataSize]byte

// KeySeekerMetadataSize is chosen to fit the CockroachDB key seeker
// implementation.
const KeySeekerMetadataSize = 168

// A KeyWriter maintains ColumnWriters for a data block for writing user keys
// into the database-specific key schema. Users may define their own key schema
// and implement KeyWriter to encode keys into custom columns that are aware of
Expand Down Expand Up @@ -99,8 +124,6 @@ func (kcmp KeyComparison) PrefixEqual() bool { return kcmp.PrefixLen == kcmp.Com
// goroutines. In practice, multiple DataBlockIterators may use the same
// KeySeeker.
type KeySeeker interface {
// Init initializes the iterator to read from the provided DataBlockDecoder.
Init(b *DataBlockDecoder) error
// IsLowerBound returns true if all keys in the data block (after suffix
// replacement if syntheticSuffix is not empty) are >= the given key. If the
// data block contains no keys, returns true.
Expand Down Expand Up @@ -137,9 +160,6 @@ type KeySeeker interface {
MaterializeUserKeyWithSyntheticSuffix(
keyIter *PrefixBytesIter, syntheticSuffix []byte, prevRow, row int,
) []byte
// Release releases the KeySeeker. It's called when the seeker is no longer
// in use. Implementations may pool KeySeeker objects.
Release()
}

const (
Expand All @@ -152,12 +172,6 @@ var defaultSchemaColumnTypes = []DataType{
defaultKeySchemaColumnSuffix: DataTypeBytes,
}

var defaultKeySeekerPool = sync.Pool{
New: func() interface{} {
return &defaultKeySeeker{}
},
}

// DefaultKeySchema returns the default key schema that decomposes a user key
// into its prefix and suffix. Prefixes are sorted in lexicographical order.
func DefaultKeySchema(comparer *base.Comparer, prefixBundleSize int) KeySchema {
Expand All @@ -170,9 +184,13 @@ func DefaultKeySchema(comparer *base.Comparer, prefixBundleSize int) KeySchema {
kw.suffixes.Init()
return kw
},
NewKeySeeker: func() KeySeeker {
ks := defaultKeySeekerPool.Get().(*defaultKeySeeker)
InitKeySeekerMetadata: func(meta *KeySeekerMetadata, d *DataBlockDecoder) {
ks := (*defaultKeySeeker)(unsafe.Pointer(&meta[0]))
ks.comparer = comparer
ks.init(d)
},
KeySeeker: func(meta *KeySeekerMetadata) KeySeeker {
ks := (*defaultKeySeeker)(unsafe.Pointer(&meta[0]))
return ks
},
}
Expand Down Expand Up @@ -296,6 +314,9 @@ func (w *defaultKeyWriter) Finish(col, rows int, offset uint32, buf []byte) (nex
// Assert that *defaultKeySeeker implements KeySeeker.
var _ KeySeeker = (*defaultKeySeeker)(nil)

// Assert that the metadata fits the defalut key seeker.
var _ uint = KeySeekerMetadataSize - uint(unsafe.Sizeof(defaultKeySeeker{}))

type defaultKeySeeker struct {
comparer *base.Comparer
decoder *DataBlockDecoder
Expand All @@ -304,12 +325,11 @@ type defaultKeySeeker struct {
sharedPrefix []byte
}

func (ks *defaultKeySeeker) Init(d *DataBlockDecoder) error {
func (ks *defaultKeySeeker) init(d *DataBlockDecoder) {
ks.decoder = d
ks.prefixes = d.d.PrefixBytes(defaultKeySchemaColumnPrefix)
ks.suffixes = d.d.RawBytes(defaultKeySchemaColumnSuffix)
ks.sharedPrefix = ks.prefixes.SharedPrefix()
return nil
}

// IsLowerBound is part of the KeySeeker interface.
Expand Down Expand Up @@ -401,11 +421,6 @@ func (ks *defaultKeySeeker) MaterializeUserKeyWithSyntheticSuffix(
return res
}

func (ks *defaultKeySeeker) Release() {
*ks = defaultKeySeeker{}
defaultKeySeekerPool.Put(ks)
}

// DataBlockEncoder encodes columnar data blocks using a user-defined schema.
type DataBlockEncoder struct {
Schema KeySchema
Expand Down Expand Up @@ -666,7 +681,6 @@ func (rw *DataBlockRewriter) RewriteSuffixes(
) (start, end base.InternalKey, rewritten []byte, err error) {
if !rw.initialized {
rw.iter.InitOnce(rw.KeySchema, rw.compare, rw.split, assertNoExternalValues{})
rw.keySeeker = rw.KeySchema.NewKeySeeker()
rw.encoder.Init(rw.KeySchema)
rw.initialized = true
}
Expand All @@ -693,7 +707,9 @@ func (rw *DataBlockRewriter) RewriteSuffixes(
// we're performing here and instead use a read-time IterTransform.

rw.decoder.Init(rw.KeySchema, input)
rw.keySeeker.Init(&rw.decoder)
meta := &KeySeekerMetadata{}
rw.KeySchema.InitKeySeekerMetadata(meta, &rw.decoder)
rw.keySeeker = rw.KeySchema.KeySeeker(meta)
rw.encoder.Reset()
if err = rw.iter.Init(&rw.decoder, block.IterTransforms{}); err != nil {
return base.InternalKey{}, base.InternalKey{}, nil, err
Expand Down Expand Up @@ -737,8 +753,15 @@ func (rw *DataBlockRewriter) RewriteSuffixes(
return start, end, rewritten, nil
}

// Assert that a DataBlockDecoder can fit inside block.Metadata.
const _ uint = block.MetadataSize - uint(unsafe.Sizeof(DataBlockDecoder{}))
// dataBlockDecoderSize is the size of DataBlockDecoder, round up to 8 bytes.
const dataBlockDecoderSize = (unsafe.Sizeof(DataBlockDecoder{}) + 7) &^ 7

// Assert that dataBlockDecoderSize is a multiple of 8 bytes (so that
// KeySeekerMetadata is also aligned).
const _ uint = uint(-(dataBlockDecoderSize % 8))

// Assert that a DataBlockDecoder and a KeySeekerMetadata can fit inside block.Metadata.
const _ uint = block.MetadataSize - uint(dataBlockDecoderSize) - KeySeekerMetadataSize

// Assert that an IndexBlockDecoder can fit inside block.Metadata.
const _ uint = block.MetadataSize - uint(unsafe.Sizeof(IndexBlockDecoder{}))
Expand All @@ -757,7 +780,8 @@ func InitDataBlockMetadata(schema KeySchema, md *block.Metadata, data []byte) (e
}
}()
d.Init(schema, data)
// TODO(radu): Initialize the KeySeeker here as well.
keySchemaMeta := (*KeySeekerMetadata)(unsafe.Pointer(&md[dataBlockDecoderSize]))
schema.InitKeySeekerMetadata(keySchemaMeta, d)
return nil
}

Expand Down Expand Up @@ -927,9 +951,10 @@ func (i *DataBlockIter) Init(d *DataBlockDecoder, transforms block.IterTransform
}
i.noTransforms = i.transforms.NoTransforms()

if i.keySeeker == nil {
i.keySeeker = i.keySchema.NewKeySeeker()
}
// TODO(radu): see if this allocation can be a problem for the suffix rewriter.
meta := &KeySeekerMetadata{}
i.keySchema.InitKeySeekerMetadata(meta, d)
i.keySeeker = i.keySchema.KeySeeker(meta)

// The worst case is when the largest key in the block has no suffix.
maxKeyLength := len(i.transforms.SyntheticPrefix) + int(d.maximumKeyLength) + len(i.transforms.SyntheticSuffix)
Expand All @@ -938,7 +963,7 @@ func (i *DataBlockIter) Init(d *DataBlockDecoder, transforms block.IterTransform
i.kv = base.InternalKV{}
i.kvRow = math.MinInt
i.nextObsoletePoint = 0
return i.keySeeker.Init(d)
return nil
}

// InitHandle initializes the block from the provided buffer handle. InitHandle
Expand All @@ -949,7 +974,9 @@ func (i *DataBlockIter) InitHandle(
) error {
i.cmp = cmp
i.split = split
i.d = (*DataBlockDecoder)(unsafe.Pointer(h.BlockMetadata()))
blockMeta := h.BlockMetadata()
i.d = (*DataBlockDecoder)(unsafe.Pointer(blockMeta))
keySeekerMeta := (*KeySeekerMetadata)(blockMeta[unsafe.Sizeof(DataBlockDecoder{}):])
i.h.Release()
i.h = h

Expand All @@ -963,18 +990,15 @@ func (i *DataBlockIter) InitHandle(
}
i.noTransforms = i.transforms.NoTransforms()

if i.keySeeker == nil {
i.keySeeker = i.keySchema.NewKeySeeker()
}

// The worst case is when the largest key in the block has no suffix.
maxKeyLength := len(i.transforms.SyntheticPrefix) + int(i.d.maximumKeyLength) + len(i.transforms.SyntheticSuffix)
i.keyIter.Init(maxKeyLength, i.transforms.SyntheticPrefix)
i.row = -1
i.kv = base.InternalKV{}
i.kvRow = math.MinInt
i.nextObsoletePoint = 0
return i.keySeeker.Init(i.d)
i.keySeeker = i.keySchema.KeySeeker(keySeekerMeta)
return nil
}

// Handle returns the handle to the block.
Expand Down Expand Up @@ -1402,10 +1426,7 @@ var _ = (*DataBlockIter).decodeKey

// Close implements the base.InternalIterator interface.
func (i *DataBlockIter) Close() error {
if i.keySeeker != nil {
i.keySeeker.Release()
i.keySeeker = nil
}
i.keySeeker = nil
i.d = nil
i.h.Release()
i.h = block.BufferHandle{}
Expand Down
2 changes: 1 addition & 1 deletion testdata/ingest
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Virtual tables: 0 (0B)
Local tables size: 569B
Compression types: snappy: 1
Block cache: 3 entries (1.0KB) hit rate: 18.2%
Table cache: 1 entries (808B) hit rate: 50.0%
Table cache: 1 entries (816B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down
14 changes: 7 additions & 7 deletions testdata/metrics
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 0.0%
Table cache: 1 entries (808B) hit rate: 0.0%
Table cache: 1 entries (816B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -218,7 +218,7 @@ Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 33.3%
Table cache: 1 entries (808B) hit rate: 66.7%
Table cache: 1 entries (816B) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -496,7 +496,7 @@ Virtual tables: 0 (0B)
Local tables size: 4.3KB
Compression types: snappy: 7
Block cache: 8 entries (2.8KB) hit rate: 9.1%
Table cache: 1 entries (808B) hit rate: 53.8%
Table cache: 1 entries (816B) hit rate: 53.8%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -560,7 +560,7 @@ Virtual tables: 0 (0B)
Local tables size: 6.1KB
Compression types: snappy: 10
Block cache: 8 entries (2.8KB) hit rate: 9.1%
Table cache: 1 entries (808B) hit rate: 53.8%
Table cache: 1 entries (816B) hit rate: 53.8%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -835,7 +835,7 @@ Virtual tables: 0 (0B)
Local tables size: 0B
Compression types: snappy: 1
Block cache: 0 entries (0B) hit rate: 0.0%
Table cache: 1 entries (808B) hit rate: 0.0%
Table cache: 1 entries (816B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -883,7 +883,7 @@ Virtual tables: 0 (0B)
Local tables size: 0B
Compression types: snappy: 2
Block cache: 4 entries (1.4KB) hit rate: 0.0%
Table cache: 1 entries (808B) hit rate: 50.0%
Table cache: 1 entries (816B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -932,7 +932,7 @@ Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 3
Block cache: 4 entries (1.4KB) hit rate: 0.0%
Table cache: 1 entries (808B) hit rate: 50.0%
Table cache: 1 entries (816B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down

0 comments on commit 1ad5f2a

Please sign in to comment.