From 7bacd846c3d36632dae4bdbeda4c933e72d30c60 Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 4 Nov 2024 16:04:20 +0800 Subject: [PATCH 1/2] Minor: change is_txn_mode to txn_context_enabled --- src/cluster/slot_migrate.h | 2 +- src/commands/cmd_txn.cc | 4 ++++ src/server/redis_connection.cc | 2 +- src/storage/storage.cc | 34 +++++++++++++++++----------------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 1114f2a1b55..6d323b6bdf2 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -45,7 +45,7 @@ enum class MigrationType { /// Use Redis commands to migrate data. - /// It will trying to extract commands from existing data and log, then replay + /// It will try to extract commands from existing data and log, then replay /// them on the destination node. kRedisCommand = 0, /// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data. diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc index 4f3cb3a4d4f..3d88d9a2ec4 100644 --- a/src/commands/cmd_txn.cc +++ b/src/commands/cmd_txn.cc @@ -85,6 +85,10 @@ class CommandExec : public Commander { auto s = storage->BeginTxn(); if (s.IsOK()) { conn->ExecuteCommands(conn->GetMultiExecCommands()); + // In Redis, errors happening after EXEC instead are not handled in a special way: + // all the other commands will be executed even if some command fails during + // the transaction. + // So, if conn->IsMultiError(), the transaction should still be committed. s = storage->CommitTxn(); } return s; diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index 70abfe70b54..c7bf90afde6 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -464,7 +464,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { // We don't execute commands, but queue them, and then execute in EXEC command if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdEndMulti)) { - multi_cmds_.emplace_back(cmd_tokens); + multi_cmds_.emplace_back(std::move(cmd_tokens)); Reply(redis::SimpleString("QUEUED")); continue; } diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 2eead08ace6..c5ee60e7890 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -597,14 +597,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key, std::string *value) { - if (ctx.is_txn_mode) { + if (ctx.txn_context_enabled) { DCHECK_NOTNULL(options.snapshot); DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } rocksdb::Status s; if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value); - } else if (ctx.batch && ctx.is_txn_mode) { + } else if (ctx.batch && ctx.txn_context_enabled) { s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value); } else { s = db_->Get(options, column_family, key, value); @@ -622,14 +622,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key, rocksdb::PinnableSlice *value) { - if (ctx.is_txn_mode) { + if (ctx.txn_context_enabled) { DCHECK_NOTNULL(options.snapshot); DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } rocksdb::Status s; if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value); - } else if (ctx.is_txn_mode && ctx.batch) { + } else if (ctx.txn_context_enabled && ctx.batch) { s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value); } else { s = db_->Get(options, column_family, key, value); @@ -655,14 +655,14 @@ void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_famil rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family) { - if (ctx.is_txn_mode) { + if (ctx.txn_context_enabled) { DCHECK_NOTNULL(options.snapshot); DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } auto iter = db_->NewIterator(options, column_family); if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { return txn_write_batch_->NewIteratorWithBase(column_family, iter, &options); - } else if (ctx.is_txn_mode && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) { + } else if (ctx.txn_context_enabled && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) { return ctx.batch->NewIteratorWithBase(column_family, iter, &options); } return iter; @@ -671,14 +671,14 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::Rea void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family, const size_t num_keys, const rocksdb::Slice *keys, rocksdb::PinnableSlice *values, rocksdb::Status *statuses) { - if (ctx.is_txn_mode) { + if (ctx.txn_context_enabled) { DCHECK_NOTNULL(options.snapshot); DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses, false); - } else if (ctx.is_txn_mode && ctx.batch) { + } else if (ctx.txn_context_enabled && ctx.batch) { ctx.batch->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses, false); } else { db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false); @@ -705,13 +705,13 @@ rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOpt updates->PutLogData(ServerLogData(kReplIdLog, replid_).Encode()); } - if (ctx.is_txn_mode) { - if (ctx.batch == nullptr) { - ctx.batch = std::make_unique(); + if (ctx.txn_context_enabled) { + if (ctx.batch != nullptr) { + WriteBatchIndexer handle(ctx); + auto s = updates->Iterate(&handle); + if (!s.ok()) return s; + ctx.batch = nullptr; } - WriteBatchIndexer handle(ctx); - auto s = updates->Iterate(&handle); - if (!s.ok()) return s; } return db_->Write(options, updates); @@ -1276,19 +1276,19 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d [[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() { rocksdb::ReadOptions read_options; - if (is_txn_mode) read_options.snapshot = GetSnapshot(); + if (txn_context_enabled) read_options.snapshot = GetSnapshot(); return read_options; } [[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() { rocksdb::ReadOptions read_options = storage->DefaultScanOptions(); - if (is_txn_mode) read_options.snapshot = GetSnapshot(); + if (txn_context_enabled) read_options.snapshot = GetSnapshot(); return read_options; } [[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() { rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions(); - if (is_txn_mode) read_options.snapshot = GetSnapshot(); + if (txn_context_enabled) read_options.snapshot = GetSnapshot(); return read_options; } From 0440925b36124f3d37baeb58072caa04e1aa506d Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 4 Nov 2024 22:37:19 +0800 Subject: [PATCH 2/2] Fix ci and add comment --- src/config/config.h | 2 +- src/storage/storage.cc | 17 ++++++++------ src/storage/storage.h | 53 +++++++++++++++++++++++++++++------------- 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/src/config/config.h b/src/config/config.h index bc33ac97c82..d2d4e8c379e 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -54,7 +54,7 @@ constexpr const size_t GiB = 1024L * MiB; constexpr const uint32_t kDefaultPort = 6666; constexpr const char *kDefaultNamespace = "__namespace"; -constexpr const size_t KVROCKS_MAX_LSM_LEVEL = 7; +constexpr int KVROCKS_MAX_LSM_LEVEL = 7; enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC }; diff --git a/src/storage/storage.cc b/src/storage/storage.cc index c5ee60e7890..80e36d341bc 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -700,20 +700,23 @@ rocksdb::Status Storage::Write(engine::Context &ctx, const rocksdb::WriteOptions rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates) { - // Put replication id logdata at the end of write batch - if (replid_.length() == kReplIdLength) { - updates->PutLogData(ServerLogData(kReplIdLog, replid_).Encode()); - } - if (ctx.txn_context_enabled) { if (ctx.batch != nullptr) { + // Extract writes from the batch and append to the updates WriteBatchIndexer handle(ctx); auto s = updates->Iterate(&handle); if (!s.ok()) return s; + // cleanup the batch to avoid it + // being written to the db again ctx.batch = nullptr; } } + // Put replication id logdata at the end of `updates`. + if (replid_.length() == kReplIdLength) { + updates->PutLogData(ServerLogData(kReplIdLog, replid_).Encode()); + } + return db_->Write(options, updates); } @@ -869,8 +872,8 @@ Status Storage::BeginTxn() { // so it's fine to reset the global write batch without any lock. is_txn_mode_ = true; txn_write_batch_ = - std::make_unique(rocksdb::BytewiseComparator() /*default backup_index_comparator */, - 0 /* default reserved_bytes*/, GetWriteBatchMaxBytes()); + std::make_unique(/*backup_index_comparator=*/rocksdb::BytewiseComparator(), + /*reserved_bytes=*/0, GetWriteBatchMaxBytes()); return Status::OK(); } diff --git a/src/storage/storage.h b/src/storage/storage.h index c29b4689de5..d8ac4851426 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -248,11 +248,16 @@ class Storage { rocksdb::ColumnFamilyHandle *column_family); rocksdb::Iterator *NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options); - [[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options, - rocksdb::WriteBatch *updates); - const rocksdb::WriteOptions &DefaultWriteOptions() { return default_write_opts_; } + const rocksdb::WriteOptions &DefaultWriteOptions() const { return default_write_opts_; } rocksdb::ReadOptions DefaultScanOptions() const; rocksdb::ReadOptions DefaultMultiGetOptions() const; + + /// Write writes the batch to the storage. + /// + /// If `ctx` is in transactional mode, the batch in `ctx` will be added to the transactional + /// write batch. + [[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options, + rocksdb::WriteBatch *updates); [[nodiscard]] rocksdb::Status Delete(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::Slice &key); [[nodiscard]] rocksdb::Status DeleteRange(engine::Context &ctx, const rocksdb::WriteOptions &options, @@ -336,6 +341,9 @@ class Storage { void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = yes_or_no; } bool IsDBInRetryableIOError() const { return db_in_retryable_io_error_; } + /// Redis PSYNC relies on a Unique Replication Sequence Id when use-rsid-psync + /// enabled. + /// ShiftReplId would generate an Id and write it to propagate cf. Status ShiftReplId(engine::Context &ctx); std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq); std::string GetReplIdFromDbEngine(); @@ -363,6 +371,8 @@ class Storage { std::atomic db_in_retryable_io_error_{false}; + // is_txn_mode_ is used to determine whether the current Storage is in transactional mode, + // .i.e, in "EXEC" command(CommandExec). std::atomic is_txn_mode_ = false; // txn_write_batch_ is used as the global write batch for the transaction mode, // all writes will be grouped in this write batch when entering the transaction mode, @@ -380,39 +390,48 @@ class Storage { /// Context passes fixed snapshot and batch between APIs /// -/// Limitations: Performing a large number of writes on the same Context may reduce performance. -/// Please choose to use the same Context or create a new Context based on the actual situation. +/// Limitations: Performing a large number of writes or apply operations like DeleteRange +/// on the same Context may reduce performance. +/// Please choose to use the same Context or create a new Context based on the actual +/// situation. /// /// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs. struct Context { engine::Storage *storage = nullptr; + /// batch can be nullptr if + /// 1. The Context is not in transactional mode. + /// 2. The Context is in transactional mode, but no write operation is performed. std::unique_ptr batch = nullptr; - /// is_txn_mode is used to determine whether the current Context is in transactional mode, + /// txn_context_enabled is used to determine whether the current Context is in transactional mode, /// if it is not transactional mode, then Context is equivalent to a Storage. /// If the configuration of txn-context-enabled is no, it is false. - bool is_txn_mode = true; + bool txn_context_enabled = true; /// NoTransactionContext returns a Context with a is_txn_mode of false - static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, false); } + static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, /*txn_mode=*/false); } - /// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true, then its snapshot is specified by the - /// Context + /// GetReadOptions returns a default ReadOptions, and if txn_context_enabled = true, + /// then its snapshot is specified by the Context. + /// Otherwise it is the same as Storage::DefaultReadOptions(). [[nodiscard]] rocksdb::ReadOptions GetReadOptions(); - /// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode = true, then its snapshot is specified by the - /// Context. Otherwise it is the same as Storage::DefaultScanOptions + /// DefaultScanOptions returns a DefaultScanOptions, and if txn_context_enabled = true, + /// then its snapshot is specified by the Context. + /// Otherwise it is the same as Storage::DefaultScanOptions(). [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions(); - /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if is_txn_mode = true, then its snapshot is specified - /// by the Context. Otherwise it is the same as Storage::DefaultMultiGetOptions + /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if txn_context_enabled = true, + /// then its snapshot is specified by the Context. + /// Otherwise it is the same as Storage::DefaultMultiGetOptions [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions(); void RefreshLatestSnapshot(); /// TODO: Change it to defer getting the context, and the snapshot is pinned after the first read operation explicit Context(engine::Storage *storage) - : storage(storage), is_txn_mode(storage->GetConfig()->txn_context_enabled) {} + : storage(storage), txn_context_enabled(storage->GetConfig()->txn_context_enabled) {} ~Context() { + // A moved-from object doesn't have `storage`. if (storage) { if (snapshot_ && storage->GetDB()) { storage->GetDB()->ReleaseSnapshot(snapshot_); @@ -441,6 +460,8 @@ struct Context { // and it's not a thread-safe operation. const rocksdb::Snapshot *GetSnapshot() { if (snapshot_ == nullptr) { + // Should not acquire a snapshot_ on a moved-from object. + DCHECK(storage != nullptr); snapshot_ = storage->GetDB()->GetSnapshot(); // NOLINT } return snapshot_; @@ -448,7 +469,7 @@ struct Context { private: /// It is only used by NonTransactionContext - explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), is_txn_mode(txn_mode) {} + explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), txn_context_enabled(txn_mode) {} /// If is_txn_mode is true, snapshot should be specified instead of nullptr when used, /// and should be consistent with snapshot in ReadOptions to avoid ambiguity.