Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove TableMetaKey trait #3837

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::{RegionDistribution, TableMetaKey, TableMetaValue};
use common_meta::key::{MetaKey, RegionDistribution, TableMetaValue};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
Expand Down Expand Up @@ -137,7 +137,7 @@ impl MigrateTableMetadata {
while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? {
let table_id = self.migrate_table_route_key(value).await?;
keys.push(key);
keys.push(TableRegionKey::new(table_id).as_raw_key())
keys.push(TableRegionKey::new(table_id).to_bytes())
}

info!("Total migrated TableRouteKeys: {}", keys.len() / 2);
Expand Down Expand Up @@ -165,7 +165,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_key(new_key.to_bytes())
.with_value(new_table_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -217,7 +217,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_key(new_key.to_bytes())
.with_value(schema_name_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -269,7 +269,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_key(new_key.to_bytes())
.with_value(catalog_name_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -346,11 +346,11 @@ impl MigrateTableMetadata {
.batch_put(
BatchPutRequest::new()
.add_kv(
table_info_key.as_raw_key(),
table_info_key.to_bytes(),
table_info_value.try_as_raw_value().unwrap(),
)
.add_kv(
table_region_key.as_raw_key(),
table_region_key.to_bytes(),
table_region_value.try_as_raw_value().unwrap(),
),
)
Expand Down Expand Up @@ -378,7 +378,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(table_name_key.as_raw_key())
.with_key(table_name_key.to_bytes())
.with_value(table_name_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -425,7 +425,7 @@ impl MigrateTableMetadata {
} else {
let mut req = BatchPutRequest::new();
for (key, value) in datanode_table_kvs {
req = req.add_kv(key.as_raw_key(), value.try_as_raw_value().unwrap());
req = req.add_kv(key.to_bytes(), value.try_as_raw_value().unwrap());
}
self.etcd_store.batch_put(req).await.unwrap();
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
use crate::key::TableMetaKey;
use crate::key::MetaKey;

/// KvBackend cache invalidator
#[async_trait::async_trait]
Expand Down Expand Up @@ -99,18 +99,18 @@ where
match cache {
CacheIdent::TableId(table_id) => {
let key = TableInfoKey::new(table_id);
self.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.to_bytes()).await;

let key = &TableRouteKey { table_id };
self.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::TableName(table_name) => {
let key: TableNameKey = (&table_name).into();
self.invalidate_key(&key.as_raw_key()).await
self.invalidate_key(&key.to_bytes()).await
}
CacheIdent::SchemaName(schema_name) => {
let key: SchemaNameKey = (&schema_name).into();
self.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.to_bytes()).await;
}
}
}
Expand Down
69 changes: 21 additions & 48 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;

lazy_static! {
static ref TABLE_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}

lazy_static! {
static ref TABLE_ROUTE_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
}

lazy_static! {
static ref DATANODE_TABLE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
Expand Down Expand Up @@ -189,15 +199,11 @@ lazy_static! {
.unwrap();
}

pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}

/// The key of metadata.
pub trait MetaKey<T> {
pub trait MetaKey<'a, T> {
fn to_bytes(&self) -> Vec<u8>;

fn from_bytes(bytes: &[u8]) -> Result<T>;
fn from_bytes(bytes: &'a [u8]) -> Result<T>;
}

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -209,12 +215,12 @@ impl From<Vec<u8>> for BytesAdapter {
}
}

impl MetaKey<BytesAdapter> for BytesAdapter {
impl<'a> MetaKey<'a, BytesAdapter> for BytesAdapter {
fn to_bytes(&self) -> Vec<u8> {
self.0.clone()
}

fn from_bytes(bytes: &[u8]) -> Result<BytesAdapter> {
fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
Ok(BytesAdapter(bytes.to_vec()))
}
}
Expand All @@ -228,24 +234,6 @@ pub(crate) trait TableMetaKeyGetTxnOp {
);
}

impl TableMetaKey for String {
fn as_raw_key(&self) -> Vec<u8> {
self.as_bytes().to_vec()
}
}

impl TableMetaKeyGetTxnOp for String {
fn build_get_op(
&self,
) -> (
TxnOp,
impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = self.as_raw_key();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
}

pub trait TableMetaValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
where
Expand Down Expand Up @@ -675,11 +663,11 @@ impl TableMetadataManager {
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<HashSet<_>>();

keys.push(table_name.as_raw_key());
keys.push(table_info_key.as_raw_key());
keys.push(table_route_key.as_raw_key());
keys.push(table_name.to_bytes());
keys.push(table_info_key.to_bytes());
keys.push(table_route_key.to_bytes());
for key in &datanode_table_keys {
keys.push(key.as_raw_key());
keys.push(key.to_bytes());
}
Ok(keys)
}
Expand Down Expand Up @@ -992,21 +980,6 @@ impl TableMetadataManager {
}
}

#[macro_export]
macro_rules! impl_table_meta_key {
($($val_ty: ty), *) => {
$(
impl std::fmt::Display for $val_ty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", String::from_utf8_lossy(&self.as_raw_key()))
}
}
)*
}
}

impl_table_meta_key!(TableNameKey<'_>, TableInfoKey, DatanodeTableKey);

#[macro_export]
macro_rules! impl_table_meta_value {
($($val_ty: ty), *) => {
Expand All @@ -1024,7 +997,7 @@ macro_rules! impl_table_meta_value {
}
}

macro_rules! impl_table_meta_key_get_txn_op {
macro_rules! impl_meta_key_get_txn_op {
($($key: ty), *) => {
$(
impl $crate::key::TableMetaKeyGetTxnOp for $key {
Expand All @@ -1038,7 +1011,7 @@ macro_rules! impl_table_meta_key_get_txn_op {
&'a mut TxnOpGetResponseSet,
) -> Option<Vec<u8>>,
) {
let raw_key = self.as_raw_key();
let raw_key = self.to_bytes();
(
TxnOp::Get(raw_key.clone()),
TxnOpGetResponseSet::filter(raw_key),
Expand All @@ -1049,7 +1022,7 @@ macro_rules! impl_table_meta_key_get_txn_op {
}
}

impl_table_meta_key_get_txn_op! {
impl_meta_key_get_txn_op! {
TableNameKey<'_>,
TableInfoKey,
TableRouteKey,
Expand Down
36 changes: 26 additions & 10 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
use crate::key::{MetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;

/// The catalog name key, indices all catalog names
///
/// The layout: `__catalog_name/{catalog_name}`
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct CatalogNameKey<'a> {
pub catalog: &'a str,
Expand All @@ -53,15 +56,28 @@ impl<'a> CatalogNameKey<'a> {
}
}

impl Display for CatalogNameKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog)
impl<'a> MetaKey<'a, CatalogNameKey<'a>> for CatalogNameKey<'_> {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}

fn from_bytes(bytes: &'a [u8]) -> Result<CatalogNameKey<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidTableMetadataSnafu {
err_msg: format!(
"CatalogNameKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
CatalogNameKey::try_from(key)
}
}

impl TableMetaKey for CatalogNameKey<'_> {
fn as_raw_key(&self) -> Vec<u8> {
self.to_string().into_bytes()
impl Display for CatalogNameKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog)
}
}

Expand Down Expand Up @@ -103,7 +119,7 @@ impl CatalogManager {
pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> {
let _timer = crate::metrics::METRIC_META_CREATE_CATALOG.start_timer();

let raw_key = catalog.as_raw_key();
let raw_key = catalog.to_bytes();
let raw_value = CatalogNameValue.try_as_raw_value()?;
if self
.kv_backend
Expand All @@ -117,7 +133,7 @@ impl CatalogManager {
}

pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
let raw_key = catalog.as_raw_key();
let raw_key = catalog.to_bytes();

self.kv_backend.exists(&raw_key).await
}
Expand Down Expand Up @@ -148,7 +164,7 @@ mod tests {

assert_eq!(key.to_string(), "__catalog_name/my-catalog");

let parsed: CatalogNameKey = "__catalog_name/my-catalog".try_into().unwrap();
let parsed = CatalogNameKey::from_bytes(b"__catalog_name/my-catalog").unwrap();

assert_eq!(key, parsed);
}
Expand Down
Loading
Loading