Skip to content

Commit

Permalink
p1: improve bpm bench (#604)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <iskyzh@gmail.com>
Co-authored-by: Xu <xzhseh@gmail.com>
  • Loading branch information
skyzh and xzhseh authored Sep 7, 2023
1 parent 04537ff commit 29cfa2a
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/include/buffer/lru_k_replacer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace bustub {

enum class AccessType { Unknown = 0, Get, Scan };
enum class AccessType { Unknown = 0, Lookup, Scan, Index };

class LRUKNode {
private:
Expand Down
61 changes: 49 additions & 12 deletions src/include/storage/disk/disk_manager_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//
//===----------------------------------------------------------------------===//
#include <array>
#include <chrono> // NOLINT
#include <cstring>
#include <fstream>
#include <future> // NOLINT
Expand All @@ -25,6 +26,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/logger.h"
#include "fmt/core.h"
#include "storage/disk/disk_manager.h"

namespace bustub {
Expand Down Expand Up @@ -63,17 +65,15 @@ class DiskManagerMemory : public DiskManager {
*/
class DiskManagerUnlimitedMemory : public DiskManager {
public:
DiskManagerUnlimitedMemory() = default;
DiskManagerUnlimitedMemory() { std::fill(recent_access_.begin(), recent_access_.end(), -1); }

/**
* Write a page to the database file.
* @param page_id id of the page
* @param page_data raw page data
*/
void WritePage(page_id_t page_id, const char *page_data) override {
if (latency_ > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(latency_));
}
ProcessLatency(page_id);

std::unique_lock<std::mutex> l(mutex_);
if (page_id >= static_cast<int>(data_.size())) {
Expand All @@ -87,6 +87,8 @@ class DiskManagerUnlimitedMemory : public DiskManager {
l.unlock();

memcpy(ptr->first.data(), page_data, BUSTUB_PAGE_SIZE);

PostProcessLatency(page_id);
}

/**
Expand All @@ -95,34 +97,69 @@ class DiskManagerUnlimitedMemory : public DiskManager {
* @param[out] page_data output buffer
*/
void ReadPage(page_id_t page_id, char *page_data) override {
if (latency_ > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(latency_));
}
ProcessLatency(page_id);

std::unique_lock<std::mutex> l(mutex_);
if (page_id >= static_cast<int>(data_.size()) || page_id < 0) {
LOG_WARN("page not exist");
fmt::println(stderr, "page {} not in range", page_id);
std::terminate();
return;
}
if (data_[page_id] == nullptr) {
LOG_WARN("page not exist");
fmt::println(stderr, "page {} not exist", page_id);
std::terminate();
return;
}
std::shared_ptr<ProtectedPage> ptr = data_[page_id];
std::shared_lock<std::shared_mutex> l_page(ptr->second);
l.unlock();

memcpy(page_data, ptr->first.data(), BUSTUB_PAGE_SIZE);

PostProcessLatency(page_id);
}

void SetLatency(size_t latency_ms) { latency_ = latency_ms; }
void ProcessLatency(page_id_t page_id) {
uint64_t sleep_micro_sec = 1000; // for random access, 1ms latency
if (latency_simulator_enabled_) {
std::unique_lock<std::mutex> lck(latency_processor_mutex_);
for (auto &recent_page_id : recent_access_) {
if ((recent_page_id & (~0x3)) == (page_id & (~0x3))) {
sleep_micro_sec = 100; // for access in the same "block", 0.1ms latency
break;
}
if (page_id >= recent_page_id && page_id <= recent_page_id + 3) {
sleep_micro_sec = 100; // for sequential access, 0.1ms latency
break;
}
}
lck.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(sleep_micro_sec));
}
}

void PostProcessLatency(page_id_t page_id) {
if (latency_simulator_enabled_) {
std::scoped_lock<std::mutex> lck(latency_processor_mutex_);
recent_access_[access_ptr_] = page_id;
access_ptr_ = (access_ptr_ + 1) % recent_access_.size();
}
}

void EnableLatencySimulator(bool enabled) { latency_simulator_enabled_ = enabled; }

private:
std::mutex mutex_;
bool latency_simulator_enabled_{false};

std::mutex latency_processor_mutex_;
std::array<page_id_t, 4> recent_access_;
uint64_t access_ptr_{0};

using Page = std::array<char, BUSTUB_PAGE_SIZE>;
using ProtectedPage = std::pair<Page, std::shared_mutex>;

std::mutex mutex_;
std::vector<std::shared_ptr<ProtectedPage>> data_;
size_t latency_{0};
};

} // namespace bustub
3 changes: 2 additions & 1 deletion tools/bpm_bench/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
set(BPM_BENCH_SOURCES bpm_bench.cpp)
add_executable(bpm-bench ${BPM_BENCH_SOURCES})
add_executable(bpm-bench ${BPM_BENCH_SOURCES} "${PROJECT_SOURCE_DIR}/tools/backtrace.cpp")
add_backward(bpm-bench)

target_link_libraries(bpm-bench bustub)
set_target_properties(bpm-bench PROPERTIES OUTPUT_NAME bustub-bpm-bench)
143 changes: 107 additions & 36 deletions tools/bpm_bench/bpm_bench.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include <chrono>
#include <exception>
#include <iostream>
#include <memory>
#include <mutex> // NOLINT
#include <random>
#include <sstream>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include <cpp_random_distributions/zipfian_int_distribution.h>
Expand All @@ -29,12 +31,6 @@ auto ClockMs() -> uint64_t {
return static_cast<uint64_t>(tm.tv_sec * 1000) + static_cast<uint64_t>(tm.tv_usec / 1000);
}

static const size_t BUSTUB_SCAN_THREAD = 8;
static const size_t BUSTUB_GET_THREAD = 8;
static const size_t LRU_K_SIZE = 16;
static const size_t BUSTUB_PAGE_CNT = 6400;
static const size_t BUSTUB_BPM_SIZE = 64;

struct BpmTotalMetrics {
uint64_t scan_cnt_{0};
uint64_t get_cnt_{0};
Expand Down Expand Up @@ -100,6 +96,45 @@ struct BpmMetrics {
}
};

struct BustubBenchPageHeader {
uint64_t seed_;
uint64_t page_id_;
char data_[0];
};

/// Modify the page and save some data inside
auto ModifyPage(char *data, size_t page_idx, uint64_t seed) -> void {
auto *pg = reinterpret_cast<BustubBenchPageHeader *>(data);
pg->seed_ = seed;
pg->page_id_ = page_idx;
pg->data_[pg->seed_ % 4000] = pg->seed_ % 256;
}

/// Check the page and verify the data inside
auto CheckPageConsistentNoSeed(const char *data, size_t page_idx) -> void {
const auto *pg = reinterpret_cast<const BustubBenchPageHeader *>(data);
if (pg->page_id_ != page_idx) {
fmt::println(stderr, "page header not consistent: page_id_={} page_idx={}", pg->page_id_, page_idx);
std::terminate();
}
auto left = static_cast<unsigned int>(static_cast<unsigned char>(pg->data_[pg->seed_ % 4000]));
auto right = static_cast<unsigned int>(pg->seed_ % 256);
if (left != right) {
fmt::println(stderr, "page content not consistent: data_[{}]={} seed_ % 256={}", pg->seed_ % 4000, left, right);
std::terminate();
}
}

/// Check the page and verify the data inside
auto CheckPageConsistent(const char *data, size_t page_idx, uint64_t seed) -> void {
const auto *pg = reinterpret_cast<const BustubBenchPageHeader *>(data);
if (pg->seed_ != seed) {
fmt::println(stderr, "page seed not consistent: seed_={} seed={}", pg->seed_, seed);
std::terminate();
}
CheckPageConsistentNoSeed(data, page_idx);
}

// NOLINTNEXTLINE
auto main(int argc, char **argv) -> int {
using bustub::AccessType;
Expand All @@ -109,7 +144,12 @@ auto main(int argc, char **argv) -> int {

argparse::ArgumentParser program("bustub-bpm-bench");
program.add_argument("--duration").help("run bpm bench for n milliseconds");
program.add_argument("--latency").help("set disk latency to n milliseconds");
program.add_argument("--latency").help("enable disk latency");
program.add_argument("--scan-thread-n").help("number of scan threads");
program.add_argument("--get-thread-n").help("number of lookup threads");
program.add_argument("--bpm-size").help("buffer pool size");
program.add_argument("--db-size").help("number of pages");
program.add_argument("--lru-k-size").help("lru-k size");

try {
program.parse_args(argc, argv);
Expand All @@ -124,102 +164,133 @@ auto main(int argc, char **argv) -> int {
duration_ms = std::stoi(program.get("--duration"));
}

uint64_t latency_ms = 0;
uint64_t enable_latency = 0;
if (program.present("--latency")) {
latency_ms = std::stoi(program.get("--latency"));
enable_latency = std::stoi(program.get("--latency"));
}

uint64_t scan_thread_n = 8;
if (program.present("--scan-thread-n")) {
scan_thread_n = std::stoi(program.get("--scan-thread-n"));
}

uint64_t get_thread_n = 8;
if (program.present("--get-thread-n")) {
get_thread_n = std::stoi(program.get("--get-thread-n"));
}

uint64_t bustub_page_cnt = 6400;
if (program.present("--db-size")) {
bustub_page_cnt = std::stoi(program.get("--db-size"));
}

uint64_t bustub_bpm_size = 64;
if (program.present("--bpm-size")) {
bustub_bpm_size = std::stoi(program.get("--bpm-size"));
}

uint64_t lru_k_size = 16;
if (program.present("--lru-k-size")) {
bustub_page_cnt = std::stoi(program.get("--lru-k-size"));
}

auto disk_manager = std::make_unique<DiskManagerUnlimitedMemory>();
auto bpm = std::make_unique<BufferPoolManager>(BUSTUB_BPM_SIZE, disk_manager.get(), LRU_K_SIZE);
auto bpm = std::make_unique<BufferPoolManager>(bustub_bpm_size, disk_manager.get(), lru_k_size);
std::vector<page_id_t> page_ids;

fmt::print(stderr, "[info] total_page={}, duration_ms={}, latency_ms={}, lru_k_size={}, bpm_size={}\n",
BUSTUB_PAGE_CNT, duration_ms, latency_ms, LRU_K_SIZE, BUSTUB_BPM_SIZE);
fmt::print(stderr,
"[info] total_page={}, duration_ms={}, latency={}, lru_k_size={}, bpm_size={}, scan_thread_cnt={}, "
"get_thread_cnt={}\n",
bustub_page_cnt, duration_ms, enable_latency, lru_k_size, bustub_bpm_size, scan_thread_n, get_thread_n);

for (size_t i = 0; i < BUSTUB_PAGE_CNT; i++) {
for (size_t i = 0; i < bustub_page_cnt; i++) {
page_id_t page_id;
auto *page = bpm->NewPage(&page_id);
if (page == nullptr) {
throw std::runtime_error("new page failed");
}
char &ch = page->GetData()[i % 1024];
ch = 1;

ModifyPage(page->GetData(), i, 0);

bpm->UnpinPage(page_id, true);
page_ids.push_back(page_id);
}

// enable disk latency after creating all pages
disk_manager->SetLatency(latency_ms);
disk_manager->EnableLatencySimulator(enable_latency != 0);

fmt::print(stderr, "[info] benchmark start\n");

BpmTotalMetrics total_metrics;
total_metrics.Begin();

std::vector<std::thread> threads;
using ModifyRecord = std::unordered_map<page_id_t, uint64_t>;

for (size_t thread_id = 0; thread_id < scan_thread_n; thread_id++) {
threads.emplace_back([bustub_page_cnt, scan_thread_n, thread_id, &page_ids, &bpm, duration_ms, &total_metrics] {
ModifyRecord records;

for (size_t thread_id = 0; thread_id < BUSTUB_SCAN_THREAD; thread_id++) {
threads.emplace_back(std::thread([thread_id, &page_ids, &bpm, duration_ms, &total_metrics] {
BpmMetrics metrics(fmt::format("scan {:>2}", thread_id), duration_ms);
metrics.Begin();

size_t page_idx = BUSTUB_PAGE_CNT * thread_id / BUSTUB_SCAN_THREAD;
size_t page_idx_start = bustub_page_cnt * thread_id / scan_thread_n;
size_t page_idx_end = bustub_page_cnt * (thread_id + 1) / scan_thread_n;
size_t page_idx = page_idx_start;

while (!metrics.ShouldFinish()) {
auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Scan);
if (page == nullptr) {
continue;
}

char &ch = page->GetData()[page_idx % 1024];
page->WLatch();
ch += 1;
if (ch == 0) {
ch = 1;
}
auto &seed = records[page_idx];
CheckPageConsistent(page->GetData(), page_idx, seed);
seed = seed + 1;
ModifyPage(page->GetData(), page_idx, seed);
page->WUnlatch();

bpm->UnpinPage(page->GetPageId(), true, AccessType::Scan);
page_idx = (page_idx + 1) % BUSTUB_PAGE_CNT;
page_idx += 1;
if (page_idx >= page_idx_end) {
page_idx = page_idx_start;
}
metrics.Tick();
metrics.Report();
}

total_metrics.ReportScan(metrics.cnt_);
}));
});
}

for (size_t thread_id = 0; thread_id < BUSTUB_GET_THREAD; thread_id++) {
threads.emplace_back(std::thread([thread_id, &page_ids, &bpm, duration_ms, &total_metrics] {
for (size_t thread_id = 0; thread_id < get_thread_n; thread_id++) {
threads.emplace_back([thread_id, &page_ids, &bpm, bustub_page_cnt, duration_ms, &total_metrics] {
std::random_device r;
std::default_random_engine gen(r());
zipfian_int_distribution<size_t> dist(0, BUSTUB_PAGE_CNT - 1, 0.8);
zipfian_int_distribution<size_t> dist(0, bustub_page_cnt - 1, 0.8);

BpmMetrics metrics(fmt::format("get {:>2}", thread_id), duration_ms);
metrics.Begin();

while (!metrics.ShouldFinish()) {
auto page_idx = dist(gen);
auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Get);
auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Lookup);
if (page == nullptr) {
continue;
}

page->RLatch();
char ch = page->GetData()[page_idx % 1024];
CheckPageConsistentNoSeed(page->GetData(), page_idx);
page->RUnlatch();
if (ch == 0) {
throw std::runtime_error("invalid data");
}

bpm->UnpinPage(page->GetPageId(), false, AccessType::Get);
bpm->UnpinPage(page->GetPageId(), false, AccessType::Lookup);
metrics.Tick();
metrics.Report();
}

total_metrics.ReportGet(metrics.cnt_);
}));
});
}

for (auto &thread : threads) {
Expand Down

0 comments on commit 29cfa2a

Please sign in to comment.