From e041c18c77b43d5b177ab21425405a736ed467da Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 10:04:53 +0000 Subject: [PATCH] chore: apply suggestions from CR --- src/common/meta/src/ddl/create_flow.rs | 36 +++++++++---------- src/common/meta/src/ddl/create_flow/check.rs | 1 - .../meta/src/ddl/create_flow/metadata.rs | 5 ++- src/common/meta/src/error.rs | 2 +- 4 files changed, 20 insertions(+), 24 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 0cce07497883..76f61e86b2ae 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -88,7 +88,7 @@ impl CreateFlowProcedure { for peer in &self.data.peers { let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { - body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), + body: Some(PbFlowRequest::Create((&self.data).into())), }; create_flow_task.push(async move { requester @@ -117,7 +117,7 @@ impl CreateFlowProcedure { // TODO(weny): Support `or_replace`. self.context .flow_task_metadata_manager - .create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value()) + .create_flow_task_metadata(flow_task_id, (&self.data).into()) .await?; info!("Created flow task metadata for flow {flow_task_id}"); Ok(Status::done_with_output(flow_task_id)) @@ -182,13 +182,10 @@ pub struct CreateFlowTaskData { pub(crate) source_table_ids: Vec, } -impl CreateFlowTaskData { - /// Converts to [CreateRequest] - /// # Panic - /// Panic if the `flow_task_id` is None. - fn to_create_flow_request(&self) -> CreateRequest { - let flow_task_id = self.flow_task_id.unwrap(); - let source_table_ids = &self.source_table_ids; +impl From<&CreateFlowTaskData> for CreateRequest { + fn from(value: &CreateFlowTaskData) -> Self { + let flow_task_id = value.flow_task_id.unwrap(); + let source_table_ids = &value.source_table_ids; CreateRequest { task_id: Some(api::v1::flow::TaskId { id: flow_task_id }), @@ -196,18 +193,19 @@ impl CreateFlowTaskData { .iter() .map(|table_id| api::v1::TableId { id: *table_id }) .collect_vec(), - sink_table_name: Some(self.task.sink_table_name.clone().into()), + sink_table_name: Some(value.task.sink_table_name.clone().into()), // Always be true create_if_not_exists: true, - expire_when: self.task.expire_when.clone(), - comment: self.task.comment.clone(), - sql: self.task.sql.clone(), - task_options: self.task.options.clone(), + expire_when: value.task.expire_when.clone(), + comment: value.task.comment.clone(), + sql: value.task.sql.clone(), + task_options: value.task.options.clone(), } } +} - /// Converts to [FlowTaskInfoValue]. - fn to_flow_task_info_value(&self) -> FlowTaskInfoValue { +impl From<&CreateFlowTaskData> for FlowTaskInfoValue { + fn from(value: &CreateFlowTaskData) -> Self { let CreateFlowTask { catalog_name, task_name, @@ -217,9 +215,9 @@ impl CreateFlowTaskData { sql, options, .. - } = self.task.clone(); + } = value.task.clone(); - let flownode_ids = self + let flownode_ids = value .peers .iter() .enumerate() @@ -227,7 +225,7 @@ impl CreateFlowTaskData { .collect::>(); FlowTaskInfoValue { - source_table_ids: self.source_table_ids.clone(), + source_table_ids: value.source_table_ids.clone(), sink_table_name, flownode_ids, catalog_name, diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs index 88bafcd3a5e7..d2e4dfa887cd 100644 --- a/src/common/meta/src/ddl/create_flow/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -20,7 +20,6 @@ use crate::error::{self, Result}; impl CreateFlowProcedure { /// Checks: /// - The new task name doesn't exist. - /// - The source tables exist. pub(crate) async fn check_creation(&self) -> Result<()> { let catalog_name = &self.data.task.catalog_name; let task_name = &self.data.task.task_name; diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index b9250054edd3..ce35ae91ca98 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -21,7 +21,7 @@ use crate::key::table_name::TableNameKey; impl CreateFlowProcedure { /// Allocates the [FlowTaskId]. pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { - //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. + // TODO(weny, ruihang): We don't support the partitions. It's always be 1, now. let partitions = 1; let (flow_task_id, peers) = self .context @@ -34,10 +34,9 @@ impl CreateFlowProcedure { Ok(()) } - /// Collects source table ids + /// Ensures all source tables exist and collects source table ids pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { // Ensures all source tables exist. - let keys = self .data .task diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 29dfcfc0757e..40d5070c3bb3 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -241,7 +241,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Task already exists, task: {}", task_name,))] + #[snafu(display("Task already exists: {}", task_name))] TaskAlreadyExists { task_name: String, location: Location,