From 886af5c76cd8639ed333ab510dd82659b67c8564 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 14 May 2024 20:47:50 +0800 Subject: [PATCH 1/2] chore: minor refactor on etcd kvbackend --- src/common/meta/src/kv_backend/etcd.rs | 17 ++++------ src/common/meta/src/kv_backend/txn/etcd.rs | 37 +++++++++++----------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index c437f90a25c7..1cdd45bc5c13 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -32,11 +32,6 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; -fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue { - let (key, value) = kv.into_key_value(); - KeyValue { key, value } -} - pub struct EtcdStore { client: Client, // Maximum number of operations permitted in a transaction. @@ -123,7 +118,7 @@ impl KvBackend for EtcdStore { let kvs = res .take_kvs() .into_iter() - .map(convert_key_value) + .map(KeyValue::from) .collect::>(); Ok(RangeResponse { @@ -146,7 +141,7 @@ impl KvBackend for EtcdStore { .await .context(error::EtcdFailedSnafu)?; - let prev_kv = res.take_prev_key().map(convert_key_value); + let prev_kv = res.take_prev_key().map(KeyValue::from); Ok(PutResponse { prev_kv }) } @@ -165,7 +160,7 @@ impl KvBackend for EtcdStore { for op_res in txn_res.op_responses() { match op_res { TxnOpResponse::Put(mut put_res) => { - if let Some(prev_kv) = put_res.take_prev_key().map(convert_key_value) { + if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) { prev_kvs.push(prev_kv); } } @@ -194,7 +189,7 @@ impl KvBackend for EtcdStore { TxnOpResponse::Get(get_res) => get_res, _ => unreachable!(), }; - kvs.extend(get_res.take_kvs().into_iter().map(convert_key_value)); + kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from)); } } @@ -214,7 +209,7 @@ impl KvBackend for EtcdStore { let prev_kvs = res .take_prev_kvs() .into_iter() - .map(convert_key_value) + .map(KeyValue::from) .collect::>(); Ok(DeleteRangeResponse { @@ -242,7 +237,7 @@ impl KvBackend for EtcdStore { delete_res .take_prev_kvs() .into_iter() - .map(convert_key_value) + .map(KeyValue::from) .for_each(|kv| { prev_kvs.push(kv); }); diff --git a/src/common/meta/src/kv_backend/txn/etcd.rs b/src/common/meta/src/kv_backend/txn/etcd.rs index ccee8b6062f4..cb516ed51a4c 100644 --- a/src/common/meta/src/kv_backend/txn/etcd.rs +++ b/src/common/meta/src/kv_backend/txn/etcd.rs @@ -20,6 +20,7 @@ use etcd_client::{ use super::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse}; use crate::error::{self, Result}; use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse}; +use crate::rpc::KeyValue; impl From for EtcdTxn { fn from(txn: Txn) -> Self { @@ -65,7 +66,7 @@ impl From for EtcdCompare { }; match cmp.target { Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target), - // create revision 0 means key was not exist + // create revision 0 means key does not exist None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0), } } @@ -86,28 +87,28 @@ impl TryFrom for TxnOpResponse { fn try_from(op_resp: EtcdTxnOpResponse) -> Result { match op_resp { - EtcdTxnOpResponse::Put(res) => { - let prev_kv = res.prev_key().cloned().map(Into::into); - let put_res = PutResponse { prev_kv }; - Ok(TxnOpResponse::ResponsePut(put_res)) + EtcdTxnOpResponse::Put(mut res) => { + let prev_kv = res.take_prev_key().map(KeyValue::from); + Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv })) } - EtcdTxnOpResponse::Get(res) => { - let kvs = res.kvs().iter().cloned().map(Into::into).collect(); - let range_res = RangeResponse { kvs, more: false }; - Ok(TxnOpResponse::ResponseGet(range_res)) + EtcdTxnOpResponse::Get(mut res) => { + let kvs = res.take_kvs().into_iter().map(KeyValue::from).collect(); + Ok(TxnOpResponse::ResponseGet(RangeResponse { + kvs, + more: false, + })) } - EtcdTxnOpResponse::Delete(res) => { + EtcdTxnOpResponse::Delete(mut res) => { + let deleted = res.deleted(); let prev_kvs = res - .prev_kvs() - .iter() - .cloned() - .map(Into::into) + .take_prev_kvs() + .into_iter() + .map(KeyValue::from) .collect::>(); - let delete_res = DeleteRangeResponse { + Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse { + deleted, prev_kvs, - deleted: res.deleted(), - }; - Ok(TxnOpResponse::ResponseDelete(delete_res)) + })) } EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu { err_msg: "nested txn is not supported", From 919a56fc4dc3389c88055faa6eb1e493aa3a2e2a Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 14 May 2024 21:07:27 +0800 Subject: [PATCH 2/2] chore: avoid clone --- src/common/meta/src/kv_backend/memory.rs | 9 +++------ src/log-store/src/raft_engine/backend.rs | 7 ++----- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index b9c1dd00bbdb..256e31f93ed3 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -268,18 +268,15 @@ impl TxnService for MemoryKvBackend { let do_txn = |txn_op| match txn_op { TxnOp::Put(key, value) => { - kvs.insert(key.clone(), value); + kvs.insert(key, value); TxnOpResponse::ResponsePut(PutResponse { prev_kv: None }) } TxnOp::Get(key) => { - let value = kvs.get(&key); + let value = kvs.get(&key).cloned(); let kvs = value + .map(|value| KeyValue { key, value }) .into_iter() - .map(|value| KeyValue { - key: key.clone(), - value: value.clone(), - }) .collect(); TxnOpResponse::ResponseGet(RangeResponse { kvs, more: false }) } diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index fdc92cd49f19..e2cd65c8fa74 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -103,7 +103,7 @@ impl TxnService for RaftEngineBackend { let do_txn = |txn_op| match txn_op { TxnOp::Put(key, value) => { batch - .put(SYSTEM_NAMESPACE, key.clone(), value) + .put(SYSTEM_NAMESPACE, key, value) .context(RaftEngineSnafu) .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; @@ -113,11 +113,8 @@ impl TxnService for RaftEngineBackend { TxnOp::Get(key) => { let value = engine_get(&engine, &key)?.map(|kv| kv.value); let kvs = value + .map(|value| KeyValue { key, value }) .into_iter() - .map(|value| KeyValue { - key: key.clone(), - value, - }) .collect(); Ok(TxnOpResponse::ResponseGet(RangeResponse { kvs,