Skip to content

Commit

Permalink
[fix][benchmark] Fixup vector index benchmark issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
rock-git authored and ketor committed Jan 31, 2024
1 parent 1c660d6 commit 4d377fa
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 82 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ include(rocksdb)
include(bdb)
include(brpc)
include(braft)
include(hdf5)

if(BUILD_BENCHMARK)
include(hdf5)
endif()

message(STATUS "protoc: ${PROTOBUF_PROTOC_EXECUTABLE}")
message(STATUS "protoc lib: ${PROTOBUF_PROTOC_LIBRARY}")
Expand Down
2 changes: 1 addition & 1 deletion contrib/hdf5
Submodule hdf5 updated 1216 files
73 changes: 47 additions & 26 deletions src/benchmark/benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ DEFINE_validator(vector_metric_type, [](const char*, const std::string& value) -
auto metric_type = dingodb::Helper::ToUpper(value);
return metric_type == "NONE" || metric_type == "L2" || metric_type == "IP" || metric_type == "COSINE";
});
DEFINE_string(vector_partition_vector_ids, "", "Vector id used by partition");

DEFINE_uint32(hnsw_ef_construction, 500, "HNSW ef construction");
DEFINE_uint32(hnsw_nlink_num, 64, "HNSW nlink number");
Expand All @@ -98,6 +99,9 @@ DEFINE_bool(vector_search_use_brute_force, false, "Vector search flag use_brute_
DEFINE_bool(vector_search_enable_range_search, false, "Vector search flag enable_range_search");
DEFINE_double(vector_search_radius, 0.1, "Vector search flag radius");

DECLARE_uint32(vector_put_batch_size);
DECLARE_uint32(vector_arrange_concurrency);

DECLARE_string(benchmark);
DECLARE_uint32(key_size);
DECLARE_uint32(value_size);
Expand All @@ -110,7 +114,7 @@ namespace benchmark {

static const std::string kClientRaw = "w";

static const std::string kNamePrefix = "Benchmark_";
static const std::string kNamePrefix = "Benchmark";

static std::string EncodeRawKey(const std::string& str) { return kClientRaw + str; }

Expand Down Expand Up @@ -194,7 +198,7 @@ void Stats::Report(bool is_cumulative, size_t milliseconds) const {
latency_recorder_->latency_percentile(0.99))
<< '\n';
} else {
std::cout << fmt::format("{:>8}{:>8}{:>8}{:>8.0f}{:>8.2f}{:>16}{:>16}{:>16}{:>16}{:>16}{:>12.2f}", epoch_, req_num_,
std::cout << fmt::format("{:>8}{:>8}{:>8}{:>8.0f}{:>8.2f}{:>16}{:>16}{:>16}{:>16}{:>16}{:>16.2f}", epoch_, req_num_,
error_count_, (req_num_ / seconds), (write_bytes_ / seconds / 1048576),
latency_recorder_->latency(), latency_recorder_->max_latency(),
latency_recorder_->latency_percentile(0.5), latency_recorder_->latency_percentile(0.95),
Expand All @@ -209,7 +213,7 @@ std::string Stats::Header() {
"MB/s", "LATENCY_AVG(us)", "LATENCY_MAX(us)", "LATENCY_P50(us)", "LATENCY_P95(us)",
"LATENCY_P99(us)");
} else {
return fmt::format("{:>8}{:>8}{:>8}{:>8}{:>8}{:>16}{:>16}{:>16}{:>16}{:>16}{:>12}", "EPOCH", "REQ_NUM", "ERRORS",
return fmt::format("{:>8}{:>8}{:>8}{:>8}{:>8}{:>16}{:>16}{:>16}{:>16}{:>16}{:>16}", "EPOCH", "REQ_NUM", "ERRORS",
"QPS", "MB/s", "LATENCY_AVG(us)", "LATENCY_MAX(us)", "LATENCY_P50(us)", "LATENCY_P95(us)",
"LATENCY_P99(us)", "RECALL_AVG(%)");
}
Expand Down Expand Up @@ -266,7 +270,7 @@ bool Benchmark::Arrange() {
FLAGS_vector_dimension = dataset_->GetDimension();
}
vector_index_entries_ = ArrangeVectorIndex(FLAGS_vector_index_num);
if (vector_index_entries_.size() != FLAGS_region_num) {
if (vector_index_entries_.size() != FLAGS_vector_index_num) {
return false;
}
} else {
Expand Down Expand Up @@ -298,15 +302,15 @@ std::vector<RegionEntryPtr> Benchmark::ArrangeRegion(int num) {
threads.reserve(num);
for (int thread_no = 0; thread_no < num; ++thread_no) {
threads.emplace_back([this, thread_no, &region_entries, &mutex]() {
auto name = fmt::format("{}{:014}_{}", kNamePrefix, Helper::TimestampMs(), thread_no);
auto name = fmt::format("{}_{}_{}", kNamePrefix, Helper::TimestampMs(), thread_no + 1);
std::string prefix = fmt::format("{}{:06}", FLAGS_prefix, thread_no);
auto region_id = CreateRegion(name, prefix, Helper::PrefixNext(prefix), GetRawEngineType(), FLAGS_replica);
if (region_id == 0) {
LOG(ERROR) << fmt::format("Create region failed, region_name: {}", name);
LOG(ERROR) << fmt::format("create region failed, name: {}", name);
return;
}

std::cout << fmt::format("Create region name({}) id({}) prefix({}) done", name, region_id, prefix) << '\n';
std::cout << fmt::format("create region name({}) id({}) prefix({}) done", name, region_id, prefix) << '\n';

auto region_entry = std::make_shared<RegionEntry>();
region_entry->prefix = prefix;
Expand All @@ -325,23 +329,34 @@ std::vector<RegionEntryPtr> Benchmark::ArrangeRegion(int num) {
}

std::vector<VectorIndexEntryPtr> Benchmark::ArrangeVectorIndex(int num) {
std::mutex mutex;
std::vector<VectorIndexEntryPtr> vector_index_entries;

for (int i = 0; i < num; ++i) {
std::string name = kNamePrefix + fmt::format("{:014}_{}", Helper::TimestampMs(), i + 1);
auto index_id = CreateVectorIndex(name, FLAGS_vector_index_type);
if (index_id == 0) {
return vector_index_entries;
}
std::vector<std::thread> threads;
threads.reserve(num);
for (int thread_no = 0; thread_no < num; ++thread_no) {
threads.emplace_back([this, thread_no, &vector_index_entries, &mutex]() {
std::string name = fmt::format("{}_{}_{}", kNamePrefix, Helper::TimestampMs(), thread_no + 1);
auto index_id = CreateVectorIndex(name, FLAGS_vector_index_type);
if (index_id == 0) {
LOG(ERROR) << fmt::format("create vector index failed, name: {}", name);
return;
}

std::cout << fmt::format("Create vector index name({}) id({}) type({}) done", name, index_id,
FLAGS_vector_index_type)
<< '\n';
std::cout << fmt::format("create vector index name({}) id({}) type({}) done", name, index_id,
FLAGS_vector_index_type)
<< '\n';

auto entry = std::make_shared<VectorIndexEntry>();
entry->index_id = index_id;
auto entry = std::make_shared<VectorIndexEntry>();
entry->index_id = index_id;

vector_index_entries.push_back(entry);
std::lock_guard lock(mutex);
vector_index_entries.push_back(entry);
});
}

for (auto& thread : threads) {
thread.join();
}

return vector_index_entries;
Expand Down Expand Up @@ -393,9 +408,9 @@ void Benchmark::Clean() {
}

// Drop vector index
// for (auto& vector_index_entry : vector_index_entries_) {
// DropVectorIndex(vector_index_entry->index_id);
// }
for (auto& vector_index_entry : vector_index_entries_) {
DropVectorIndex(vector_index_entry->index_id);
}
}

int64_t Benchmark::CreateRegion(const std::string& name, const std::string& start_key, const std::string& end_key,
Expand All @@ -411,7 +426,7 @@ int64_t Benchmark::CreateRegion(const std::string& name, const std::string& star
.SetRange(EncodeRawKey(start_key), EncodeRawKey(end_key))
.Create(region_id);
if (!status.IsOK()) {
LOG(ERROR) << fmt::format("Create region failed, {}", status.ToString());
LOG(ERROR) << fmt::format("create region failed, {}", status.ToString());
return 0;
}
if (region_id == 0) {
Expand Down Expand Up @@ -490,7 +505,10 @@ int64_t Benchmark::CreateVectorIndex(const std::string& name, const std::string&
CHECK(status.ok()) << fmt::format("new vector index creator failed, {}", status.ToString());

int64_t vector_index_id = 0;
std::vector<int64_t> separator_id = {70001};
std::vector<int64_t> separator_id;
if (!FLAGS_vector_partition_vector_ids.empty()) {
Helper::SplitString(FLAGS_vector_partition_vector_ids, ',', separator_id);
}

creator->SetName(name).SetSchemaId(1).SetRangePartitions(separator_id).SetReplicaNum(3);

Expand All @@ -512,7 +530,7 @@ int64_t Benchmark::CreateVectorIndex(const std::string& name, const std::string&

status = creator->Create(vector_index_id);
if (!status.IsOK()) {
LOG(ERROR) << fmt::format("Create vector index failed, {}", status.ToString());
LOG(ERROR) << fmt::format("create vector index failed, {}", status.ToString());
return 0;
}
if (vector_index_id == 0) {
Expand All @@ -527,7 +545,7 @@ int64_t Benchmark::CreateVectorIndex(const std::string& name, const std::string&
void Benchmark::DropVectorIndex(int64_t vector_index_id) {
CHECK(vector_index_id != 0) << "vector_index_id is invalid";
auto status = client_->DropIndex(vector_index_id);
CHECK(status.IsOK()) << fmt::format("Drop vector index failed, {}", status.ToString());
CHECK(status.IsOK()) << fmt::format("drop vector index failed, {}", status.ToString());
}

static std::vector<std::string> ExtractPrefixs(const std::vector<RegionEntryPtr>& region_entries) {
Expand Down Expand Up @@ -769,7 +787,10 @@ void Environment::PrintParam() {
std::cout << fmt::format("{:<34}: {:>32}", "vector_value_type", FLAGS_vector_value_type) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "vector_max_element_num", FLAGS_vector_max_element_num) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "vector_metric_type", FLAGS_vector_metric_type) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "vector_partition_vector_ids", FLAGS_vector_partition_vector_ids) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "vector_dataset", FLAGS_vector_dataset) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "vector_arrange_concurrency", FLAGS_vector_arrange_concurrency) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "vector_put_batch_size", FLAGS_vector_put_batch_size) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "hnsw_ef_construction", FLAGS_hnsw_ef_construction) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "hnsw_nlink_num", FLAGS_hnsw_nlink_num) << '\n';
std::cout << fmt::format("{:<34}: {:>32}", "ivf_ncentroids", FLAGS_ivf_ncentroids) << '\n';
Expand Down
19 changes: 11 additions & 8 deletions src/benchmark/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "benchmark/dataset.h"

#include <cassert>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <exception>
Expand All @@ -29,6 +30,7 @@
#include "gflags/gflags_declare.h"

DECLARE_uint32(vector_put_batch_size);
DECLARE_uint32(vector_search_topk);

namespace dingodb {
namespace benchmark {
Expand All @@ -44,16 +46,11 @@ std::shared_ptr<Dataset> Dataset::New(std::string filepath) {
return std::make_shared<GistDataset>(filepath);

} else if (filepath.find("kosarak") == std::string::npos) {
return std::make_shared<KosarakDataset>(filepath);

} else if (filepath.find("lastfm") == std::string::npos) {
return std::make_shared<LastfmDataset>(filepath);

} else if (filepath.find("mnist") == std::string::npos) {
return std::make_shared<MnistDataset>(filepath);

} else if (filepath.find("movielens10m") == std::string::npos) {
return std::make_shared<Movielens10mDataset>(filepath);
}

std::cout << "Not support dataset, path: " << filepath << std::endl;
Expand All @@ -66,6 +63,8 @@ BaseDataset::BaseDataset(std::string filepath) : filepath_(filepath) {}
BaseDataset::~BaseDataset() { h5file_->close(); }

bool BaseDataset::Init() {
std::lock_guard lock(mutex_);

try {
h5file_ = std::make_shared<H5::H5File>(filepath_, H5F_ACC_RDONLY);
{
Expand Down Expand Up @@ -140,6 +139,8 @@ uint32_t BaseDataset::GetTrainDataCount() const { return train_row_count_; }
uint32_t BaseDataset::GetTestDataCount() const { return test_row_count_; }

void BaseDataset::GetBatchTrainData(uint32_t batch_num, std::vector<sdk::VectorWithId>& vector_with_ids, bool& is_eof) {
std::lock_guard lock(mutex_);

is_eof = false;
H5::DataSet dataset = h5file_->openDataSet("train");
H5::DataSpace dataspace = dataset.getSpace();
Expand Down Expand Up @@ -204,6 +205,8 @@ static void PrintVector(const std::vector<T>& vec) {
}

std::vector<BaseDataset::TestEntryPtr> BaseDataset::GetTestData() {
std::lock_guard lock(mutex_);

H5::DataSet dataset = h5file_->openDataSet("test");
H5::DataSpace dataspace = dataset.getSpace();

Expand All @@ -228,7 +231,7 @@ std::vector<BaseDataset::TestEntryPtr> BaseDataset::GetTestData() {

// gernerate test entries
std::vector<BaseDataset::TestEntryPtr> test_entries;
for (int i = 0; i < 3; ++i) {
for (int i = 0; i < row_count; ++i) {
auto test_entry = std::make_shared<BaseDataset::TestEntry>();
test_entry->vector_with_id.id = 0;
test_entry->vector_with_id.vector.dimension = dimension;
Expand All @@ -238,7 +241,6 @@ std::vector<BaseDataset::TestEntryPtr> BaseDataset::GetTestData() {
vec.resize(dimension);
memcpy(vec.data(), buf.data() + i * dimension, dimension * sizeof(float));
test_entry->vector_with_id.vector.float_values.swap(vec);
// PrintVector(test_entry->vector_with_id.vector.float_values);

test_entry->neighbors = GetTestVectorNeighbors(i);

Expand All @@ -257,8 +259,9 @@ std::unordered_map<int64_t, float> BaseDataset::GetTestVectorNeighbors(uint32_t
auto distances = GetDistances(index);
assert(vector_ids.size() == distances.size());

uint32_t size = std::min(static_cast<uint32_t>(vector_ids.size()), FLAGS_vector_search_topk);
std::unordered_map<int64_t, float> neighbors;
for (int i = 0; i < vector_ids.size(); ++i) {
for (uint32_t i = 0; i < size; ++i) {
neighbors.insert(std::make_pair(vector_ids[i] + 1, distances[i]));
}

Expand Down
2 changes: 2 additions & 0 deletions src/benchmark/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -89,6 +90,7 @@ class BaseDataset : public Dataset {
uint32_t test_row_count_{0};

uint32_t dimension_{0};
std::mutex mutex_;
};

// sift/glove/gist/mnist is same
Expand Down
4 changes: 4 additions & 0 deletions src/benchmark/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "benchmark/benchmark.h"
#include "benchmark/dataset.h"
#include "gflags/gflags.h"
#include "glog/logging.h"

const std::string kVersion = "0.1.0";

Expand Down Expand Up @@ -48,6 +49,9 @@ static std::string GetUsageMessage() {
message += "\n --vector_value_type vector value type float/uint8, default(float)";
message += "\n --vector_max_element_num vector index contain max element number, default(100000)";
message += "\n --vector_metric_type calcute vector distance method L2/IP/COSINE, default(L2)";
message += "\n --vector_partition_vector_ids vector id used by partition, default()";
message += "\n --vector_arrange_concurrency vector arrange concurrency, default(10)";
message += "\n --vector_put_batch_size vector put batch size, default(512)";
message += "\n --hnsw_ef_construction HNSW ef construction, default(true)";
message += "\n --hnsw_nlink_num HNSW nlink number, default(32)";
message += "\n --ivf_ncentroids IVF ncentroids, default(2048)";
Expand Down
Loading

0 comments on commit 4d377fa

Please sign in to comment.