From 78b6626ba6f2f6f09c579ad117b9ed39ef1fa107 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 26 Apr 2024 08:13:27 +0000 Subject: [PATCH] feat: implement the `CreateFlowTaskProcedure` --- src/common/meta/src/ddl.rs | 3 +- src/common/meta/src/ddl/create_flow_task.rs | 242 ++++++++++++++++++ .../meta/src/ddl/create_flow_task/check.rs | 44 ++++ .../meta/src/ddl/create_flow_task/metadata.rs | 62 +++++ src/common/meta/src/ddl_manager.rs | 2 +- src/common/meta/src/error.rs | 8 +- src/common/meta/src/key/flow_task.rs | 65 +++-- .../meta/src/key/flow_task/flow_task_info.rs | 7 +- .../meta/src/key/flow_task/flow_task_name.rs | 7 + src/common/meta/src/metrics.rs | 6 + src/common/meta/src/rpc/ddl.rs | 1 + 11 files changed, 416 insertions(+), 31 deletions(-) create mode 100644 src/common/meta/src/ddl/create_flow_task.rs create mode 100644 src/common/meta/src/ddl/create_flow_task/check.rs create mode 100644 src/common/meta/src/ddl/create_flow_task/metadata.rs diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 3bca6928856d..f32fbf244bfb 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; -use self::task_meta::FlowTaskMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::NodeManagerRef; use crate::ddl::table_meta::TableMetadataAllocatorRef; +use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef; use crate::error::Result; use crate::key::flow_task::FlowTaskMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; @@ -33,6 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu pub mod alter_logical_tables; pub mod alter_table; pub mod create_database; +pub mod create_flow_task; pub mod create_logical_tables; pub mod create_table; mod create_table_template; diff --git a/src/common/meta/src/ddl/create_flow_task.rs b/src/common/meta/src/ddl/create_flow_task.rs new file mode 100644 index 000000000000..c61757bc49e1 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod check; +mod metadata; + +use std::collections::BTreeMap; + +use api::v1::flow::flow_request::Body as PbFlowRequest; +use api::v1::flow::{CreateRequest, FlowRequest}; +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::info; +use futures::future::join_all; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use strum::AsRefStr; +use table::metadata::TableId; + +use super::utils::add_peer_context_if_needed; +use crate::ddl::utils::handle_retry_error; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::key::flow_task::flow_task_info::FlowTaskInfoValue; +use crate::key::FlowTaskId; +use crate::lock_key::{CatalogLock, FlowTaskNameLock}; +use crate::peer::Peer; +use crate::rpc::ddl::CreateFlowTask; +use crate::{metrics, ClusterId}; + +/// The procedure of flow task creation. +pub struct CreateFlowTaskProcedure { + pub context: DdlContext, + pub data: CreateFlowTaskData, +} + +impl CreateFlowTaskProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask"; + + /// Returns a new [CreateFlowTaskProcedure]. + pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { + Self { + context, + data: CreateFlowTaskData { + cluster_id, + task, + flow_task_id: None, + peers: vec![], + source_table_ids: vec![], + state: CreateFlowTaskState::CreateMetadata, + }, + } + } + + /// Deserializes from `json`. + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(CreateFlowTaskProcedure { context, data }) + } + + async fn on_prepare(&mut self) -> Result { + self.check_creation().await?; + self.collect_source_tables().await?; + self.allocate_flow_task_id().await?; + self.data.state = CreateFlowTaskState::FlownodeCreateFlows; + + Ok(Status::executing(true)) + } + + async fn on_flownode_create_flow(&mut self) -> Result { + self.data.state = CreateFlowTaskState::CreateMetadata; + // Safety: must be allocated. + let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); + for peer in &self.data.peers { + let requester = self.context.datanode_manager.flownode(peer).await; + let request = FlowRequest { + body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), + }; + create_flow_task.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer.clone())) + }); + } + + join_all(create_flow_task) + .await + .into_iter() + .collect::>>()?; + + self.data.state = CreateFlowTaskState::CreateMetadata; + Ok(Status::executing(true)) + } + + /// Creates flow task metadata. + /// + /// Abort(not-retry): + /// - Failed to create table metadata. + async fn on_create_metadata(&mut self) -> Result { + // Safety: The flow task id must be allocated. + let flow_task_id = self.data.flow_task_id.unwrap(); + // TODO(weny): Support `or_replace`. + self.context + .flow_task_metadata_manager + .create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value()) + .await?; + info!("Created flow task metadata for flow task {flow_task_id}"); + Ok(Status::done_with_output(flow_task_id)) + } +} + +#[async_trait] +impl Procedure for CreateFlowTaskProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + CreateFlowTaskState::Prepare => self.on_prepare().await, + CreateFlowTaskState::FlownodeCreateFlows => self.on_flownode_create_flow().await, + CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + + LockKey::new(vec![ + CatalogLock::Read(catalog_name).into(), + FlowTaskNameLock::new(catalog_name, task_name).into(), + ]) + } +} + +/// The state of [CreateFlowTaskProcedure]. +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] +pub enum CreateFlowTaskState { + /// Prepares to create the flow. + Prepare, + /// Creates flows on the flownode. + FlownodeCreateFlows, + /// Create metadata. + CreateMetadata, +} + +/// The serializable data. +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateFlowTaskData { + pub(crate) cluster_id: ClusterId, + pub(crate) state: CreateFlowTaskState, + pub(crate) task: CreateFlowTask, + pub(crate) flow_task_id: Option, + pub(crate) peers: Vec, + pub(crate) source_table_ids: Vec, +} + +impl CreateFlowTaskData { + /// Converts to [CreateRequest] + /// # Panic + /// Panic if the `flow_task_id` is None. + fn to_create_flow_request(&self) -> CreateRequest { + let flow_task_id = self.flow_task_id.unwrap(); + let source_table_ids = &self.source_table_ids; + + CreateRequest { + task_id: Some(api::v1::flow::TaskId { id: flow_task_id }), + source_table_ids: source_table_ids + .iter() + .map(|table_id| api::v1::TableId { id: *table_id }) + .collect_vec(), + sink_table_name: Some(self.task.sink_table_name.clone().into()), + // Always be true + create_if_not_exists: true, + expire_when: self.task.expire_when.clone(), + comment: self.task.comment.clone(), + sql: self.task.sql.clone(), + task_options: self.task.options.clone(), + } + } + + /// Converts to [FlowTaskInfoValue]. + fn to_flow_task_info_value(&self) -> FlowTaskInfoValue { + let CreateFlowTask { + catalog_name, + task_name, + sink_table_name, + expire_when, + comment, + sql, + options, + .. + } = self.task.clone(); + + let flownode_ids = self + .peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, peer.id)) + .collect::>(); + + FlowTaskInfoValue { + source_table_ids: self.source_table_ids.clone(), + sink_table_name, + flownode_ids, + catalog_name, + task_name, + raw_sql: sql, + expire_when, + comment, + options, + } + } +} diff --git a/src/common/meta/src/ddl/create_flow_task/check.rs b/src/common/meta/src/ddl/create_flow_task/check.rs new file mode 100644 index 000000000000..0f245f675453 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task/check.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; + +use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::error::{self, Result}; + +impl CreateFlowTaskProcedure { + /// Checks: + /// - The new task name doesn't exist. + /// - The source tables exist. + pub(crate) async fn check_creation(&self) -> Result<()> { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + + // Ensures the task name doesn't exist. + let exists = self + .context + .flow_task_metadata_manager + .flow_task_name_manager() + .exists(catalog_name, task_name) + .await?; + ensure!( + !exists, + error::TaskAlreadyExistsSnafu { + task_name: format!("{}.{}", catalog_name, task_name), + } + ); + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_flow_task/metadata.rs b/src/common/meta/src/ddl/create_flow_task/metadata.rs new file mode 100644 index 000000000000..17747f5870bd --- /dev/null +++ b/src/common/meta/src/ddl/create_flow_task/metadata.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; + +impl CreateFlowTaskProcedure { + /// Allocates the [FlowTaskId]. + pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { + //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. + let partitions = 1; + let (flow_task_id, peers) = self + .context + .flow_task_metadata_allocator + .create(partitions) + .await?; + self.data.flow_task_id = Some(flow_task_id); + self.data.peers = peers; + + Ok(()) + } + + /// Collects source table ids + pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { + // Ensures all source tables exist. + let mut source_table_ids = Vec::with_capacity(self.data.task.source_table_names.len()); + + for name in &self.data.task.source_table_names { + let key = TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name); + match self + .context + .table_metadata_manager + .table_name_manager() + .get(key) + .await? + { + Some(value) => source_table_ids.push(value.table_id()), + None => { + return error::TableNotFoundSnafu { + table_name: name.to_string(), + } + .fail(); + } + } + } + + self.data.source_table_ids = source_table_ids; + Ok(()) + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 13cf1b8ff76c..d14ef809fc14 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -714,7 +714,7 @@ mod tests { use crate::state_store::KvStateStore; use crate::wal_options_allocator::WalOptionsAllocator; - /// A dummy implemented [DatanodeManager]. + /// A dummy implemented [NodeManager]. pub struct DummyDatanodeManager; #[async_trait::async_trait] diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 323b2c0fee10..29dfcfc0757e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -23,7 +23,6 @@ use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::key::FlowTaskId; use crate::peer::Peer; use crate::DatanodeId; @@ -242,14 +241,9 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Task already exists, task: {}, flow_task_id: {}", - task_name, - flow_task_id - ))] + #[snafu(display("Task already exists, task: {}", task_name,))] TaskAlreadyExists { task_name: String, - flow_task_id: FlowTaskId, location: Location, }, diff --git a/src/common/meta/src/key/flow_task.rs b/src/common/meta/src/key/flow_task.rs index 251b15ab77ca..5e1c2e427ab6 100644 --- a/src/common/meta/src/key/flow_task.rs +++ b/src/common/meta/src/key/flow_task.rs @@ -90,7 +90,7 @@ pub type FlowTaskMetadataManagerRef = Arc; /// - Retrieve metadata of the task. /// - Delete metadata of the task. pub struct FlowTaskMetadataManager { - flow_task_manager: FlowTaskInfoManager, + flow_task_info_manager: FlowTaskInfoManager, flownode_task_manager: FlownodeTaskManager, table_task_manager: TableTaskManager, flow_task_name_manager: FlowTaskNameManager, @@ -101,7 +101,7 @@ impl FlowTaskMetadataManager { /// Returns a new [FlowTaskMetadataManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { - flow_task_manager: FlowTaskInfoManager::new(kv_backend.clone()), + flow_task_info_manager: FlowTaskInfoManager::new(kv_backend.clone()), flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()), flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()), table_task_manager: TableTaskManager::new(kv_backend.clone()), @@ -109,9 +109,14 @@ impl FlowTaskMetadataManager { } } - /// Returns the [FlowTaskManager]. - pub fn flow_task_manager(&self) -> &FlowTaskInfoManager { - &self.flow_task_manager + /// Returns the [FlowTaskNameManager]. + pub fn flow_task_name_manager(&self) -> &FlowTaskNameManager { + &self.flow_task_name_manager + } + + /// Returns the [FlowTaskInfoManager]. + pub fn flow_task_info_manager(&self) -> &FlowTaskInfoManager { + &self.flow_task_info_manager } /// Returns the [FlownodeTaskManager]. @@ -138,7 +143,7 @@ impl FlowTaskMetadataManager { )?; let (create_flow_task_txn, on_create_flow_task_failure) = - self.flow_task_manager.build_create_txn( + self.flow_task_info_manager.build_create_txn( &flow_task_value.catalog_name, flow_task_id, &flow_task_value, @@ -195,7 +200,6 @@ impl FlowTaskMetadataManager { "{}.{}", flow_task_value.catalog_name, flow_task_value.task_name ), - flow_task_id, } .fail(); } @@ -226,6 +230,7 @@ mod tests { use crate::key::flow_task::table_task::TableTaskKey; use crate::key::scope::CatalogScoped; use crate::kv_backend::memory::MemoryKvBackend; + use crate::table_name::TableName; #[derive(Debug)] struct MockKey { @@ -276,11 +281,16 @@ mod tests { let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone()); let task_id = 10; let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -297,7 +307,7 @@ mod tests { .await .unwrap(); let got = flow_metadata_manager - .flow_task_manager() + .flow_task_info_manager() .get(catalog_name, task_id) .await .unwrap() @@ -335,11 +345,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -352,10 +368,10 @@ mod tests { .unwrap(); // Creates again. let flow_task_value = FlowTaskInfoValue { - catalog_name: "greptime".to_string(), + catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -374,11 +390,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -390,11 +412,16 @@ mod tests { .await .unwrap(); // Creates again. + let another_sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "another_sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2048, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), diff --git a/src/common/meta/src/key/flow_task/flow_task_info.rs b/src/common/meta/src/key/flow_task/flow_task_info.rs index f30d3217f8a6..371ab96a1e41 100644 --- a/src/common/meta/src/key/flow_task/flow_task_info.rs +++ b/src/common/meta/src/key/flow_task/flow_task_info.rs @@ -29,6 +29,7 @@ use crate::key::{ }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::table_name::TableName; use crate::FlownodeId; const FLOW_TASK_INFO_KEY_PREFIX: &str = "info"; @@ -117,9 +118,9 @@ impl MetaKey for FlowTaskInfoKeyInner { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowTaskInfoValue { /// The source tables used by the task. - pub(crate) source_tables: Vec, + pub(crate) source_table_ids: Vec, /// The sink table used by the task. - pub(crate) sink_table: TableId, + pub(crate) sink_table_name: TableName, /// Which flow nodes this task is running on. pub(crate) flownode_ids: BTreeMap, /// The catalog name. @@ -144,7 +145,7 @@ impl FlowTaskInfoValue { /// Returns the `source_table`. pub fn source_table_ids(&self) -> &[TableId] { - &self.source_tables + &self.source_table_ids } } diff --git a/src/common/meta/src/key/flow_task/flow_task_name.rs b/src/common/meta/src/key/flow_task/flow_task_name.rs index 9828283e6401..eaf6da5ae848 100644 --- a/src/common/meta/src/key/flow_task/flow_task_name.rs +++ b/src/common/meta/src/key/flow_task/flow_task_name.rs @@ -149,6 +149,13 @@ impl FlowTaskNameManager { .transpose() } + /// Returns true if the `task` exists. + pub async fn exists(&self, catalog: &str, task: &str) -> Result { + let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string()); + let raw_key = key.to_bytes(); + self.kv_backend.exists(&raw_key).await + } + /// Builds a create flow task name transaction. /// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 4e810195bb2b..0a47b1de1463 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,6 +39,12 @@ lazy_static! { &["step"] ) .unwrap(); + pub static ref METRIC_META_PROCEDURE_CREATE_FLOW_TASK: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_flow_task", + "meta procedure create flow task", + &["step"] + ) + .unwrap(); pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!( "greptime_meta_procedure_create_tables", "meta procedure create tables", diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index a7e14161ecc6..c160f243e482 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -724,6 +724,7 @@ impl TryFrom for PbDropDatabaseTask { } /// Create flow task +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateFlowTask { pub catalog_name: String, pub task_name: String,