diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index fb6800c1406c..5bdcb1f68e2b 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -33,7 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu pub mod alter_logical_tables; pub mod alter_table; pub mod create_database; -pub mod create_flow_task; +pub mod create_flow; pub mod create_logical_tables; pub mod create_table; mod create_table_template; diff --git a/src/common/meta/src/ddl/create_flow_task.rs b/src/common/meta/src/ddl/create_flow.rs similarity index 96% rename from src/common/meta/src/ddl/create_flow_task.rs rename to src/common/meta/src/ddl/create_flow.rs index c61757bc49e1..656d20634855 100644 --- a/src/common/meta/src/ddl/create_flow_task.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -44,15 +44,15 @@ use crate::rpc::ddl::CreateFlowTask; use crate::{metrics, ClusterId}; /// The procedure of flow task creation. -pub struct CreateFlowTaskProcedure { +pub struct CreateFlowProcedure { pub context: DdlContext, pub data: CreateFlowTaskData, } -impl CreateFlowTaskProcedure { +impl CreateFlowProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask"; - /// Returns a new [CreateFlowTaskProcedure]. + /// Returns a new [CreateFlowProcedure]. pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { Self { context, @@ -70,7 +70,7 @@ impl CreateFlowTaskProcedure { /// Deserializes from `json`. pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(CreateFlowTaskProcedure { context, data }) + Ok(CreateFlowProcedure { context, data }) } async fn on_prepare(&mut self) -> Result { @@ -87,7 +87,7 @@ impl CreateFlowTaskProcedure { // Safety: must be allocated. let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); for peer in &self.data.peers { - let requester = self.context.datanode_manager.flownode(peer).await; + let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), }; @@ -126,7 +126,7 @@ impl CreateFlowTaskProcedure { } #[async_trait] -impl Procedure for CreateFlowTaskProcedure { +impl Procedure for CreateFlowProcedure { fn type_name(&self) -> &str { Self::TYPE_NAME } diff --git a/src/common/meta/src/ddl/create_flow_task/check.rs b/src/common/meta/src/ddl/create_flow/check.rs similarity index 93% rename from src/common/meta/src/ddl/create_flow_task/check.rs rename to src/common/meta/src/ddl/create_flow/check.rs index 0f245f675453..88bafcd3a5e7 100644 --- a/src/common/meta/src/ddl/create_flow_task/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -14,10 +14,10 @@ use snafu::ensure; -use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::ddl::create_flow::CreateFlowProcedure; use crate::error::{self, Result}; -impl CreateFlowTaskProcedure { +impl CreateFlowProcedure { /// Checks: /// - The new task name doesn't exist. /// - The source tables exist. diff --git a/src/common/meta/src/ddl/create_flow_task/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs similarity index 95% rename from src/common/meta/src/ddl/create_flow_task/metadata.rs rename to src/common/meta/src/ddl/create_flow/metadata.rs index 17747f5870bd..1383eb8eda3f 100644 --- a/src/common/meta/src/ddl/create_flow_task/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::ddl::create_flow_task::CreateFlowTaskProcedure; +use crate::ddl::create_flow::CreateFlowProcedure; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; -impl CreateFlowTaskProcedure { +impl CreateFlowProcedure { /// Allocates the [FlowTaskId]. pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.