Skip to content

Commit

Permalink
Merge branch 'unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanlo99 authored Nov 7, 2024
2 parents 786908f + 2fa2de7 commit 838a4ab
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/commands/cmd_search.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ static StatusOr<std::unique_ptr<kqir::Node>> ParseRediSearchQuery(const std::vec

kqir::ParamMap param_map;
while (parser.Good()) {
if (parser.EatEqICase("RETURNS")) {
if (parser.EatEqICase("RETURN")) {
auto count = GET_OR_RET(parser.TakeInt<size_t>());

for (size_t i = 0; i < count; ++i) {
Expand Down
12 changes: 6 additions & 6 deletions src/search/index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ struct IndexManager {
info->Add(kqir::FieldInfo(field_name.ToString(), std::move(field_meta)));
}

IndexUpdater updater(info.get());
indexer->Add(updater);
auto updater = std::make_unique<IndexUpdater>(info.get());
indexer->Add(std::move(updater));
index_map.Insert(std::move(info));
}

Expand Down Expand Up @@ -180,12 +180,12 @@ struct IndexManager {
return {Status::NotOK, fmt::format("failed to write index metadata: {}", s.ToString())};
}

IndexUpdater updater(info.get());
indexer->Add(updater);
auto updater = std::make_unique<IndexUpdater>(info.get());
indexer->Add(std::move(updater));
index_map.Insert(std::move(info));

for (auto updater : indexer->updater_list) {
GET_OR_RET(updater.Build(ctx));
for (const auto &updater : indexer->updater_list) {
GET_OR_RET(updater->Build(ctx));
}

return Status::OK();
Expand Down
35 changes: 20 additions & 15 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,14 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context &ctx, std::string_view k

Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const kqir::Value &current, const SearchKey &search_key,
HnswVectorFieldMetadata *vector) const {
HnswVectorFieldMetadata *vector) {
CHECK(original.IsNull() || original.Is<kqir::NumericArray>());
CHECK(current.IsNull() || current.Is<kqir::NumericArray>());

// TODO: we can remove the lock if we solve the race problem
// inside the HNSW indexer, refer to #2481 and #2489
std::unique_lock lock(update_mutex);

auto storage = indexer->storage;
auto hnsw = HnswIndex(search_key, vector, storage);

Expand All @@ -305,7 +309,7 @@ Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx, std::string_vie
}

Status IndexUpdater::UpdateIndex(engine::Context &ctx, const std::string &field, std::string_view key,
const kqir::Value &original, const kqir::Value &current) const {
const kqir::Value &original, const kqir::Value &current) {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
Expand All @@ -331,7 +335,7 @@ Status IndexUpdater::UpdateIndex(engine::Context &ctx, const std::string &field,
return Status::OK();
}

Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const {
Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, std::string_view key) {
auto current = GET_OR_RET(Record(ctx, key));

for (const auto &[field, i] : info->fields) {
Expand All @@ -354,7 +358,7 @@ Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, s
return Status::OK();
}

Status IndexUpdater::Build(engine::Context &ctx) const {
Status IndexUpdater::Build(engine::Context &ctx) {
auto storage = indexer->storage;
util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), ColumnFamilyID::Metadata);

Expand All @@ -380,26 +384,27 @@ Status IndexUpdater::Build(engine::Context &ctx) const {
return Status::OK();
}

void GlobalIndexer::Add(IndexUpdater updater) {
updater.indexer = this;
for (const auto &prefix : updater.info->prefixes) {
prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false), updater);
void GlobalIndexer::Add(std::unique_ptr<IndexUpdater> updater) {
updater->indexer = this;
for (const auto &prefix : updater->info->prefixes) {
prefix_map.insert(ComposeNamespaceKey(updater->info->ns, prefix, false), updater.get());
}
updater_list.push_back(updater);
updater_list.push_back(std::move(updater));
}

void GlobalIndexer::Remove(const kqir::IndexInfo *index) {
for (auto iter = prefix_map.begin(); iter != prefix_map.end();) {
if (iter->info == index) {
if ((*iter)->info == index) {
iter = prefix_map.erase(iter);
} else {
++iter;
}
}

updater_list.erase(std::remove_if(updater_list.begin(), updater_list.end(),
[index](IndexUpdater updater) { return updater.info == index; }),
updater_list.end());
updater_list.erase(
std::remove_if(updater_list.begin(), updater_list.end(),
[index](const std::unique_ptr<IndexUpdater> &updater) { return updater->info == index; }),
updater_list.end());
}

StatusOr<GlobalIndexer::RecordResult> GlobalIndexer::Record(engine::Context &ctx, std::string_view key,
Expand All @@ -411,14 +416,14 @@ StatusOr<GlobalIndexer::RecordResult> GlobalIndexer::Record(engine::Context &ctx
auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key, false));
if (iter != prefix_map.end()) {
auto updater = iter.value();
return RecordResult{updater, std::string(key.begin(), key.end()), GET_OR_RET(updater.Record(ctx, key))};
return RecordResult{updater, std::string(key.begin(), key.end()), GET_OR_RET(updater->Record(ctx, key))};
}

return {Status::NoPrefixMatched};
}

Status GlobalIndexer::Update(engine::Context &ctx, const RecordResult &original) {
return original.updater.Update(ctx, original.fields, original.key);
return original.updater->Update(ctx, original.fields, original.key);
}

} // namespace redis
17 changes: 9 additions & 8 deletions src/search/indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ struct IndexUpdater {

const kqir::IndexInfo *info = nullptr;
GlobalIndexer *indexer = nullptr;
std::mutex update_mutex;

explicit IndexUpdater(const kqir::IndexInfo *info) : info(info) {}

StatusOr<FieldValues> Record(engine::Context &ctx, std::string_view key) const;
Status UpdateIndex(engine::Context &ctx, const std::string &field, std::string_view key, const kqir::Value &original,
const kqir::Value &current) const;
Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const;
const kqir::Value &current);
Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key);

Status Build(engine::Context &ctx) const;
Status Build(engine::Context &ctx);

Status UpdateTagIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const kqir::Value &current, const SearchKey &search_key, const TagFieldMetadata *tag) const;
Expand All @@ -92,25 +93,25 @@ struct IndexUpdater {
const NumericFieldMetadata *num) const;
Status UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const kqir::Value &current, const SearchKey &search_key,
HnswVectorFieldMetadata *vector) const;
HnswVectorFieldMetadata *vector);
};

