Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p1: improve bpm bench #604

Merged
merged 10 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/buffer/lru_k_replacer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace bustub {

enum class AccessType { Unknown = 0, Get, Scan };
enum class AccessType { Unknown = 0, Lookup, Scan, Index };

class LRUKNode {
private:
Expand Down
61 changes: 49 additions & 12 deletions src/include/storage/disk/disk_manager_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//
//===----------------------------------------------------------------------===//
#include <array>
#include <chrono> // NOLINT
#include <cstring>
#include <fstream>
#include <future> // NOLINT
Expand All @@ -25,6 +26,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/logger.h"
#include "fmt/core.h"
#include "storage/disk/disk_manager.h"

namespace bustub {
Expand Down Expand Up @@ -63,17 +65,15 @@ class DiskManagerMemory : public DiskManager {
*/
class DiskManagerUnlimitedMemory : public DiskManager {
public:
DiskManagerUnlimitedMemory() = default;
DiskManagerUnlimitedMemory() { std::fill(recent_access_.begin(), recent_access_.end(), -1); }

/**
* Write a page to the database file.
* @param page_id id of the page
* @param page_data raw page data
*/
void WritePage(page_id_t page_id, const char *page_data) override {
if (latency_ > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(latency_));
}
ProcessLatency(page_id);

std::unique_lock<std::mutex> l(mutex_);
if (page_id >= static_cast<int>(data_.size())) {
Expand All @@ -87,6 +87,8 @@ class DiskManagerUnlimitedMemory : public DiskManager {
l.unlock();

memcpy(ptr->first.data(), page_data, BUSTUB_PAGE_SIZE);

PostProcessLatency(page_id);
}

/**
Expand All @@ -95,34 +97,69 @@ class DiskManagerUnlimitedMemory : public DiskManager {
* @param[out] page_data output buffer
*/
void ReadPage(page_id_t page_id, char *page_data) override {
if (latency_ > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(latency_));
}
ProcessLatency(page_id);

std::unique_lock<std::mutex> l(mutex_);
if (page_id >= static_cast<int>(data_.size()) || page_id < 0) {
LOG_WARN("page not exist");
fmt::println(stderr, "page {} not in range", page_id);
std::terminate();
return;
}
if (data_[page_id] == nullptr) {
LOG_WARN("page not exist");
fmt::println(stderr, "page {} not exist", page_id);
std::terminate();
return;
}
std::shared_ptr<ProtectedPage> ptr = data_[page_id];
std::shared_lock<std::shared_mutex> l_page(ptr->second);
l.unlock();

memcpy(page_data, ptr->first.data(), BUSTUB_PAGE_SIZE);

PostProcessLatency(page_id);
}

void SetLatency(size_t latency_ms) { latency_ = latency_ms; }
void ProcessLatency(page_id_t page_id) {
uint64_t sleep_micro_sec = 1000; // for random access, 1ms latency
if (latency_simulator_enabled_) {
std::unique_lock<std::mutex> lck(latency_processor_mutex_);
for (auto &recent_page_id : recent_access_) {
if ((recent_page_id & (~0x3)) == (page_id & (~0x3))) {
sleep_micro_sec = 100; // for access in the same "block", 0.1ms latency
break;
}
if (page_id >= recent_page_id && page_id <= recent_page_id + 3) {
sleep_micro_sec = 100; // for sequential access, 0.1ms latency
break;
}
}
lck.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(sleep_micro_sec));
}
}

void PostProcessLatency(page_id_t page_id) {
if (latency_simulator_enabled_) {
std::scoped_lock<std::mutex> lck(latency_processor_mutex_);
recent_access_[access_ptr_] = page_id;
access_ptr_ = (access_ptr_ + 1) % recent_access_.size();
}
}

void EnableLatencySimulator(bool enabled) { latency_simulator_enabled_ = enabled; }

private:
std::mutex mutex_;
bool latency_simulator_enabled_{false};

std::mutex latency_processor_mutex_;
std::array<page_id_t, 4> recent_access_;
uint64_t access_ptr_{0};

using Page = std::array<char, BUSTUB_PAGE_SIZE>;
using ProtectedPage = std::pair<Page, std::shared_mutex>;

std::mutex mutex_;
std::vector<std::shared_ptr<ProtectedPage>> data_;
size_t latency_{0};
};

} // namespace bustub
3 changes: 2 additions & 1 deletion tools/bpm_bench/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
set(BPM_BENCH_SOURCES bpm_bench.cpp)
add_executable(bpm-bench ${BPM_BENCH_SOURCES})
add_executable(bpm-bench ${BPM_BENCH_SOURCES} "${PROJECT_SOURCE_DIR}/tools/backtrace.cpp")
add_backward(bpm-bench)

target_link_libraries(bpm-bench bustub)
set_target_properties(bpm-bench PROPERTIES OUTPUT_NAME bustub-bpm-bench)
143 changes: 107 additions & 36 deletions tools/bpm_bench/bpm_bench.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include <chrono>
#include <exception>
#include <iostream>
#include <memory>
#include <mutex> // NOLINT
#include <random>
#include <sstream>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include <cpp_random_distributions/zipfian_int_distribution.h>
Expand All @@ -29,12 +31,6 @@ auto ClockMs() -> uint64_t {
return static_cast<uint64_t>(tm.tv_sec * 1000) + static_cast<uint64_t>(tm.tv_usec / 1000);
}

static const size_t BUSTUB_SCAN_THREAD = 8;
static const size_t BUSTUB_GET_THREAD = 8;
static const size_t LRU_K_SIZE = 16;
static const size_t BUSTUB_PAGE_CNT = 6400;
static const size_t BUSTUB_BPM_SIZE = 64;

struct BpmTotalMetrics {
uint64_t scan_cnt_{0};
uint64_t get_cnt_{0};
Expand Down Expand Up @@ -100,6 +96,45 @@ struct BpmMetrics {
}
};

struct BustubBenchPageHeader {
uint64_t seed_;
uint64_t page_id_;
char data_[0];
};

/// Modify the page and save some data inside
auto ModifyPage(char *data, size_t page_idx, uint64_t seed) -> void {
auto *pg = reinterpret_cast<BustubBenchPageHeader *>(data);
pg->seed_ = seed;
pg->page_id_ = page_idx;
pg->data_[pg->seed_ % 4000] = pg->seed_ % 256;
}

/// Check the page and verify the data inside
auto CheckPageConsistentNoSeed(const char *data, size_t page_idx) -> void {
const auto *pg = reinterpret_cast<const BustubBenchPageHeader *>(data);
if (pg->page_id_ != page_idx) {
fmt::println(stderr, "page header not consistent: page_id_={} page_idx={}", pg->page_id_, page_idx);
std::terminate();
}
auto left = static_cast<unsigned int>(static_cast<unsigned char>(pg->data_[pg->seed_ % 4000]));
auto right = static_cast<unsigned int>(pg->seed_ % 256);
if (left != right) {
fmt::println(stderr, "page content not consistent: data_[{}]={} seed_ % 256={}", pg->seed_ % 4000, left, right);
std::terminate();
}
}

/// Check the page and verify the data inside
auto CheckPageConsistent(const char *data, size_t page_idx, uint64_t seed) -> void {
const auto *pg = reinterpret_cast<const BustubBenchPageHeader *>(data);
if (pg->seed_ != seed) {
fmt::println(stderr, "page seed not consistent: seed_={} seed={}", pg->seed_, seed);
std::terminate();
}
CheckPageConsistentNoSeed(data, page_idx);
}

// NOLINTNEXTLINE
auto main(int argc, char **argv) -> int {
using bustub::AccessType;
Expand All @@ -109,7 +144,12 @@ auto main(int argc, char **argv) -> int {

argparse::ArgumentParser program("bustub-bpm-bench");
program.add_argument("--duration").help("run bpm bench for n milliseconds");
program.add_argument("--latency").help("set disk latency to n milliseconds");
program.add_argument("--latency").help("enable disk latency");
program.add_argument("--scan-thread-n").help("number of scan threads");
program.add_argument("--get-thread-n").help("number of lookup threads");
program.add_argument("--bpm-size").help("buffer pool size");
program.add_argument("--db-size").help("number of pages");
program.add_argument("--lru-k-size").help("lru-k size");

try {
program.parse_args(argc, argv);
Expand All @@ -124,102 +164,133 @@ auto main(int argc, char **argv) -> int {
duration_ms = std::stoi(program.get("--duration"));
}

uint64_t latency_ms = 0;
uint64_t enable_latency = 0;
if (program.present("--latency")) {
latency_ms = std::stoi(program.get("--latency"));
enable_latency = std::stoi(program.get("--latency"));
}

uint64_t scan_thread_n = 8;
if (program.present("--scan-thread-n")) {
scan_thread_n = std::stoi(program.get("--scan-thread-n"));
}

uint64_t get_thread_n = 8;
if (program.present("--get-thread-n")) {
get_thread_n = std::stoi(program.get("--get-thread-n"));
}

uint64_t bustub_page_cnt = 6400;
if (program.present("--db-size")) {
bustub_page_cnt = std::stoi(program.get("--db-size"));
}

uint64_t bustub_bpm_size = 64;
if (program.present("--bpm-size")) {
bustub_bpm_size = std::stoi(program.get("--bpm-size"));
}

uint64_t lru_k_size = 16;
if (program.present("--lru-k-size")) {
bustub_page_cnt = std::stoi(program.get("--lru-k-size"));
}

auto disk_manager = std::make_unique<DiskManagerUnlimitedMemory>();
auto bpm = std::make_unique<BufferPoolManager>(BUSTUB_BPM_SIZE, disk_manager.get(), LRU_K_SIZE);
auto bpm = std::make_unique<BufferPoolManager>(bustub_bpm_size, disk_manager.get(), lru_k_size);
std::vector<page_id_t> page_ids;

fmt::print(stderr, "[info] total_page={}, duration_ms={}, latency_ms={}, lru_k_size={}, bpm_size={}\n",
BUSTUB_PAGE_CNT, duration_ms, latency_ms, LRU_K_SIZE, BUSTUB_BPM_SIZE);
fmt::print(stderr,
"[info] total_page={}, duration_ms={}, latency={}, lru_k_size={}, bpm_size={}, scan_thread_cnt={}, "
"get_thread_cnt={}\n",
bustub_page_cnt, duration_ms, enable_latency, lru_k_size, bustub_bpm_size, scan_thread_n, get_thread_n);

for (size_t i = 0; i < BUSTUB_PAGE_CNT; i++) {
for (size_t i = 0; i < bustub_page_cnt; i++) {
page_id_t page_id;
auto *page = bpm->NewPage(&page_id);
if (page == nullptr) {
throw std::runtime_error("new page failed");
}
char &ch = page->GetData()[i % 1024];
ch = 1;

ModifyPage(page->GetData(), i, 0);

bpm->UnpinPage(page_id, true);
page_ids.push_back(page_id);
}

// enable disk latency after creating all pages
disk_manager->SetLatency(latency_ms);
disk_manager->EnableLatencySimulator(enable_latency != 0);

fmt::print(stderr, "[info] benchmark start\n");

BpmTotalMetrics total_metrics;
total_metrics.Begin();

std::vector<std::thread> threads;
using ModifyRecord = std::unordered_map<page_id_t, uint64_t>;

for (size_t thread_id = 0; thread_id < scan_thread_n; thread_id++) {
threads.emplace_back([bustub_page_cnt, scan_thread_n, thread_id, &page_ids, &bpm, duration_ms, &total_metrics] {
ModifyRecord records;

for (size_t thread_id = 0; thread_id < BUSTUB_SCAN_THREAD; thread_id++) {
threads.emplace_back(std::thread([thread_id, &page_ids, &bpm, duration_ms, &total_metrics] {
BpmMetrics metrics(fmt::format("scan {:>2}", thread_id), duration_ms);
metrics.Begin();

size_t page_idx = BUSTUB_PAGE_CNT * thread_id / BUSTUB_SCAN_THREAD;
size_t page_idx_start = bustub_page_cnt * thread_id / scan_thread_n;
size_t page_idx_end = bustub_page_cnt * (thread_id + 1) / scan_thread_n;
size_t page_idx = page_idx_start;

while (!metrics.ShouldFinish()) {
auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Scan);
if (page == nullptr) {
continue;
}

char &ch = page->GetData()[page_idx % 1024];
page->WLatch();
ch += 1;
if (ch == 0) {
ch = 1;
}
auto &seed = records[page_idx];
CheckPageConsistent(page->GetData(), page_idx, seed);
seed = seed + 1;
ModifyPage(page->GetData(), page_idx, seed);
page->WUnlatch();

bpm->UnpinPage(page->GetPageId(), true, AccessType::Scan);
page_idx = (page_idx + 1) % BUSTUB_PAGE_CNT;
page_idx += 1;
if (page_idx >= page_idx_end) {
page_idx = page_idx_start;
}
metrics.Tick();
metrics.Report();
}

total_metrics.ReportScan(metrics.cnt_);
}));
});
}

for (size_t thread_id = 0; thread_id < BUSTUB_GET_THREAD; thread_id++) {
threads.emplace_back(std::thread([thread_id, &page_ids, &bpm, duration_ms, &total_metrics] {
for (size_t thread_id = 0; thread_id < get_thread_n; thread_id++) {
threads.emplace_back([thread_id, &page_ids, &bpm, bustub_page_cnt, duration_ms, &total_metrics] {
std::random_device r;
std::default_random_engine gen(r());
zipfian_int_distribution<size_t> dist(0, BUSTUB_PAGE_CNT - 1, 0.8);
zipfian_int_distribution<size_t> dist(0, bustub_page_cnt - 1, 0.8);

BpmMetrics metrics(fmt::format("get {:>2}", thread_id), duration_ms);
metrics.Begin();

while (!metrics.ShouldFinish()) {
auto page_idx = dist(gen);
auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Get);
auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Lookup);
if (page == nullptr) {
continue;
}

page->RLatch();
char ch = page->GetData()[page_idx % 1024];
CheckPageConsistentNoSeed(page->GetData(), page_idx);
page->RUnlatch();
if (ch == 0) {
throw std::runtime_error("invalid data");
}

bpm->UnpinPage(page->GetPageId(), false, AccessType::Get);
bpm->UnpinPage(page->GetPageId(), false, AccessType::Lookup);
metrics.Tick();
metrics.Report();
}

total_metrics.ReportGet(metrics.cnt_);
}));
});
}

for (auto &thread : threads) {
Expand Down
Loading