From b88f1d1cd2b6d3556823ebf928944f00e78ae380 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 03:16:14 +0000 Subject: [PATCH] feat: invoke create flow procedure --- src/common/meta/src/ddl_manager.rs | 55 +++++++++++++++++++++++++++--- src/common/meta/src/rpc/ddl.rs | 8 ++++- src/operator/src/expr_factory.rs | 19 +++++------ src/operator/src/statement/ddl.rs | 23 ++++++++++--- 4 files changed, 85 insertions(+), 20 deletions(-) diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index f71744f9736d..99fb97d8e7d9 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -26,6 +26,7 @@ use store_api::storage::TableId; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_database::CreateDatabaseProcedure; +use crate::ddl::create_flow::CreateFlowProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; @@ -42,12 +43,12 @@ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::rpc::ddl::DdlTask::{ - AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase, - DropLogicalTables, DropTable, TruncateTable, + AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable, + DropDatabase, DropLogicalTables, DropTable, TruncateTable, }; use crate::rpc::ddl::{ - AlterTableTask, CreateDatabaseTask, CreateTableTask, DropDatabaseTask, DropTableTask, - SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, + AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask, + DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -312,6 +313,20 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes a create flow task. + pub async fn submit_create_flow_task( + &self, + cluster_id: ClusterId, + create_flow: CreateFlowTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = CreateFlowProcedure::new(cluster_id, create_flow, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a truncate table task. pub async fn submit_truncate_table_task( @@ -572,6 +587,35 @@ async fn handle_drop_database_task( }) } +async fn handle_create_flow_task( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + create_flow_task: CreateFlowTask, +) -> Result { + let (id, output) = ddl_manager + .submit_create_flow_task(cluster_id, create_flow_task.clone()) + .await?; + + let procedure_id = id.to_string(); + let output = output.context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "empty output", + })?; + let flow_id = *(output.downcast_ref::().context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "downcast to `u32`", + })?); + info!( + "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", + create_flow_task.catalog_name, create_flow_task.flow_name, + ); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + ..Default::default() + }) +} + async fn handle_alter_logical_table_tasks( ddl_manager: &DdlManager, cluster_id: ClusterId, @@ -651,6 +695,9 @@ impl ProcedureExecutor for DdlManager { DropDatabase(drop_database_task) => { handle_drop_database_task(self, cluster_id, drop_database_task).await } + CreateFlow(create_flow_task) => { + handle_create_flow_task(self, cluster_id, create_flow_task).await + } } } .trace(span) diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 660e83439389..c1b1aabb264f 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -51,9 +51,14 @@ pub enum DdlTask { AlterLogicalTables(Vec), CreateDatabase(CreateDatabaseTask), DropDatabase(DropDatabaseTask), + CreateFlow(CreateFlowTask), } impl DdlTask { + pub fn new_create_flow(expr: CreateFlowTask) -> Self { + DdlTask::CreateFlow(expr) + } + pub fn new_create_table( expr: CreateTableExpr, partitions: Vec, @@ -182,7 +187,7 @@ impl TryFrom for DdlTask { Task::DropDatabaseTask(drop_database) => { Ok(DdlTask::DropDatabase(drop_database.try_into()?)) } - Task::CreateFlowTask(_) => unimplemented!(), + Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)), Task::DropFlowTask(_) => unimplemented!(), } } @@ -228,6 +233,7 @@ impl TryFrom for PbDdlTaskRequest { } DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?), DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?), + DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()), }; Ok(Self { diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 855a6a7ffbc4..47e3a2cc931c 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -18,11 +18,12 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, - CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, - TableName, + CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; +use common_meta::rpc::ddl::CreateFlowTask; +use common_meta::table_name::TableName; use common_time::Timezone; use datafusion::sql::planner::object_name_to_table_reference; use datatypes::schema::{ColumnSchema, COMMENT_KEY}; @@ -493,7 +494,7 @@ pub(crate) fn to_alter_expr( pub fn to_create_flow_task_expr( create_flow: CreateFlow, query_ctx: QueryContextRef, -) -> Result { +) -> Result { // retrieve sink table name let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true) @@ -537,22 +538,20 @@ pub fn to_create_flow_task_expr( }) .collect::>>()?; - Ok(CreateFlowTaskExpr { + Ok(CreateFlowTask { catalog_name: query_ctx.current_catalog().to_string(), - task_name: create_flow.flow_name.to_string(), + flow_name: create_flow.flow_name.to_string(), source_table_names, - sink_table_name: Some(sink_table_name), - create_if_not_exists: create_flow.if_not_exists, + sink_table_name, or_replace: create_flow.or_replace, - // TODO(ruihang): change this field to optional in proto + create_if_not_exists: create_flow.if_not_exists, expire_when: create_flow .expire_when .map(|e| e.to_string()) .unwrap_or_default(), - // TODO(ruihang): change this field to optional in proto comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - task_options: HashMap::new(), + options: HashMap::new(), }) } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index a6b13a91722f..f7196b8f1322 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -27,7 +27,7 @@ use common_meta::ddl::ExecutorContext; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::NAME_PATTERN; -use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use common_meta::rpc::ddl::{CreateFlowTask, DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; @@ -323,14 +323,27 @@ impl StatementExecutor { } #[tracing::instrument(skip_all)] - pub async fn create_flow(&self, stmt: CreateFlow, query_ctx: QueryContextRef) -> Result<()> { + pub async fn create_flow( + &self, + stmt: CreateFlow, + query_ctx: QueryContextRef, + ) -> Result { // TODO(ruihang): do some verification + let expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; - let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; + self.create_flow_procedure(expr).await?; + Ok(Output::new_with_affected_rows(0)) + } - // TODO: invoke procedure + async fn create_flow_procedure(&self, expr: CreateFlowTask) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_create_flow(expr), + }; - Ok(()) + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) } #[tracing::instrument(skip_all)]