From 063761381bb7f214af051700c9494659f2d464f3 Mon Sep 17 00:00:00 2001 From: Linh Tran Tuan Date: Mon, 4 Sep 2023 10:40:38 +0900 Subject: [PATCH] Adapt RocksDB 8.5.3 (#126) --- array_test.go | 2 ++ backup_test.go | 18 +++++----- build.sh | 2 +- c.h | 19 +++++++++- cache_test.go | 8 +++++ cf_test.go | 22 ++++++++++-- cf_ts_test.go | 4 +++ checkpoint_test.go | 1 + compaction_filter_test.go | 2 ++ comparator_test.go | 2 ++ comparator_ts_test.go | 2 ++ cow_test.go | 4 +++ db.go | 33 ++++++++++++++++++ db_external_file_test.go | 4 +++ db_test.go | 36 +++++++++++++++++++ db_ts_test.go | 4 +++ env_test.go | 2 ++ filter_policy_test.go | 18 +++++++--- iterator_test.go | 6 ++++ iterator_ts_test.go | 8 +++++ jemalloc_test.go | 2 ++ memory_usage_test.go | 2 ++ merge_operator_test.go | 9 ++++- options.go | 58 ++++++++++++++++++++++++++----- options_backup_test.go | 2 ++ options_blob_test.go | 2 ++ options_block_based_table_test.go | 2 ++ options_compaction_test.go | 6 ++++ options_env_test.go | 2 ++ options_flush.go | 8 ++--- options_flush_test.go | 24 +++++++------ options_read_test.go | 2 ++ options_restore_test.go | 2 ++ options_test.go | 16 +++++++-- options_write_test.go | 2 ++ slice_transform_test.go | 9 +++-- transactiondb.go | 34 ++++++++++++++++++ transactiondb_test.go | 11 ++++++ write_batch_test.go | 4 +++ write_batch_ts_test.go | 4 +++ write_batch_wi_test.go | 6 ++++ 41 files changed, 357 insertions(+), 47 deletions(-) diff --git a/array_test.go b/array_test.go index f04195f..7bcfd9e 100644 --- a/array_test.go +++ b/array_test.go @@ -7,6 +7,8 @@ import ( ) func TestBytesToCSlice(t *testing.T) { + t.Parallel() + v, err := byteSlicesToCSlices(nil) require.Nil(t, v) require.Nil(t, err) diff --git a/backup_test.go b/backup_test.go index b3e2189..c77cd6d 100644 --- a/backup_test.go +++ b/backup_test.go @@ -7,6 +7,8 @@ import ( ) func TestBackupEngine(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -63,7 +65,7 @@ func TestBackupEngine(t *testing.T) { opts.Destroy() }() - t.Run("createBackupAndVerify", func(t *testing.T) { + { infos := engine.GetInfo() require.Empty(t, infos) @@ -80,25 +82,25 @@ func TestBackupEngine(t *testing.T) { require.True(t, infos[i].Size > 0) require.True(t, infos[i].NumFiles > 0) } - }) + } - t.Run("purge", func(t *testing.T) { + { require.Nil(t, engine.PurgeOldBackups(1)) infos := engine.GetInfo() require.Equal(t, 1, len(infos)) - }) + } - t.Run("restoreFromLatest", func(t *testing.T) { + { dir := t.TempDir() ro := NewRestoreOptions() defer ro.Destroy() require.Nil(t, engine.RestoreDBFromLatestBackup(dir, dir, ro)) require.Nil(t, engine.RestoreDBFromLatestBackup(dir, dir, ro)) - }) + } - t.Run("restoreFromBackup", func(t *testing.T) { + { infos := engine.GetInfo() require.Equal(t, 1, len(infos)) @@ -127,5 +129,5 @@ func TestBackupEngine(t *testing.T) { require.False(t, v4.Exists()) v4.Destroy() } - }) + } } diff --git a/build.sh b/build.sh index c14c794..90127ab 100755 --- a/build.sh +++ b/build.sh @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version} # Note: if you don't have a good reason, please do not set -DPORTABLE=ON # This one is set here on purpose of compatibility with github action runtime processor -rocksdb_version="8.4.4" +rocksdb_version="8.5.3" cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \ mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \ -DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \ diff --git a/c.h b/c.h index cb0c224..407eb47 100644 --- a/c.h +++ b/c.h @@ -411,6 +411,16 @@ rocksdb_create_column_family(rocksdb_t* db, const rocksdb_options_t* column_family_options, const char* column_family_name, char** errptr); +extern ROCKSDB_LIBRARY_API rocksdb_column_family_handle_t** +rocksdb_create_column_families(rocksdb_t* db, + const rocksdb_options_t* column_family_options, + int num_column_families, + const char* const* column_family_names, + size_t* lencfs, char** errptr); + +extern ROCKSDB_LIBRARY_API void rocksdb_create_column_families_destroy( + rocksdb_column_family_handle_t** list); + extern ROCKSDB_LIBRARY_API rocksdb_column_family_handle_t* rocksdb_create_column_family_with_ttl( rocksdb_t* db, const rocksdb_options_t* column_family_options, @@ -1518,7 +1528,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_hash_skip_list_rep( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_hash_link_list_rep( rocksdb_options_t*, size_t); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_plain_table_factory( - rocksdb_options_t*, uint32_t, int, double, size_t); + rocksdb_options_t*, uint32_t, int, double, size_t, size_t, char, + unsigned char, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_min_level_to_compress( rocksdb_options_t* opt, int level); @@ -2446,6 +2457,12 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_property_value( extern ROCKSDB_LIBRARY_API int rocksdb_transactiondb_property_int( rocksdb_transactiondb_t* db, const char* propname, uint64_t* out_val); +extern ROCKSDB_LIBRARY_API rocksdb_t* rocksdb_transactiondb_get_base_db( + rocksdb_transactiondb_t* txn_db); + +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_close_base_db( + rocksdb_t* base_db); + extern ROCKSDB_LIBRARY_API rocksdb_transaction_t* rocksdb_transaction_begin( rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* write_options, diff --git a/cache_test.go b/cache_test.go index f9c8d8c..8b4ec9f 100644 --- a/cache_test.go +++ b/cache_test.go @@ -7,6 +7,8 @@ import ( ) func TestLRUCache(t *testing.T) { + t.Parallel() + cache := NewLRUCache(19) defer cache.Destroy() @@ -18,6 +20,8 @@ func TestLRUCache(t *testing.T) { } func TestHyperClockCache(t *testing.T) { + t.Parallel() + cache := NewHyperClockCache(100, 10) defer cache.Destroy() @@ -29,6 +33,8 @@ func TestHyperClockCache(t *testing.T) { } func TestLRUCacheWithOpts(t *testing.T) { + t.Parallel() + opts := NewLRUCacheOptions() opts.SetCapacity(19) opts.SetNumShardBits(2) @@ -45,6 +51,8 @@ func TestLRUCacheWithOpts(t *testing.T) { } func TestHyperClockCacheWithOpts(t *testing.T) { + t.Parallel() + opts := NewHyperClockCacheOptions(100, 10) opts.SetCapacity(19) opts.SetEstimatedEntryCharge(10) diff --git a/cf_test.go b/cf_test.go index 1a8f9a5..8b1d84d 100644 --- a/cf_test.go +++ b/cf_test.go @@ -8,6 +8,8 @@ import ( ) func TestColumnFamilyOpen(t *testing.T) { + t.Parallel() + dir := t.TempDir() givenNames := []string{"default", "guide"} @@ -22,12 +24,18 @@ func TestColumnFamilyOpen(t *testing.T) { cfh[0].Destroy() cfh[1].Destroy() - actualNames, err := ListColumnFamilies(opts, dir) - require.Nil(t, err) - require.EqualValues(t, actualNames, givenNames) + for i := 0; i < 10; i++ { + actualNames, err := ListColumnFamilies(opts, dir) + require.Nil(t, err) + require.EqualValues(t, actualNames, givenNames) + + runtime.GC() + } } func TestColumnFamilyCreateDrop(t *testing.T) { + t.Parallel() + dir := t.TempDir() opts := NewDefaultOptions() @@ -53,6 +61,8 @@ func TestColumnFamilyCreateDrop(t *testing.T) { } func TestColumnFamilyBatchPutGet(t *testing.T) { + t.Parallel() + dir := t.TempDir() givenNames := []string{"default", "guide"} @@ -139,6 +149,8 @@ func TestColumnFamilyBatchPutGet(t *testing.T) { } func TestColumnFamilyPutGetDelete(t *testing.T) { + t.Parallel() + dir := t.TempDir() givenNames := []string{"default", "guide"} @@ -244,6 +256,8 @@ func newTestDBCF(t *testing.T) (db *DB, cfh []*ColumnFamilyHandle, cleanup func( } func TestColumnFamilyMultiGet(t *testing.T) { + t.Parallel() + db, cfh, cleanup := newTestDBCF(t) defer cleanup() @@ -314,6 +328,8 @@ func TestColumnFamilyMultiGet(t *testing.T) { } func TestCFMetadata(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() meta := db.GetColumnFamilyMetadata() diff --git a/cf_ts_test.go b/cf_ts_test.go index e79c7e7..f62de31 100644 --- a/cf_ts_test.go +++ b/cf_ts_test.go @@ -7,6 +7,8 @@ import ( ) func TestColumnFamilyPutGetDeleteWithTS(t *testing.T) { + t.Parallel() + dir := t.TempDir() givenNames := []string{"default", "guide"} @@ -96,6 +98,8 @@ func TestColumnFamilyPutGetDeleteWithTS(t *testing.T) { } func TestColumnFamilyMultiGetWithTS(t *testing.T) { + t.Parallel() + db, cfh, cleanup := newTestDBMultiCF(t, []string{"default", "custom"}, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) diff --git a/checkpoint_test.go b/checkpoint_test.go index 67aa564..c5e6f65 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -8,6 +8,7 @@ import ( ) func TestCheckpoint(t *testing.T) { + t.Parallel() dir := t.TempDir() err := os.RemoveAll(dir) diff --git a/compaction_filter_test.go b/compaction_filter_test.go index 1e6abc5..604f1d8 100644 --- a/compaction_filter_test.go +++ b/compaction_filter_test.go @@ -8,6 +8,8 @@ import ( ) func TestCompactionFilter(t *testing.T) { + t.Parallel() + var ( changeKey = []byte("change") changeValOld = []byte("old") diff --git a/comparator_test.go b/comparator_test.go index 9f3be9a..b491b83 100644 --- a/comparator_test.go +++ b/comparator_test.go @@ -9,6 +9,8 @@ import ( ) func TestComparator(t *testing.T) { + t.Parallel() + db, opts := newTestDBAndOpts(t, func(opts *Options) { opts.SetComparator(NewComparator("rev", func(a, b []byte) int { return bytes.Compare(a, b) * -1 diff --git a/comparator_ts_test.go b/comparator_ts_test.go index ad7490b..7a3ca56 100644 --- a/comparator_ts_test.go +++ b/comparator_ts_test.go @@ -9,6 +9,8 @@ import ( ) func TestComparatorWithTS(t *testing.T) { + t.Parallel() + db, opts := newTestDBAndOpts(t, func(opts *Options) { comp := newComparatorWithTimeStamp( "rev", diff --git a/cow_test.go b/cow_test.go index cabd617..1ab1e4b 100644 --- a/cow_test.go +++ b/cow_test.go @@ -9,6 +9,8 @@ import ( ) func TestCOWList(t *testing.T) { + t.Parallel() + cl := NewCOWList() cl.Append("hello") cl.Append("world") @@ -19,6 +21,8 @@ func TestCOWList(t *testing.T) { } func TestCOWListMT(t *testing.T) { + t.Parallel() + cl := NewCOWList() expectedRes := make([]int, 3) var wg sync.WaitGroup diff --git a/db.go b/db.go index c4933b1..28214bd 100644 --- a/db.go +++ b/db.go @@ -1269,6 +1269,39 @@ func (db *DB) CreateColumnFamily(opts *Options, name string) (handle *ColumnFami return } +// CreateColumnFamilies creates new column families. +func (db *DB) CreateColumnFamilies(opts *Options, names []string) (handles []*ColumnFamilyHandle, err error) { + if len(names) == 0 { + return nil, nil + } + + var cErr *C.char + + n := len(names) + cNames := make([]*C.char, 0, n) + cSizes := make([]C.size_t, 0, n) + for i := range names { + cNames = append(cNames, C.CString(names[i])) + cSizes = append(cSizes, C.size_t(len(names[i]))) + } + + cHandles := C.rocksdb_create_column_families(db.c, opts.c, C.int(n), &cNames[0], &cSizes[0], &cErr) + if err = fromCError(cErr); err == nil { + tmp := unsafe.Slice(cHandles, n) + + handles = make([]*ColumnFamilyHandle, 0, n) + for i := range tmp { + handles = append(handles, newNativeColumnFamilyHandle(tmp[i])) + } + } + + for i := range cNames { + C.free(unsafe.Pointer(cNames[i])) + } + + return +} + // CreateColumnFamilyWithTTL create a new column family along with its ttl. // // BEHAVIOUR: diff --git a/db_external_file_test.go b/db_external_file_test.go index 4c2140a..a4290e7 100644 --- a/db_external_file_test.go +++ b/db_external_file_test.go @@ -8,6 +8,8 @@ import ( ) func TestExternalFile(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -56,6 +58,8 @@ func TestExternalFile(t *testing.T) { } func TestExternalFileWithTS(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) diff --git a/db_test.go b/db_test.go index 7cb211f..eebf580 100644 --- a/db_test.go +++ b/db_test.go @@ -10,6 +10,8 @@ import ( ) func TestOpenDb(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -20,6 +22,8 @@ func TestOpenDb(t *testing.T) { } func TestDBCRUD(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -81,6 +85,8 @@ func TestDBCRUD(t *testing.T) { } func TestDBCRUDDBPaths(t *testing.T) { + t.Parallel() + names := make([]string, 4) targetSizes := make([]uint64, len(names)) @@ -253,6 +259,8 @@ func newTestDBPathNames(t *testing.T, names []string, targetSizes []uint64, appl } func TestDBMultiGet(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -285,6 +293,8 @@ func TestDBMultiGet(t *testing.T) { } func TestLoadLatestOpts(t *testing.T) { + t.Parallel() + dir := t.TempDir() opts := NewDefaultOptions() @@ -315,6 +325,8 @@ func TestLoadLatestOpts(t *testing.T) { } func TestDBGetApproximateSizes(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -335,6 +347,8 @@ func TestDBGetApproximateSizes(t *testing.T) { } func TestDBGetApproximateSizesCF(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -358,3 +372,25 @@ func TestDBGetApproximateSizesCF(t *testing.T) { require.EqualValues(t, sizes, []uint64{0}) require.NoError(t, err) } + +func TestCreateCFs(t *testing.T) { + t.Parallel() + + db := newTestDB(t, nil) + defer db.Close() + + o := NewDefaultOptions() + + cfs, err := db.CreateColumnFamilies(o, []string{"other1", "other2"}) + require.Nil(t, err) + require.Len(t, cfs, 2) + + err = db.PutCF(NewDefaultWriteOptions(), cfs[0], []byte{1, 2, 3}, []byte{4, 5, 6}) + require.NoError(t, err) + + _, err = db.CreateColumnFamilies(o, nil) + require.NoError(t, err) + + _, err = db.CreateColumnFamilies(o, []string{}) + require.NoError(t, err) +} diff --git a/db_ts_test.go b/db_ts_test.go index 7b6db01..fc36626 100644 --- a/db_ts_test.go +++ b/db_ts_test.go @@ -9,6 +9,8 @@ import ( ) func TestDBCRUDWithTS(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) @@ -85,6 +87,8 @@ func TestDBCRUDWithTS(t *testing.T) { } func TestDBMultiGetWithTS(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) diff --git a/env_test.go b/env_test.go index 483d4c9..0a9d854 100644 --- a/env_test.go +++ b/env_test.go @@ -7,6 +7,8 @@ import ( ) func TestEnv(t *testing.T) { + t.Parallel() + env := NewDefaultEnv() defer env.Destroy() diff --git a/filter_policy_test.go b/filter_policy_test.go index 68b310e..739afdd 100644 --- a/filter_policy_test.go +++ b/filter_policy_test.go @@ -5,22 +5,32 @@ import ( ) func TestFilterPolicy(t *testing.T) { - t.Run("Bloom", func(*testing.T) { + t.Parallel() + + t.Run("Bloom", func(t *testing.T) { + t.Parallel() + flt := NewBloomFilter(1.2) defer flt.Destroy() }) - t.Run("BloomFull", func(*testing.T) { + t.Run("BloomFull", func(t *testing.T) { + t.Parallel() + flt := NewBloomFilterFull(1.2) defer flt.Destroy() }) - t.Run("Ribbon", func(*testing.T) { + t.Run("Ribbon", func(t *testing.T) { + t.Parallel() + flt := NewRibbonFilterPolicy(1.2) defer flt.Destroy() }) - t.Run("RibbonHybrid", func(*testing.T) { + t.Run("RibbonHybrid", func(t *testing.T) { + t.Parallel() + flt := NewRibbonHybridFilterPolicy(1.2, 1) defer flt.Destroy() }) diff --git a/iterator_test.go b/iterator_test.go index 42ef4d2..3de6122 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -8,6 +8,8 @@ import ( ) func TestIterator(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -32,6 +34,8 @@ func TestIterator(t *testing.T) { } func TestIteratorWriteManyThenIter(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -61,6 +65,8 @@ func TestIteratorWriteManyThenIter(t *testing.T) { } func TestIteratorCF(t *testing.T) { + t.Parallel() + db, cfs, cleanup := newTestDBMultiCF(t, []string{"default", "c1", "c2", "c3"}, nil) defer cleanup() diff --git a/iterator_ts_test.go b/iterator_ts_test.go index 4af1dd5..26a1add 100644 --- a/iterator_ts_test.go +++ b/iterator_ts_test.go @@ -8,6 +8,8 @@ import ( ) func TestIteratorWithTS(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) @@ -86,6 +88,8 @@ func TestIteratorWithTS(t *testing.T) { } func TestIteratorWriteManyThenIterWithTS(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) @@ -120,6 +124,8 @@ func TestIteratorWriteManyThenIterWithTS(t *testing.T) { } func TestIteratorCFWithTS(t *testing.T) { + t.Parallel() + db, cfs, cleanup := newTestDBMultiCF(t, []string{"default", "c1", "c2", "c3"}, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) @@ -186,6 +192,8 @@ func TestIteratorCFWithTS(t *testing.T) { } func TestIteratorRangeWithTS(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) diff --git a/jemalloc_test.go b/jemalloc_test.go index 45b7f4e..802bcbb 100644 --- a/jemalloc_test.go +++ b/jemalloc_test.go @@ -9,6 +9,8 @@ import ( ) func TestMemAlloc(t *testing.T) { + t.Parallel() + m, err := CreateJemallocNodumpAllocator() require.NoError(t, err) m.Destroy() diff --git a/memory_usage_test.go b/memory_usage_test.go index bfedefa..3956c26 100644 --- a/memory_usage_test.go +++ b/memory_usage_test.go @@ -9,6 +9,8 @@ import ( ) func TestMemoryUsage(t *testing.T) { + t.Parallel() + // create database with cache cache := NewLRUCache(8 * 1024 * 1024) cache.SetCapacity(90) diff --git a/merge_operator_test.go b/merge_operator_test.go index 50e1ea0..c7cc989 100644 --- a/merge_operator_test.go +++ b/merge_operator_test.go @@ -7,6 +7,8 @@ import ( ) func TestMergeOperator(t *testing.T) { + t.Parallel() + var ( givenKey = []byte("hello") givenVal1 = []byte("foo") @@ -41,6 +43,8 @@ func TestMergeOperator(t *testing.T) { } func TestPartialMergeOperator(t *testing.T) { + t.Parallel() + var ( givenKey = []byte("hello") startingVal = []byte("foo") @@ -91,10 +95,11 @@ func TestPartialMergeOperator(t *testing.T) { defer v1.Free() require.Nil(t, err) require.EqualValues(t, v1.Data(), fMergeResult) - } func TestMergeMultiOperator(t *testing.T) { + t.Parallel() + var ( givenKey = []byte("hello") startingVal = []byte("foo") @@ -166,6 +171,7 @@ func (m *mockMergeMultiOperator) Name() string { return "grocksdb.multi" } func (m *mockMergeMultiOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { return m.fullMerge(key, existingValue, operands) } + func (m *mockMergeMultiOperator) PartialMergeMulti(key []byte, operands [][]byte) ([]byte, bool) { return m.partialMergeMulti(key, operands) } @@ -179,6 +185,7 @@ func (m *mockMergePartialOperator) Name() string { return "grocksdb.partial" } func (m *mockMergePartialOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { return m.fullMerge(key, existingValue, operands) } + func (m *mockMergePartialOperator) PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) { return m.partialMerge(key, leftOperand, rightOperand) } diff --git a/options.go b/options.go index 914774d..7b122fb 100644 --- a/options.go +++ b/options.go @@ -1797,27 +1797,43 @@ func (opts *Options) SetHashLinkListRep(bucketCount uint) { // hash bucket found, a binary search is executed for hash conflicts. Finally, // a linear search is used. // -// keyLen: plain table has optimization for fix-sized keys, +// keyLen: plain table has optimization for fix-sized keys, which can be specified via keyLen. // -// which can be specified via keyLen. +// bloomBitsPerKey: the number of bits used for bloom filer per prefix. You may disable it by passing a zero. // -// bloomBitsPerKey: the number of bits used for bloom filer per prefix. You +// hashTableRatio: the desired utilization of the hash table used for prefix hashing. +// hashTableRatio = number of prefixes / #buckets in the hash table // -// may disable it by passing a zero. +// indexSparseness: inside each prefix, need to build one index record for how +// many keys for binary search inside each hash bucket. // -// hashTableRatio: the desired utilization of the hash table used for prefix +// hugePageTlbSize: if <=0, allocate hash indexes and blooms from malloc. +// Otherwise from huge page TLB. The user needs to reserve huge pages for it to be allocated, like: sysctl -w vm.nr_hugepages=20 +// See linux doc Documentation/vm/hugetlbpage.txt // -// hashing. hashTableRatio = number of prefixes / #buckets -// in the hash table +// encodeType: how to encode the keys. See enum EncodingType above for +// the choices. The value will determine how to encode keys +// when writing to a new SST file. This value will be stored +// inside the SST file which will be used when reading from +// the file, which makes it possible for users to choose +// different encoding type when reopening a DB. Files with +// different encoding types can co-exist in the same DB and +// can be read. // -// indexSparseness: inside each prefix, need to build one index record for how +// fullScanMode: mode for reading the whole file one record by one without +// using the index. // -// many keys for binary search inside each hash bucket. +// storeIndexInFile: compute plain table index and bloom filter during +// file building and store it in file. When reading file, index will be mapped instead of recomputation. func (opts *Options) SetPlainTableFactory( keyLen uint32, bloomBitsPerKey int, hashTableRatio float64, indexSparseness uint, + hugePageTlbSize int, + encodeType EncodingType, + fullScanMode bool, + storeIndexInFile bool, ) { C.rocksdb_options_set_plain_table_factory( opts.c, @@ -1825,6 +1841,10 @@ func (opts *Options) SetPlainTableFactory( C.int(bloomBitsPerKey), C.double(hashTableRatio), C.size_t(indexSparseness), + C.size_t(hugePageTlbSize), + C.char(encodeType), + boolToChar(fullScanMode), + boolToChar(storeIndexInFile), ) } @@ -2574,3 +2594,23 @@ func (l *LatestOptions) Destroy() { C.rocksdb_load_latest_options_destroy(l.opts.c, l.cfNames_, l.cfOptions_, C.size_t(len(l.cfNames))) l.opts.c = nil } + +type EncodingType byte + +const ( + // EncodingTypePlain always write full keys without any special encoding. + EncodingTypePlain EncodingType = iota + + // EncodingTypePrefix find opportunity to write the same prefix once for multiple rows. + // In some cases, when a key follows a previous key with the same prefix, + // instead of writing out the full key, it just writes out the size of the + // shared prefix, as well as other bytes, to save some bytes. + // + // When using this option, the user is required to use the same prefix + // extractor to make sure the same prefix will be extracted from the same key. + // The Name() value of the prefix extractor will be stored in the file. When + // reopening the file, the name of the options.prefix_extractor given will be + // bitwise compared to the prefix extractors stored in the file. An error + // will be returned if the two don't match. + EncodingTypePrefix +) diff --git a/options_backup_test.go b/options_backup_test.go index 42d1f8f..d87a2d3 100644 --- a/options_backup_test.go +++ b/options_backup_test.go @@ -8,6 +8,8 @@ import ( ) func TestBackupableDBOptions(t *testing.T) { + t.Parallel() + opts := NewBackupableDBOptions("/tmp/v1") defer opts.Destroy() diff --git a/options_blob_test.go b/options_blob_test.go index adaf476..3c5e1fc 100644 --- a/options_blob_test.go +++ b/options_blob_test.go @@ -7,6 +7,8 @@ import ( ) func TestOptionBlobFile(t *testing.T) { + t.Parallel() + opt := NewDefaultOptions() defer opt.Destroy() diff --git a/options_block_based_table_test.go b/options_block_based_table_test.go index a1b54ba..f50caba 100644 --- a/options_block_based_table_test.go +++ b/options_block_based_table_test.go @@ -5,6 +5,8 @@ import ( ) func TestBBT(t *testing.T) { + t.Parallel() + b := NewDefaultBlockBasedTableOptions() defer b.Destroy() diff --git a/options_compaction_test.go b/options_compaction_test.go index 6b4ea24..5b20aaa 100644 --- a/options_compaction_test.go +++ b/options_compaction_test.go @@ -7,6 +7,8 @@ import ( ) func TestOptionCompactions(t *testing.T) { + t.Parallel() + co := NewCompactRangeOptions() defer co.Destroy() @@ -30,6 +32,8 @@ func TestOptionCompactions(t *testing.T) { } func TestFifoCompactOption(t *testing.T) { + t.Parallel() + fo := NewDefaultFIFOCompactionOptions() defer fo.Destroy() @@ -42,6 +46,8 @@ func TestFifoCompactOption(t *testing.T) { } func TestUniversalCompactOption(t *testing.T) { + t.Parallel() + uo := NewDefaultUniversalCompactionOptions() defer uo.Destroy() diff --git a/options_env_test.go b/options_env_test.go index ba9a868..b69de57 100644 --- a/options_env_test.go +++ b/options_env_test.go @@ -3,6 +3,8 @@ package grocksdb import "testing" func TestOptEnv(t *testing.T) { + t.Parallel() + opt := NewDefaultEnvOptions() defer opt.Destroy() } diff --git a/options_flush.go b/options_flush.go index 90a53d9..4a8cf70 100644 --- a/options_flush.go +++ b/options_flush.go @@ -26,10 +26,10 @@ func (opts *FlushOptions) SetWait(value bool) { C.rocksdb_flushoptions_set_wait(opts.c, boolToChar(value)) } -// // IsWait returns if the flush will wait until the flush is done. -// func (opts *FlushOptions) IsWait() bool { -// return charToBool(C.rocksdb_flushoptions_get_wait(opts.c)) -// } +// IsWait returns if the flush will wait until the flush is done. +func (opts *FlushOptions) IsWait() bool { + return charToBool(C.rocksdb_flushoptions_get_wait(opts.c)) +} // Destroy deallocates the FlushOptions object. func (opts *FlushOptions) Destroy() { diff --git a/options_flush_test.go b/options_flush_test.go index c34b5a1..da5647e 100644 --- a/options_flush_test.go +++ b/options_flush_test.go @@ -1,16 +1,18 @@ package grocksdb -// import ( -// "testing" +import ( + "testing" -// "github.com/stretchr/testify/require" -// ) + "github.com/stretchr/testify/require" +) -// func TestFlushOption(t *testing.T) { -// fo := NewDefaultFlushOptions() -// defer fo.Destroy() +func TestFlushOption(t *testing.T) { + t.Parallel() -// require.EqualValues(t, true, fo.IsWait()) -// fo.SetWait(false) -// require.EqualValues(t, false, fo.IsWait()) -// } + fo := NewDefaultFlushOptions() + defer fo.Destroy() + + require.EqualValues(t, true, fo.IsWait()) + fo.SetWait(false) + require.EqualValues(t, false, fo.IsWait()) +} diff --git a/options_read_test.go b/options_read_test.go index 1cb86ad..418e71d 100644 --- a/options_read_test.go +++ b/options_read_test.go @@ -7,6 +7,8 @@ import ( ) func TestReadOptions(t *testing.T) { + t.Parallel() + ro := NewDefaultReadOptions() defer ro.Destroy() diff --git a/options_restore_test.go b/options_restore_test.go index f932829..8c3c9c3 100644 --- a/options_restore_test.go +++ b/options_restore_test.go @@ -3,6 +3,8 @@ package grocksdb import "testing" func TestRestoreOption(t *testing.T) { + t.Parallel() + ro := NewRestoreOptions() defer ro.Destroy() diff --git a/options_test.go b/options_test.go index 92ff7f2..18eff7a 100644 --- a/options_test.go +++ b/options_test.go @@ -7,6 +7,8 @@ import ( ) func TestOptions(t *testing.T) { + t.Parallel() + opts := NewDefaultOptions() defer opts.Destroy() @@ -310,7 +312,7 @@ func TestOptions(t *testing.T) { opts.SetMemtableVectorRep() opts.SetHashLinkListRep(12) opts.SetHashSkipListRep(1, 2, 3) - opts.SetPlainTableFactory(1, 2, 3.1, 12) + opts.SetPlainTableFactory(1, 2, 3.1, 12, 58922, EncodingTypePlain, true, true) opts.SetUint64AddMergeOperator() opts.SetDumpMallocStats(true) opts.SetMemtableWholeKeyFiltering(true) @@ -384,14 +386,20 @@ func TestOptions(t *testing.T) { } func TestOptions2(t *testing.T) { - t.Run("SetUniversalCompactionOpts", func(*testing.T) { + t.Parallel() + + t.Run("SetUniversalCompactionOpts", func(t *testing.T) { + t.Parallel() + opts := NewDefaultOptions() defer opts.Destroy() opts.SetUniversalCompactionOptions(NewDefaultUniversalCompactionOptions()) }) - t.Run("SetFifoCompactionOpts", func(*testing.T) { + t.Run("SetFifoCompactionOpts", func(t *testing.T) { + t.Parallel() + opts := NewDefaultOptions() defer opts.Destroy() @@ -399,6 +407,8 @@ func TestOptions2(t *testing.T) { }) t.Run("StatisticString", func(t *testing.T) { + t.Parallel() + opts := NewDefaultOptions() defer opts.Destroy() diff --git a/options_write_test.go b/options_write_test.go index 4738912..5bdc454 100644 --- a/options_write_test.go +++ b/options_write_test.go @@ -7,6 +7,8 @@ import ( ) func TestWriteOptions(t *testing.T) { + t.Parallel() + wo := NewDefaultWriteOptions() defer wo.Destroy() diff --git a/slice_transform_test.go b/slice_transform_test.go index dcfa266..7100b8e 100644 --- a/slice_transform_test.go +++ b/slice_transform_test.go @@ -7,6 +7,8 @@ import ( ) func TestSliceTransform(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetPrefixExtractor(&testSliceTransform{}) }) @@ -29,6 +31,8 @@ func TestSliceTransform(t *testing.T) { } func TestFixedPrefixTransformOpen(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetPrefixExtractor(NewFixedPrefixTransform(3)) }) @@ -36,14 +40,15 @@ func TestFixedPrefixTransformOpen(t *testing.T) { } func TestNewNoopPrefixTransform(t *testing.T) { + t.Parallel() + db := newTestDB(t, func(opts *Options) { opts.SetPrefixExtractor(NewNoopPrefixTransform()) }) defer db.Close() } -type testSliceTransform struct { -} +type testSliceTransform struct{} func (st *testSliceTransform) Name() string { return "grocksdb.test" } func (st *testSliceTransform) Transform(src []byte) []byte { return src[0:3] } diff --git a/transactiondb.go b/transactiondb.go index 529c6b0..ef6cace 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -112,6 +112,40 @@ func (db *TransactionDB) ReleaseSnapshot(snapshot *Snapshot) { snapshot.c = nil } +// GetProperty returns the value of a database property. +func (db *TransactionDB) GetProperty(propName string) (value string) { + cprop := C.CString(propName) + cValue := C.rocksdb_transactiondb_property_value(db.c, cprop) + + value = C.GoString(cValue) + + C.rocksdb_free(unsafe.Pointer(cValue)) + C.free(unsafe.Pointer(cprop)) + return +} + +// GetIntProperty similar to `GetProperty`, but only works for a subset of properties whose +// return value is an integer. Return the value by integer. +func (db *TransactionDB) GetIntProperty(propName string) (value uint64, success bool) { + cProp := C.CString(propName) + success = C.rocksdb_transactiondb_property_int(db.c, cProp, (*C.uint64_t)(&value)) == 0 + C.free(unsafe.Pointer(cProp)) + return +} + +// GetBaseDB gets base db. +func (db *TransactionDB) GetBaseDB() *DB { + base := C.rocksdb_transactiondb_get_base_db(db.c) + return &DB{c: base} +} + +// CloseBaseDBOfTransactionDB closes base db of TransactionDB. +func CloseBaseDBOfTransactionDB(db *DB) { + if db != nil && db.c != nil { + C.rocksdb_transactiondb_close_base_db(db.c) + } +} + // TransactionBegin begins a new transaction // with the WriteOptions and TransactionOptions given. func (db *TransactionDB) TransactionBegin( diff --git a/transactiondb_test.go b/transactiondb_test.go index ecdd334..702b110 100644 --- a/transactiondb_test.go +++ b/transactiondb_test.go @@ -7,11 +7,15 @@ import ( ) func TestOpenTransactionDb(t *testing.T) { + t.Parallel() + db := newTestTransactionDB(t, nil) defer db.Close() } func TestTransactionDBCRUD(t *testing.T) { + t.Parallel() + db := newTestTransactionDB(t, nil) defer db.Close() @@ -141,6 +145,8 @@ func TestTransactionDBCRUD(t *testing.T) { } func TestTransactionDBGetForUpdate(t *testing.T) { + t.Parallel() + lockTimeoutMilliSec := int64(50) applyOpts := func(_ *Options, transactionDBOpts *TransactionDBOptions) { transactionDBOpts.SetTransactionLockTimeout(lockTimeoutMilliSec) @@ -167,6 +173,9 @@ func TestTransactionDBGetForUpdate(t *testing.T) { if err := db.Put(wo, givenKey, givenVal); err == nil { t.Error("expect locktime out error, got nil error") } + + base := db.GetBaseDB() + defer CloseBaseDBOfTransactionDB(base) } func newTestTransactionDB(t *testing.T, applyOpts func(opts *Options, transactionDBOpts *TransactionDBOptions)) *TransactionDB { @@ -185,6 +194,8 @@ func newTestTransactionDB(t *testing.T, applyOpts func(opts *Options, transactio } func TestTransactionDBColumnFamilyBatchPutGet(t *testing.T) { + t.Parallel() + dir := t.TempDir() givenNames := []string{"default", "guide"} diff --git a/write_batch_test.go b/write_batch_test.go index d0235f3..49b3a8b 100644 --- a/write_batch_test.go +++ b/write_batch_test.go @@ -7,6 +7,8 @@ import ( ) func TestWriteBatch(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -54,6 +56,8 @@ func TestWriteBatch(t *testing.T) { } func TestWriteBatchIterator(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() diff --git a/write_batch_ts_test.go b/write_batch_ts_test.go index e2bf1bb..c236691 100644 --- a/write_batch_ts_test.go +++ b/write_batch_ts_test.go @@ -7,6 +7,8 @@ import ( ) func TestWriteBatchWithTS(t *testing.T) { + t.Parallel() + db, cfs, cleanup := newTestDBMultiCF(t, []string{"default"}, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) @@ -55,6 +57,8 @@ func TestWriteBatchWithTS(t *testing.T) { } func TestWriteBatchIteratorWithTS(t *testing.T) { + t.Parallel() + _, cfs, cleanup := newTestDBMultiCF(t, []string{"default"}, func(opts *Options) { opts.SetComparator(newDefaultComparatorWithTS()) }) diff --git a/write_batch_wi_test.go b/write_batch_wi_test.go index ebccac9..abe6ec5 100644 --- a/write_batch_wi_test.go +++ b/write_batch_wi_test.go @@ -7,6 +7,8 @@ import ( ) func TestWriteBatchWI(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -86,6 +88,8 @@ func TestWriteBatchWI(t *testing.T) { } func TestWriteBatchWIIterator(t *testing.T) { + t.Parallel() + db := newTestDB(t, nil) defer db.Close() @@ -119,6 +123,8 @@ func TestWriteBatchWIIterator(t *testing.T) { } func TestWriteBatchWIIteratorWithBase(t *testing.T) { + t.Parallel() + db, cfs, closeup := newTestDBMultiCF(t, []string{"default", "custom"}, nil) defer closeup()