diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 8d1be2c3c554..a36333c9c483 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -27,7 +27,7 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameValue}; use common_meta::key::table_region::{TableRegionKey, TableRegionValue}; use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue}; -use common_meta::key::{RegionDistribution, TableMetaKey, TableMetaValue}; +use common_meta::key::{MetaKey, RegionDistribution, TableMetaValue}; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::KvBackendRef; use common_meta::range_stream::PaginationStream; @@ -137,7 +137,7 @@ impl MigrateTableMetadata { while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? { let table_id = self.migrate_table_route_key(value).await?; keys.push(key); - keys.push(TableRegionKey::new(table_id).as_raw_key()) + keys.push(TableRegionKey::new(table_id).to_bytes()) } info!("Total migrated TableRouteKeys: {}", keys.len() / 2); @@ -165,7 +165,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(new_key.as_raw_key()) + .with_key(new_key.to_bytes()) .with_value(new_table_value.try_as_raw_value().unwrap()), ) .await @@ -217,7 +217,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(new_key.as_raw_key()) + .with_key(new_key.to_bytes()) .with_value(schema_name_value.try_as_raw_value().unwrap()), ) .await @@ -269,7 +269,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(new_key.as_raw_key()) + .with_key(new_key.to_bytes()) .with_value(catalog_name_value.try_as_raw_value().unwrap()), ) .await @@ -346,11 +346,11 @@ impl MigrateTableMetadata { .batch_put( BatchPutRequest::new() .add_kv( - table_info_key.as_raw_key(), + table_info_key.to_bytes(), table_info_value.try_as_raw_value().unwrap(), ) .add_kv( - table_region_key.as_raw_key(), + table_region_key.to_bytes(), table_region_value.try_as_raw_value().unwrap(), ), ) @@ -378,7 +378,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(table_name_key.as_raw_key()) + .with_key(table_name_key.to_bytes()) .with_value(table_name_value.try_as_raw_value().unwrap()), ) .await @@ -425,7 +425,7 @@ impl MigrateTableMetadata { } else { let mut req = BatchPutRequest::new(); for (key, value) in datanode_table_kvs { - req = req.add_kv(key.as_raw_key(), value.try_as_raw_value().unwrap()); + req = req.add_kv(key.to_bytes(), value.try_as_raw_value().unwrap()); } self.etcd_store.batch_put(req).await.unwrap(); } diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 2e7afd37efc7..fb62e7a61a25 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -22,7 +22,7 @@ use crate::key::schema_name::SchemaNameKey; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteKey; -use crate::key::TableMetaKey; +use crate::key::MetaKey; /// KvBackend cache invalidator #[async_trait::async_trait] @@ -99,18 +99,18 @@ where match cache { CacheIdent::TableId(table_id) => { let key = TableInfoKey::new(table_id); - self.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.to_bytes()).await; let key = &TableRouteKey { table_id }; - self.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.to_bytes()).await; } CacheIdent::TableName(table_name) => { let key: TableNameKey = (&table_name).into(); - self.invalidate_key(&key.as_raw_key()).await + self.invalidate_key(&key.to_bytes()).await } CacheIdent::SchemaName(schema_name) => { let key: SchemaNameKey = (&schema_name).into(); - self.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.to_bytes()).await; } } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 7e09b3dc33d7..5155572f939e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -160,6 +160,16 @@ pub type FlowId = u32; /// The partition of flow. pub type FlowPartitionId = u32; +lazy_static! { + static ref TABLE_INFO_KEY_PATTERN: Regex = + Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + +lazy_static! { + static ref TABLE_ROUTE_KEY_PATTERN: Regex = + Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap(); +} + lazy_static! { static ref DATANODE_TABLE_KEY_PATTERN: Regex = Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap(); @@ -188,15 +198,11 @@ lazy_static! { .unwrap(); } -pub trait TableMetaKey { - fn as_raw_key(&self) -> Vec; -} - /// The key of metadata. -pub trait MetaKey { +pub trait MetaKey<'a, T> { fn to_bytes(&self) -> Vec; - fn from_bytes(bytes: &[u8]) -> Result; + fn from_bytes(bytes: &'a [u8]) -> Result; } #[derive(Debug, Clone, PartialEq)] @@ -208,12 +214,12 @@ impl From> for BytesAdapter { } } -impl MetaKey for BytesAdapter { +impl<'a> MetaKey<'a, BytesAdapter> for BytesAdapter { fn to_bytes(&self) -> Vec { self.0.clone() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(BytesAdapter(bytes.to_vec())) } } @@ -227,24 +233,6 @@ pub(crate) trait TableMetaKeyGetTxnOp { ); } -impl TableMetaKey for String { - fn as_raw_key(&self) -> Vec { - self.as_bytes().to_vec() - } -} - -impl TableMetaKeyGetTxnOp for String { - fn build_get_op( - &self, - ) -> ( - TxnOp, - impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option>, - ) { - let key = self.as_raw_key(); - (TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key)) - } -} - pub trait TableMetaValue { fn try_from_raw_value(raw_value: &[u8]) -> Result where @@ -674,11 +662,11 @@ impl TableMetadataManager { .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id)) .collect::>(); - keys.push(table_name.as_raw_key()); - keys.push(table_info_key.as_raw_key()); - keys.push(table_route_key.as_raw_key()); + keys.push(table_name.to_bytes()); + keys.push(table_info_key.to_bytes()); + keys.push(table_route_key.to_bytes()); for key in &datanode_table_keys { - keys.push(key.as_raw_key()); + keys.push(key.to_bytes()); } Ok(keys) } @@ -991,21 +979,6 @@ impl TableMetadataManager { } } -#[macro_export] -macro_rules! impl_table_meta_key { - ($($val_ty: ty), *) => { - $( - impl std::fmt::Display for $val_ty { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", String::from_utf8_lossy(&self.as_raw_key())) - } - } - )* - } -} - -impl_table_meta_key!(TableNameKey<'_>, TableInfoKey, DatanodeTableKey); - #[macro_export] macro_rules! impl_table_meta_value { ($($val_ty: ty), *) => { @@ -1023,7 +996,7 @@ macro_rules! impl_table_meta_value { } } -macro_rules! impl_table_meta_key_get_txn_op { +macro_rules! impl_meta_key_get_txn_op { ($($key: ty), *) => { $( impl $crate::key::TableMetaKeyGetTxnOp for $key { @@ -1037,7 +1010,7 @@ macro_rules! impl_table_meta_key_get_txn_op { &'a mut TxnOpGetResponseSet, ) -> Option>, ) { - let raw_key = self.as_raw_key(); + let raw_key = self.to_bytes(); ( TxnOp::Get(raw_key.clone()), TxnOpGetResponseSet::filter(raw_key), @@ -1048,7 +1021,7 @@ macro_rules! impl_table_meta_key_get_txn_op { } } -impl_table_meta_key_get_txn_op! { +impl_meta_key_get_txn_op! { TableNameKey<'_>, TableInfoKey, TableRouteKey, diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 63873177b1b7..7b5ee2568a53 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -21,12 +21,15 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; -use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; +use crate::key::{MetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; +/// The catalog name key, indices all catalog names +/// +/// The layout: `__catalog_name/{catalog_name}` #[derive(Debug, Clone, Copy, PartialEq)] pub struct CatalogNameKey<'a> { pub catalog: &'a str, @@ -53,15 +56,28 @@ impl<'a> CatalogNameKey<'a> { } } -impl Display for CatalogNameKey<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog) +impl<'a> MetaKey<'a, CatalogNameKey<'a>> for CatalogNameKey<'_> { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "CatalogNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + CatalogNameKey::try_from(key) } } -impl TableMetaKey for CatalogNameKey<'_> { - fn as_raw_key(&self) -> Vec { - self.to_string().into_bytes() +impl Display for CatalogNameKey<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog) } } @@ -103,7 +119,7 @@ impl CatalogManager { pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> { let _timer = crate::metrics::METRIC_META_CREATE_CATALOG.start_timer(); - let raw_key = catalog.as_raw_key(); + let raw_key = catalog.to_bytes(); let raw_value = CatalogNameValue.try_as_raw_value()?; if self .kv_backend @@ -117,7 +133,7 @@ impl CatalogManager { } pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result { - let raw_key = catalog.as_raw_key(); + let raw_key = catalog.to_bytes(); self.kv_backend.exists(&raw_key).await } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 96bebb74662e..5d927270bc94 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Display; use std::sync::Arc; use futures::stream::BoxStream; @@ -21,10 +22,10 @@ use snafu::OptionExt; use store_api::storage::RegionNumber; use table::metadata::TableId; +use super::MetaKey; use crate::error::{InvalidTableMetadataSnafu, Result}; use crate::key::{ - RegionDistribution, TableMetaKey, TableMetaValue, DATANODE_TABLE_KEY_PATTERN, - DATANODE_TABLE_KEY_PREFIX, + RegionDistribution, TableMetaValue, DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX, }; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -54,6 +55,9 @@ pub struct RegionInfo { pub region_wal_options: HashMap, } +/// The key mapping {datanode_id} to {table_id} +/// +/// The layout: `__dn_table/{datanode_id}/{table_id}`. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct DatanodeTableKey { pub datanode_id: DatanodeId, @@ -98,9 +102,40 @@ impl DatanodeTableKey { } } -impl TableMetaKey for DatanodeTableKey { - fn as_raw_key(&self) -> Vec { - format!("{}/{}", Self::prefix(self.datanode_id), self.table_id).into_bytes() +impl<'a> MetaKey<'a, DatanodeTableKey> for DatanodeTableKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "DatanodeTableKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + DATANODE_TABLE_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid DatanodeTableKey '{key}'"), + })?; + // Safety: pass the regex check above + let datanode_id = captures[2].parse::().unwrap(); + let table_id = captures[2].parse::().unwrap(); + Ok(DatanodeTableKey { + datanode_id, + table_id, + }) + } +} + +impl Display for DatanodeTableKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", Self::prefix(self.datanode_id), self.table_id) } } @@ -140,7 +175,7 @@ impl DatanodeTableManager { pub async fn get(&self, key: &DatanodeTableKey) -> Result> { self.kv_backend - .get(&key.as_raw_key()) + .get(&key.to_bytes()) .await? .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value)) .transpose() @@ -190,7 +225,7 @@ impl DatanodeTableManager { }, ); - Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?)) + Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?)) }) .collect::>>()?; @@ -215,7 +250,7 @@ impl DatanodeTableManager { for current_datanode in current_region_distribution.keys() { if !new_region_distribution.contains_key(current_datanode) { let key = DatanodeTableKey::new(*current_datanode, table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); opts.push(TxnOp::Delete(raw_key)) } } @@ -233,7 +268,7 @@ impl DatanodeTableManager { }; if need_update { let key = DatanodeTableKey::new(datanode, table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); // FIXME(weny): add unit tests. let mut new_region_info = region_info.clone(); if need_update_options { @@ -266,7 +301,7 @@ impl DatanodeTableManager { .into_keys() .map(|datanode_id| { let key = DatanodeTableKey::new(datanode_id, table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); Ok(TxnOp::Delete(raw_key)) }) @@ -288,7 +323,7 @@ mod tests { datanode_id: 1, table_id: 2, }; - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); assert_eq!(raw_key, b"__dn_table/1/2"); let value = DatanodeTableValue { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 8d3660a42e51..44fae286815f 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -58,7 +58,7 @@ impl FlowScoped { } } -impl> MetaKey> for FlowScoped { +impl<'a, T: MetaKey<'a, T>> MetaKey<'a, FlowScoped> for FlowScoped { fn to_bytes(&self) -> Vec { let prefix = FlowScoped::::PREFIX.as_bytes(); let inner = self.inner.to_bytes(); @@ -68,7 +68,7 @@ impl> MetaKey> for FlowScoped { bytes } - fn from_bytes(bytes: &[u8]) -> Result> { + fn from_bytes(bytes: &'a [u8]) -> Result> { let prefix = FlowScoped::::PREFIX.as_bytes(); ensure!( bytes.starts_with(prefix), @@ -224,12 +224,12 @@ mod tests { inner: Vec, } - impl MetaKey for MockKey { + impl<'a> MetaKey<'a, MockKey> for MockKey { fn to_bytes(&self) -> Vec { self.inner.clone() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(MockKey { inner: bytes.to_vec(), }) diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 2ae432d484b4..d84a7b0ec470 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -43,12 +43,12 @@ lazy_static! { /// The layout: `__flow/{catalog}/info/{flow_id}`. pub struct FlowInfoKey(FlowScoped); -impl MetaKey for FlowInfoKey { +impl<'a> MetaKey<'a, FlowInfoKey> for FlowInfoKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(FlowInfoKey(FlowScoped::::from_bytes( bytes, )?)) @@ -81,12 +81,12 @@ impl FlowInfoKeyInner { } } -impl MetaKey for FlowInfoKeyInner { +impl<'a> MetaKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner { fn to_bytes(&self) -> Vec { format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 63af877d406e..e6b73012a794 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -59,12 +59,12 @@ impl FlowNameKey { } } -impl MetaKey for FlowNameKey { +impl<'a> MetaKey<'a, FlowNameKey> for FlowNameKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(FlowNameKey(FlowScoped::::from_bytes( bytes, )?)) @@ -78,7 +78,7 @@ pub struct FlowNameKeyInner { pub flow_name: String, } -impl MetaKey for FlowNameKeyInner { +impl<'a> MetaKey<'a, FlowNameKeyInner> for FlowNameKeyInner { fn to_bytes(&self) -> Vec { format!( "{FLOW_NAME_KEY_PREFIX}/{}/{}", @@ -87,7 +87,7 @@ impl MetaKey for FlowNameKeyInner { .into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index d9ae8a215fb5..bcd25960d7fb 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -44,12 +44,12 @@ const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode"; /// The layout `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}` pub struct FlownodeFlowKey(FlowScoped); -impl MetaKey for FlownodeFlowKey { +impl<'a> MetaKey<'a, FlownodeFlowKey> for FlownodeFlowKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(FlownodeFlowKey( FlowScoped::::from_bytes(bytes)?, )) @@ -118,7 +118,7 @@ impl FlownodeFlowKeyInner { } } -impl MetaKey for FlownodeFlowKeyInner { +impl<'a> MetaKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner { fn to_bytes(&self) -> Vec { format!( "{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}", @@ -127,7 +127,7 @@ impl MetaKey for FlownodeFlowKeyInner { .into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index f05866447823..18a27a3b20e9 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -54,12 +54,12 @@ struct TableFlowKeyInner { #[derive(Debug, PartialEq)] pub struct TableFlowKey(FlowScoped); -impl MetaKey for TableFlowKey { +impl<'a> MetaKey<'a, TableFlowKey> for TableFlowKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(TableFlowKey(FlowScoped::::from_bytes( bytes, )?)) @@ -132,7 +132,7 @@ impl TableFlowKeyInner { } } -impl MetaKey for TableFlowKeyInner { +impl<'a> MetaKey<'a, TableFlowKeyInner> for TableFlowKeyInner { fn to_bytes(&self) -> Vec { format!( "{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}", @@ -141,7 +141,7 @@ impl MetaKey for TableFlowKeyInner { .into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index f56adbaec440..db33b969aa2f 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Result}; -use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; +use crate::key::{MetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; @@ -32,6 +32,9 @@ use crate::rpc::KeyValue; const OPT_KEY_TTL: &str = "ttl"; +/// The schema name key, indices all schema names belong to the {catalog_name} +/// +/// The layout: `__schema_name/{catalog_name}/{schema_name}`. #[derive(Debug, Clone, Copy, PartialEq)] pub struct SchemaNameKey<'a> { pub catalog: &'a str, @@ -95,13 +98,26 @@ impl Display for SchemaNameKey<'_> { } } -impl TableMetaKey for SchemaNameKey<'_> { - fn as_raw_key(&self) -> Vec { +impl<'a> MetaKey<'a, SchemaNameKey<'a>> for SchemaNameKey<'_> { + fn to_bytes(&self) -> Vec { self.to_string().into_bytes() } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "SchemaNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + SchemaNameKey::try_from(key) + } } -/// Decodes `KeyValue` to ({schema},()) +/// Decodes `KeyValue` to {schema} pub fn schema_decoder(kv: KeyValue) -> Result { let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?; let schema_name = SchemaNameKey::try_from(str)?; @@ -145,7 +161,7 @@ impl SchemaManager { ) -> Result<()> { let _timer = crate::metrics::METRIC_META_CREATE_SCHEMA.start_timer(); - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); let raw_value = value.unwrap_or_default().try_as_raw_value()?; if self .kv_backend @@ -159,13 +175,13 @@ impl SchemaManager { } pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result { - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); self.kv_backend.exists(&raw_key).await } pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result> { - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); let value = self.kv_backend.get(&raw_key).await?; value .and_then(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref()).transpose()) @@ -174,7 +190,7 @@ impl SchemaManager { /// Deletes a [SchemaNameKey]. pub async fn delete(&self, schema: SchemaNameKey<'_>) -> Result<()> { - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); self.kv_backend.delete(&raw_key, false).await?; Ok(()) diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 798dbf1beb68..28a7855699af 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -13,34 +13,67 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Display; use serde::{Deserialize, Serialize}; +use snafu::OptionExt; use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; -use super::txn_helper::TxnOpGetResponseSet; -use crate::error::Result; +use super::TABLE_INFO_KEY_PATTERN; +use crate::error::{InvalidTableMetadataSnafu, Result}; +use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - txn_helper, DeserializedValueWithBytes, TableMetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX, + txn_helper, DeserializedValueWithBytes, MetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX, }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::rpc::store::BatchGetRequest; use crate::table_name::TableName; +/// The key stores the metadata of the table. +/// +/// The layout: `__table_info/{table_id}`. pub struct TableInfoKey { table_id: TableId, } impl TableInfoKey { + /// Returns a new [TableInfoKey]. pub fn new(table_id: TableId) -> Self { Self { table_id } } } -impl TableMetaKey for TableInfoKey { - fn as_raw_key(&self) -> Vec { - format!("{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id).into_bytes() +impl Display for TableInfoKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id) + } +} + +impl<'a> MetaKey<'a, TableInfoKey> for TableInfoKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableInfoKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = TABLE_INFO_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableInfoKey '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[1].parse::().unwrap(); + Ok(TableInfoKey { table_id }) } } @@ -115,7 +148,7 @@ impl TableInfoManager { ) -> Result>>, )> { let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let txn = txn_helper::build_put_if_absent_txn( raw_key.clone(), @@ -142,7 +175,7 @@ impl TableInfoManager { ) -> Result>>, )> { let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let raw_value = current_table_info_value.get_raw_bytes(); let new_raw_value: Vec = new_table_info_value.try_as_raw_value()?; @@ -159,7 +192,7 @@ impl TableInfoManager { table_id: TableId, ) -> Result>> { let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); self.kv_backend .get(&raw_key) .await? @@ -173,7 +206,7 @@ impl TableInfoManager { ) -> Result> { let lookup_table = table_ids .iter() - .map(|id| (TableInfoKey::new(*id).as_raw_key(), id)) + .map(|id| (TableInfoKey::new(*id).to_bytes(), id)) .collect::>(); let resp = self @@ -205,7 +238,7 @@ impl TableInfoManager { ) -> Result>> { let lookup_table = table_ids .iter() - .map(|id| (TableInfoKey::new(*id).as_raw_key(), id)) + .map(|id| (TableInfoKey::new(*id).to_bytes(), id)) .collect::>(); let resp = self @@ -250,7 +283,7 @@ mod tests { #[test] fn test_key_serde() { let key = TableInfoKey::new(42); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); assert_eq!(raw_key, b"__table_info/42"); } diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 83e1cb7fb254..52947227b802 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Display; use std::sync::Arc; use futures_util::stream::BoxStream; @@ -20,9 +21,8 @@ use serde::{Deserialize, Serialize}; use snafu::OptionExt; use table::metadata::TableId; -use super::{TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX}; +use super::{MetaKey, TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX}; use crate::error::{Error, InvalidTableMetadataSnafu, Result}; -use crate::key::TableMetaKey; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -72,14 +72,45 @@ impl<'a> TableNameKey<'a> { } } -impl TableMetaKey for TableNameKey<'_> { - fn as_raw_key(&self) -> Vec { - format!( +impl Display for TableNameKey<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, "{}/{}", Self::prefix_to_table(self.catalog, self.schema), self.table ) - .into_bytes() + } +} + +impl<'a> MetaKey<'a, TableNameKey<'a>> for TableNameKey<'_> { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = TABLE_NAME_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableNameKey '{key}'"), + })?; + let catalog = captures.get(1).unwrap().as_str(); + let schema = captures.get(2).unwrap().as_str(); + let table = captures.get(3).unwrap().as_str(); + Ok(TableNameKey { + catalog, + schema, + table, + }) } } @@ -166,7 +197,7 @@ impl TableNameManager { key: &TableNameKey<'_>, table_id: TableId, ) -> Result { - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let value = TableNameValue::new(table_id); let raw_value = value.try_as_raw_value()?; @@ -182,8 +213,8 @@ impl TableNameManager { new_key: &TableNameKey<'_>, table_id: TableId, ) -> Result { - let raw_key = key.as_raw_key(); - let new_raw_key = new_key.as_raw_key(); + let raw_key = key.to_bytes(); + let new_raw_key = new_key.to_bytes(); let value = TableNameValue::new(table_id); let raw_value = value.try_as_raw_value()?; @@ -195,7 +226,7 @@ impl TableNameManager { } pub async fn get(&self, key: TableNameKey<'_>) -> Result> { - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); self.kv_backend .get(&raw_key) .await? @@ -209,7 +240,7 @@ impl TableNameManager { ) -> Result>> { let raw_keys = keys .into_iter() - .map(|key| key.as_raw_key()) + .map(|key| key.to_bytes()) .collect::>(); let req = BatchGetRequest::new().with_keys(raw_keys.clone()); let res = self.kv_backend.batch_get(req).await?; @@ -229,7 +260,7 @@ impl TableNameManager { } pub async fn exists(&self, key: TableNameKey<'_>) -> Result { - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); self.kv_backend.exists(&raw_key).await } @@ -293,7 +324,7 @@ mod tests { #[test] fn test_serde() { let key = TableNameKey::new("my_catalog", "my_schema", "my_table"); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); assert_eq!( b"__table_name/my_catalog/my_schema/my_table", raw_key.as_slice() diff --git a/src/common/meta/src/key/table_region.rs b/src/common/meta/src/key/table_region.rs index e51e1a547194..044161a6ccb5 100644 --- a/src/common/meta/src/key/table_region.rs +++ b/src/common/meta/src/key/table_region.rs @@ -13,16 +13,18 @@ // limitations under the License. use std::collections::BTreeMap; +use std::fmt::Display; +use lazy_static::lazy_static; +use regex::Regex; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::metadata::TableId; -use super::TABLE_REGION_KEY_PREFIX; -use crate::error::{Result, SerdeJsonSnafu}; -use crate::key::TableMetaKey; -use crate::{impl_table_meta_key, impl_table_meta_value, DatanodeId}; +use super::{MetaKey, TABLE_REGION_KEY_PREFIX}; +use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; +use crate::{impl_table_meta_value, DatanodeId}; pub type RegionDistribution = BTreeMap>; @@ -34,19 +36,49 @@ pub struct TableRegionKey { table_id: TableId, } +lazy_static! { + static ref TABLE_REGION_KEY_PATTERN: Regex = + Regex::new(&format!("^{TABLE_REGION_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + impl TableRegionKey { pub fn new(table_id: TableId) -> Self { Self { table_id } } } -impl TableMetaKey for TableRegionKey { - fn as_raw_key(&self) -> Vec { - format!("{}/{}", TABLE_REGION_KEY_PREFIX, self.table_id).into_bytes() +impl Display for TableRegionKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", TABLE_REGION_KEY_PREFIX, self.table_id) } } -impl_table_meta_key! {TableRegionKey} +impl<'a> MetaKey<'a, TableRegionKey> for TableRegionKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableRegionKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + TABLE_REGION_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableRegionKey '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[1].parse::().unwrap(); + Ok(TableRegionKey { table_id }) + } +} #[deprecated( since = "0.4.0", @@ -77,7 +109,7 @@ mod tests { #[test] fn test_serde() { let key = TableRegionKey::new(1); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); assert_eq!(raw_key, b"__table_region/1"); let value = TableRegionValue { diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index a6745f8db29b..effdd4bfe632 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -21,19 +21,22 @@ use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::error::{ - self, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, - UnexpectedLogicalRouteTableSnafu, + self, InvalidTableMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, + TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, }; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - txn_helper, DeserializedValueWithBytes, RegionDistribution, TableMetaKey, TableMetaValue, - TABLE_ROUTE_PREFIX, + txn_helper, DeserializedValueWithBytes, MetaKey, RegionDistribution, TableMetaValue, + TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX, }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute}; use crate::rpc::store::BatchGetRequest; +/// The key stores table routes +/// +/// The layout: `__table_route/{table_id}`. pub struct TableRouteKey { pub table_id: TableId, } @@ -239,10 +242,31 @@ impl LogicalTableRouteValue { } } -impl TableMetaKey for TableRouteKey { - fn as_raw_key(&self) -> Vec { +impl<'a> MetaKey<'a, TableRouteKey> for TableRouteKey { + fn to_bytes(&self) -> Vec { self.to_string().into_bytes() } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableRouteKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + TABLE_ROUTE_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableRouteKey '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[1].parse::().unwrap(); + Ok(TableRouteKey { table_id }) + } } impl Display for TableRouteKey { @@ -462,7 +486,7 @@ impl TableRouteStorage { ) -> Result>>, )> { let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let txn = txn_helper::build_put_if_absent_txn( raw_key.clone(), @@ -490,7 +514,7 @@ impl TableRouteStorage { ) -> Result>>, )> { let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let raw_value = current_table_route_value.get_raw_bytes(); let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; @@ -506,7 +530,7 @@ impl TableRouteStorage { pub async fn get(&self, table_id: TableId) -> Result> { let key = TableRouteKey::new(table_id); self.kv_backend - .get(&key.as_raw_key()) + .get(&key.to_bytes()) .await? .map(|kv| TableRouteValue::try_from_raw_value(&kv.value)) .transpose() @@ -519,7 +543,7 @@ impl TableRouteStorage { ) -> Result>> { let key = TableRouteKey::new(table_id); self.kv_backend - .get(&key.as_raw_key()) + .get(&key.to_bytes()) .await? .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value)) .transpose() @@ -560,7 +584,7 @@ impl TableRouteStorage { pub async fn batch_get(&self, table_ids: &[TableId]) -> Result>> { let keys = table_ids .iter() - .map(|id| TableRouteKey::new(*id).as_raw_key()) + .map(|id| TableRouteKey::new(*id).to_bytes()) .collect::>(); let resp = self .kv_backend diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index c5b5b4ecde81..9bbe8d903ae2 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -24,7 +24,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::key::table_info::TableInfoKey; -use common_meta::key::TableMetaKey; +use common_meta::key::MetaKey; use partition::manager::TableRouteCacheInvalidator; use table::metadata::TableId; use tokio::sync::mpsc; @@ -79,7 +79,7 @@ async fn handle_instruction( async fn test_invalidate_table_cache_handler() { let table_id = 1; let table_info_key = TableInfoKey::new(table_id); - let inner = HashMap::from([(table_info_key.as_raw_key(), 1)]); + let inner = HashMap::from([(table_info_key.to_bytes(), 1)]); let backend = Arc::new(MockKvCacheInvalidator { inner: Mutex::new(inner), }); @@ -103,7 +103,7 @@ async fn test_invalidate_table_cache_handler() { .inner .lock() .unwrap() - .contains_key(&table_info_key.as_raw_key())); + .contains_key(&table_info_key.to_bytes())); // removes a invalid key handle_instruction( @@ -118,7 +118,7 @@ async fn test_invalidate_table_cache_handler() { async fn test_invalidate_schema_key_handler() { let (catalog, schema) = ("foo", "bar"); let schema_key = SchemaNameKey { catalog, schema }; - let inner = HashMap::from([(schema_key.as_raw_key(), 1)]); + let inner = HashMap::from([(schema_key.to_bytes(), 1)]); let backend = Arc::new(MockKvCacheInvalidator { inner: Mutex::new(inner), }); @@ -146,7 +146,7 @@ async fn test_invalidate_schema_key_handler() { .inner .lock() .unwrap() - .contains_key(&schema_key.as_raw_key())); + .contains_key(&schema_key.to_bytes())); // removes a invalid key handle_instruction( diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 8ca53a6f463a..f2430a0d48f0 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -19,7 +19,7 @@ use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use client::OutputData; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::table_route::TableRouteKey; -use common_meta::key::{RegionDistribution, TableMetaKey}; +use common_meta::key::{MetaKey, RegionDistribution}; use common_meta::peer::Peer; use common_meta::{distributed_time_constants, RegionIdent}; use common_procedure::{watcher, ProcedureWithId}; @@ -176,7 +176,7 @@ async fn has_route_cache(instance: &Arc, table_id: TableId) -> bool { .cache(); cache - .get(TableRouteKey::new(table_id).as_raw_key().as_slice()) + .get(TableRouteKey::new(table_id).to_bytes().as_slice()) .await .is_some() }