Skip to content

Commit

Permalink
Adapt RocksDB 8.9.1 (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu authored Jan 11, 2024
1 parent d95dda2 commit f0bad2d
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 5 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version}

# Note: if you don't have a good reason, please do not set -DPORTABLE=ON
# This one is set here on purpose of compatibility with github action runtime processor
rocksdb_version="8.8.1"
rocksdb_version="8.9.1"
cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \
mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \
-DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \
Expand Down
46 changes: 46 additions & 0 deletions c.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef struct rocksdb_lru_cache_options_t rocksdb_lru_cache_options_t;
typedef struct rocksdb_hyper_clock_cache_options_t
rocksdb_hyper_clock_cache_options_t;
typedef struct rocksdb_cache_t rocksdb_cache_t;
typedef struct rocksdb_write_buffer_manager_t rocksdb_write_buffer_manager_t;
typedef struct rocksdb_compactionfilter_t rocksdb_compactionfilter_t;
typedef struct rocksdb_compactionfiltercontext_t
rocksdb_compactionfiltercontext_t;
Expand Down Expand Up @@ -1060,6 +1061,8 @@ rocksdb_block_based_options_set_pin_top_level_index_and_filter(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory(
rocksdb_options_t* opt, rocksdb_block_based_table_options_t* table_options);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_write_buffer_manager(
rocksdb_options_t* opt, rocksdb_write_buffer_manager_t* wbm);

/* Cuckoo table options */

Expand Down Expand Up @@ -1143,6 +1146,8 @@ extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_paranoid_checks(
rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_db_paths(
rocksdb_options_t*, const rocksdb_dbpath_t** path_values, size_t num_paths);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_cf_paths(
rocksdb_options_t*, const rocksdb_dbpath_t** path_values, size_t num_paths);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_env(rocksdb_options_t*,
rocksdb_env_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_info_log(rocksdb_options_t*,
Expand Down Expand Up @@ -1256,6 +1261,10 @@ rocksdb_options_set_max_bytes_for_level_multiplier_additional(
rocksdb_options_t*, int* level_values, size_t num_levels);
extern ROCKSDB_LIBRARY_API void rocksdb_options_enable_statistics(
rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_periodic_compaction_seconds(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_options_get_periodic_compaction_seconds(rocksdb_options_t*);

enum {
rocksdb_statistics_level_disable_all = 0,
Expand Down Expand Up @@ -1668,6 +1677,10 @@ extern ROCKSDB_LIBRARY_API int rocksdb_options_get_wal_compression(
/* RateLimiter */
extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* rocksdb_ratelimiter_create(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness);
extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t*
rocksdb_ratelimiter_create_auto_tuned(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness);
extern ROCKSDB_LIBRARY_API void rocksdb_ratelimiter_destroy(
rocksdb_ratelimiter_t*);

Expand Down Expand Up @@ -2067,6 +2080,36 @@ rocksdb_cache_get_table_address_count(const rocksdb_cache_t* cache);
extern ROCKSDB_LIBRARY_API size_t
rocksdb_cache_get_occupancy_count(const rocksdb_cache_t* cache);

/* WriteBufferManager */

extern ROCKSDB_LIBRARY_API rocksdb_write_buffer_manager_t*
rocksdb_write_buffer_manager_create(size_t buffer_size, bool allow_stall);
extern ROCKSDB_LIBRARY_API rocksdb_write_buffer_manager_t*
rocksdb_write_buffer_manager_create_with_cache(size_t buffer_size,
const rocksdb_cache_t* cache,
bool allow_stall);

extern ROCKSDB_LIBRARY_API void rocksdb_write_buffer_manager_destroy(
rocksdb_write_buffer_manager_t* wbm);
extern ROCKSDB_LIBRARY_API bool rocksdb_write_buffer_manager_enabled(
rocksdb_write_buffer_manager_t* wbm);
extern ROCKSDB_LIBRARY_API bool rocksdb_write_buffer_manager_cost_to_cache(
rocksdb_write_buffer_manager_t* wbm);
extern ROCKSDB_LIBRARY_API size_t
rocksdb_write_buffer_manager_memory_usage(rocksdb_write_buffer_manager_t* wbm);
extern ROCKSDB_LIBRARY_API size_t
rocksdb_write_buffer_manager_mutable_memtable_memory_usage(
rocksdb_write_buffer_manager_t* wbm);
extern ROCKSDB_LIBRARY_API size_t
rocksdb_write_buffer_manager_dummy_entries_in_cache_usage(
rocksdb_write_buffer_manager_t* wbm);
extern ROCKSDB_LIBRARY_API size_t
rocksdb_write_buffer_manager_buffer_size(rocksdb_write_buffer_manager_t* wbm);
extern ROCKSDB_LIBRARY_API void rocksdb_write_buffer_manager_set_buffer_size(
rocksdb_write_buffer_manager_t* wbm, size_t new_size);
extern ROCKSDB_LIBRARY_API void rocksdb_write_buffer_manager_set_allow_stall(
rocksdb_write_buffer_manager_t* wbm, bool new_allow_stall);

/* HyperClockCache */

extern ROCKSDB_LIBRARY_API rocksdb_hyper_clock_cache_options_t*
Expand Down Expand Up @@ -2427,6 +2470,9 @@ extern ROCKSDB_LIBRARY_API char*
rocksdb_sst_file_metadata_get_relative_filename(
rocksdb_sst_file_metadata_t* file_meta);

extern ROCKSDB_LIBRARY_API char* rocksdb_sst_file_metadata_get_directory(
rocksdb_sst_file_metadata_t* file_meta);

extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_sst_file_metadata_get_size(rocksdb_sst_file_metadata_t* file_meta);

Expand Down
2 changes: 2 additions & 0 deletions cf_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (l *LevelMetadata) SstMetas() []SstMetadata {
// SstMetadata represents metadata of sst file.
type SstMetadata struct {
relativeFileName string
directory string
size uint64
smallestKey []byte
largestKey []byte
Expand All @@ -95,6 +96,7 @@ func sstMetas(c *C.rocksdb_level_metadata_t) []SstMetadata {
for i := range metas {
sm := C.rocksdb_level_metadata_get_sst_file_metadata(c, C.size_t(i))
metas[i].relativeFileName = C.GoString(C.rocksdb_sst_file_metadata_get_relative_filename(sm))
metas[i].directory = C.GoString(C.rocksdb_sst_file_metadata_get_directory(sm))
metas[i].size = uint64(C.rocksdb_sst_file_metadata_get_size(sm))

var ln C.size_t
Expand Down
4 changes: 3 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func newTestDBAndOpts(t *testing.T, applyOpts func(opts *Options)) (*DB, *Option

opts := NewDefaultOptions()
// test the ratelimiter
rateLimiter := NewRateLimiter(1024, 100*1000, 10)
rateLimiter := NewAutoTunedRateLimiter(1024, 100*1000, 10)
opts.SetRateLimiter(rateLimiter)
opts.SetCreateIfMissing(true)
opts.SetCompression(ZSTDCompression)
Expand Down Expand Up @@ -245,6 +245,8 @@ func newTestDBPathNames(t *testing.T, names []string, targetSizes []uint64, appl

opts := NewDefaultOptions()
opts.SetDBPaths(dbpaths)
opts.SetCFPaths(dbpaths)

// test the ratelimiter
rateLimiter := NewRateLimiter(1024, 100*1000, 10)
opts.SetRateLimiter(rateLimiter)
Expand Down
105 changes: 103 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ func (opts *Options) ParanoidChecks() bool {
return charToBool(C.rocksdb_options_get_paranoid_checks(opts.c))
}

// SetDBPaths sets the DBPaths of the options.
// SetDBPaths sets the db_paths option.
//
// A list of paths where SST files can be put into, with its target size.
// db_paths is a list of paths where SST files can be put into, with its target size.
// Newer data is placed into paths specified earlier in the vector while
// older data gradually moves to paths specified later in the vector.
//
Expand Down Expand Up @@ -314,6 +314,30 @@ func (opts *Options) SetDBPaths(dbpaths []*DBPath) {
}
}

// SetCFPaths sets cf_paths option. cf_paths is a list of paths where SST files
// for this column family can be put into, with its target size.
//
// Similar to db_paths, newer data is placed into paths specified earlier in the
// vector while older data gradually moves to paths specified later in the vector.
//
// Note that, if a path is supplied to multiple column
// families, it would have files and total size from all
// the column families combined. User should provision for the
// total size(from all the column families) in such cases.
//
// If left empty, db_paths will be used.
// Default: empty
func (opts *Options) SetCFPaths(dbpaths []*DBPath) {
if n := len(dbpaths); n > 0 {
cDbpaths := make([]*C.rocksdb_dbpath_t, n)
for i, v := range dbpaths {
cDbpaths[i] = v.c
}

C.rocksdb_options_set_cf_paths(opts.c, &cDbpaths[0], C.size_t(n))
}
}

// SetEnv sets the specified object to interact with the environment,
// e.g. to read/write files, schedule background work, etc.
//
Expand Down Expand Up @@ -1767,6 +1791,64 @@ func (opts *Options) EnableStatistics() {
C.rocksdb_options_enable_statistics(opts.c)
}

// SetPeriodicCompactionSeconds sets periodic_compaction_seconds option.
//
// This option has different meanings for different compaction styles:
//
// Leveled: files older than `periodic_compaction_seconds` will be picked up
//
// for compaction and will be re-written to the same level as they were
// before.
//
// FIFO: not supported. Setting this option has no effect for FIFO compaction.
//
// Universal: when there are files older than `periodic_compaction_seconds`,
//
// rocksdb will try to do as large a compaction as possible including the
// last level. Such compaction is only skipped if only last level is to
// be compacted and no file in last level is older than
// `periodic_compaction_seconds`. See more in
// UniversalCompactionBuilder::PickPeriodicCompaction().
// For backward compatibility, the effective value of this option takes
// into account the value of option `ttl`. The logic is as follows:
// - both options are set to 30 days if they have the default value.
// - if both options are zero, zero is picked. Otherwise, we take the min
// value among non-zero options values (i.e. takes the stricter limit).
//
// One main use of the feature is to make sure a file goes through compaction
// filters periodically. Users can also use the feature to clear up SST
// files using old format.
//
// A file's age is computed by looking at file_creation_time or creation_time
// table properties in order, if they have valid non-zero values; if not, the
// age is based on the file's last modified time (given by the underlying
// Env).
//
// This option only supports block based table format for any compaction
// style.
//
// unit: seconds. Ex: 7 days = 7 * 24 * 60 * 60
//
// Values:
// 0: Turn off Periodic compactions.
// UINT64_MAX - 1 (0xfffffffffffffffe) is special flag to allow RocksDB to
// pick default.
//
// Default: 30 days if using block based table format + compaction filter +
//
// leveled compaction or block based table format + universal compaction.
// 0 (disabled) otherwise.
//
// Dynamically changeable through SetOptions() API
func (opts *Options) SetPeriodicCompactionSeconds(v uint64) {
C.rocksdb_options_set_periodic_compaction_seconds(opts.c, C.uint64_t(v))
}

// GetPeriodicCompactionSeconds gets periodic periodic_compaction_seconds option.
func (opts *Options) GetPeriodicCompactionSeconds() uint64 {
return uint64(C.rocksdb_options_get_periodic_compaction_seconds(opts.c))
}

// SetStatisticsLevel set statistics level.
func (opts *Options) SetStatisticsLevel(level StatisticsLevel) {
C.rocksdb_options_set_statistics_level(opts.c, C.int(level))
Expand Down Expand Up @@ -1881,6 +1963,25 @@ func (opts *Options) SetPlainTableFactory(
)
}

// SetWriteBufferManager binds with a WriteBufferManager.
//
// The memory usage of memtable will report to this object. The same object
// can be passed into multiple DBs and it will track the sum of size of all
// the DBs. If the total size of all live memtables of all the DBs exceeds
// a limit, a flush will be triggered in the next DB to which the next write
// is issued, as long as there is one or more column family not already
// flushing.
//
// If the object is only passed to one DB, the behavior is the same as
// db_write_buffer_size. When write_buffer_manager is set, the value set will
// override db_write_buffer_size.
//
// This feature is disabled by default. Specify a non-zero value
// to enable it.
func (opts *Options) SetWriteBufferManager(wbm *WriteBufferManager) {
C.rocksdb_options_set_write_buffer_manager(opts.c, wbm.p)
}

// SetCreateIfMissingColumnFamilies specifies whether the column families
// should be created if they are missing.
func (opts *Options) SetCreateIfMissingColumnFamilies(value bool) {
Expand Down
9 changes: 9 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
func TestOptions(t *testing.T) {
t.Parallel()

wbm := NewWriteBufferManager(123456, true)
defer wbm.Destroy()

opts := NewDefaultOptions()
defer opts.Destroy()

Expand Down Expand Up @@ -388,6 +391,12 @@ func TestOptions(t *testing.T) {
hData := opts.GetHistogramData(HistogramType_BLOB_DB_MULTIGET_MICROS)
require.EqualValues(t, 0, hData.P99)

require.EqualValues(t, uint64(0xfffffffffffffffe), opts.GetPeriodicCompactionSeconds())
opts.SetPeriodicCompactionSeconds(123)
require.EqualValues(t, 123, opts.GetPeriodicCompactionSeconds())

opts.SetWriteBufferManager(wbm)

// cloning
cl := opts.Clone()
require.EqualValues(t, 5, cl.GetTableCacheNumshardbits())
Expand Down
1 change: 1 addition & 0 deletions options_wait_for_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func NewWaitForCompactOptions() *WaitForCompactOptions {
}
}

// Destroy the object.
func (w *WaitForCompactOptions) Destroy() {
C.rocksdb_wait_for_compact_options_destroy(w.p)
w.p = nil
Expand Down
34 changes: 33 additions & 1 deletion ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,27 @@ type RateLimiter struct {
c *C.rocksdb_ratelimiter_t
}

// NewRateLimiter creates a default RateLimiter object.
// NewRateLimiter creates a RateLimiter object, which can be shared among RocksDB instances to
// control write rate of flush and compaction.
//
// @rate_bytes_per_sec: this is the only parameter you want to set most of the
// time. It controls the total write rate of compaction and flush in bytes per
// second. Currently, RocksDB does not enforce rate limit for anything other
// than flush and compaction, e.g. write to WAL.
//
// @refill_period_us: this controls how often tokens are refilled. For example,
// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to
// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to
// burstier writes while smaller value introduces more CPU overhead.
// The default should work for most cases.
//
// @fairness: RateLimiter accepts high-pri requests and low-pri requests.
// A low-pri request is usually blocked in favor of hi-pri request. Currently,
// RocksDB assigns low-pri to request from compaction and high-pri to request
// from flush. Low-pri requests can get blocked if flush requests come in
// continuously. This fairness parameter grants low-pri requests permission by
// 1/fairness chance even though high-pri requests exist to avoid starvation.
// You should be good by leaving it at default 10.
func NewRateLimiter(rateBytesPerSec, refillPeriodMicros int64, fairness int32) *RateLimiter {
cR := C.rocksdb_ratelimiter_create(
C.int64_t(rateBytesPerSec),
Expand All @@ -20,6 +40,18 @@ func NewRateLimiter(rateBytesPerSec, refillPeriodMicros int64, fairness int32) *
return newNativeRateLimiter(cR)
}

// NewAutoTunedRateLimiter similar to NewRateLimiter, enables dynamic adjustment of rate
// limit within the range `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to
// the recent demand for background I/O.
func NewAutoTunedRateLimiter(rateBytesPerSec, refillPeriodMicros int64, fairness int32) *RateLimiter {
cR := C.rocksdb_ratelimiter_create_auto_tuned(
C.int64_t(rateBytesPerSec),
C.int64_t(refillPeriodMicros),
C.int32_t(fairness),
)
return newNativeRateLimiter(cR)
}

// NewNativeRateLimiter creates a native RateLimiter object.
func newNativeRateLimiter(c *C.rocksdb_ratelimiter_t) *RateLimiter {
return &RateLimiter{c: c}
Expand Down
Loading

0 comments on commit f0bad2d

Please sign in to comment.