Skip to content

Commit

Permalink
feat: invoke create flow procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent 2b75c98 commit b88f1d1
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 20 deletions.
55 changes: 51 additions & 4 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use store_api::storage::TableId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_database::DropDatabaseProcedure;
Expand All @@ -42,12 +43,12 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase,
DropLogicalTables, DropTable, TruncateTable,
AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable,
DropDatabase, DropLogicalTables, DropTable, TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateDatabaseTask, CreateTableTask, DropDatabaseTask, DropTableTask,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask,
DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -312,6 +313,20 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a create flow task.
pub async fn submit_create_flow_task(
&self,
cluster_id: ClusterId,
create_flow: CreateFlowTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateFlowProcedure::new(cluster_id, create_flow, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a truncate table task.
pub async fn submit_truncate_table_task(
Expand Down Expand Up @@ -572,6 +587,35 @@ async fn handle_drop_database_task(
})
}

async fn handle_create_flow_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
create_flow_task: CreateFlowTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, output) = ddl_manager
.submit_create_flow_task(cluster_id, create_flow_task.clone())
.await?;

let procedure_id = id.to_string();
let output = output.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "empty output",
})?;
let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}

async fn handle_alter_logical_table_tasks(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
Expand Down Expand Up @@ -651,6 +695,9 @@ impl ProcedureExecutor for DdlManager {
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, cluster_id, drop_database_task).await
}
CreateFlow(create_flow_task) => {
handle_create_flow_task(self, cluster_id, create_flow_task).await
}
}
}
.trace(span)
Expand Down
8 changes: 7 additions & 1 deletion src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,14 @@ pub enum DdlTask {
AlterLogicalTables(Vec<AlterTableTask>),
CreateDatabase(CreateDatabaseTask),
DropDatabase(DropDatabaseTask),
CreateFlow(CreateFlowTask),
}

impl DdlTask {
pub fn new_create_flow(expr: CreateFlowTask) -> Self {
DdlTask::CreateFlow(expr)
}

pub fn new_create_table(
expr: CreateTableExpr,
partitions: Vec<Partition>,
Expand Down Expand Up @@ -182,7 +187,7 @@ impl TryFrom<Task> for DdlTask {
Task::DropDatabaseTask(drop_database) => {
Ok(DdlTask::DropDatabase(drop_database.try_into()?))
}
Task::CreateFlowTask(_) => unimplemented!(),
Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
Task::DropFlowTask(_) => unimplemented!(),
}
}
Expand Down Expand Up @@ -228,6 +233,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
}
DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
};

Ok(Self {
Expand Down
19 changes: 9 additions & 10 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension,
CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
TableName,
CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use common_meta::rpc::ddl::CreateFlowTask;
use common_meta::table_name::TableName;
use common_time::Timezone;
use datafusion::sql::planner::object_name_to_table_reference;
use datatypes::schema::{ColumnSchema, COMMENT_KEY};
Expand Down Expand Up @@ -493,7 +494,7 @@ pub(crate) fn to_alter_expr(
pub fn to_create_flow_task_expr(
create_flow: CreateFlow,
query_ctx: QueryContextRef,
) -> Result<CreateFlowTaskExpr> {
) -> Result<CreateFlowTask> {
// retrieve sink table name
let sink_table_ref =
object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
Expand Down Expand Up @@ -537,22 +538,20 @@ pub fn to_create_flow_task_expr(
})
.collect::<Result<Vec<_>>>()?;

Ok(CreateFlowTaskExpr {
Ok(CreateFlowTask {
catalog_name: query_ctx.current_catalog().to_string(),
task_name: create_flow.flow_name.to_string(),
flow_name: create_flow.flow_name.to_string(),
source_table_names,
sink_table_name: Some(sink_table_name),
create_if_not_exists: create_flow.if_not_exists,
sink_table_name,
or_replace: create_flow.or_replace,
// TODO(ruihang): change this field to optional in proto
create_if_not_exists: create_flow.if_not_exists,
expire_when: create_flow
.expire_when
.map(|e| e.to_string())
.unwrap_or_default(),
// TODO(ruihang): change this field to optional in proto
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
task_options: HashMap::new(),
options: HashMap::new(),
})
}

Expand Down
23 changes: 18 additions & 5 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::NAME_PATTERN;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::ddl::{CreateFlowTask, DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
Expand Down Expand Up @@ -323,14 +323,27 @@ impl StatementExecutor {
}

#[tracing::instrument(skip_all)]
pub async fn create_flow(&self, stmt: CreateFlow, query_ctx: QueryContextRef) -> Result<()> {
pub async fn create_flow(
&self,
stmt: CreateFlow,
query_ctx: QueryContextRef,
) -> Result<Output> {
// TODO(ruihang): do some verification
let expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?;

let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?;
self.create_flow_procedure(expr).await?;
Ok(Output::new_with_affected_rows(0))
}

// TODO: invoke procedure
async fn create_flow_procedure(&self, expr: CreateFlowTask) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_create_flow(expr),
};

Ok(())
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}

#[tracing::instrument(skip_all)]
Expand Down

0 comments on commit b88f1d1

Please sign in to comment.