From ecdb1595403fe4cd54022ba780374b08cbd117db Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 26 Apr 2024 08:13:27 +0000 Subject: [PATCH] feat: implement the `CreateFlowTaskProcedure` --- src/client/src/client_manager.rs | 9 +- src/cmd/src/standalone.rs | 4 +- src/common/meta/src/datanode_manager.rs | 18 +- src/common/meta/src/ddl.rs | 7 +- src/common/meta/src/ddl/create_flow_task.rs | 242 ++++++++++++++++++ .../meta/src/ddl/create_flow_task/check.rs | 44 ++++ .../meta/src/ddl/create_flow_task/metadata.rs | 62 +++++ src/common/meta/src/ddl_manager.rs | 10 +- src/common/meta/src/error.rs | 8 +- src/common/meta/src/flownode_manager.rs | 37 --- src/common/meta/src/key/flow_task.rs | 60 +++-- .../meta/src/key/flow_task/flow_task_info.rs | 7 +- .../meta/src/key/flow_task/flow_task_name.rs | 7 + src/common/meta/src/lib.rs | 1 - src/common/meta/src/metrics.rs | 6 + src/common/meta/src/rpc/ddl.rs | 1 + src/common/meta/src/test_util.rs | 12 +- src/frontend/src/instance/builder.rs | 6 +- src/frontend/src/instance/region_query.rs | 6 +- src/frontend/src/instance/standalone.rs | 9 +- src/meta-srv/src/metasrv/builder.rs | 6 +- src/meta-srv/src/procedure/tests.rs | 4 +- src/meta-srv/src/procedure/utils.rs | 4 +- src/operator/src/delete.rs | 6 +- src/operator/src/insert.rs | 6 +- src/operator/src/request.rs | 6 +- 26 files changed, 480 insertions(+), 108 deletions(-) create mode 100644 src/common/meta/src/ddl/create_flow_task.rs create mode 100644 src/common/meta/src/ddl/create_flow_task/check.rs create mode 100644 src/common/meta/src/ddl/create_flow_task/metadata.rs delete mode 100644 src/common/meta/src/flownode_manager.rs diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index e503555dd2a6..d6eef037ba3e 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::datanode_manager::{Datanode, DatanodeManager}; +use common_meta::datanode_manager::{Datanode, Flownode, NodeManager}; use common_meta::peer::Peer; use moka::future::{Cache, CacheBuilder}; @@ -44,12 +44,17 @@ impl Debug for DatanodeClients { } #[async_trait::async_trait] -impl DatanodeManager for DatanodeClients { +impl NodeManager for DatanodeClients { async fn datanode(&self, datanode: &Peer) -> Arc { let client = self.get_client(datanode).await; Arc::new(RegionRequester::new(client)) } + + async fn flownode(&self, _node: &Peer) -> Arc { + // TODO(weny): Support it. + unimplemented!() + } } impl DatanodeClients { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 1ba3b6f02ad1..22488150a6a6 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,7 +21,7 @@ use clap::Parser; use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef}; use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; @@ -480,7 +480,7 @@ impl StartCommand { pub async fn create_ddl_task_executor( procedure_manager: ProcedureManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 58990ce01dec..cec0d21226ca 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; +use api::v1::flow::{FlowRequest, FlowResponse}; use api::v1::region::{QueryRequest, RegionRequest}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; @@ -34,11 +35,22 @@ pub trait Datanode: Send + Sync { pub type DatanodeRef = Arc; +/// The trait for handling requests to flownode +#[async_trait::async_trait] +pub trait Flownode: Send + Sync { + async fn handle(&self, request: FlowRequest) -> Result; +} + +pub type FlownodeRef = Arc; + /// Datanode manager #[async_trait::async_trait] -pub trait DatanodeManager: Send + Sync { +pub trait NodeManager: Send + Sync { /// Retrieves a target `datanode`. - async fn datanode(&self, datanode: &Peer) -> DatanodeRef; + async fn datanode(&self, node: &Peer) -> DatanodeRef; + + /// Retrieves a target `flownode`. + async fn flownode(&self, node: &Peer) -> FlownodeRef; } -pub type DatanodeManagerRef = Arc; +pub type NodeManagerRef = Arc; diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index d1cff4e0d0cb..f32fbf244bfb 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; -use self::task_meta::FlowTaskMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; -use crate::datanode_manager::DatanodeManagerRef; +use crate::datanode_manager::NodeManagerRef; use crate::ddl::table_meta::TableMetadataAllocatorRef; +use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef; use crate::error::Result; use crate::key::flow_task::FlowTaskMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; @@ -33,6 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu pub mod alter_logical_tables; pub mod alter_table; pub mod create_database; +pub mod create_flow_task; pub mod create_logical_tables; pub mod create_table; mod create_table_template; @@ -100,7 +101,7 @@ pub struct TableMetadata { #[derive(Clone)] pub struct DdlContext { /// Sends querying and requests to datanode. - pub datanode_manager: DatanodeManagerRef, + pub datanode_manager: NodeManagerRef, /// Cache invalidation. pub cache_invalidator: CacheInvalidatorRef, /// Keep tracking operating regions. diff --git a/src/common/meta/src/ddl/create_flow_task.rs b/src/common/meta/src/ddl/create_flow_task.rs new file mode 100644 index 000000000000..c61757bc49e1 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod check; +mod metadata; + +use std::collections::BTreeMap; + +use api::v1::flow::flow_request::Body as PbFlowRequest; +use api::v1::flow::{CreateRequest, FlowRequest}; +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::info; +use futures::future::join_all; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use strum::AsRefStr; +use table::metadata::TableId; + +use super::utils::add_peer_context_if_needed; +use crate::ddl::utils::handle_retry_error; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::key::flow_task::flow_task_info::FlowTaskInfoValue; +use crate::key::FlowTaskId; +use crate::lock_key::{CatalogLock, FlowTaskNameLock}; +use crate::peer::Peer; +use crate::rpc::ddl::CreateFlowTask; +use crate::{metrics, ClusterId}; + +/// The procedure of flow task creation. +pub struct CreateFlowTaskProcedure { + pub context: DdlContext, + pub data: CreateFlowTaskData, +} + +impl CreateFlowTaskProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask"; + + /// Returns a new [CreateFlowTaskProcedure]. + pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { + Self { + context, + data: CreateFlowTaskData { + cluster_id, + task, + flow_task_id: None, + peers: vec![], + source_table_ids: vec![], + state: CreateFlowTaskState::CreateMetadata, + }, + } + } + + /// Deserializes from `json`. + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(CreateFlowTaskProcedure { context, data }) + } + + async fn on_prepare(&mut self) -> Result { + self.check_creation().await?; + self.collect_source_tables().await?; + self.allocate_flow_task_id().await?; + self.data.state = CreateFlowTaskState::FlownodeCreateFlows; + + Ok(Status::executing(true)) + } + + async fn on_flownode_create_flow(&mut self) -> Result { + self.data.state = CreateFlowTaskState::CreateMetadata; + // Safety: must be allocated. + let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); + for peer in &self.data.peers { + let requester = self.context.datanode_manager.flownode(peer).await; + let request = FlowRequest { + body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), + }; + create_flow_task.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer.clone())) + }); + } + + join_all(create_flow_task) + .await + .into_iter() + .collect::>>()?; + + self.data.state = CreateFlowTaskState::CreateMetadata; + Ok(Status::executing(true)) + } + + /// Creates flow task metadata. + /// + /// Abort(not-retry): + /// - Failed to create table metadata. + async fn on_create_metadata(&mut self) -> Result { + // Safety: The flow task id must be allocated. + let flow_task_id = self.data.flow_task_id.unwrap(); + // TODO(weny): Support `or_replace`. + self.context + .flow_task_metadata_manager + .create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value()) + .await?; + info!("Created flow task metadata for flow task {flow_task_id}"); + Ok(Status::done_with_output(flow_task_id)) + } +} + +#[async_trait] +impl Procedure for CreateFlowTaskProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + CreateFlowTaskState::Prepare => self.on_prepare().await, + CreateFlowTaskState::FlownodeCreateFlows => self.on_flownode_create_flow().await, + CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + + LockKey::new(vec![ + CatalogLock::Read(catalog_name).into(), + FlowTaskNameLock::new(catalog_name, task_name).into(), + ]) + } +} + +/// The state of [CreateFlowTaskProcedure]. +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] +pub enum CreateFlowTaskState { + /// Prepares to create the flow. + Prepare, + /// Creates flows on the flownode. + FlownodeCreateFlows, + /// Create metadata. + CreateMetadata, +} + +/// The serializable data. +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateFlowTaskData { + pub(crate) cluster_id: ClusterId, + pub(crate) state: CreateFlowTaskState, + pub(crate) task: CreateFlowTask, + pub(crate) flow_task_id: Option, + pub(crate) peers: Vec, + pub(crate) source_table_ids: Vec, +} + +impl CreateFlowTaskData { + /// Converts to [CreateRequest] + /// # Panic + /// Panic if the `flow_task_id` is None. + fn to_create_flow_request(&self) -> CreateRequest { + let flow_task_id = self.flow_task_id.unwrap(); + let source_table_ids = &self.source_table_ids; + + CreateRequest { + task_id: Some(api::v1::flow::TaskId { id: flow_task_id }), + source_table_ids: source_table_ids + .iter() + .map(|table_id| api::v1::TableId { id: *table_id }) + .collect_vec(), + sink_table_name: Some(self.task.sink_table_name.clone().into()), + // Always be true + create_if_not_exists: true, + expire_when: self.task.expire_when.clone(), + comment: self.task.comment.clone(), + sql: self.task.sql.clone(), + task_options: self.task.options.clone(), + } + } + + /// Converts to [FlowTaskInfoValue]. + fn to_flow_task_info_value(&self) -> FlowTaskInfoValue { + let CreateFlowTask { + catalog_name, + task_name, + sink_table_name, + expire_when, + comment, + sql, + options, + .. + } = self.task.clone(); + + let flownode_ids = self + .peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, peer.id)) + .collect::>(); + + FlowTaskInfoValue { + source_table_ids: self.source_table_ids.clone(), + sink_table_name, + flownode_ids, + catalog_name, + task_name, + raw_sql: sql, + expire_when, + comment, + options, + } + } +} diff --git a/src/common/meta/src/ddl/create_flow_task/check.rs b/src/common/meta/src/ddl/create_flow_task/check.rs new file mode 100644 index 000000000000..0f245f675453 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task/check.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; + +use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::error::{self, Result}; + +impl CreateFlowTaskProcedure { + /// Checks: + /// - The new task name doesn't exist. + /// - The source tables exist. + pub(crate) async fn check_creation(&self) -> Result<()> { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + + // Ensures the task name doesn't exist. + let exists = self + .context + .flow_task_metadata_manager + .flow_task_name_manager() + .exists(catalog_name, task_name) + .await?; + ensure!( + !exists, + error::TaskAlreadyExistsSnafu { + task_name: format!("{}.{}", catalog_name, task_name), + } + ); + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_flow_task/metadata.rs b/src/common/meta/src/ddl/create_flow_task/metadata.rs new file mode 100644 index 000000000000..17747f5870bd --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task/metadata.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; + +impl CreateFlowTaskProcedure { + /// Allocates the [FlowTaskId]. + pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { + //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. + let partitions = 1; + let (flow_task_id, peers) = self + .context + .flow_task_metadata_allocator + .create(partitions) + .await?; + self.data.flow_task_id = Some(flow_task_id); + self.data.peers = peers; + + Ok(()) + } + + /// Collects source table ids + pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { + // Ensures all source tables exist. + let mut source_table_ids = Vec::with_capacity(self.data.task.source_table_names.len()); + + for name in &self.data.task.source_table_names { + let key = TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name); + match self + .context + .table_metadata_manager + .table_name_manager() + .get(key) + .await? + { + Some(value) => source_table_ids.push(value.table_id()), + None => { + return error::TableNotFoundSnafu { + table_name: name.to_string(), + } + .fail(); + } + } + } + + self.data.source_table_ids = source_table_ids; + Ok(()) + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 5abd471aa9ed..e395cdcc9833 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -697,7 +697,7 @@ mod tests { use super::DdlManager; use crate::cache_invalidator::DummyCacheInvalidator; - use crate::datanode_manager::{DatanodeManager, DatanodeRef}; + use crate::datanode_manager::{DatanodeRef, Flownode, NodeManager}; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; @@ -714,14 +714,18 @@ mod tests { use crate::state_store::KvStateStore; use crate::wal_options_allocator::WalOptionsAllocator; - /// A dummy implemented [DatanodeManager]. + /// A dummy implemented [NodeManager]. pub struct DummyDatanodeManager; #[async_trait::async_trait] - impl DatanodeManager for DummyDatanodeManager { + impl NodeManager for DummyDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { unimplemented!() } + + async fn flownode(&self, _node: &Peer) -> Arc { + unimplemented!() + } } #[test] diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 323b2c0fee10..29dfcfc0757e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -23,7 +23,6 @@ use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::key::FlowTaskId; use crate::peer::Peer; use crate::DatanodeId; @@ -242,14 +241,9 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Task already exists, task: {}, flow_task_id: {}", - task_name, - flow_task_id - ))] + #[snafu(display("Task already exists, task: {}", task_name,))] TaskAlreadyExists { task_name: String, - flow_task_id: FlowTaskId, location: Location, }, diff --git a/src/common/meta/src/flownode_manager.rs b/src/common/meta/src/flownode_manager.rs deleted file mode 100644 index 3e6f1849b22c..000000000000 --- a/src/common/meta/src/flownode_manager.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use api::v1::flow::{FlowRequest, FlowResponse}; - -use crate::error::Result; -use crate::peer::Peer; - -/// The trait fo handling requests to flownode -#[async_trait::async_trait] -pub trait Flownode: Send + Sync { - async fn handle(&self, request: FlowRequest) -> Result; -} - -pub type FlownodeRef = Arc; - -/// Flownode manager -#[async_trait::async_trait] -pub trait FlownodeManager: Send + Sync { - /// Retrieves a target `flownode`. - async fn flownode(&self, datanode: &Peer) -> FlownodeRef; -} - -pub type FlownodeManagerRef = Arc; diff --git a/src/common/meta/src/key/flow_task.rs b/src/common/meta/src/key/flow_task.rs index 251b15ab77ca..3640ebd75141 100644 --- a/src/common/meta/src/key/flow_task.rs +++ b/src/common/meta/src/key/flow_task.rs @@ -90,7 +90,7 @@ pub type FlowTaskMetadataManagerRef = Arc; /// - Retrieve metadata of the task. /// - Delete metadata of the task. pub struct FlowTaskMetadataManager { - flow_task_manager: FlowTaskInfoManager, + flow_task_info_manager: FlowTaskInfoManager, flownode_task_manager: FlownodeTaskManager, table_task_manager: TableTaskManager, flow_task_name_manager: FlowTaskNameManager, @@ -101,7 +101,7 @@ impl FlowTaskMetadataManager { /// Returns a new [FlowTaskMetadataManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { - flow_task_manager: FlowTaskInfoManager::new(kv_backend.clone()), + flow_task_info_manager: FlowTaskInfoManager::new(kv_backend.clone()), flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()), flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()), table_task_manager: TableTaskManager::new(kv_backend.clone()), @@ -109,9 +109,14 @@ impl FlowTaskMetadataManager { } } - /// Returns the [FlowTaskManager]. - pub fn flow_task_manager(&self) -> &FlowTaskInfoManager { - &self.flow_task_manager + /// Returns the [FlowTaskNameManager]. + pub fn flow_task_name_manager(&self) -> &FlowTaskNameManager { + &self.flow_task_name_manager + } + + /// Returns the [FlowTaskInfoManager]. + pub fn flow_task_info_manager(&self) -> &FlowTaskInfoManager { + &self.flow_task_info_manager } /// Returns the [FlownodeTaskManager]. @@ -138,7 +143,7 @@ impl FlowTaskMetadataManager { )?; let (create_flow_task_txn, on_create_flow_task_failure) = - self.flow_task_manager.build_create_txn( + self.flow_task_info_manager.build_create_txn( &flow_task_value.catalog_name, flow_task_id, &flow_task_value, @@ -195,7 +200,6 @@ impl FlowTaskMetadataManager { "{}.{}", flow_task_value.catalog_name, flow_task_value.task_name ), - flow_task_id, } .fail(); } @@ -226,6 +230,7 @@ mod tests { use crate::key::flow_task::table_task::TableTaskKey; use crate::key::scope::CatalogScoped; use crate::kv_backend::memory::MemoryKvBackend; + use crate::table_name::TableName; #[derive(Debug)] struct MockKey { @@ -276,11 +281,16 @@ mod tests { let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone()); let task_id = 10; let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -297,7 +307,7 @@ mod tests { .await .unwrap(); let got = flow_metadata_manager - .flow_task_manager() + .flow_task_info_manager() .get(catalog_name, task_id) .await .unwrap() @@ -335,11 +345,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -352,10 +368,10 @@ mod tests { .unwrap(); // Creates again. let flow_task_value = FlowTaskInfoValue { - catalog_name: "greptime".to_string(), + catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -374,11 +390,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -393,8 +415,8 @@ mod tests { let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2048, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), diff --git a/src/common/meta/src/key/flow_task/flow_task_info.rs b/src/common/meta/src/key/flow_task/flow_task_info.rs index f30d3217f8a6..371ab96a1e41 100644 --- a/src/common/meta/src/key/flow_task/flow_task_info.rs +++ b/src/common/meta/src/key/flow_task/flow_task_info.rs @@ -29,6 +29,7 @@ use crate::key::{ }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::table_name::TableName; use crate::FlownodeId; const FLOW_TASK_INFO_KEY_PREFIX: &str = "info"; @@ -117,9 +118,9 @@ impl MetaKey for FlowTaskInfoKeyInner { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowTaskInfoValue { /// The source tables used by the task. - pub(crate) source_tables: Vec, + pub(crate) source_table_ids: Vec, /// The sink table used by the task. - pub(crate) sink_table: TableId, + pub(crate) sink_table_name: TableName, /// Which flow nodes this task is running on. pub(crate) flownode_ids: BTreeMap, /// The catalog name. @@ -144,7 +145,7 @@ impl FlowTaskInfoValue { /// Returns the `source_table`. pub fn source_table_ids(&self) -> &[TableId] { - &self.source_tables + &self.source_table_ids } } diff --git a/src/common/meta/src/key/flow_task/flow_task_name.rs b/src/common/meta/src/key/flow_task/flow_task_name.rs index 9828283e6401..eaf6da5ae848 100644 --- a/src/common/meta/src/key/flow_task/flow_task_name.rs +++ b/src/common/meta/src/key/flow_task/flow_task_name.rs @@ -149,6 +149,13 @@ impl FlowTaskNameManager { .transpose() } + /// Returns true if the `task` exists. + pub async fn exists(&self, catalog: &str, task: &str) -> Result { + let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string()); + let raw_key = key.to_bytes(); + self.kv_backend.exists(&raw_key).await + } + /// Builds a create flow task name transaction. /// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 400fed9e3b71..8aa8c8abecc4 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -25,7 +25,6 @@ pub mod ddl; pub mod ddl_manager; pub mod distributed_time_constants; pub mod error; -pub mod flownode_manager; pub mod heartbeat; pub mod instruction; pub mod key; diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 4e810195bb2b..0a47b1de1463 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,6 +39,12 @@ lazy_static! { &["step"] ) .unwrap(); + pub static ref METRIC_META_PROCEDURE_CREATE_FLOW_TASK: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_flow_task", + "meta procedure create flow task", + &["step"] + ) + .unwrap(); pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!( "greptime_meta_procedure_create_tables", "meta procedure create tables", diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index a7e14161ecc6..c160f243e482 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -724,6 +724,7 @@ impl TryFrom for PbDropDatabaseTask { } /// Create flow task +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateFlowTask { pub catalog_name: String, pub task_name: String, diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 6c3bf7397da0..763358534f5c 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -20,7 +20,7 @@ pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; -use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef}; +use crate::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef}; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::DdlContext; @@ -76,24 +76,28 @@ impl Datanode for MockDatanode { } #[async_trait::async_trait] -impl DatanodeManager for MockDatanodeManager { +impl NodeManager for MockDatanodeManager { async fn datanode(&self, peer: &Peer) -> DatanodeRef { Arc::new(MockDatanode { peer: peer.clone(), handler: self.handler.clone(), }) } + + async fn flownode(&self, _node: &Peer) -> FlownodeRef { + unimplemented!() + } } /// Returns a test purpose [DdlContext]. -pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { +pub fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); new_ddl_context_with_kv_backend(datanode_manager, kv_backend) } /// Returns a test purpose [DdlContext] with a specified [KvBackendRef]. pub fn new_ddl_context_with_kv_backend( - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, kv_backend: KvBackendRef, ) -> DdlContext { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 1464a11c27d1..2f39a0dd6795 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use catalog::CatalogManagerRef; use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; @@ -42,7 +42,7 @@ pub struct FrontendBuilder { kv_backend: KvBackendRef, cache_invalidator: Option, catalog_manager: CatalogManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, plugins: Option, procedure_executor: ProcedureExecutorRef, heartbeat_task: Option, @@ -52,7 +52,7 @@ impl FrontendBuilder { pub fn new( kv_backend: KvBackendRef, catalog_manager: CatalogManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, procedure_executor: ProcedureExecutorRef, ) -> Self { Self { diff --git a/src/frontend/src/instance/region_query.rs b/src/frontend/src/instance/region_query.rs index 844b9a7735ab..a6c21e35030a 100644 --- a/src/frontend/src/instance/region_query.rs +++ b/src/frontend/src/instance/region_query.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use api::v1::region::QueryRequest; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_recordbatch::SendableRecordBatchStream; use partition::manager::PartitionRuleManagerRef; use query::error::{RegionQuerySnafu, Result as QueryResult}; @@ -29,13 +29,13 @@ use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result}; pub(crate) struct FrontendRegionQueryHandler { partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } impl FrontendRegionQueryHandler { pub fn arc( partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Arc { Arc::new(Self { partition_manager, diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 4001c4e59d61..38363f57f6e3 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -19,7 +19,7 @@ use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionRespo use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; -use common_meta::datanode_manager::{Datanode, DatanodeManager, DatanodeRef}; +use common_meta::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; use common_recordbatch::SendableRecordBatchStream; @@ -34,10 +34,15 @@ use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; pub struct StandaloneDatanodeManager(pub RegionServer); #[async_trait] -impl DatanodeManager for StandaloneDatanodeManager { +impl NodeManager for StandaloneDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { RegionInvoker::arc(self.0.clone()) } + + async fn flownode(&self, _node: &Peer) -> FlownodeRef { + // TODO(weny, discord9): Support it. + unimplemented!() + } } /// Relative to [client::region::RegionRequester] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 2c8dd02829ff..73b6674328f9 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -20,7 +20,7 @@ use client::client_manager::DatanodeClients; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl::DdlContext; @@ -83,7 +83,7 @@ pub struct MetasrvBuilder { election: Option, meta_peer_client: Option, lock: Option, - datanode_manager: Option, + datanode_manager: Option, plugins: Option, table_metadata_allocator: Option, } @@ -145,7 +145,7 @@ impl MetasrvBuilder { self } - pub fn datanode_manager(mut self, datanode_manager: DatanodeManagerRef) -> Self { + pub fn datanode_manager(mut self, datanode_manager: NodeManagerRef) -> Self { self.datanode_manager = Some(datanode_manager); self } diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 03ddc2dbdebe..ce2e5cda4d9d 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -21,7 +21,7 @@ use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType}; use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; -use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState}; use common_meta::ddl::create_table::*; use common_meta::ddl::test_util::columns::TestColumnDefBuilder; @@ -173,7 +173,7 @@ fn test_region_request_builder() { async fn new_datanode_manager( region_server: &EchoRegionServer, region_routes: &[RegionRoute], -) -> DatanodeManagerRef { +) -> NodeManagerRef { let clients = DatanodeClients::default(); let datanodes = find_leaders(region_routes); diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index c9713de09cdd..f68209fbcbc4 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -105,7 +105,7 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; - use common_meta::datanode_manager::DatanodeManagerRef; + use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl::DdlContext; @@ -190,7 +190,7 @@ pub mod test_data { } } - pub(crate) fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { + pub(crate) fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); let mailbox_sequence = diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 46f235123caf..309edf4146b0 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -19,7 +19,7 @@ use std::{iter, mem}; use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader}; use api::v1::{DeleteRequests, RowDeleteRequests}; use catalog::CatalogManagerRef; -use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::Output; use common_telemetry::tracing_context::TracingContext; @@ -40,7 +40,7 @@ use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion}; pub struct Deleter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } pub type DeleterRef = Arc; @@ -49,7 +49,7 @@ impl Deleter { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 61d2d2cfcfa0..95ff84f13a1b 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -25,7 +25,7 @@ use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; -use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; @@ -57,7 +57,7 @@ use crate::statement::StatementExecutor; pub struct Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } pub type InserterRef = Arc; @@ -66,7 +66,7 @@ impl Inserter { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 7cfd5a8a1a21..768a2850aac3 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -18,7 +18,7 @@ use api::v1::region::region_request::Body as RegionRequestBody; use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader}; use catalog::CatalogManagerRef; use common_catalog::build_db_string; -use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_telemetry::logging::{error, info}; use common_telemetry::tracing_context::TracingContext; @@ -39,7 +39,7 @@ use crate::region_req_factory::RegionRequestFactory; pub struct Requester { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, } pub type RequesterRef = Arc; @@ -48,7 +48,7 @@ impl Requester { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: DatanodeManagerRef, + datanode_manager: NodeManagerRef, ) -> Self { Self { catalog_manager,