struct GlobalIndexer {
using FieldValues = IndexUpdater::FieldValues;
struct RecordResult {
IndexUpdater updater;
IndexUpdater *updater;
std::string key;
FieldValues fields;
};

tsl::htrie_map<char, IndexUpdater> prefix_map;
std::vector<IndexUpdater> updater_list;
tsl::htrie_map<char, IndexUpdater *> prefix_map;
std::vector<std::unique_ptr<IndexUpdater>> updater_list;

engine::Storage *storage = nullptr;

explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {}

void Add(IndexUpdater updater);
void Add(std::unique_ptr<IndexUpdater> updater);
void Remove(const kqir::IndexInfo *index);

StatusOr<RecordResult> Record(engine::Context &ctx, std::string_view key, const std::string &ns);
Expand Down
2 changes: 1 addition & 1 deletion src/search/redis_query_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ struct Transformer : ir::TreeTransformer {
if (Is<UnsignedInteger>(knn_search->children[1])) {
k = *ParseInt(knn_search->children[1]->string());
} else {
k = *ParseInt(GET_OR_RET(GetParam(node)));
k = *ParseInt(GET_OR_RET(GetParam(knn_search->children[1])));
}

return std::make_unique<VectorKnnExpr>(std::make_unique<FieldRef>(knn_search->children[2]->string()),
Expand Down
16 changes: 8 additions & 8 deletions tests/cppunit/indexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct IndexerTest : TestBase {

map.emplace("hashtest", std::move(hash_info));

redis::IndexUpdater hash_updater{map.at("hashtest").get()};
auto hash_updater = std::make_unique<redis::IndexUpdater>(map.at("hashtest").get());

redis::IndexMetadata json_field_meta;
json_field_meta.on_data_type = redis::IndexOnDataType::JSON;
Expand All @@ -65,7 +65,7 @@ struct IndexerTest : TestBase {

map.emplace("jsontest", std::move(json_info));

redis::IndexUpdater json_updater{map.at("jsontest").get()};
auto json_updater = std::make_unique<redis::IndexUpdater>(map.at("jsontest").get());

indexer.Add(std::move(hash_updater));
indexer.Add(std::move(json_updater));
Expand All @@ -87,7 +87,7 @@ TEST_F(IndexerTest, HashTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_EQ(s.Msg(), Status::ok_msg);
ASSERT_EQ(s->updater.info->name, idxname);
ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_TRUE(s->fields.empty());

uint64_t cnt = 0;
Expand Down Expand Up @@ -120,7 +120,7 @@ TEST_F(IndexerTest, HashTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_TRUE(s);
ASSERT_EQ(s->updater.info->name, idxname);
ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_EQ(s->fields.size(), 1);
ASSERT_EQ(s->fields["x"], T("food,kitChen,Beauty"));

Expand Down Expand Up @@ -178,7 +178,7 @@ TEST_F(IndexerTest, JsonTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_TRUE(s);
ASSERT_EQ(s->updater.info->name, idxname);
ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_TRUE(s->fields.empty());

auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})");
Expand Down Expand Up @@ -210,7 +210,7 @@ TEST_F(IndexerTest, JsonTag) {
{
auto s = indexer.Record(*ctx_, key1, ns);
ASSERT_TRUE(s);
ASSERT_EQ(s->updater.info->name, idxname);
ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_EQ(s->fields.size(), 1);
ASSERT_EQ(s->fields["$.x"], T("food,kitChen,Beauty"));

Expand Down Expand Up @@ -262,7 +262,7 @@ TEST_F(IndexerTest, JsonTagBuildIndex) {
auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})");
ASSERT_TRUE(s_set.ok());

auto s2 = indexer.updater_list[1].Build(*ctx_);
auto s2 = indexer.updater_list[1]->Build(*ctx_);
ASSERT_EQ(s2.Msg(), Status::ok_msg);

auto key = redis::SearchKey(ns, idxname, "$.x").ConstructTagFieldData("food", key1);
Expand Down Expand Up @@ -301,7 +301,7 @@ TEST_F(IndexerTest, JsonHnswVector) {
{
auto s = indexer.Record(*ctx_, key3, ns);
ASSERT_TRUE(s);
ASSERT_EQ(s->updater.info->name, idxname);
ASSERT_EQ(s->updater->info->name, idxname);
ASSERT_TRUE(s->fields.empty());

auto s_set = db.Set(*ctx_, key3, "$", R"({"z": [1,2,3]})");
Expand Down
6 changes: 3 additions & 3 deletions tests/cppunit/plan_executor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ std::vector<std::unique_ptr<ScopedUpdate>> ScopedUpdates(engine::Context& ctx, r

TEST_F(PlanExecutorTestC, NumericFieldScan) {
redis::GlobalIndexer indexer(storage_.get());
indexer.Add(redis::IndexUpdater(IndexI()));
indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));

{
engine::Context ctx(storage_.get());
Expand Down Expand Up @@ -428,7 +428,7 @@ TEST_F(PlanExecutorTestC, NumericFieldScan) {

TEST_F(PlanExecutorTestC, TagFieldScan) {
redis::GlobalIndexer indexer(storage_.get());
indexer.Add(redis::IndexUpdater(IndexI()));
indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));

{
engine::Context ctx(storage_.get());
Expand Down Expand Up @@ -467,7 +467,7 @@ TEST_F(PlanExecutorTestC, TagFieldScan) {

TEST_F(PlanExecutorTestC, HnswVectorFieldScans) {
redis::GlobalIndexer indexer(storage_.get());
indexer.Add(redis::IndexUpdater(IndexI()));
indexer.Add(std::make_unique<redis::IndexUpdater>(IndexI()));

{
auto updates = ScopedUpdates(*ctx_, indexer,
Expand Down

0 comments on commit 838a4ab

Please sign in to comment.