diff --git a/src/yb/rocksdb/db/readahead_test.cc b/src/yb/rocksdb/db/readahead_test.cc index 8b2d43192a43..f3c6422196b6 100644 --- a/src/yb/rocksdb/db/readahead_test.cc +++ b/src/yb/rocksdb/db/readahead_test.cc @@ -18,8 +18,10 @@ #include "yb/rocksutil/yb_rocksdb_logger.h" #include "yb/util/compare_util.h" +#include "yb/util/random_util.h" #include "yb/util/size_literals.h" +DECLARE_uint64(rocksdb_iterator_sequential_disk_reads_factor); DECLARE_uint64(rocksdb_iterator_sequential_disk_reads_for_auto_readahead); DECLARE_uint64(rocksdb_iterator_init_readahead_size); DECLARE_uint64(rocksdb_iterator_max_readahead_size); @@ -100,6 +102,14 @@ class TestRandomAccessFile : public yb::RandomAccessFileWrapper { env_->OnRandomAccessFileDestroy(this); } + void ResetInfo() { + read_count = 0; + last_read_offset = 0; + last_read_length = 0; + last_readahead_offset = 0; + last_readahead_length = 0; + } + Status Read(uint64_t offset, size_t n, Slice* result, uint8_t* scratch) const override { auto s = target()->Read(offset, n, result, scratch); last_read_offset.store(offset); @@ -110,6 +120,8 @@ class TestRandomAccessFile : public yb::RandomAccessFileWrapper { } void Readahead(size_t offset, size_t length) override { + LOG(INFO) << "Readahead call at: " << offset << " length: " << length + << " filename: " << target()->filename(); target()->Readahead(offset, length); last_readahead_offset.store(offset); last_readahead_length.store(length); @@ -147,6 +159,8 @@ void TestEnv::OnRandomAccessFileDestroy(TestRandomAccessFile* file) { random_access_files_.erase(file->filename()); } +constexpr auto kMaxSequentialDiskReadsFactorForTests = 3; + class ReadaheadTest : public DBTestBase { public: ReadaheadTest() : @@ -201,10 +215,13 @@ class ReadaheadTest : public DBTestBase { SCHECK_EQ(props.size(), 1, InternalError, "Expected single SST file"); sst_props_ = *props.begin()->second; - avg_compressed_data_block_size_ = sst_props_.data_size / sst_props_.num_data_blocks; - LOG(INFO) << "avg_compressed_data_block_size: " << avg_compressed_data_block_size_; - avg_keys_per_block_ = num_keys_ / sst_props_.num_data_blocks; - + avg_compressed_data_block_size_ = sst_props_.data_size / sst_props_.num_data_blocks; + avg_compressed_row_size_ = sst_props_.data_size / num_keys_; + avg_rows_per_block_ = num_keys_ / sst_props_.num_data_blocks; + LOG(INFO) << "num_keys: " << num_keys_ << " num_data_blocks: " << sst_props_.num_data_blocks + << " avg_compressed_row_size: " << avg_compressed_row_size_ + << " avg_compressed_data_block_size: " << avg_compressed_data_block_size_ + << " avg_rows_per_block: " << avg_rows_per_block_; return Status::OK(); } @@ -249,7 +266,7 @@ class ReadaheadTest : public DBTestBase { break; } RETURN_NOT_OK(ExpectReadaheadStats(stats)); - *current_key_idx += avg_keys_per_block_; + *current_key_idx += avg_rows_per_block_; if (*current_key_idx >= num_keys_) { return false; } @@ -270,7 +287,8 @@ class ReadaheadTest : public DBTestBase { std::optional sst_metadata_; TableProperties sst_props_; size_t avg_compressed_data_block_size_; - int avg_keys_per_block_; + size_t avg_compressed_row_size_; + int avg_rows_per_block_; }; namespace { @@ -297,63 +315,105 @@ TEST_F(ReadaheadTest, SequentialScan) { ASSERT_OK(WriteData()); - for (auto seq_disk_reads_for_readahead : {0, 1, 2, 3, 4, 5, 8, 16}) { - LOG(INFO) << "Setting FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = " - << seq_disk_reads_for_readahead; - FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = seq_disk_reads_for_readahead; - for (bool purge_block_cache : {true, false}) { - if (purge_block_cache) { - PurgeBlockCache(); - } + TestRandomAccessFile* data_file = test_env_->GetRandomAccessFile(sst_metadata_->DataFilePath()); - auto* stats = options.statistics.get(); - stats->resetTickersForTest(); + for (auto seq_disk_reads_factor = 0; + seq_disk_reads_factor < kMaxSequentialDiskReadsFactorForTests; ++seq_disk_reads_factor) { + LOG(INFO) << "Setting FLAGS_rocksdb_iterator_sequential_disk_reads_factor = " + << seq_disk_reads_factor; + FLAGS_rocksdb_iterator_sequential_disk_reads_factor = seq_disk_reads_factor; + for (auto seq_disk_reads_for_readahead : {0, 1, 2, 3, 4, 5, 8, 16}) { + LOG(INFO) << "Setting FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = " + << seq_disk_reads_for_readahead; + FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = + seq_disk_reads_for_readahead; + std::vector purge_block_cache_options = {true}; + if (seq_disk_reads_factor == 0) { + // Scan is loading the whole file into block cache - test reading from block cache. + purge_block_cache_options.push_back(false); + } + for (bool purge_block_cache : purge_block_cache_options) { + if (purge_block_cache) { + PurgeBlockCache(); + } - auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); - size_t num_keys_read = 0; - for (iter->SeekToFirst(); ASSERT_RESULT(iter->CheckedValid()); - iter->Next(), ++num_keys_read) { - if ((seq_disk_reads_for_readahead > 1) && - (num_keys_read == - (seq_disk_reads_for_readahead - 1) * num_keys_ / sst_props_.num_data_blocks)) { - LOG(INFO) << "num_keys: " << num_keys_ - << " num_data_blocks: " << sst_props_.num_data_blocks - << " num_keys_read: " << num_keys_read; - // We are about to reach seq_disk_reads_for_readahead disk reads. Should be no readaheads - // till now. - ASSERT_OK(ExpectReadaheadStats(ReadaheadStats())); + auto* stats = options.statistics.get(); + stats->resetTickersForTest(); + data_file->ResetInfo(); + + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + int row_index = 0; + for (iter->SeekToFirst(); ASSERT_RESULT(iter->CheckedValid()); + iter->Next(), ++row_index) { + if ((seq_disk_reads_for_readahead > 1) && + std::cmp_equal( + row_index, + (seq_disk_reads_for_readahead - 1) * num_keys_ / sst_props_.num_data_blocks)) { + LOG(INFO) << "num_keys: " << num_keys_ + << " num_data_blocks: " << sst_props_.num_data_blocks + << " row_index: " << row_index; + // We are about to reach seq_disk_reads_for_readahead disk reads. Should be no + // readaheads till now. + ASSERT_OK(ExpectReadaheadStats(ReadaheadStats())); + } + if (seq_disk_reads_factor > 0 && data_file->last_readahead_length > 0) { + // Skipping data within readahead window shouldn't reset readahead. + const auto jump_to_row_idx = std::min( + num_keys_ - 1, + yb::RandomUniformInt( + row_index, static_cast( + (data_file->last_readahead_offset + + data_file->last_readahead_length * seq_disk_reads_factor) / + avg_compressed_row_size_) - + 1)); + if (jump_to_row_idx > row_index) { + ASSERT_OK(SeekToKey(iter, row_index)); + row_index = jump_to_row_idx; + } + } } - } - size_t expected_num_readaheads = 0; - size_t expected_readahead_bytes_read = 0; - if (seq_disk_reads_for_readahead > 0 && purge_block_cache) { - const auto bytes_should_read_before_readahead = - (seq_disk_reads_for_readahead - 1) * avg_compressed_data_block_size_; - const auto data_size = sst_props_.data_size; - if (data_size > bytes_should_read_before_readahead) { - AddWholeFileReadaheads( - data_size - bytes_should_read_before_readahead, &expected_num_readaheads, - &expected_readahead_bytes_read); + size_t expected_num_readaheads = 0; + size_t expected_readahead_bytes_read = 0; + if (seq_disk_reads_for_readahead > 0 && purge_block_cache) { + const auto bytes_should_read_before_readahead = + (seq_disk_reads_for_readahead - 1) * avg_compressed_data_block_size_; + const auto data_size = sst_props_.data_size; + if (data_size > bytes_should_read_before_readahead) { + AddWholeFileReadaheads( + data_size - bytes_should_read_before_readahead, &expected_num_readaheads, + &expected_readahead_bytes_read); + } + LOG(INFO) << "data_size: " << data_size + << " bytes_should_read_before_readahead: " << bytes_should_read_before_readahead + << " expected_num_readaheads: " << expected_num_readaheads + << " expected_readahead_bytes_read: " << expected_readahead_bytes_read; + } else { + LOG(INFO) << "seq_disk_reads_for_readahead: " << seq_disk_reads_for_readahead + << " purge_block_cache: " << purge_block_cache + << " expected_num_readaheads: " << expected_num_readaheads + << " expected_readahead_bytes_read: " << expected_readahead_bytes_read; } - LOG(INFO) << " data_size: " << data_size - << " bytes_should_read_before_readahead: " << bytes_should_read_before_readahead; - } - const auto num_readaheads = stats->getTickerCount(Tickers::READAHEAD_CALLS); - const auto readahead_bytes_read = stats->getTickerCount(Tickers::READAHEAD_BYTES_READ); + const auto num_readaheads = stats->getTickerCount(Tickers::READAHEAD_CALLS); + const auto readahead_bytes_read = stats->getTickerCount(Tickers::READAHEAD_BYTES_READ); - ASSERT_GE(num_readaheads, expected_num_readaheads); - ASSERT_GE(readahead_bytes_read, expected_readahead_bytes_read); + if (seq_disk_reads_factor <= 1) { + // If seq_disk_reads_factor > 1, we can have less readahead calls because of skipping + // more data than was read ahead. + ASSERT_GE(num_readaheads, expected_num_readaheads); + ASSERT_GE(readahead_bytes_read, expected_readahead_bytes_read); + } - // We can readahead more in reality due to blocks located on readahead window boundary. - ASSERT_LE(num_readaheads, expected_num_readaheads * 1.1); - ASSERT_LE( - readahead_bytes_read, - expected_readahead_bytes_read + (num_readaheads - expected_num_readaheads) * - FLAGS_rocksdb_iterator_max_readahead_size); + // We can readahead more in reality due to blocks located on readahead window boundary. + ASSERT_LE(num_readaheads, expected_num_readaheads * 1.1); + ASSERT_LE( + readahead_bytes_read, + expected_readahead_bytes_read + (num_readaheads - expected_num_readaheads) * + FLAGS_rocksdb_iterator_max_readahead_size); - ASSERT_EQ(stats->getTickerCount(Tickers::READAHEAD_RESET), 0); + ASSERT_EQ(stats->getTickerCount(Tickers::READAHEAD_RESET), 0); + } } } } @@ -366,69 +426,83 @@ TEST_F(ReadaheadTest, MixedReadsWith1SeqDiskReadsForReadahead) { ASSERT_OK(WriteData()); - TestRandomAccessFile* data_file = test_env_->GetRandomAccessFile(sst_metadata_->DataFilePath()); + for (auto seq_disk_reads_factor = 0; + seq_disk_reads_factor < kMaxSequentialDiskReadsFactorForTests; ++seq_disk_reads_factor) { + LOG(INFO) << "Setting FLAGS_rocksdb_iterator_sequential_disk_reads_factor = " + << seq_disk_reads_factor; + FLAGS_rocksdb_iterator_sequential_disk_reads_factor = seq_disk_reads_factor; - auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + PurgeBlockCache(); - size_t expected_readahead_size = FLAGS_rocksdb_iterator_init_readahead_size; - ReadaheadStats expected_stats; + auto* stats = options.statistics.get(); + stats->resetTickersForTest(); - int current_key_idx = 0; - ASSERT_OK(SeekToKey(iter, current_key_idx)); + TestRandomAccessFile* data_file = test_env_->GetRandomAccessFile(sst_metadata_->DataFilePath()); - expected_stats.AddReadaheadCall(expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); - ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); + size_t expected_readahead_size = FLAGS_rocksdb_iterator_init_readahead_size; + ReadaheadStats expected_stats; + + int current_key_idx = 0; + ASSERT_OK(SeekToKey(iter, current_key_idx)); - expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + expected_stats.AddReadaheadCall(expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); - ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); + ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); - expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); - constexpr auto kBlocksToJump = 4; + ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); - // Jump forward. - current_key_idx += kBlocksToJump * avg_keys_per_block_; - ASSERT_OK(SeekToKey(iter, current_key_idx)); + expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); - expected_stats.AddReadaheadReset(&expected_readahead_size); - expected_stats.AddReadaheadCall(expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + constexpr auto kBlocksToJump = 4; - ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); - expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + // Jump forward. + current_key_idx += kBlocksToJump * avg_rows_per_block_ + data_file->last_readahead_length * 2 * + seq_disk_reads_factor / + avg_compressed_row_size_; + ASSERT_OK(SeekToKey(iter, current_key_idx)); - // Jump backward. - current_key_idx -= kBlocksToJump * avg_keys_per_block_; - ASSERT_OK(SeekToKey(iter, current_key_idx)); + expected_stats.AddReadaheadReset(&expected_readahead_size); + expected_stats.AddReadaheadCall(expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); - // No disk reads, served from block cache but still should reset readahead. - expected_stats.AddReadaheadReset(&expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); + expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); - // Read next blocks, served from block cache, no disk reads => no readahead. - for (int i = 0; i < kBlocksToJump; ++i) { - current_key_idx += avg_keys_per_block_; + // Jump backward. + current_key_idx -= kBlocksToJump * avg_rows_per_block_; ASSERT_OK(SeekToKey(iter, current_key_idx)); + + // No disk reads, served from block cache but still should reset readahead. + expected_stats.AddReadaheadReset(&expected_readahead_size); ASSERT_OK(ExpectReadaheadStats(expected_stats)); - } - PurgeBlockCache(); + // Read next blocks, served from block cache, no disk reads => no readahead. + for (int i = 0; i < kBlocksToJump; ++i) { + current_key_idx += avg_rows_per_block_; + ASSERT_OK(SeekToKey(iter, current_key_idx)); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); + } + + PurgeBlockCache(); - // Read next block after purging block cache, should do readahead. - current_key_idx += avg_keys_per_block_; - ASSERT_OK(SeekToKey(iter, current_key_idx)); - expected_stats.AddReadaheadCall(expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + // Read next block after purging block cache, should do readahead. + current_key_idx += avg_rows_per_block_ + 1; + ASSERT_OK(SeekToKey(iter, current_key_idx)); + expected_stats.AddReadaheadCall(expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); - ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); - expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); - ASSERT_OK(ExpectReadaheadStats(expected_stats)); + ASSERT_OK(ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx)); + expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); + } } TEST_F(ReadaheadTest, MixedReads) { @@ -441,62 +515,72 @@ TEST_F(ReadaheadTest, MixedReads) { TestRandomAccessFile* data_file = test_env_->GetRandomAccessFile(sst_metadata_->DataFilePath()); - for (auto seq_disk_reads_for_readahead : {2, 3, 4, 5, 8, 16}) { - LOG(INFO) << "Setting FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = " - << seq_disk_reads_for_readahead; - FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = seq_disk_reads_for_readahead; - PurgeBlockCache(); + for (auto seq_disk_reads_factor = 0; + seq_disk_reads_factor < kMaxSequentialDiskReadsFactorForTests; ++seq_disk_reads_factor) { + LOG(INFO) << "Setting FLAGS_rocksdb_iterator_sequential_disk_reads_factor = " + << seq_disk_reads_factor; + FLAGS_rocksdb_iterator_sequential_disk_reads_factor = seq_disk_reads_factor; + for (auto seq_disk_reads_for_readahead : {2, 3, 4, 5, 8, 16}) { + LOG(INFO) << "Setting FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = " + << seq_disk_reads_for_readahead; + FLAGS_rocksdb_iterator_sequential_disk_reads_for_auto_readahead = + seq_disk_reads_for_readahead; + PurgeBlockCache(); - auto* stats = options.statistics.get(); - stats->resetTickersForTest(); - - auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + auto* stats = options.statistics.get(); + stats->resetTickersForTest(); - size_t expected_readahead_size = FLAGS_rocksdb_iterator_init_readahead_size; - ReadaheadStats expected_stats; + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); - int current_key_idx = -1; - auto prev_num_disk_reads = data_file->read_count.load(); - for (int random_seek_iter = 0; random_seek_iter < kNumRandomSeeks; ++random_seek_iter) { - // Seek to another random block (and not the next one). - const auto prev_key_idx = current_key_idx; - do { - current_key_idx = rnd_.Uniform(num_keys_); - } while (prev_key_idx >= 0 && current_key_idx >= prev_key_idx - avg_keys_per_block_ && - current_key_idx <= prev_key_idx + 2 * avg_keys_per_block_); - - auto num_disk_reads = data_file->read_count.load(); - if (num_disk_reads > prev_num_disk_reads) { - expected_stats.AddReadaheadReset(&expected_readahead_size); - prev_num_disk_reads = num_disk_reads; - } - LOG(INFO) << "Disk read count: " << num_disk_reads - << ". Moving to random key: " << current_key_idx; + size_t expected_readahead_size = FLAGS_rocksdb_iterator_init_readahead_size; + ReadaheadStats expected_stats; + + int current_key_idx = -1; + auto prev_num_disk_reads = data_file->read_count.load(); + for (int random_seek_iter = 0; random_seek_iter < kNumRandomSeeks; ++random_seek_iter) { + // Seek to another random block (and not the next one, also make sure to skip + // seq_disk_reads_factor * readahead window size). + const auto prev_key_idx = current_key_idx; + do { + current_key_idx = rnd_.Uniform(num_keys_); + } while (prev_key_idx >= 0 && current_key_idx >= prev_key_idx - avg_rows_per_block_ && + std::cmp_less_equal( + current_key_idx, prev_key_idx + 2 * avg_rows_per_block_ + + data_file->last_readahead_length * 2 * + seq_disk_reads_factor / avg_compressed_row_size_)); + + auto num_disk_reads = data_file->read_count.load(); + LOG(INFO) << "Disk read count: " << num_disk_reads + << ". Moving to random key: " << current_key_idx; + if (num_disk_reads > prev_num_disk_reads) { + expected_stats.AddReadaheadReset(&expected_readahead_size); + prev_num_disk_reads = num_disk_reads; + } - for (; current_key_idx < num_keys_; - current_key_idx += avg_keys_per_block_) { - ASSERT_OK(SeekToKey(iter, current_key_idx)); - LOG(INFO) << "Disk read count: " << data_file->read_count; + for (; current_key_idx < num_keys_; current_key_idx += avg_rows_per_block_) { + ASSERT_OK(SeekToKey(iter, current_key_idx)); + LOG(INFO) << "Disk read count: " << data_file->read_count; - // No readahead until Nth seq disk reads. - if (data_file->read_count == num_disk_reads + seq_disk_reads_for_readahead) { - expected_stats.AddReadaheadCall(expected_readahead_size); + // No readahead until Nth seq disk reads. + if (data_file->read_count == num_disk_reads + seq_disk_reads_for_readahead) { + expected_stats.AddReadaheadCall(expected_readahead_size); + ASSERT_OK(ExpectReadaheadStats(expected_stats)); + break; + } ASSERT_OK(ExpectReadaheadStats(expected_stats)); - break; } - ASSERT_OK(ExpectReadaheadStats(expected_stats)); - } - for (int readahead_window_iter = 0; readahead_window_iter < 2; ++readahead_window_iter) { - num_disk_reads = data_file->read_count.load(); - if (!ASSERT_RESULT( - ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx))) { - continue; - } - if (data_file->read_count > num_disk_reads) { - expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); + for (int readahead_window_iter = 0; readahead_window_iter < 2; ++readahead_window_iter) { + num_disk_reads = data_file->read_count.load(); + if (!ASSERT_RESULT( + ReadOneKeyPerBlockUntilOutOfReadaheadWindow(iter, data_file, ¤t_key_idx))) { + continue; + } + if (data_file->read_count > num_disk_reads) { + expected_stats.IncreaseWindowAndAddReadaheadCall(&expected_readahead_size); + } + ASSERT_OK(ExpectReadaheadStats(expected_stats)); } - ASSERT_OK(ExpectReadaheadStats(expected_stats)); } } } diff --git a/src/yb/rocksdb/table/block_based_table_reader.cc b/src/yb/rocksdb/table/block_based_table_reader.cc index fefa76844b32..3980537adfd9 100644 --- a/src/yb/rocksdb/table/block_based_table_reader.cc +++ b/src/yb/rocksdb/table/block_based_table_reader.cc @@ -63,6 +63,7 @@ #include "yb/util/atomic.h" #include "yb/util/bytes_formatter.h" +#include "yb/util/debug-util.h" #include "yb/util/logging.h" #include "yb/util/mem_tracker.h" #include "yb/util/scope_exit.h" @@ -80,6 +81,10 @@ DEFINE_RUNTIME_uint64(rocksdb_iterator_sequential_disk_reads_for_auto_readahead, "will be enabled with the first disk read. If set to N > 1, iterator readahead will be used " "with the Nth sequential disk read."); +DEFINE_RUNTIME_uint64( + rocksdb_iterator_sequential_disk_reads_factor, 1, + "Treat read as sequential for readahead purposes if next read operation is skipping up to " + "readahead window size multiplied by rocksdb_iterator_sequential_disk_reads_factor bytes."); TAG_FLAG(rocksdb_iterator_sequential_disk_reads_for_auto_readahead, advanced); DEFINE_RUNTIME_uint64(rocksdb_iterator_init_readahead_size, 32_KB, @@ -344,15 +349,18 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { } auto block_res = table_->GetBlockFromCache(read_options_, block_info); const auto& handle = block_info.handle; + const auto is_sequential_read = IsSequentialRead(handle); VLOG_WITH_FUNC(5) << "handle: " << handle.ToDebugString() << " num_sequential_disk_reads_: " << num_sequential_disk_reads_ << " prev_offset_: " << prev_offset_ << " prev_length_: " << prev_length_ + << " skip_size: " + << static_cast(handle.offset() - prev_offset_ - prev_length_) + << " is_sequential_read: " << is_sequential_read << " block from cache: " << (block_res.ok() ? yb::AsString(static_cast(block_res->value)) : AsString(block_res.status())) << " for file: " << block_info.file_reader->file()->filename(); - const auto is_sequential_read = IsSequentialRead(handle); if (!is_sequential_read && (num_sequential_disk_reads_ > 0 || readahead_limit_ > 0)) { VLOG_WITH_FUNC(4) << "handle: " << handle.ToDebugString() << " prev_offset_: " << prev_offset_ @@ -361,6 +369,9 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { << " readahead_size_: " << readahead_size_ << ". Resetting readahead for iterator for file: " << block_info.file_reader->file()->filename(); + if (VLOG_IS_ON(6)) { + YB_LOG_EVERY_N_SECS(INFO, 1) << "Resetting readahead at: " << yb::GetStackTrace(); + } ResetReadahead(); } @@ -437,7 +448,10 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { } bool IsSequentialRead(BlockHandle handle) { - return handle.offset() == prev_offset_ + prev_length_; + const auto prev_read_end = prev_offset_ + prev_length_; + return (handle.offset() >= prev_read_end) && + (handle.offset() <= + prev_read_end + readahead_size_ * FLAGS_rocksdb_iterator_sequential_disk_reads_factor); } // Don't own table_. BlockEntryIteratorState should only be stored in iterators or in