From f715e7545358d33124931441feffa6b9865f4e63 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 25 Apr 2024 06:15:33 +0000 Subject: [PATCH 01/13] feat: implement `FlowTaskMetadataAllocator` --- src/common/meta/src/ddl/task_meta.rs | 77 ++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 src/common/meta/src/ddl/task_meta.rs diff --git a/src/common/meta/src/ddl/task_meta.rs b/src/common/meta/src/ddl/task_meta.rs new file mode 100644 index 000000000000..3e8a4fb36cfc --- /dev/null +++ b/src/common/meta/src/ddl/task_meta.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use tonic::async_trait; + +use crate::error::Result; +use crate::key::FlowTaskId; +use crate::peer::Peer; +use crate::sequence::SequenceRef; + +/// The reference of [FlowTaskMetadataAllocator]. +pub type FlowTaskMetadataAllocatorRef = Arc; + +/// [FlowTaskMetadataAllocator] provides the ability of: +/// - [FlowTaskId] Allocation. +/// - [FlownodeId] Selection. +#[derive(Clone)] +pub struct FlowTaskMetadataAllocator { + flow_task_id_sequence: SequenceRef, + partition_peer_allocator: PartitionPeerAllocatorRef, +} + +impl FlowTaskMetadataAllocator { + /// Returns the [FlowTaskMetadataAllocator] with [NoopPartitionPeerAllocator]. + pub fn with_noop_peer_allocator(flow_task_id_sequence: SequenceRef) -> Self { + Self { + flow_task_id_sequence, + partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator), + } + } + + /// Allocates a the [FlowTaskId]. + pub(crate) async fn allocate_flow_task_id(&self) -> Result { + let flow_task_id = self.flow_task_id_sequence.next().await? as FlowTaskId; + Ok(flow_task_id) + } + + /// Allocates the [FlowTaskId] and [Peer]s. + pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec)> { + let flow_task_id = self.allocate_flow_task_id().await?; + let peers = self.partition_peer_allocator.alloc(partitions).await?; + + Ok((flow_task_id, peers)) + } +} + +/// Allocates [Peer]s for partitions. +#[async_trait] +pub trait PartitionPeerAllocator: Send + Sync { + /// Allocates [Peer] nodes for storing partitions. + async fn alloc(&self, partitions: usize) -> Result>; +} + +/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions. +pub type PartitionPeerAllocatorRef = Arc; + +struct NoopPartitionPeerAllocator; + +#[async_trait] +impl PartitionPeerAllocator for NoopPartitionPeerAllocator { + async fn alloc(&self, partitions: usize) -> Result> { + Ok(vec![Peer::default(); partitions]) + } +} From eb8a5fec873ea8e7c7c6d33d2e64e7b9da4f29d6 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 25 Apr 2024 13:52:23 +0000 Subject: [PATCH 02/13] feat: add `FlowTaskMetadataManagerRef` and `FlowTaskMetadataAllocatorRef` --- src/cmd/src/standalone.rs | 29 +++++++++++++++++----- src/common/catalog/src/consts.rs | 3 +++ src/common/meta/src/ddl.rs | 17 +++++++++++-- src/common/meta/src/ddl_manager.rs | 36 ++++++++++++++++++++++------ src/common/meta/src/key/flow_task.rs | 3 +++ src/common/meta/src/test_util.rs | 30 ++++++++++++++++------- src/meta-srv/src/metasrv.rs | 1 + src/meta-srv/src/metasrv/builder.rs | 24 +++++++++++++++++-- src/meta-srv/src/procedure/utils.rs | 21 +++++++++++----- tests-integration/src/standalone.rs | 20 +++++++++++++--- 10 files changed, 149 insertions(+), 35 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 061c4f98e2bd..edde6ed14db9 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,12 +18,14 @@ use std::{fs, path}; use async_trait::async_trait; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; -use common_catalog::consts::MIN_USER_TABLE_ID; +use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; +use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::ddl_manager::DdlManager; +use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; @@ -45,6 +47,7 @@ use frontend::server::Services; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, }; +use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ}; use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; @@ -411,30 +414,40 @@ impl StartCommand { let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); let table_id_sequence = Arc::new( - SequenceBuilder::new("table_id", kv_backend.clone()) + SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) .step(10) .build(), ); + let flow_task_id_sequence = Arc::new( + SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_TASK_ID as u64) + .step(10) + .build(), + ); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( opts.wal_meta.clone(), kv_backend.clone(), )); - let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; - + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), )); + let flow_task_meta_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence), + ); let ddl_task_executor = Self::create_ddl_task_executor( - table_metadata_manager, procedure_manager.clone(), node_manager.clone(), multi_cache_invalidator, + table_metadata_manager, table_meta_allocator, + flow_task_metadata_manager, + flow_task_meta_allocator, ) .await?; @@ -462,11 +475,13 @@ impl StartCommand { } pub async fn create_ddl_task_executor( - table_metadata_manager: TableMetadataManagerRef, procedure_manager: ProcedureManagerRef, node_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, + table_metadata_manager: TableMetadataManagerRef, table_meta_allocator: TableMetadataAllocatorRef, + flow_metadata_manager: FlowTaskMetadataManagerRef, + flow_metadata_allocator: FlowTaskMetadataAllocatorRef, ) -> Result { let procedure_executor: ProcedureExecutorRef = Arc::new( DdlManager::try_new( @@ -475,6 +490,8 @@ impl StartCommand { cache_invalidator, table_metadata_manager, table_meta_allocator, + flow_metadata_manager, + flow_metadata_allocator, Arc::new(MemoryRegionKeeper::default()), true, ) diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 8834b6239f91..175435d89842 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -19,6 +19,9 @@ pub const DEFAULT_CATALOG_NAME: &str = "greptime"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private"; +/// Reserves [0,MIN_USER_FLOW_TASK_ID) for internal usage. +/// User defined table id starts from this value. +pub const MIN_USER_FLOW_TASK_ID: u32 = 1024; /// Reserves [0,MIN_USER_TABLE_ID) for internal usage. /// User defined table id starts from this value. pub const MIN_USER_TABLE_ID: u32 = 1024; diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 3feea55253ef..08efa57eae39 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; -use self::table_meta::TableMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; +use crate::ddl::table_meta::TableMetadataAllocatorRef; +use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef; use crate::error::Result; +use crate::key::flow_task::FlowTaskMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; @@ -38,6 +40,7 @@ pub mod drop_database; pub mod drop_table; mod physical_table_metadata; pub mod table_meta; +pub mod task_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] @@ -93,11 +96,21 @@ pub struct TableMetadata { pub region_wal_options: HashMap, } +/// The context of ddl. #[derive(Clone)] pub struct DdlContext { + /// Sends querying and requests to nodes. pub node_manager: NodeManagerRef, + /// Cache invalidation. pub cache_invalidator: CacheInvalidatorRef, - pub table_metadata_manager: TableMetadataManagerRef, + /// Keep tracking operating regions. pub memory_region_keeper: MemoryRegionKeeperRef, + /// Table metadata manager. + pub table_metadata_manager: TableMetadataManagerRef, + /// Allocator for table metadata. pub table_metadata_allocator: TableMetadataAllocatorRef, + /// Flow task metadata manager. + pub flow_task_metadata_manager: FlowTaskMetadataManagerRef, + /// Allocator for flow task metadata. + pub flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 8db6198bd609..32c8a99183c0 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -19,6 +19,7 @@ use common_procedure::{ }; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, info, tracing}; +use derive_builder::Builder; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::TableId; @@ -31,6 +32,7 @@ use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::table_meta::TableMetadataAllocatorRef; +use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; use crate::error::{ @@ -39,6 +41,7 @@ use crate::error::{ TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu, }; +use crate::key::flow_task::FlowTaskMetadataManagerRef; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; @@ -62,12 +65,15 @@ pub type DdlManagerRef = Arc; pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader; /// The [DdlManager] provides the ability to execute Ddl. +#[derive(Builder)] pub struct DdlManager { procedure_manager: ProcedureManagerRef, node_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, + flow_task_metadata_manager: FlowTaskMetadataManagerRef, + flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, } @@ -75,19 +81,23 @@ pub struct DdlManager { impl DdlManager { pub fn try_new( procedure_manager: ProcedureManagerRef, - datanode_clients: NodeManagerRef, + node_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, + flow_task_metadata_manager: FlowTaskMetadataManagerRef, + flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, register_loaders: bool, ) -> Result { let manager = Self { procedure_manager, - node_manager: datanode_clients, + node_manager, cache_invalidator, table_metadata_manager, table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, memory_region_keeper, }; if register_loaders { @@ -107,8 +117,10 @@ impl DdlManager { node_manager: self.node_manager.clone(), cache_invalidator: self.cache_invalidator.clone(), table_metadata_manager: self.table_metadata_manager.clone(), - memory_region_keeper: self.memory_region_keeper.clone(), table_metadata_allocator: self.table_metadata_allocator.clone(), + flow_task_metadata_manager: self.flow_task_metadata_manager.clone(), + flow_task_metadata_allocator: self.flow_task_metadata_allocator.clone(), + memory_region_keeper: self.memory_region_keeper.clone(), } } @@ -720,7 +732,9 @@ mod tests { use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::table_meta::TableMetadataAllocator; + use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; + use crate::key::flow_task::FlowTaskMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; @@ -748,6 +762,15 @@ mod tests { fn test_try_new() { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( + Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), + Arc::new(WalOptionsAllocator::default()), + )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_task_metadata_allocator = + Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator( + Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()), + )); let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store)); @@ -757,10 +780,9 @@ mod tests { Arc::new(DummyDatanodeManager), Arc::new(DummyCacheInvalidator), table_metadata_manager.clone(), - Arc::new(TableMetadataAllocator::new( - Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), - Arc::new(WalOptionsAllocator::default()), - )), + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, Arc::new(MemoryRegionKeeper::default()), true, ); diff --git a/src/common/meta/src/key/flow_task.rs b/src/common/meta/src/key/flow_task.rs index f5fc9b4793cd..251b15ab77ca 100644 --- a/src/common/meta/src/key/flow_task.rs +++ b/src/common/meta/src/key/flow_task.rs @@ -18,6 +18,7 @@ pub(crate) mod flownode_task; pub(crate) mod table_task; use std::ops::Deref; +use std::sync::Arc; use common_telemetry::info; use snafu::{ensure, OptionExt}; @@ -82,6 +83,8 @@ impl> MetaKey> for FlowTaskScoped { } } +pub type FlowTaskMetadataManagerRef = Arc; + /// The manager of metadata, provides ability to: /// - Create metadata of the task. /// - Retrieve metadata of the task. diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 3d282e8caff3..fe5bf0c439be 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -21,8 +21,10 @@ use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; use crate::ddl::table_meta::TableMetadataAllocator; +use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::DdlContext; use crate::error::Result; +use crate::key::flow_task::FlowTaskMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; @@ -99,19 +101,29 @@ pub fn new_ddl_context_with_kv_backend( kv_backend: KvBackendRef, ) -> DdlContext { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( + Arc::new( + SequenceBuilder::new("test", kv_backend.clone()) + .initial(1024) + .build(), + ), + Arc::new(WalOptionsAllocator::default()), + )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_task_metadata_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new( + SequenceBuilder::new("flow-test", kv_backend) + .initial(1024) + .build(), + )), + ); DdlContext { node_manager, cache_invalidator: Arc::new(DummyCacheInvalidator), memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), - table_metadata_allocator: Arc::new(TableMetadataAllocator::new( - Arc::new( - SequenceBuilder::new("test", kv_backend) - .initial(1024) - .build(), - ), - Arc::new(WalOptionsAllocator::default()), - )), + table_metadata_allocator, table_metadata_manager, + flow_task_metadata_allocator, + flow_task_metadata_manager, } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index dd0fbb1fde1a..308d08a20ae0 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -59,6 +59,7 @@ use crate::service::store::cached_kv::LeaderCachedKvBackend; use crate::state::{become_follower, become_leader, StateRef}; pub const TABLE_ID_SEQ: &str = "table_id"; +pub const FLOW_TASK_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 223ccf11d147..1ab2e97db420 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,11 +18,13 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; -use common_catalog::consts::MIN_USER_TABLE_ID; +use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; +use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef}; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; +use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; @@ -35,6 +37,7 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; +use super::FLOW_TASK_ID_SEQ; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, Result}; @@ -201,6 +204,9 @@ impl MetasrvBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new( leader_cached_kv_backend.clone() as _, )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new( + leader_cached_kv_backend.clone() as _, + )); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), @@ -231,7 +237,15 @@ impl MetasrvBuilder { peer_allocator, )) }); - + // TODO(weny): use the real allocator. + let flow_task_metadata_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new( + SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_TASK_ID as u64) + .step(10) + .build(), + )), + ); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); let ddl_manager = build_ddl_manager( @@ -241,6 +255,8 @@ impl MetasrvBuilder { &mailbox, &table_metadata_manager, &table_metadata_allocator, + &flow_task_metadata_manager, + &flow_task_metadata_allocator, &opening_region_keeper, )?; @@ -397,6 +413,8 @@ fn build_ddl_manager( mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, table_metadata_allocator: &TableMetadataAllocatorRef, + flow_task_metadata_manager: &FlowTaskMetadataManagerRef, + flow_task_metadata_allocator: &FlowTaskMetadataAllocatorRef, memory_region_keeper: &MemoryRegionKeeperRef, ) -> Result { let datanode_clients = datanode_clients.unwrap_or_else(|| { @@ -424,6 +442,8 @@ fn build_ddl_manager( cache_invalidator, table_metadata_manager.clone(), table_metadata_allocator.clone(), + flow_task_metadata_manager.clone(), + flow_task_metadata_allocator.clone(), memory_region_keeper.clone(), true, ) diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index f614f33b00d8..55a9119db3ed 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -106,7 +106,9 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; use common_meta::ddl::table_meta::TableMetadataAllocator; + use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl::DdlContext; + use common_meta::key::flow_task::FlowTaskMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::node_manager::NodeManagerRef; @@ -194,8 +196,16 @@ pub mod test_data { let mailbox_sequence = SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( + Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), + Arc::new(WalOptionsAllocator::default()), + )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_task_metadata_allocator = + Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator( + Arc::new(SequenceBuilder::new("test", kv_backend).build()), + )); DdlContext { node_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( @@ -204,12 +214,11 @@ pub mod test_data { server_addr: "127.0.0.1:4321".to_string(), }, )), - table_metadata_manager: table_metadata_manager.clone(), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), - table_metadata_allocator: Arc::new(TableMetadataAllocator::new( - Arc::new(SequenceBuilder::new("test", kv_backend).build()), - Arc::new(WalOptionsAllocator::default()), - )), } } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 79fbef604e24..0f962aeb0723 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -17,11 +17,13 @@ use std::sync::Arc; use catalog::kvbackend::KvBackendCatalogManager; use cmd::options::MixOptions; use common_base::Plugins; -use common_catalog::consts::MIN_USER_TABLE_ID; +use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_config::KvBackendConfig; use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::ddl::table_meta::TableMetadataAllocator; +use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl_manager::DdlManager; +use common_meta::key::flow_task::FlowTaskMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; @@ -35,6 +37,7 @@ use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; +use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ}; use servers::Mode; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -125,7 +128,7 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); - + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); let catalog_manager = KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; @@ -133,11 +136,17 @@ impl GreptimeDbStandaloneBuilder { let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); let table_id_sequence = Arc::new( - SequenceBuilder::new("table_id", kv_backend.clone()) + SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) .step(10) .build(), ); + let flow_task_id_sequence = Arc::new( + SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_TASK_ID as u64) + .step(10) + .build(), + ); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( mix_options.wal_meta.clone(), kv_backend.clone(), @@ -146,6 +155,9 @@ impl GreptimeDbStandaloneBuilder { table_id_sequence, wal_options_allocator.clone(), )); + let flow_task_meta_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence), + ); let ddl_task_executor = Arc::new( DdlManager::try_new( @@ -154,6 +166,8 @@ impl GreptimeDbStandaloneBuilder { multi_cache_invalidator, table_metadata_manager, table_meta_allocator, + flow_task_metadata_manager, + flow_task_meta_allocator, Arc::new(MemoryRegionKeeper::default()), register_procedure_loaders, ) From 5b7095f04f81092123574caf017e87638ca38520 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 26 Apr 2024 03:12:18 +0000 Subject: [PATCH 03/13] chore: fix clippy --- src/cmd/src/standalone.rs | 24 +++--- src/common/meta/src/ddl_manager.rs | 70 ++++++------------ src/meta-srv/src/metasrv/builder.rs | 109 +++++++++++----------------- tests-integration/src/standalone.rs | 21 +++--- 4 files changed, 89 insertions(+), 135 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index edde6ed14db9..f81c415d16d6 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -23,7 +23,7 @@ use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef}; -use common_meta::ddl::ProcedureExecutorRef; +use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -479,20 +479,22 @@ impl StartCommand { node_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_meta_allocator: TableMetadataAllocatorRef, - flow_metadata_manager: FlowTaskMetadataManagerRef, - flow_metadata_allocator: FlowTaskMetadataAllocatorRef, + table_metadata_allocator: TableMetadataAllocatorRef, + flow_task_metadata_manager: FlowTaskMetadataManagerRef, + flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, ) -> Result { let procedure_executor: ProcedureExecutorRef = Arc::new( DdlManager::try_new( + DdlContext { + node_manager, + cache_invalidator, + memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, + }, procedure_manager, - node_manager, - cache_invalidator, - table_metadata_manager, - table_meta_allocator, - flow_metadata_manager, - flow_metadata_allocator, - Arc::new(MemoryRegionKeeper::default()), true, ) .context(InitDdlManagerSnafu)?, diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 32c8a99183c0..5115e6e362dd 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -23,7 +23,6 @@ use derive_builder::Builder; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::TableId; -use crate::cache_invalidator::CacheInvalidatorRef; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_database::CreateDatabaseProcedure; @@ -31,8 +30,6 @@ use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_table::DropTableProcedure; -use crate::ddl::table_meta::TableMetadataAllocatorRef; -use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; use crate::error::{ @@ -41,12 +38,9 @@ use crate::error::{ TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu, }; -use crate::key::flow_task::FlowTaskMetadataManagerRef; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; -use crate::node_manager::NodeManagerRef; -use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::DdlTask::{ AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase, DropLogicalTables, DropTable, TruncateTable, @@ -67,38 +61,20 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade /// The [DdlManager] provides the ability to execute Ddl. #[derive(Builder)] pub struct DdlManager { + ddl_context: DdlContext, procedure_manager: ProcedureManagerRef, - node_manager: NodeManagerRef, - cache_invalidator: CacheInvalidatorRef, - table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, - flow_task_metadata_manager: FlowTaskMetadataManagerRef, - flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, - memory_region_keeper: MemoryRegionKeeperRef, } -/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. impl DdlManager { + /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. pub fn try_new( + ddl_context: DdlContext, procedure_manager: ProcedureManagerRef, - node_manager: NodeManagerRef, - cache_invalidator: CacheInvalidatorRef, - table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, - flow_task_metadata_manager: FlowTaskMetadataManagerRef, - flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, - memory_region_keeper: MemoryRegionKeeperRef, register_loaders: bool, ) -> Result { let manager = Self { + ddl_context, procedure_manager, - node_manager, - cache_invalidator, - table_metadata_manager, - table_metadata_allocator, - flow_task_metadata_manager, - flow_task_metadata_allocator, - memory_region_keeper, }; if register_loaders { manager.register_loaders()?; @@ -108,23 +84,16 @@ impl DdlManager { /// Returns the [TableMetadataManagerRef]. pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { - &self.table_metadata_manager + &self.ddl_context.table_metadata_manager } /// Returns the [DdlContext] pub fn create_context(&self) -> DdlContext { - DdlContext { - node_manager: self.node_manager.clone(), - cache_invalidator: self.cache_invalidator.clone(), - table_metadata_manager: self.table_metadata_manager.clone(), - table_metadata_allocator: self.table_metadata_allocator.clone(), - flow_task_metadata_manager: self.flow_task_metadata_manager.clone(), - flow_task_metadata_allocator: self.flow_task_metadata_allocator.clone(), - memory_region_keeper: self.memory_region_keeper.clone(), - } + self.ddl_context.clone() } - fn register_loaders(&self) -> Result<()> { + /// Registers all Ddl loaders. + pub fn register_loaders(&self) -> Result<()> { let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = vec![ ( CreateTableProcedure::TYPE_NAME, @@ -431,7 +400,7 @@ async fn handle_alter_table_task( let table_ref = alter_table_task.table_ref(); let table_id = ddl_manager - .table_metadata_manager + .table_metadata_manager() .table_name_manager() .get(TableNameKey::new( table_ref.catalog, @@ -529,7 +498,7 @@ async fn handle_create_logical_table_tasks( } ); let physical_table_id = utils::check_and_get_physical_table_id( - &ddl_manager.table_metadata_manager, + ddl_manager.table_metadata_manager(), &create_table_tasks, ) .await?; @@ -622,7 +591,7 @@ async fn handle_alter_logical_table_tasks( table: &alter_table_tasks[0].alter_table.table_name, }; let physical_table_id = - utils::get_physical_table_id(&ddl_manager.table_metadata_manager, first_table).await?; + utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?; let num_logical_tables = alter_table_tasks.len(); let (id, _) = ddl_manager @@ -734,6 +703,7 @@ mod tests { use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; + use crate::ddl::DdlContext; use crate::key::flow_task::FlowTaskMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; @@ -776,14 +746,16 @@ mod tests { let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store)); let _ = DdlManager::try_new( + DdlContext { + node_manager: Arc::new(DummyDatanodeManager), + cache_invalidator: Arc::new(DummyCacheInvalidator), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, + memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + }, procedure_manager.clone(), - Arc::new(DummyDatanodeManager), - Arc::new(DummyCacheInvalidator), - table_metadata_manager.clone(), - table_metadata_allocator, - flow_task_metadata_manager, - flow_task_metadata_allocator, - Arc::new(MemoryRegionKeeper::default()), true, ); diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 1ab2e97db420..ab17088745c5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -21,15 +21,16 @@ use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef}; -use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; +use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; +use common_meta::ddl::DdlContext; +use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; -use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef}; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::key::flow_task::FlowTaskMetadataManager; +use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::node_manager::NodeManagerRef; -use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; +use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_meta::wal_options_allocator::WalOptionsAllocator; @@ -246,25 +247,46 @@ impl MetasrvBuilder { .build(), )), ); - let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); - - let ddl_manager = build_ddl_manager( - &options, - node_manager, - &procedure_manager, - &mailbox, - &table_metadata_manager, - &table_metadata_allocator, - &flow_task_metadata_manager, - &flow_task_metadata_allocator, - &opening_region_keeper, - )?; + let memory_region_keeper = Arc::new(MemoryRegionKeeper::default()); + let node_manager = node_manager.unwrap_or_else(|| { + let datanode_client_channel_config = ChannelConfig::new() + .timeout(Duration::from_millis( + options.datanode.client_options.timeout_millis, + )) + .connect_timeout(Duration::from_millis( + options.datanode.client_options.connect_timeout_millis, + )) + .tcp_nodelay(options.datanode.client_options.tcp_nodelay); + Arc::new(DatanodeClients::new(datanode_client_channel_config)) + }); + let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( + mailbox.clone(), + MetasrvInfo { + server_addr: options.server_addr.clone(), + }, + )); + let ddl_manager = Arc::new( + DdlManager::try_new( + DdlContext { + node_manager, + cache_invalidator, + memory_region_keeper: memory_region_keeper.clone(), + table_metadata_manager: table_metadata_manager.clone(), + table_metadata_allocator: table_metadata_allocator.clone(), + flow_task_metadata_manager: flow_task_metadata_manager.clone(), + flow_task_metadata_allocator: flow_task_metadata_allocator.clone(), + }, + procedure_manager.clone(), + true, + ) + .context(error::InitDdlManagerSnafu)?, + ); let region_migration_manager = Arc::new(RegionMigrationManager::new( procedure_manager.clone(), DefaultContextFactory::new( table_metadata_manager.clone(), - opening_region_keeper.clone(), + memory_region_keeper.clone(), mailbox.clone(), options.server_addr.clone(), ), @@ -305,7 +327,7 @@ impl MetasrvBuilder { let region_lease_handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), - opening_region_keeper.clone(), + memory_region_keeper.clone(), ); let group = HeartbeatHandlerGroup::new(pushers); @@ -360,7 +382,7 @@ impl MetasrvBuilder { ) .await, plugins: plugins.unwrap_or_else(Plugins::default), - memory_region_keeper: opening_region_keeper, + memory_region_keeper, region_migration_manager, }) } @@ -406,51 +428,6 @@ fn build_procedure_manager( Arc::new(LocalManager::new(manager_config, Arc::new(state_store))) } -fn build_ddl_manager( - options: &MetasrvOptions, - datanode_clients: Option, - procedure_manager: &ProcedureManagerRef, - mailbox: &MailboxRef, - table_metadata_manager: &TableMetadataManagerRef, - table_metadata_allocator: &TableMetadataAllocatorRef, - flow_task_metadata_manager: &FlowTaskMetadataManagerRef, - flow_task_metadata_allocator: &FlowTaskMetadataAllocatorRef, - memory_region_keeper: &MemoryRegionKeeperRef, -) -> Result { - let datanode_clients = datanode_clients.unwrap_or_else(|| { - let datanode_client_channel_config = ChannelConfig::new() - .timeout(Duration::from_millis( - options.datanode.client_options.timeout_millis, - )) - .connect_timeout(Duration::from_millis( - options.datanode.client_options.connect_timeout_millis, - )) - .tcp_nodelay(options.datanode.client_options.tcp_nodelay); - Arc::new(DatanodeClients::new(datanode_client_channel_config)) - }); - let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( - mailbox.clone(), - MetasrvInfo { - server_addr: options.server_addr.clone(), - }, - )); - - Ok(Arc::new( - DdlManager::try_new( - procedure_manager.clone(), - datanode_clients, - cache_invalidator, - table_metadata_manager.clone(), - table_metadata_allocator.clone(), - flow_task_metadata_manager.clone(), - flow_task_metadata_allocator.clone(), - memory_region_keeper.clone(), - true, - ) - .context(error::InitDdlManagerSnafu)?, - )) -} - impl Default for MetasrvBuilder { fn default() -> Self { Self::new() diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 0f962aeb0723..0931e87d05d4 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -22,6 +22,7 @@ use common_config::KvBackendConfig; use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; +use common_meta::ddl::DdlContext; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow_task::FlowTaskMetadataManager; use common_meta::key::TableMetadataManager; @@ -151,24 +152,26 @@ impl GreptimeDbStandaloneBuilder { mix_options.wal_meta.clone(), kv_backend.clone(), )); - let table_meta_allocator = Arc::new(TableMetadataAllocator::new( + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), )); - let flow_task_meta_allocator = Arc::new( + let flow_task_metadata_allocator = Arc::new( FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence), ); let ddl_task_executor = Arc::new( DdlManager::try_new( + DdlContext { + node_manager: node_manager.clone(), + cache_invalidator: multi_cache_invalidator, + memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, + }, procedure_manager.clone(), - node_manager.clone(), - multi_cache_invalidator, - table_metadata_manager, - table_meta_allocator, - flow_task_metadata_manager, - flow_task_meta_allocator, - Arc::new(MemoryRegionKeeper::default()), register_procedure_loaders, ) .unwrap(), From 73a63321f5f75be787baac713e85b2530f9801d7 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 26 Apr 2024 08:12:40 +0000 Subject: [PATCH 04/13] feat: add `FlowTaskNameLock` --- src/common/meta/src/lock_key.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs index ad09c064d31d..456d1ccffad7 100644 --- a/src/common/meta/src/lock_key.rs +++ b/src/common/meta/src/lock_key.rs @@ -22,6 +22,7 @@ const CATALOG_LOCK_PREFIX: &str = "__catalog_lock"; const SCHEMA_LOCK_PREFIX: &str = "__schema_lock"; const TABLE_LOCK_PREFIX: &str = "__table_lock"; const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; +const FLOW_TASK_NAME_LOCK_PREFIX: &str = "__flow_task_name_lock"; const REGION_LOCK_PREFIX: &str = "__region_lock"; /// [CatalogLock] acquires the lock on the tenant level. @@ -110,6 +111,32 @@ impl From for StringKey { } } +/// [FlowTaskNameLock] prevents any procedures trying to create a flow task named it. +pub enum FlowTaskNameLock { + Write(String), +} + +impl FlowTaskNameLock { + pub fn new(catalog: &str, table: &str) -> Self { + Self::Write(format!("{catalog}.{table}")) + } +} + +impl Display for FlowTaskNameLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let FlowTaskNameLock::Write(name) = self; + write!(f, "{}/{}", FLOW_TASK_NAME_LOCK_PREFIX, name) + } +} + +impl From for StringKey { + fn from(value: FlowTaskNameLock) -> Self { + match value { + FlowTaskNameLock::Write(_) => StringKey::Exclusive(value.to_string()), + } + } +} + /// [TableLock] acquires the lock on the table level. /// /// Note: Allows to read/modify the corresponding table's [TableInfoValue](crate::key::table_info::TableInfoValue), From c24b4f92c4a441415591c8a8df39aae1bf937205 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 26 Apr 2024 08:13:27 +0000 Subject: [PATCH 05/13] feat: implement the `CreateFlowTaskProcedure` --- src/common/meta/src/ddl.rs | 1 + src/common/meta/src/ddl/create_flow_task.rs | 242 ++++++++++++++++++ .../meta/src/ddl/create_flow_task/check.rs | 44 ++++ .../meta/src/ddl/create_flow_task/metadata.rs | 62 +++++ src/common/meta/src/ddl_manager.rs | 2 +- src/common/meta/src/error.rs | 8 +- src/common/meta/src/key/flow_task.rs | 65 +++-- .../meta/src/key/flow_task/flow_task_info.rs | 7 +- .../meta/src/key/flow_task/flow_task_name.rs | 7 + src/common/meta/src/metrics.rs | 6 + src/common/meta/src/rpc/ddl.rs | 1 + 11 files changed, 415 insertions(+), 30 deletions(-) create mode 100644 src/common/meta/src/ddl/create_flow_task.rs create mode 100644 src/common/meta/src/ddl/create_flow_task/check.rs create mode 100644 src/common/meta/src/ddl/create_flow_task/metadata.rs diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 08efa57eae39..fb6800c1406c 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -33,6 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu pub mod alter_logical_tables; pub mod alter_table; pub mod create_database; +pub mod create_flow_task; pub mod create_logical_tables; pub mod create_table; mod create_table_template; diff --git a/src/common/meta/src/ddl/create_flow_task.rs b/src/common/meta/src/ddl/create_flow_task.rs new file mode 100644 index 000000000000..c61757bc49e1 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod check; +mod metadata; + +use std::collections::BTreeMap; + +use api::v1::flow::flow_request::Body as PbFlowRequest; +use api::v1::flow::{CreateRequest, FlowRequest}; +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::info; +use futures::future::join_all; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use strum::AsRefStr; +use table::metadata::TableId; + +use super::utils::add_peer_context_if_needed; +use crate::ddl::utils::handle_retry_error; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::key::flow_task::flow_task_info::FlowTaskInfoValue; +use crate::key::FlowTaskId; +use crate::lock_key::{CatalogLock, FlowTaskNameLock}; +use crate::peer::Peer; +use crate::rpc::ddl::CreateFlowTask; +use crate::{metrics, ClusterId}; + +/// The procedure of flow task creation. +pub struct CreateFlowTaskProcedure { + pub context: DdlContext, + pub data: CreateFlowTaskData, +} + +impl CreateFlowTaskProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask"; + + /// Returns a new [CreateFlowTaskProcedure]. + pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { + Self { + context, + data: CreateFlowTaskData { + cluster_id, + task, + flow_task_id: None, + peers: vec![], + source_table_ids: vec![], + state: CreateFlowTaskState::CreateMetadata, + }, + } + } + + /// Deserializes from `json`. + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(CreateFlowTaskProcedure { context, data }) + } + + async fn on_prepare(&mut self) -> Result { + self.check_creation().await?; + self.collect_source_tables().await?; + self.allocate_flow_task_id().await?; + self.data.state = CreateFlowTaskState::FlownodeCreateFlows; + + Ok(Status::executing(true)) + } + + async fn on_flownode_create_flow(&mut self) -> Result { + self.data.state = CreateFlowTaskState::CreateMetadata; + // Safety: must be allocated. + let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); + for peer in &self.data.peers { + let requester = self.context.datanode_manager.flownode(peer).await; + let request = FlowRequest { + body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), + }; + create_flow_task.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer.clone())) + }); + } + + join_all(create_flow_task) + .await + .into_iter() + .collect::>>()?; + + self.data.state = CreateFlowTaskState::CreateMetadata; + Ok(Status::executing(true)) + } + + /// Creates flow task metadata. + /// + /// Abort(not-retry): + /// - Failed to create table metadata. + async fn on_create_metadata(&mut self) -> Result { + // Safety: The flow task id must be allocated. + let flow_task_id = self.data.flow_task_id.unwrap(); + // TODO(weny): Support `or_replace`. + self.context + .flow_task_metadata_manager + .create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value()) + .await?; + info!("Created flow task metadata for flow task {flow_task_id}"); + Ok(Status::done_with_output(flow_task_id)) + } +} + +#[async_trait] +impl Procedure for CreateFlowTaskProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + CreateFlowTaskState::Prepare => self.on_prepare().await, + CreateFlowTaskState::FlownodeCreateFlows => self.on_flownode_create_flow().await, + CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + + LockKey::new(vec![ + CatalogLock::Read(catalog_name).into(), + FlowTaskNameLock::new(catalog_name, task_name).into(), + ]) + } +} + +/// The state of [CreateFlowTaskProcedure]. +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] +pub enum CreateFlowTaskState { + /// Prepares to create the flow. + Prepare, + /// Creates flows on the flownode. + FlownodeCreateFlows, + /// Create metadata. + CreateMetadata, +} + +/// The serializable data. +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateFlowTaskData { + pub(crate) cluster_id: ClusterId, + pub(crate) state: CreateFlowTaskState, + pub(crate) task: CreateFlowTask, + pub(crate) flow_task_id: Option, + pub(crate) peers: Vec, + pub(crate) source_table_ids: Vec, +} + +impl CreateFlowTaskData { + /// Converts to [CreateRequest] + /// # Panic + /// Panic if the `flow_task_id` is None. + fn to_create_flow_request(&self) -> CreateRequest { + let flow_task_id = self.flow_task_id.unwrap(); + let source_table_ids = &self.source_table_ids; + + CreateRequest { + task_id: Some(api::v1::flow::TaskId { id: flow_task_id }), + source_table_ids: source_table_ids + .iter() + .map(|table_id| api::v1::TableId { id: *table_id }) + .collect_vec(), + sink_table_name: Some(self.task.sink_table_name.clone().into()), + // Always be true + create_if_not_exists: true, + expire_when: self.task.expire_when.clone(), + comment: self.task.comment.clone(), + sql: self.task.sql.clone(), + task_options: self.task.options.clone(), + } + } + + /// Converts to [FlowTaskInfoValue]. + fn to_flow_task_info_value(&self) -> FlowTaskInfoValue { + let CreateFlowTask { + catalog_name, + task_name, + sink_table_name, + expire_when, + comment, + sql, + options, + .. + } = self.task.clone(); + + let flownode_ids = self + .peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, peer.id)) + .collect::>(); + + FlowTaskInfoValue { + source_table_ids: self.source_table_ids.clone(), + sink_table_name, + flownode_ids, + catalog_name, + task_name, + raw_sql: sql, + expire_when, + comment, + options, + } + } +} diff --git a/src/common/meta/src/ddl/create_flow_task/check.rs b/src/common/meta/src/ddl/create_flow_task/check.rs new file mode 100644 index 000000000000..0f245f675453 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task/check.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; + +use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::error::{self, Result}; + +impl CreateFlowTaskProcedure { + /// Checks: + /// - The new task name doesn't exist. + /// - The source tables exist. + pub(crate) async fn check_creation(&self) -> Result<()> { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + + // Ensures the task name doesn't exist. + let exists = self + .context + .flow_task_metadata_manager + .flow_task_name_manager() + .exists(catalog_name, task_name) + .await?; + ensure!( + !exists, + error::TaskAlreadyExistsSnafu { + task_name: format!("{}.{}", catalog_name, task_name), + } + ); + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_flow_task/metadata.rs b/src/common/meta/src/ddl/create_flow_task/metadata.rs new file mode 100644 index 000000000000..17747f5870bd --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task/metadata.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; + +impl CreateFlowTaskProcedure { + /// Allocates the [FlowTaskId]. + pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { + //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. + let partitions = 1; + let (flow_task_id, peers) = self + .context + .flow_task_metadata_allocator + .create(partitions) + .await?; + self.data.flow_task_id = Some(flow_task_id); + self.data.peers = peers; + + Ok(()) + } + + /// Collects source table ids + pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { + // Ensures all source tables exist. + let mut source_table_ids = Vec::with_capacity(self.data.task.source_table_names.len()); + + for name in &self.data.task.source_table_names { + let key = TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name); + match self + .context + .table_metadata_manager + .table_name_manager() + .get(key) + .await? + { + Some(value) => source_table_ids.push(value.table_id()), + None => { + return error::TableNotFoundSnafu { + table_name: name.to_string(), + } + .fail(); + } + } + } + + self.data.source_table_ids = source_table_ids; + Ok(()) + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 5115e6e362dd..365805bfd962 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -714,7 +714,7 @@ mod tests { use crate::state_store::KvStateStore; use crate::wal_options_allocator::WalOptionsAllocator; - /// A dummy implemented [DatanodeManager]. + /// A dummy implemented [NodeManager]. pub struct DummyDatanodeManager; #[async_trait::async_trait] diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 323b2c0fee10..29dfcfc0757e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -23,7 +23,6 @@ use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::key::FlowTaskId; use crate::peer::Peer; use crate::DatanodeId; @@ -242,14 +241,9 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Task already exists, task: {}, flow_task_id: {}", - task_name, - flow_task_id - ))] + #[snafu(display("Task already exists, task: {}", task_name,))] TaskAlreadyExists { task_name: String, - flow_task_id: FlowTaskId, location: Location, }, diff --git a/src/common/meta/src/key/flow_task.rs b/src/common/meta/src/key/flow_task.rs index 251b15ab77ca..5e1c2e427ab6 100644 --- a/src/common/meta/src/key/flow_task.rs +++ b/src/common/meta/src/key/flow_task.rs @@ -90,7 +90,7 @@ pub type FlowTaskMetadataManagerRef = Arc; /// - Retrieve metadata of the task. /// - Delete metadata of the task. pub struct FlowTaskMetadataManager { - flow_task_manager: FlowTaskInfoManager, + flow_task_info_manager: FlowTaskInfoManager, flownode_task_manager: FlownodeTaskManager, table_task_manager: TableTaskManager, flow_task_name_manager: FlowTaskNameManager, @@ -101,7 +101,7 @@ impl FlowTaskMetadataManager { /// Returns a new [FlowTaskMetadataManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { - flow_task_manager: FlowTaskInfoManager::new(kv_backend.clone()), + flow_task_info_manager: FlowTaskInfoManager::new(kv_backend.clone()), flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()), flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()), table_task_manager: TableTaskManager::new(kv_backend.clone()), @@ -109,9 +109,14 @@ impl FlowTaskMetadataManager { } } - /// Returns the [FlowTaskManager]. - pub fn flow_task_manager(&self) -> &FlowTaskInfoManager { - &self.flow_task_manager + /// Returns the [FlowTaskNameManager]. + pub fn flow_task_name_manager(&self) -> &FlowTaskNameManager { + &self.flow_task_name_manager + } + + /// Returns the [FlowTaskInfoManager]. + pub fn flow_task_info_manager(&self) -> &FlowTaskInfoManager { + &self.flow_task_info_manager } /// Returns the [FlownodeTaskManager]. @@ -138,7 +143,7 @@ impl FlowTaskMetadataManager { )?; let (create_flow_task_txn, on_create_flow_task_failure) = - self.flow_task_manager.build_create_txn( + self.flow_task_info_manager.build_create_txn( &flow_task_value.catalog_name, flow_task_id, &flow_task_value, @@ -195,7 +200,6 @@ impl FlowTaskMetadataManager { "{}.{}", flow_task_value.catalog_name, flow_task_value.task_name ), - flow_task_id, } .fail(); } @@ -226,6 +230,7 @@ mod tests { use crate::key::flow_task::table_task::TableTaskKey; use crate::key::scope::CatalogScoped; use crate::kv_backend::memory::MemoryKvBackend; + use crate::table_name::TableName; #[derive(Debug)] struct MockKey { @@ -276,11 +281,16 @@ mod tests { let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone()); let task_id = 10; let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -297,7 +307,7 @@ mod tests { .await .unwrap(); let got = flow_metadata_manager - .flow_task_manager() + .flow_task_info_manager() .get(catalog_name, task_id) .await .unwrap() @@ -335,11 +345,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -352,10 +368,10 @@ mod tests { .unwrap(); // Creates again. let flow_task_value = FlowTaskInfoValue { - catalog_name: "greptime".to_string(), + catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -374,11 +390,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -390,11 +412,16 @@ mod tests { .await .unwrap(); // Creates again. + let another_sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "another_sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2048, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), diff --git a/src/common/meta/src/key/flow_task/flow_task_info.rs b/src/common/meta/src/key/flow_task/flow_task_info.rs index f30d3217f8a6..371ab96a1e41 100644 --- a/src/common/meta/src/key/flow_task/flow_task_info.rs +++ b/src/common/meta/src/key/flow_task/flow_task_info.rs @@ -29,6 +29,7 @@ use crate::key::{ }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::table_name::TableName; use crate::FlownodeId; const FLOW_TASK_INFO_KEY_PREFIX: &str = "info"; @@ -117,9 +118,9 @@ impl MetaKey for FlowTaskInfoKeyInner { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowTaskInfoValue { /// The source tables used by the task. - pub(crate) source_tables: Vec, + pub(crate) source_table_ids: Vec, /// The sink table used by the task. - pub(crate) sink_table: TableId, + pub(crate) sink_table_name: TableName, /// Which flow nodes this task is running on. pub(crate) flownode_ids: BTreeMap, /// The catalog name. @@ -144,7 +145,7 @@ impl FlowTaskInfoValue { /// Returns the `source_table`. pub fn source_table_ids(&self) -> &[TableId] { - &self.source_tables + &self.source_table_ids } } diff --git a/src/common/meta/src/key/flow_task/flow_task_name.rs b/src/common/meta/src/key/flow_task/flow_task_name.rs index 9828283e6401..eaf6da5ae848 100644 --- a/src/common/meta/src/key/flow_task/flow_task_name.rs +++ b/src/common/meta/src/key/flow_task/flow_task_name.rs @@ -149,6 +149,13 @@ impl FlowTaskNameManager { .transpose() } + /// Returns true if the `task` exists. + pub async fn exists(&self, catalog: &str, task: &str) -> Result { + let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string()); + let raw_key = key.to_bytes(); + self.kv_backend.exists(&raw_key).await + } + /// Builds a create flow task name transaction. /// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 4e810195bb2b..0a47b1de1463 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,6 +39,12 @@ lazy_static! { &["step"] ) .unwrap(); + pub static ref METRIC_META_PROCEDURE_CREATE_FLOW_TASK: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_flow_task", + "meta procedure create flow task", + &["step"] + ) + .unwrap(); pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!( "greptime_meta_procedure_create_tables", "meta procedure create tables", diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 3a88ea11bf2e..66a25cf21af7 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -724,6 +724,7 @@ impl TryFrom for PbDropDatabaseTask { } /// Create flow task +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateFlowTask { pub catalog_name: String, pub flow_name: String, From 17d084ab6b3faf6845a4dd33e349c886ba19bfc3 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 28 Apr 2024 03:35:18 +0000 Subject: [PATCH 06/13] chore: rename to `CreateFlowProcedure` --- src/common/meta/src/ddl.rs | 2 +- .../src/ddl/{create_flow_task.rs => create_flow.rs} | 12 ++++++------ .../ddl/{create_flow_task => create_flow}/check.rs | 4 ++-- .../{create_flow_task => create_flow}/metadata.rs | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) rename src/common/meta/src/ddl/{create_flow_task.rs => create_flow.rs} (96%) rename src/common/meta/src/ddl/{create_flow_task => create_flow}/check.rs (93%) rename src/common/meta/src/ddl/{create_flow_task => create_flow}/metadata.rs (95%) diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index fb6800c1406c..5bdcb1f68e2b 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -33,7 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu pub mod alter_logical_tables; pub mod alter_table; pub mod create_database; -pub mod create_flow_task; +pub mod create_flow; pub mod create_logical_tables; pub mod create_table; mod create_table_template; diff --git a/src/common/meta/src/ddl/create_flow_task.rs b/src/common/meta/src/ddl/create_flow.rs similarity index 96% rename from src/common/meta/src/ddl/create_flow_task.rs rename to src/common/meta/src/ddl/create_flow.rs index c61757bc49e1..656d20634855 100644 --- a/src/common/meta/src/ddl/create_flow_task.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -44,15 +44,15 @@ use crate::rpc::ddl::CreateFlowTask; use crate::{metrics, ClusterId}; /// The procedure of flow task creation. -pub struct CreateFlowTaskProcedure { +pub struct CreateFlowProcedure { pub context: DdlContext, pub data: CreateFlowTaskData, } -impl CreateFlowTaskProcedure { +impl CreateFlowProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask"; - /// Returns a new [CreateFlowTaskProcedure]. + /// Returns a new [CreateFlowProcedure]. pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { Self { context, @@ -70,7 +70,7 @@ impl CreateFlowTaskProcedure { /// Deserializes from `json`. pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(CreateFlowTaskProcedure { context, data }) + Ok(CreateFlowProcedure { context, data }) } async fn on_prepare(&mut self) -> Result { @@ -87,7 +87,7 @@ impl CreateFlowTaskProcedure { // Safety: must be allocated. let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); for peer in &self.data.peers { - let requester = self.context.datanode_manager.flownode(peer).await; + let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), }; @@ -126,7 +126,7 @@ impl CreateFlowTaskProcedure { } #[async_trait] -impl Procedure for CreateFlowTaskProcedure { +impl Procedure for CreateFlowProcedure { fn type_name(&self) -> &str { Self::TYPE_NAME } diff --git a/src/common/meta/src/ddl/create_flow_task/check.rs b/src/common/meta/src/ddl/create_flow/check.rs similarity index 93% rename from src/common/meta/src/ddl/create_flow_task/check.rs rename to src/common/meta/src/ddl/create_flow/check.rs index 0f245f675453..88bafcd3a5e7 100644 --- a/src/common/meta/src/ddl/create_flow_task/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -14,10 +14,10 @@ use snafu::ensure; -use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::ddl::create_flow::CreateFlowProcedure; use crate::error::{self, Result}; -impl CreateFlowTaskProcedure { +impl CreateFlowProcedure { /// Checks: /// - The new task name doesn't exist. /// - The source tables exist. diff --git a/src/common/meta/src/ddl/create_flow_task/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs similarity index 95% rename from src/common/meta/src/ddl/create_flow_task/metadata.rs rename to src/common/meta/src/ddl/create_flow/metadata.rs index 17747f5870bd..1383eb8eda3f 100644 --- a/src/common/meta/src/ddl/create_flow_task/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::ddl::create_flow::CreateFlowProcedure; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; -impl CreateFlowTaskProcedure { +impl CreateFlowProcedure { /// Allocates the [FlowTaskId]. pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. From a97f513a1987fc35937b7aeed42f88a4e41aa8ff Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 28 Apr 2024 06:07:08 +0000 Subject: [PATCH 07/13] chore: apply suggestions from CR --- src/common/meta/src/ddl/create_flow.rs | 1 - .../meta/src/ddl/create_flow/metadata.rs | 50 ++++++++++++------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 656d20634855..3d6aeeeba438 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -83,7 +83,6 @@ impl CreateFlowProcedure { } async fn on_flownode_create_flow(&mut self) -> Result { - self.data.state = CreateFlowTaskState::CreateMetadata; // Safety: must be allocated. let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); for peer in &self.data.peers { diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index 1383eb8eda3f..b9250054edd3 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use snafu::OptionExt; + use crate::ddl::create_flow::CreateFlowProcedure; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; @@ -35,26 +37,36 @@ impl CreateFlowProcedure { /// Collects source table ids pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { // Ensures all source tables exist. - let mut source_table_ids = Vec::with_capacity(self.data.task.source_table_names.len()); - - for name in &self.data.task.source_table_names { - let key = TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name); - match self - .context - .table_metadata_manager - .table_name_manager() - .get(key) - .await? - { - Some(value) => source_table_ids.push(value.table_id()), - None => { - return error::TableNotFoundSnafu { + + let keys = self + .data + .task + .source_table_names + .iter() + .map(|name| TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name)) + .collect::>(); + + let source_table_ids = self + .context + .table_metadata_manager + .table_name_manager() + .batch_get(keys) + .await?; + + let source_table_ids = self + .data + .task + .source_table_names + .iter() + .zip(source_table_ids) + .map(|(name, table_id)| { + Ok(table_id + .with_context(|| error::TableNotFoundSnafu { table_name: name.to_string(), - } - .fail(); - } - } - } + })? + .table_id()) + }) + .collect::>>()?; self.data.source_table_ids = source_table_ids; Ok(()) From 2afe4897ed6eb7dadcfb6fd0ba37fc5fef3c13e2 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 03:16:14 +0000 Subject: [PATCH 08/13] feat: invoke create flow procedure --- src/common/meta/src/ddl_manager.rs | 55 +++++++++++++++++++++++++++--- src/common/meta/src/rpc/ddl.rs | 14 +++++--- src/operator/src/expr_factory.rs | 17 +++++---- src/operator/src/statement/ddl.rs | 23 ++++++++++--- 4 files changed, 87 insertions(+), 22 deletions(-) diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 365805bfd962..313f193ff2c6 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -26,6 +26,7 @@ use store_api::storage::TableId; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_database::CreateDatabaseProcedure; +use crate::ddl::create_flow::CreateFlowProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; @@ -42,12 +43,12 @@ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::rpc::ddl::DdlTask::{ - AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase, - DropLogicalTables, DropTable, TruncateTable, + AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable, + DropDatabase, DropLogicalTables, DropTable, TruncateTable, }; use crate::rpc::ddl::{ - AlterTableTask, CreateDatabaseTask, CreateTableTask, DropDatabaseTask, DropTableTask, - SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, + AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask, + DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -312,6 +313,20 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes a create flow task. + pub async fn submit_create_flow_task( + &self, + cluster_id: ClusterId, + create_flow: CreateFlowTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = CreateFlowProcedure::new(cluster_id, create_flow, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a truncate table task. pub async fn submit_truncate_table_task( @@ -572,6 +587,35 @@ async fn handle_drop_database_task( }) } +async fn handle_create_flow_task( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + create_flow_task: CreateFlowTask, +) -> Result { + let (id, output) = ddl_manager + .submit_create_flow_task(cluster_id, create_flow_task.clone()) + .await?; + + let procedure_id = id.to_string(); + let output = output.context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "empty output", + })?; + let flow_id = *(output.downcast_ref::().context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "downcast to `u32`", + })?); + info!( + "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", + create_flow_task.catalog_name, create_flow_task.task_name, + ); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + ..Default::default() + }) +} + async fn handle_alter_logical_table_tasks( ddl_manager: &DdlManager, cluster_id: ClusterId, @@ -651,6 +695,9 @@ impl ProcedureExecutor for DdlManager { DropDatabase(drop_database_task) => { handle_drop_database_task(self, cluster_id, drop_database_task).await } + CreateFlow(create_flow_task) => { + handle_create_flow_task(self, cluster_id, create_flow_task).await + } } } .trace(span) diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 66a25cf21af7..911cab18df4c 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -51,9 +51,14 @@ pub enum DdlTask { AlterLogicalTables(Vec), CreateDatabase(CreateDatabaseTask), DropDatabase(DropDatabaseTask), + CreateFlow(CreateFlowTask), } impl DdlTask { + pub fn new_create_flow(expr: CreateFlowTask) -> Self { + DdlTask::CreateFlow(expr) + } + pub fn new_create_table( expr: CreateTableExpr, partitions: Vec, @@ -182,7 +187,7 @@ impl TryFrom for DdlTask { Task::DropDatabaseTask(drop_database) => { Ok(DdlTask::DropDatabase(drop_database.try_into()?)) } - Task::CreateFlowTask(_) => unimplemented!(), + Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)), Task::DropFlowTask(_) => unimplemented!(), } } @@ -228,6 +233,7 @@ impl TryFrom for PbDdlTaskRequest { } DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?), DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?), + DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()), }; Ok(Self { @@ -727,7 +733,7 @@ impl TryFrom for PbDropDatabaseTask { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateFlowTask { pub catalog_name: String, - pub flow_name: String, + pub task_name: String, pub source_table_names: Vec, pub sink_table_name: TableName, pub or_replace: bool, @@ -759,7 +765,7 @@ impl TryFrom for CreateFlowTask { Ok(CreateFlowTask { catalog_name, - flow_name: task_name, + task_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: sink_table_name .context(error::InvalidProtoMsgSnafu { @@ -780,7 +786,7 @@ impl From for PbCreateFlowTask { fn from( CreateFlowTask { catalog_name, - flow_name: task_name, + task_name, source_table_names, sink_table_name, or_replace, diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 855a6a7ffbc4..beca0d4f224a 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -18,11 +18,12 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, - CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, - TableName, + CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; +use common_meta::rpc::ddl::CreateFlowTask; +use common_meta::table_name::TableName; use common_time::Timezone; use datafusion::sql::planner::object_name_to_table_reference; use datatypes::schema::{ColumnSchema, COMMENT_KEY}; @@ -493,7 +494,7 @@ pub(crate) fn to_alter_expr( pub fn to_create_flow_task_expr( create_flow: CreateFlow, query_ctx: QueryContextRef, -) -> Result { +) -> Result { // retrieve sink table name let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true) @@ -537,22 +538,20 @@ pub fn to_create_flow_task_expr( }) .collect::>>()?; - Ok(CreateFlowTaskExpr { + Ok(CreateFlowTask { catalog_name: query_ctx.current_catalog().to_string(), task_name: create_flow.flow_name.to_string(), source_table_names, - sink_table_name: Some(sink_table_name), - create_if_not_exists: create_flow.if_not_exists, + sink_table_name, or_replace: create_flow.or_replace, - // TODO(ruihang): change this field to optional in proto + create_if_not_exists: create_flow.if_not_exists, expire_when: create_flow .expire_when .map(|e| e.to_string()) .unwrap_or_default(), - // TODO(ruihang): change this field to optional in proto comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - task_options: HashMap::new(), + options: HashMap::new(), }) } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index a6b13a91722f..f7196b8f1322 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -27,7 +27,7 @@ use common_meta::ddl::ExecutorContext; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::NAME_PATTERN; -use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use common_meta::rpc::ddl::{CreateFlowTask, DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; @@ -323,14 +323,27 @@ impl StatementExecutor { } #[tracing::instrument(skip_all)] - pub async fn create_flow(&self, stmt: CreateFlow, query_ctx: QueryContextRef) -> Result<()> { + pub async fn create_flow( + &self, + stmt: CreateFlow, + query_ctx: QueryContextRef, + ) -> Result { // TODO(ruihang): do some verification + let expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; - let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; + self.create_flow_procedure(expr).await?; + Ok(Output::new_with_affected_rows(0)) + } - // TODO: invoke procedure + async fn create_flow_procedure(&self, expr: CreateFlowTask) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_create_flow(expr), + }; - Ok(()) + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) } #[tracing::instrument(skip_all)] From 4ee2b9f52735b8aae5fbbb718db0b60968457860 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 09:46:39 +0000 Subject: [PATCH 09/13] chore: apply suggestions from CR --- src/common/meta/src/ddl/create_flow.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 3d6aeeeba438..837a22deeaf7 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -77,7 +77,7 @@ impl CreateFlowProcedure { self.check_creation().await?; self.collect_source_tables().await?; self.allocate_flow_task_id().await?; - self.data.state = CreateFlowTaskState::FlownodeCreateFlows; + self.data.state = CreateFlowTaskState::CreateFlows; Ok(Status::executing(true)) } @@ -119,7 +119,7 @@ impl CreateFlowProcedure { .flow_task_metadata_manager .create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value()) .await?; - info!("Created flow task metadata for flow task {flow_task_id}"); + info!("Created flow task metadata for flow {flow_task_id}"); Ok(Status::done_with_output(flow_task_id)) } } @@ -139,7 +139,7 @@ impl Procedure for CreateFlowProcedure { match state { CreateFlowTaskState::Prepare => self.on_prepare().await, - CreateFlowTaskState::FlownodeCreateFlows => self.on_flownode_create_flow().await, + CreateFlowTaskState::CreateFlows => self.on_flownode_create_flow().await, CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await, } .map_err(handle_retry_error) @@ -166,7 +166,7 @@ pub enum CreateFlowTaskState { /// Prepares to create the flow. Prepare, /// Creates flows on the flownode. - FlownodeCreateFlows, + CreateFlows, /// Create metadata. CreateMetadata, } From adf4e808e861c50396a0ec36f1e8dfbee5f6eeac Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 10:00:50 +0000 Subject: [PATCH 10/13] refactor: rename TYPE_NAME --- src/common/meta/src/ddl/create_flow.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 837a22deeaf7..0cce07497883 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -50,7 +50,7 @@ pub struct CreateFlowProcedure { } impl CreateFlowProcedure { - pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask"; + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow"; /// Returns a new [CreateFlowProcedure]. pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { From 6c286be01a889e3b3276f3b6a48f2d78caeb4f8c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 10:02:02 +0000 Subject: [PATCH 11/13] feat: register the procedure --- src/common/meta/src/ddl_manager.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 313f193ff2c6..2f3d3a4eb45c 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -115,6 +115,15 @@ impl DdlManager { }) }, ), + ( + CreateFlowProcedure::TYPE_NAME, + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + CreateFlowProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), ( AlterTableProcedure::TYPE_NAME, &|context: DdlContext| -> BoxedProcedureLoader { From f9c02f6207d8de8eb3ba31f3a3f9409f266b3d52 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 10:04:53 +0000 Subject: [PATCH 12/13] chore: apply suggestions from CR --- src/common/meta/src/ddl/create_flow.rs | 40 +++++++++---------- src/common/meta/src/ddl/create_flow/check.rs | 1 - .../meta/src/ddl/create_flow/metadata.rs | 5 +-- src/common/meta/src/error.rs | 2 +- 4 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 0cce07497883..578be23b08bd 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -82,13 +82,13 @@ impl CreateFlowProcedure { Ok(Status::executing(true)) } - async fn on_flownode_create_flow(&mut self) -> Result { + async fn on_flownode_create_flows(&mut self) -> Result { // Safety: must be allocated. let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); for peer in &self.data.peers { let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { - body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), + body: Some(PbFlowRequest::Create((&self.data).into())), }; create_flow_task.push(async move { requester @@ -117,7 +117,7 @@ impl CreateFlowProcedure { // TODO(weny): Support `or_replace`. self.context .flow_task_metadata_manager - .create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value()) + .create_flow_task_metadata(flow_task_id, (&self.data).into()) .await?; info!("Created flow task metadata for flow {flow_task_id}"); Ok(Status::done_with_output(flow_task_id)) @@ -139,7 +139,7 @@ impl Procedure for CreateFlowProcedure { match state { CreateFlowTaskState::Prepare => self.on_prepare().await, - CreateFlowTaskState::CreateFlows => self.on_flownode_create_flow().await, + CreateFlowTaskState::CreateFlows => self.on_flownode_create_flows().await, CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await, } .map_err(handle_retry_error) @@ -182,13 +182,10 @@ pub struct CreateFlowTaskData { pub(crate) source_table_ids: Vec, } -impl CreateFlowTaskData { - /// Converts to [CreateRequest] - /// # Panic - /// Panic if the `flow_task_id` is None. - fn to_create_flow_request(&self) -> CreateRequest { - let flow_task_id = self.flow_task_id.unwrap(); - let source_table_ids = &self.source_table_ids; +impl From<&CreateFlowTaskData> for CreateRequest { + fn from(value: &CreateFlowTaskData) -> Self { + let flow_task_id = value.flow_task_id.unwrap(); + let source_table_ids = &value.source_table_ids; CreateRequest { task_id: Some(api::v1::flow::TaskId { id: flow_task_id }), @@ -196,18 +193,19 @@ impl CreateFlowTaskData { .iter() .map(|table_id| api::v1::TableId { id: *table_id }) .collect_vec(), - sink_table_name: Some(self.task.sink_table_name.clone().into()), + sink_table_name: Some(value.task.sink_table_name.clone().into()), // Always be true create_if_not_exists: true, - expire_when: self.task.expire_when.clone(), - comment: self.task.comment.clone(), - sql: self.task.sql.clone(), - task_options: self.task.options.clone(), + expire_when: value.task.expire_when.clone(), + comment: value.task.comment.clone(), + sql: value.task.sql.clone(), + task_options: value.task.options.clone(), } } +} - /// Converts to [FlowTaskInfoValue]. - fn to_flow_task_info_value(&self) -> FlowTaskInfoValue { +impl From<&CreateFlowTaskData> for FlowTaskInfoValue { + fn from(value: &CreateFlowTaskData) -> Self { let CreateFlowTask { catalog_name, task_name, @@ -217,9 +215,9 @@ impl CreateFlowTaskData { sql, options, .. - } = self.task.clone(); + } = value.task.clone(); - let flownode_ids = self + let flownode_ids = value .peers .iter() .enumerate() @@ -227,7 +225,7 @@ impl CreateFlowTaskData { .collect::>(); FlowTaskInfoValue { - source_table_ids: self.source_table_ids.clone(), + source_table_ids: value.source_table_ids.clone(), sink_table_name, flownode_ids, catalog_name, diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs index 88bafcd3a5e7..d2e4dfa887cd 100644 --- a/src/common/meta/src/ddl/create_flow/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -20,7 +20,6 @@ use crate::error::{self, Result}; impl CreateFlowProcedure { /// Checks: /// - The new task name doesn't exist. - /// - The source tables exist. pub(crate) async fn check_creation(&self) -> Result<()> { let catalog_name = &self.data.task.catalog_name; let task_name = &self.data.task.task_name; diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index b9250054edd3..ce35ae91ca98 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -21,7 +21,7 @@ use crate::key::table_name::TableNameKey; impl CreateFlowProcedure { /// Allocates the [FlowTaskId]. pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { - //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. + // TODO(weny, ruihang): We don't support the partitions. It's always be 1, now. let partitions = 1; let (flow_task_id, peers) = self .context @@ -34,10 +34,9 @@ impl CreateFlowProcedure { Ok(()) } - /// Collects source table ids + /// Ensures all source tables exist and collects source table ids pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { // Ensures all source tables exist. - let keys = self .data .task diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 29dfcfc0757e..40d5070c3bb3 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -241,7 +241,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Task already exists, task: {}", task_name,))] + #[snafu(display("Task already exists: {}", task_name))] TaskAlreadyExists { task_name: String, location: Location, From d9cccd5792a7153814c85e2496ecfe7f0c841378 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 12:21:31 +0000 Subject: [PATCH 13/13] feat: acquire the lock of sink table name --- src/common/meta/src/ddl/create_flow.rs | 9 ++++++++- src/common/meta/src/ddl/create_flow/check.rs | 21 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 578be23b08bd..018c7dc84276 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -38,7 +38,7 @@ use crate::ddl::DdlContext; use crate::error::Result; use crate::key::flow_task::flow_task_info::FlowTaskInfoValue; use crate::key::FlowTaskId; -use crate::lock_key::{CatalogLock, FlowTaskNameLock}; +use crate::lock_key::{CatalogLock, FlowTaskNameLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::CreateFlowTask; use crate::{metrics, ClusterId}; @@ -152,9 +152,16 @@ impl Procedure for CreateFlowProcedure { fn lock_key(&self) -> LockKey { let catalog_name = &self.data.task.catalog_name; let task_name = &self.data.task.task_name; + let sink_table_name = &self.data.task.sink_table_name; LockKey::new(vec![ CatalogLock::Read(catalog_name).into(), + TableNameLock::new( + &sink_table_name.catalog_name, + &sink_table_name.schema_name, + &sink_table_name.catalog_name, + ) + .into(), FlowTaskNameLock::new(catalog_name, task_name).into(), ]) } diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs index d2e4dfa887cd..6aa1ecb3ed00 100644 --- a/src/common/meta/src/ddl/create_flow/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -16,13 +16,16 @@ use snafu::ensure; use crate::ddl::create_flow::CreateFlowProcedure; use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; impl CreateFlowProcedure { /// Checks: /// - The new task name doesn't exist. + /// - The sink table doesn't exist. pub(crate) async fn check_creation(&self) -> Result<()> { let catalog_name = &self.data.task.catalog_name; let task_name = &self.data.task.task_name; + let sink_table_name = &self.data.task.sink_table_name; // Ensures the task name doesn't exist. let exists = self @@ -38,6 +41,24 @@ impl CreateFlowProcedure { } ); + // Ensures sink table doesn't exist. + let exists = self + .context + .table_metadata_manager + .table_name_manager() + .exists(TableNameKey::new( + &sink_table_name.catalog_name, + &sink_table_name.schema_name, + &sink_table_name.table_name, + )) + .await?; + ensure!( + !exists, + error::TableAlreadyExistsSnafu { + table_name: sink_table_name.to_string(), + } + ); + Ok(()) } }