diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 8d1be2c3c554..a36333c9c483 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -27,7 +27,7 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameValue}; use common_meta::key::table_region::{TableRegionKey, TableRegionValue}; use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue}; -use common_meta::key::{RegionDistribution, TableMetaKey, TableMetaValue}; +use common_meta::key::{MetaKey, RegionDistribution, TableMetaValue}; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::KvBackendRef; use common_meta::range_stream::PaginationStream; @@ -137,7 +137,7 @@ impl MigrateTableMetadata { while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? { let table_id = self.migrate_table_route_key(value).await?; keys.push(key); - keys.push(TableRegionKey::new(table_id).as_raw_key()) + keys.push(TableRegionKey::new(table_id).to_bytes()) } info!("Total migrated TableRouteKeys: {}", keys.len() / 2); @@ -165,7 +165,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(new_key.as_raw_key()) + .with_key(new_key.to_bytes()) .with_value(new_table_value.try_as_raw_value().unwrap()), ) .await @@ -217,7 +217,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(new_key.as_raw_key()) + .with_key(new_key.to_bytes()) .with_value(schema_name_value.try_as_raw_value().unwrap()), ) .await @@ -269,7 +269,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(new_key.as_raw_key()) + .with_key(new_key.to_bytes()) .with_value(catalog_name_value.try_as_raw_value().unwrap()), ) .await @@ -346,11 +346,11 @@ impl MigrateTableMetadata { .batch_put( BatchPutRequest::new() .add_kv( - table_info_key.as_raw_key(), + table_info_key.to_bytes(), table_info_value.try_as_raw_value().unwrap(), ) .add_kv( - table_region_key.as_raw_key(), + table_region_key.to_bytes(), table_region_value.try_as_raw_value().unwrap(), ), ) @@ -378,7 +378,7 @@ impl MigrateTableMetadata { self.etcd_store .put( PutRequest::new() - .with_key(table_name_key.as_raw_key()) + .with_key(table_name_key.to_bytes()) .with_value(table_name_value.try_as_raw_value().unwrap()), ) .await @@ -425,7 +425,7 @@ impl MigrateTableMetadata { } else { let mut req = BatchPutRequest::new(); for (key, value) in datanode_table_kvs { - req = req.add_kv(key.as_raw_key(), value.try_as_raw_value().unwrap()); + req = req.add_kv(key.to_bytes(), value.try_as_raw_value().unwrap()); } self.etcd_store.batch_put(req).await.unwrap(); } diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 2e7afd37efc7..fb62e7a61a25 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -22,7 +22,7 @@ use crate::key::schema_name::SchemaNameKey; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteKey; -use crate::key::TableMetaKey; +use crate::key::MetaKey; /// KvBackend cache invalidator #[async_trait::async_trait] @@ -99,18 +99,18 @@ where match cache { CacheIdent::TableId(table_id) => { let key = TableInfoKey::new(table_id); - self.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.to_bytes()).await; let key = &TableRouteKey { table_id }; - self.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.to_bytes()).await; } CacheIdent::TableName(table_name) => { let key: TableNameKey = (&table_name).into(); - self.invalidate_key(&key.as_raw_key()).await + self.invalidate_key(&key.to_bytes()).await } CacheIdent::SchemaName(schema_name) => { let key: SchemaNameKey = (&schema_name).into(); - self.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.to_bytes()).await; } } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index c6b11ee57ef7..7fb1fedeff64 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -161,6 +161,16 @@ pub type FlowId = u32; /// The partition of flow. pub type FlowPartitionId = u32; +lazy_static! { + static ref TABLE_INFO_KEY_PATTERN: Regex = + Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + +lazy_static! { + static ref TABLE_ROUTE_KEY_PATTERN: Regex = + Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap(); +} + lazy_static! { static ref DATANODE_TABLE_KEY_PATTERN: Regex = Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap(); @@ -189,15 +199,11 @@ lazy_static! { .unwrap(); } -pub trait TableMetaKey { - fn as_raw_key(&self) -> Vec; -} - /// The key of metadata. -pub trait MetaKey { +pub trait MetaKey<'a, T> { fn to_bytes(&self) -> Vec; - fn from_bytes(bytes: &[u8]) -> Result; + fn from_bytes(bytes: &'a [u8]) -> Result; } #[derive(Debug, Clone, PartialEq)] @@ -209,12 +215,12 @@ impl From> for BytesAdapter { } } -impl MetaKey for BytesAdapter { +impl<'a> MetaKey<'a, BytesAdapter> for BytesAdapter { fn to_bytes(&self) -> Vec { self.0.clone() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(BytesAdapter(bytes.to_vec())) } } @@ -228,24 +234,6 @@ pub(crate) trait TableMetaKeyGetTxnOp { ); } -impl TableMetaKey for String { - fn as_raw_key(&self) -> Vec { - self.as_bytes().to_vec() - } -} - -impl TableMetaKeyGetTxnOp for String { - fn build_get_op( - &self, - ) -> ( - TxnOp, - impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option>, - ) { - let key = self.as_raw_key(); - (TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key)) - } -} - pub trait TableMetaValue { fn try_from_raw_value(raw_value: &[u8]) -> Result where @@ -675,11 +663,11 @@ impl TableMetadataManager { .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id)) .collect::>(); - keys.push(table_name.as_raw_key()); - keys.push(table_info_key.as_raw_key()); - keys.push(table_route_key.as_raw_key()); + keys.push(table_name.to_bytes()); + keys.push(table_info_key.to_bytes()); + keys.push(table_route_key.to_bytes()); for key in &datanode_table_keys { - keys.push(key.as_raw_key()); + keys.push(key.to_bytes()); } Ok(keys) } @@ -992,21 +980,6 @@ impl TableMetadataManager { } } -#[macro_export] -macro_rules! impl_table_meta_key { - ($($val_ty: ty), *) => { - $( - impl std::fmt::Display for $val_ty { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", String::from_utf8_lossy(&self.as_raw_key())) - } - } - )* - } -} - -impl_table_meta_key!(TableNameKey<'_>, TableInfoKey, DatanodeTableKey); - #[macro_export] macro_rules! impl_table_meta_value { ($($val_ty: ty), *) => { @@ -1024,7 +997,7 @@ macro_rules! impl_table_meta_value { } } -macro_rules! impl_table_meta_key_get_txn_op { +macro_rules! impl_meta_key_get_txn_op { ($($key: ty), *) => { $( impl $crate::key::TableMetaKeyGetTxnOp for $key { @@ -1038,7 +1011,7 @@ macro_rules! impl_table_meta_key_get_txn_op { &'a mut TxnOpGetResponseSet, ) -> Option>, ) { - let raw_key = self.as_raw_key(); + let raw_key = self.to_bytes(); ( TxnOp::Get(raw_key.clone()), TxnOpGetResponseSet::filter(raw_key), @@ -1049,7 +1022,7 @@ macro_rules! impl_table_meta_key_get_txn_op { } } -impl_table_meta_key_get_txn_op! { +impl_meta_key_get_txn_op! { TableNameKey<'_>, TableInfoKey, TableRouteKey, diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 648d84f259cc..fd67acf34800 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -21,12 +21,15 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; -use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; +use crate::key::{MetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; +/// The catalog name key, indices all catalog names +/// +/// The layout: `__catalog_name/{catalog_name}` #[derive(Debug, Clone, Copy, PartialEq)] pub struct CatalogNameKey<'a> { pub catalog: &'a str, @@ -53,15 +56,28 @@ impl<'a> CatalogNameKey<'a> { } } -impl Display for CatalogNameKey<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog) +impl<'a> MetaKey<'a, CatalogNameKey<'a>> for CatalogNameKey<'_> { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "CatalogNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + CatalogNameKey::try_from(key) } } -impl TableMetaKey for CatalogNameKey<'_> { - fn as_raw_key(&self) -> Vec { - self.to_string().into_bytes() +impl Display for CatalogNameKey<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog) } } @@ -103,7 +119,7 @@ impl CatalogManager { pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> { let _timer = crate::metrics::METRIC_META_CREATE_CATALOG.start_timer(); - let raw_key = catalog.as_raw_key(); + let raw_key = catalog.to_bytes(); let raw_value = CatalogNameValue.try_as_raw_value()?; if self .kv_backend @@ -117,7 +133,7 @@ impl CatalogManager { } pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result { - let raw_key = catalog.as_raw_key(); + let raw_key = catalog.to_bytes(); self.kv_backend.exists(&raw_key).await } @@ -148,7 +164,7 @@ mod tests { assert_eq!(key.to_string(), "__catalog_name/my-catalog"); - let parsed: CatalogNameKey = "__catalog_name/my-catalog".try_into().unwrap(); + let parsed = CatalogNameKey::from_bytes(b"__catalog_name/my-catalog").unwrap(); assert_eq!(key, parsed); } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 96bebb74662e..c20243bfd7d4 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Display; use std::sync::Arc; use futures::stream::BoxStream; @@ -21,10 +22,10 @@ use snafu::OptionExt; use store_api::storage::RegionNumber; use table::metadata::TableId; +use super::MetaKey; use crate::error::{InvalidTableMetadataSnafu, Result}; use crate::key::{ - RegionDistribution, TableMetaKey, TableMetaValue, DATANODE_TABLE_KEY_PATTERN, - DATANODE_TABLE_KEY_PREFIX, + RegionDistribution, TableMetaValue, DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX, }; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -54,6 +55,9 @@ pub struct RegionInfo { pub region_wal_options: HashMap, } +/// The key mapping {datanode_id} to {table_id} +/// +/// The layout: `__dn_table/{datanode_id}/{table_id}`. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct DatanodeTableKey { pub datanode_id: DatanodeId, @@ -75,32 +79,42 @@ impl DatanodeTableKey { pub fn range_start_key(datanode_id: DatanodeId) -> String { format!("{}/", Self::prefix(datanode_id)) } +} + +impl<'a> MetaKey<'a, DatanodeTableKey> for DatanodeTableKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } - pub fn strip_table_id(raw_key: &[u8]) -> Result { - let key = String::from_utf8(raw_key.to_vec()).map_err(|e| { + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { InvalidTableMetadataSnafu { err_msg: format!( "DatanodeTableKey '{}' is not a valid UTF8 string: {e}", - String::from_utf8_lossy(raw_key) + String::from_utf8_lossy(bytes) ), } .build() })?; let captures = DATANODE_TABLE_KEY_PATTERN - .captures(&key) + .captures(key) .context(InvalidTableMetadataSnafu { err_msg: format!("Invalid DatanodeTableKey '{key}'"), })?; // Safety: pass the regex check above + let datanode_id = captures[1].parse::().unwrap(); let table_id = captures[2].parse::().unwrap(); - Ok(table_id) + Ok(DatanodeTableKey { + datanode_id, + table_id, + }) } } -impl TableMetaKey for DatanodeTableKey { - fn as_raw_key(&self) -> Vec { - format!("{}/{}", Self::prefix(self.datanode_id), self.table_id).into_bytes() +impl Display for DatanodeTableKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", Self::prefix(self.datanode_id), self.table_id) } } @@ -140,7 +154,7 @@ impl DatanodeTableManager { pub async fn get(&self, key: &DatanodeTableKey) -> Result> { self.kv_backend - .get(&key.as_raw_key()) + .get(&key.to_bytes()) .await? .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value)) .transpose() @@ -190,7 +204,7 @@ impl DatanodeTableManager { }, ); - Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?)) + Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?)) }) .collect::>>()?; @@ -215,7 +229,7 @@ impl DatanodeTableManager { for current_datanode in current_region_distribution.keys() { if !new_region_distribution.contains_key(current_datanode) { let key = DatanodeTableKey::new(*current_datanode, table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); opts.push(TxnOp::Delete(raw_key)) } } @@ -233,7 +247,7 @@ impl DatanodeTableManager { }; if need_update { let key = DatanodeTableKey::new(datanode, table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); // FIXME(weny): add unit tests. let mut new_region_info = region_info.clone(); if need_update_options { @@ -266,7 +280,7 @@ impl DatanodeTableManager { .into_keys() .map(|datanode_id| { let key = DatanodeTableKey::new(datanode_id, table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); Ok(TxnOp::Delete(raw_key)) }) @@ -283,12 +297,12 @@ mod tests { use super::*; #[test] - fn test_serde() { + fn test_serialization() { let key = DatanodeTableKey { datanode_id: 1, table_id: 2, }; - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); assert_eq!(raw_key, b"__dn_table/1/2"); let value = DatanodeTableValue { @@ -402,9 +416,9 @@ mod tests { } #[test] - fn test_strip_table_id() { + fn test_deserialization() { fn test_err(raw_key: &[u8]) { - let result = DatanodeTableKey::strip_table_id(raw_key); + let result = DatanodeTableKey::from_bytes(raw_key); assert!(result.is_err()); } @@ -417,7 +431,7 @@ mod tests { test_err(b"__dn_table/invalid_node_id/2"); test_err(b"__dn_table/1/invalid_table_id"); - let table_id = DatanodeTableKey::strip_table_id(b"__dn_table/1/2").unwrap(); - assert_eq!(table_id, 2); + let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap(); + assert_eq!(DatanodeTableKey::new(11, 21), key); } } diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 8d3660a42e51..44fae286815f 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -58,7 +58,7 @@ impl FlowScoped { } } -impl> MetaKey> for FlowScoped { +impl<'a, T: MetaKey<'a, T>> MetaKey<'a, FlowScoped> for FlowScoped { fn to_bytes(&self) -> Vec { let prefix = FlowScoped::::PREFIX.as_bytes(); let inner = self.inner.to_bytes(); @@ -68,7 +68,7 @@ impl> MetaKey> for FlowScoped { bytes } - fn from_bytes(bytes: &[u8]) -> Result> { + fn from_bytes(bytes: &'a [u8]) -> Result> { let prefix = FlowScoped::::PREFIX.as_bytes(); ensure!( bytes.starts_with(prefix), @@ -224,12 +224,12 @@ mod tests { inner: Vec, } - impl MetaKey for MockKey { + impl<'a> MetaKey<'a, MockKey> for MockKey { fn to_bytes(&self) -> Vec { self.inner.clone() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(MockKey { inner: bytes.to_vec(), }) diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index b1f07845edb7..20d0e4598780 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -43,12 +43,12 @@ lazy_static! { /// The layout: `__flow/info/{flow_id}`. pub struct FlowInfoKey(FlowScoped); -impl MetaKey for FlowInfoKey { +impl<'a> MetaKey<'a, FlowInfoKey> for FlowInfoKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(FlowInfoKey(FlowScoped::::from_bytes( bytes, )?)) @@ -81,12 +81,12 @@ impl FlowInfoKeyInner { } } -impl MetaKey for FlowInfoKeyInner { +impl<'a> MetaKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner { fn to_bytes(&self) -> Vec { format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 031b19ce64af..24608498c4f1 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -39,32 +39,32 @@ lazy_static! { /// The key of mapping {flow_name} to [FlowId]. /// /// The layout: `__flow/name/{catalog_name}/{flow_name}`. -pub struct FlowNameKey(FlowScoped); +pub struct FlowNameKey<'a>(FlowScoped>); -impl FlowNameKey { +impl<'a> FlowNameKey<'a> { /// Returns the [FlowNameKey] - pub fn new(catalog: String, flow_name: String) -> FlowNameKey { + pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> { let inner = FlowNameKeyInner::new(catalog, flow_name); FlowNameKey(FlowScoped::new(inner)) } /// Returns the catalog. pub fn catalog(&self) -> &str { - &self.0.catalog_name + self.0.catalog_name } /// Return the `flow_name` pub fn flow_name(&self) -> &str { - &self.0.flow_name + self.0.flow_name } } -impl MetaKey for FlowNameKey { +impl<'a> MetaKey<'a, FlowNameKey<'a>> for FlowNameKey<'a> { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result> { Ok(FlowNameKey(FlowScoped::::from_bytes( bytes, )?)) @@ -73,12 +73,12 @@ impl MetaKey for FlowNameKey { /// The key of mapping name to [FlowId] #[derive(Debug, Clone, PartialEq, Eq)] -pub struct FlowNameKeyInner { - pub catalog_name: String, - pub flow_name: String, +pub struct FlowNameKeyInner<'a> { + pub catalog_name: &'a str, + pub flow_name: &'a str, } -impl MetaKey for FlowNameKeyInner { +impl<'a> MetaKey<'a, FlowNameKeyInner<'a>> for FlowNameKeyInner<'_> { fn to_bytes(&self) -> Vec { format!( "{FLOW_NAME_KEY_PREFIX}/{}/{}", @@ -87,7 +87,7 @@ impl MetaKey for FlowNameKeyInner { .into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( @@ -104,8 +104,8 @@ impl MetaKey for FlowNameKeyInner { err_msg: format!("Invalid FlowNameKeyInner '{key}'"), })?; // Safety: pass the regex check above - let catalog_name = captures[1].to_string(); - let flow_name = captures[2].to_string(); + let catalog_name = captures.get(1).unwrap().as_str(); + let flow_name = captures.get(2).unwrap().as_str(); Ok(FlowNameKeyInner { catalog_name, flow_name, @@ -113,9 +113,9 @@ impl MetaKey for FlowNameKeyInner { } } -impl FlowNameKeyInner { +impl<'a> FlowNameKeyInner<'a> { /// Returns a [FlowNameKeyInner]. - pub fn new(catalog_name: String, flow_name: String) -> Self { + pub fn new(catalog_name: &'a str, flow_name: &'a str) -> Self { Self { catalog_name, flow_name, @@ -154,7 +154,7 @@ impl FlowNameManager { /// Returns the [FlowNameValue] of specified `catalog.flow`. pub async fn get(&self, catalog: &str, flow: &str) -> Result> { - let key = FlowNameKey::new(catalog.to_string(), flow.to_string()); + let key = FlowNameKey::new(catalog, flow); let raw_key = key.to_bytes(); self.kv_backend .get(&raw_key) @@ -165,7 +165,7 @@ impl FlowNameManager { /// Returns true if the `flow` exists. pub async fn exists(&self, catalog: &str, flow: &str) -> Result { - let key = FlowNameKey::new(catalog.to_string(), flow.to_string()); + let key = FlowNameKey::new(catalog, flow); let raw_key = key.to_bytes(); self.kv_backend.exists(&raw_key).await } @@ -184,7 +184,7 @@ impl FlowNameManager { &mut TxnOpGetResponseSet, ) -> Result>>, )> { - let key = FlowNameKey::new(catalog_name.to_string(), flow_name.to_string()); + let key = FlowNameKey::new(catalog_name, flow_name); let raw_key = key.to_bytes(); let flow_flow_name_value = FlowNameValue::new(flow_id); let txn = txn_helper::build_put_if_absent_txn( @@ -205,7 +205,7 @@ mod tests { #[test] fn test_key_serialization() { - let key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string()); + let key = FlowNameKey::new("my_catalog", "my_task"); assert_eq!(b"__flow/name/my_catalog/my_task".to_vec(), key.to_bytes(),); } diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index d584c8b25776..cffafaa870c7 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -44,12 +44,12 @@ const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode"; /// The layout `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}` pub struct FlownodeFlowKey(FlowScoped); -impl MetaKey for FlownodeFlowKey { +impl<'a> MetaKey<'a, FlownodeFlowKey> for FlownodeFlowKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(FlownodeFlowKey( FlowScoped::::from_bytes(bytes)?, )) @@ -118,7 +118,7 @@ impl FlownodeFlowKeyInner { } } -impl MetaKey for FlownodeFlowKeyInner { +impl<'a> MetaKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner { fn to_bytes(&self) -> Vec { format!( "{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}", @@ -127,7 +127,7 @@ impl MetaKey for FlownodeFlowKeyInner { .into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 9eff61d10e81..9fdde5c95c8c 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -54,12 +54,12 @@ struct TableFlowKeyInner { #[derive(Debug, PartialEq)] pub struct TableFlowKey(FlowScoped); -impl MetaKey for TableFlowKey { +impl<'a> MetaKey<'a, TableFlowKey> for TableFlowKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { Ok(TableFlowKey(FlowScoped::::from_bytes( bytes, )?)) @@ -132,7 +132,7 @@ impl TableFlowKeyInner { } } -impl MetaKey for TableFlowKeyInner { +impl<'a> MetaKey<'a, TableFlowKeyInner> for TableFlowKeyInner { fn to_bytes(&self) -> Vec { format!( "{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}", @@ -141,7 +141,7 @@ impl MetaKey for TableFlowKeyInner { .into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &'a [u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index f56adbaec440..91c4c74bc104 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Result}; -use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; +use crate::key::{MetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; @@ -32,6 +32,9 @@ use crate::rpc::KeyValue; const OPT_KEY_TTL: &str = "ttl"; +/// The schema name key, indices all schema names belong to the {catalog_name} +/// +/// The layout: `__schema_name/{catalog_name}/{schema_name}`. #[derive(Debug, Clone, Copy, PartialEq)] pub struct SchemaNameKey<'a> { pub catalog: &'a str, @@ -95,13 +98,26 @@ impl Display for SchemaNameKey<'_> { } } -impl TableMetaKey for SchemaNameKey<'_> { - fn as_raw_key(&self) -> Vec { +impl<'a> MetaKey<'a, SchemaNameKey<'a>> for SchemaNameKey<'_> { + fn to_bytes(&self) -> Vec { self.to_string().into_bytes() } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "SchemaNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + SchemaNameKey::try_from(key) + } } -/// Decodes `KeyValue` to ({schema},()) +/// Decodes `KeyValue` to {schema} pub fn schema_decoder(kv: KeyValue) -> Result { let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?; let schema_name = SchemaNameKey::try_from(str)?; @@ -145,7 +161,7 @@ impl SchemaManager { ) -> Result<()> { let _timer = crate::metrics::METRIC_META_CREATE_SCHEMA.start_timer(); - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); let raw_value = value.unwrap_or_default().try_as_raw_value()?; if self .kv_backend @@ -159,13 +175,13 @@ impl SchemaManager { } pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result { - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); self.kv_backend.exists(&raw_key).await } pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result> { - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); let value = self.kv_backend.get(&raw_key).await?; value .and_then(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref()).transpose()) @@ -174,7 +190,7 @@ impl SchemaManager { /// Deletes a [SchemaNameKey]. pub async fn delete(&self, schema: SchemaNameKey<'_>) -> Result<()> { - let raw_key = schema.as_raw_key(); + let raw_key = schema.to_bytes(); self.kv_backend.delete(&raw_key, false).await?; Ok(()) @@ -222,7 +238,8 @@ mod tests { let key = SchemaNameKey::new("my-catalog", "my-schema"); assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema"); - let parsed: SchemaNameKey<'_> = "__schema_name/my-catalog/my-schema".try_into().unwrap(); + let parsed = SchemaNameKey::from_bytes(b"__schema_name/my-catalog/my-schema").unwrap(); + assert_eq!(key, parsed); let value = SchemaNameValue { diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 798dbf1beb68..7283281c14ab 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -13,34 +13,68 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Display; use serde::{Deserialize, Serialize}; +use snafu::OptionExt; use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; -use super::txn_helper::TxnOpGetResponseSet; -use crate::error::Result; +use super::TABLE_INFO_KEY_PATTERN; +use crate::error::{InvalidTableMetadataSnafu, Result}; +use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - txn_helper, DeserializedValueWithBytes, TableMetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX, + txn_helper, DeserializedValueWithBytes, MetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX, }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::rpc::store::BatchGetRequest; use crate::table_name::TableName; +/// The key stores the metadata of the table. +/// +/// The layout: `__table_info/{table_id}`. +#[derive(Debug, PartialEq)] pub struct TableInfoKey { table_id: TableId, } impl TableInfoKey { + /// Returns a new [TableInfoKey]. pub fn new(table_id: TableId) -> Self { Self { table_id } } } -impl TableMetaKey for TableInfoKey { - fn as_raw_key(&self) -> Vec { - format!("{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id).into_bytes() +impl Display for TableInfoKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", TABLE_INFO_KEY_PREFIX, self.table_id) + } +} + +impl<'a> MetaKey<'a, TableInfoKey> for TableInfoKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableInfoKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = TABLE_INFO_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableInfoKey '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[1].parse::().unwrap(); + Ok(TableInfoKey { table_id }) } } @@ -115,7 +149,7 @@ impl TableInfoManager { ) -> Result>>, )> { let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let txn = txn_helper::build_put_if_absent_txn( raw_key.clone(), @@ -142,7 +176,7 @@ impl TableInfoManager { ) -> Result>>, )> { let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let raw_value = current_table_info_value.get_raw_bytes(); let new_raw_value: Vec = new_table_info_value.try_as_raw_value()?; @@ -159,7 +193,7 @@ impl TableInfoManager { table_id: TableId, ) -> Result>> { let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); self.kv_backend .get(&raw_key) .await? @@ -173,7 +207,7 @@ impl TableInfoManager { ) -> Result> { let lookup_table = table_ids .iter() - .map(|id| (TableInfoKey::new(*id).as_raw_key(), id)) + .map(|id| (TableInfoKey::new(*id).to_bytes(), id)) .collect::>(); let resp = self @@ -205,7 +239,7 @@ impl TableInfoManager { ) -> Result>> { let lookup_table = table_ids .iter() - .map(|id| (TableInfoKey::new(*id).as_raw_key(), id)) + .map(|id| (TableInfoKey::new(*id).to_bytes(), id)) .collect::>(); let resp = self @@ -248,14 +282,21 @@ mod tests { } #[test] - fn test_key_serde() { + fn test_key_serialization() { let key = TableInfoKey::new(42); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); assert_eq!(raw_key, b"__table_info/42"); } #[test] - fn test_value_serde() { + fn test_key_deserialization() { + let expected = TableInfoKey::new(42); + let key = TableInfoKey::from_bytes(b"__table_info/42").unwrap(); + assert_eq!(key, expected); + } + + #[test] + fn test_value_serialization() { let value = TableInfoValue { table_info: new_table_info(42), version: 1, diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 83e1cb7fb254..2eb30381fcdf 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Display; use std::sync::Arc; use futures_util::stream::BoxStream; @@ -20,9 +21,8 @@ use serde::{Deserialize, Serialize}; use snafu::OptionExt; use table::metadata::TableId; -use super::{TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX}; +use super::{MetaKey, TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX}; use crate::error::{Error, InvalidTableMetadataSnafu, Result}; -use crate::key::TableMetaKey; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -50,45 +50,56 @@ impl<'a> TableNameKey<'a> { pub fn prefix_to_table(catalog: &str, schema: &str) -> String { format!("{}/{}/{}", TABLE_NAME_KEY_PREFIX, catalog, schema) } +} - fn strip_table_name(raw_key: &[u8]) -> Result { - let key = String::from_utf8(raw_key.to_vec()).map_err(|e| { +impl Display for TableNameKey<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}/{}", + Self::prefix_to_table(self.catalog, self.schema), + self.table + ) + } +} + +impl<'a> MetaKey<'a, TableNameKey<'a>> for TableNameKey<'_> { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { InvalidTableMetadataSnafu { err_msg: format!( "TableNameKey '{}' is not a valid UTF8 string: {e}", - String::from_utf8_lossy(raw_key) + String::from_utf8_lossy(bytes) ), } .build() })?; - let captures = - TABLE_NAME_KEY_PATTERN - .captures(&key) - .context(InvalidTableMetadataSnafu { - err_msg: format!("Invalid TableNameKey '{key}'"), - })?; - // Safety: pass the regex check above - Ok(captures[3].to_string()) - } -} - -impl TableMetaKey for TableNameKey<'_> { - fn as_raw_key(&self) -> Vec { - format!( - "{}/{}", - Self::prefix_to_table(self.catalog, self.schema), - self.table - ) - .into_bytes() + let captures = TABLE_NAME_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableNameKey '{key}'"), + })?; + let catalog = captures.get(1).unwrap().as_str(); + let schema = captures.get(2).unwrap().as_str(); + let table = captures.get(3).unwrap().as_str(); + Ok(TableNameKey { + catalog, + schema, + table, + }) } } /// Decodes `KeyValue` to ({table_name}, TableNameValue) pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> { - let table_name = TableNameKey::strip_table_name(kv.key())?; + let table_name_key = TableNameKey::from_bytes(&kv.key)?; let table_name_value = TableNameValue::try_from_raw_value(&kv.value)?; - Ok((table_name, table_name_value)) + Ok((table_name_key.table.to_string(), table_name_value)) } impl<'a> From<&'a TableName> for TableNameKey<'a> { @@ -166,7 +177,7 @@ impl TableNameManager { key: &TableNameKey<'_>, table_id: TableId, ) -> Result { - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let value = TableNameValue::new(table_id); let raw_value = value.try_as_raw_value()?; @@ -182,8 +193,8 @@ impl TableNameManager { new_key: &TableNameKey<'_>, table_id: TableId, ) -> Result { - let raw_key = key.as_raw_key(); - let new_raw_key = new_key.as_raw_key(); + let raw_key = key.to_bytes(); + let new_raw_key = new_key.to_bytes(); let value = TableNameValue::new(table_id); let raw_value = value.try_as_raw_value()?; @@ -195,7 +206,7 @@ impl TableNameManager { } pub async fn get(&self, key: TableNameKey<'_>) -> Result> { - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); self.kv_backend .get(&raw_key) .await? @@ -209,7 +220,7 @@ impl TableNameManager { ) -> Result>> { let raw_keys = keys .into_iter() - .map(|key| key.as_raw_key()) + .map(|key| key.to_bytes()) .collect::>(); let req = BatchGetRequest::new().with_keys(raw_keys.clone()); let res = self.kv_backend.batch_get(req).await?; @@ -229,7 +240,7 @@ impl TableNameManager { } pub async fn exists(&self, key: TableNameKey<'_>) -> Result { - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); self.kv_backend.exists(&raw_key).await } @@ -259,8 +270,8 @@ mod tests { #[test] fn test_strip_table_name() { - fn test_err(raw_key: &[u8]) { - assert!(TableNameKey::strip_table_name(raw_key).is_err()); + fn test_err(bytes: &[u8]) { + assert!(TableNameKey::from_bytes(bytes).is_err()); } test_err(b""); @@ -277,10 +288,11 @@ mod tests { fn test_ok(table_name: &str) { assert_eq!( table_name, - TableNameKey::strip_table_name( + TableNameKey::from_bytes( format!("__table_name/my_catalog/my_schema/{}", table_name).as_bytes() ) .unwrap() + .table ); } test_ok("my_table"); @@ -291,13 +303,18 @@ mod tests { } #[test] - fn test_serde() { + fn test_serialization() { let key = TableNameKey::new("my_catalog", "my_schema", "my_table"); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); assert_eq!( b"__table_name/my_catalog/my_schema/my_table", raw_key.as_slice() ); + let table_name_key = + TableNameKey::from_bytes(b"__table_name/my_catalog/my_schema/my_table").unwrap(); + assert_eq!(table_name_key.catalog, "my_catalog"); + assert_eq!(table_name_key.schema, "my_schema"); + assert_eq!(table_name_key.table, "my_table"); let value = TableNameValue::new(1); let literal = br#"{"table_id":1}"#; diff --git a/src/common/meta/src/key/table_region.rs b/src/common/meta/src/key/table_region.rs index e51e1a547194..4ccc99ba513d 100644 --- a/src/common/meta/src/key/table_region.rs +++ b/src/common/meta/src/key/table_region.rs @@ -13,16 +13,18 @@ // limitations under the License. use std::collections::BTreeMap; +use std::fmt::Display; +use lazy_static::lazy_static; +use regex::Regex; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::metadata::TableId; -use super::TABLE_REGION_KEY_PREFIX; -use crate::error::{Result, SerdeJsonSnafu}; -use crate::key::TableMetaKey; -use crate::{impl_table_meta_key, impl_table_meta_value, DatanodeId}; +use super::{MetaKey, TABLE_REGION_KEY_PREFIX}; +use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; +use crate::{impl_table_meta_value, DatanodeId}; pub type RegionDistribution = BTreeMap>; @@ -30,23 +32,54 @@ pub type RegionDistribution = BTreeMap>; since = "0.4.0", note = "Please use the TableRouteManager's get_region_distribution method instead" )] +#[derive(Debug, PartialEq)] pub struct TableRegionKey { table_id: TableId, } +lazy_static! { + static ref TABLE_REGION_KEY_PATTERN: Regex = + Regex::new(&format!("^{TABLE_REGION_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + impl TableRegionKey { pub fn new(table_id: TableId) -> Self { Self { table_id } } } -impl TableMetaKey for TableRegionKey { - fn as_raw_key(&self) -> Vec { - format!("{}/{}", TABLE_REGION_KEY_PREFIX, self.table_id).into_bytes() +impl Display for TableRegionKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", TABLE_REGION_KEY_PREFIX, self.table_id) } } -impl_table_meta_key! {TableRegionKey} +impl<'a> MetaKey<'a, TableRegionKey> for TableRegionKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableRegionKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + TABLE_REGION_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableRegionKey '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[1].parse::().unwrap(); + Ok(TableRegionKey { table_id }) + } +} #[deprecated( since = "0.4.0", @@ -75,10 +108,12 @@ mod tests { use crate::key::TableMetaValue; #[test] - fn test_serde() { - let key = TableRegionKey::new(1); - let raw_key = key.as_raw_key(); - assert_eq!(raw_key, b"__table_region/1"); + fn test_serialization() { + let key = TableRegionKey::new(24); + let raw_key = key.to_bytes(); + assert_eq!(raw_key, b"__table_region/24"); + let deserialized = TableRegionKey::from_bytes(b"__table_region/24").unwrap(); + assert_eq!(key, deserialized); let value = TableRegionValue { region_distribution: RegionDistribution::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6])]), diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index a6745f8db29b..eca8d702ef09 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -21,19 +21,23 @@ use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::error::{ - self, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, - UnexpectedLogicalRouteTableSnafu, + self, InvalidTableMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, + TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, }; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - txn_helper, DeserializedValueWithBytes, RegionDistribution, TableMetaKey, TableMetaValue, - TABLE_ROUTE_PREFIX, + txn_helper, DeserializedValueWithBytes, MetaKey, RegionDistribution, TableMetaValue, + TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX, }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute}; use crate::rpc::store::BatchGetRequest; +/// The key stores table routes +/// +/// The layout: `__table_route/{table_id}`. +#[derive(Debug, PartialEq)] pub struct TableRouteKey { pub table_id: TableId, } @@ -239,10 +243,31 @@ impl LogicalTableRouteValue { } } -impl TableMetaKey for TableRouteKey { - fn as_raw_key(&self) -> Vec { +impl<'a> MetaKey<'a, TableRouteKey> for TableRouteKey { + fn to_bytes(&self) -> Vec { self.to_string().into_bytes() } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidTableMetadataSnafu { + err_msg: format!( + "TableRouteKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + TABLE_ROUTE_KEY_PATTERN + .captures(key) + .context(InvalidTableMetadataSnafu { + err_msg: format!("Invalid TableRouteKey '{key}'"), + })?; + // Safety: pass the regex check above + let table_id = captures[1].parse::().unwrap(); + Ok(TableRouteKey { table_id }) + } } impl Display for TableRouteKey { @@ -462,7 +487,7 @@ impl TableRouteStorage { ) -> Result>>, )> { let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let txn = txn_helper::build_put_if_absent_txn( raw_key.clone(), @@ -490,7 +515,7 @@ impl TableRouteStorage { ) -> Result>>, )> { let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); + let raw_key = key.to_bytes(); let raw_value = current_table_route_value.get_raw_bytes(); let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; @@ -506,7 +531,7 @@ impl TableRouteStorage { pub async fn get(&self, table_id: TableId) -> Result> { let key = TableRouteKey::new(table_id); self.kv_backend - .get(&key.as_raw_key()) + .get(&key.to_bytes()) .await? .map(|kv| TableRouteValue::try_from_raw_value(&kv.value)) .transpose() @@ -519,7 +544,7 @@ impl TableRouteStorage { ) -> Result>> { let key = TableRouteKey::new(table_id); self.kv_backend - .get(&key.as_raw_key()) + .get(&key.to_bytes()) .await? .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value)) .transpose() @@ -560,7 +585,7 @@ impl TableRouteStorage { pub async fn batch_get(&self, table_ids: &[TableId]) -> Result>> { let keys = table_ids .iter() - .map(|id| TableRouteKey::new(*id).as_raw_key()) + .map(|id| TableRouteKey::new(*id).to_bytes()) .collect::>(); let resp = self .kv_backend @@ -604,6 +629,20 @@ mod tests { ); } + #[test] + fn test_key_serialization() { + let key = TableRouteKey::new(42); + let raw_key = key.to_bytes(); + assert_eq!(raw_key, b"__table_route/42"); + } + + #[test] + fn test_key_deserialization() { + let expected = TableRouteKey::new(42); + let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap(); + assert_eq!(key, expected); + } + #[tokio::test] async fn test_table_route_storage_get_raw_empty() { let kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index c5b5b4ecde81..9bbe8d903ae2 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -24,7 +24,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::key::table_info::TableInfoKey; -use common_meta::key::TableMetaKey; +use common_meta::key::MetaKey; use partition::manager::TableRouteCacheInvalidator; use table::metadata::TableId; use tokio::sync::mpsc; @@ -79,7 +79,7 @@ async fn handle_instruction( async fn test_invalidate_table_cache_handler() { let table_id = 1; let table_info_key = TableInfoKey::new(table_id); - let inner = HashMap::from([(table_info_key.as_raw_key(), 1)]); + let inner = HashMap::from([(table_info_key.to_bytes(), 1)]); let backend = Arc::new(MockKvCacheInvalidator { inner: Mutex::new(inner), }); @@ -103,7 +103,7 @@ async fn test_invalidate_table_cache_handler() { .inner .lock() .unwrap() - .contains_key(&table_info_key.as_raw_key())); + .contains_key(&table_info_key.to_bytes())); // removes a invalid key handle_instruction( @@ -118,7 +118,7 @@ async fn test_invalidate_table_cache_handler() { async fn test_invalidate_schema_key_handler() { let (catalog, schema) = ("foo", "bar"); let schema_key = SchemaNameKey { catalog, schema }; - let inner = HashMap::from([(schema_key.as_raw_key(), 1)]); + let inner = HashMap::from([(schema_key.to_bytes(), 1)]); let backend = Arc::new(MockKvCacheInvalidator { inner: Mutex::new(inner), }); @@ -146,7 +146,7 @@ async fn test_invalidate_schema_key_handler() { .inner .lock() .unwrap() - .contains_key(&schema_key.as_raw_key())); + .contains_key(&schema_key.to_bytes())); // removes a invalid key handle_instruction( diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 8ca53a6f463a..f2430a0d48f0 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -19,7 +19,7 @@ use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use client::OutputData; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::table_route::TableRouteKey; -use common_meta::key::{RegionDistribution, TableMetaKey}; +use common_meta::key::{MetaKey, RegionDistribution}; use common_meta::peer::Peer; use common_meta::{distributed_time_constants, RegionIdent}; use common_procedure::{watcher, ProcedureWithId}; @@ -176,7 +176,7 @@ async fn has_route_cache(instance: &Arc, table_id: TableId) -> bool { .cache(); cache - .get(TableRouteKey::new(table_id).as_raw_key().as_slice()) + .get(TableRouteKey::new(table_id).to_bytes().as_slice()) .await .is_some() }