From d63686ad58f8ab4903c8e362d5ee57ad76f878b8 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 13:16:34 +0000 Subject: [PATCH] chore: bump proto to b5412f7 --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 4 +- src/common/meta/src/ddl/create_flow.rs | 10 ++-- src/common/meta/src/ddl/create_flow/check.rs | 2 +- src/common/meta/src/ddl_manager.rs | 2 +- src/common/meta/src/rpc/ddl.rs | 48 ++++++++++---------- src/frontend/src/instance/grpc.rs | 8 ++-- src/operator/src/expr_factory.rs | 4 +- 9 files changed, 41 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eee95568ccd1..f0ea63351da4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3883,7 +3883,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=aba235025ac5643c12bfdcefd656af11ad58ea8e#aba235025ac5643c12bfdcefd656af11ad58ea8e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b5412f72257c18410fdccbb893fa5d245b846141#b5412f72257c18410fdccbb893fa5d245b846141" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 061e0e8b186f..0f6f7b6fad25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aba235025ac5643c12bfdcefd656af11ad58ea8e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b5412f72257c18410fdccbb893fa5d245b846141" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 3528e9b93576..bf7786bcaf66 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -478,8 +478,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::Alter(_)) => "ddl.alter", Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table", - Some(Expr::CreateFlowTask(_)) => "ddl.create_flow", - Some(Expr::DropFlowTask(_)) => "ddl.drop_flow", + Some(Expr::CreateFlow(_)) => "ddl.create_flow", + Some(Expr::DropFlow(_)) => "ddl.drop_flow", None => "ddl.empty", } } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 93dbdfa99cf7..3a22f7cbaf68 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -151,7 +151,7 @@ impl Procedure for CreateFlowProcedure { fn lock_key(&self) -> LockKey { let catalog_name = &self.data.task.catalog_name; - let task_name = &self.data.task.task_name; + let task_name = &self.data.task.flow_name; let sink_table_name = &self.data.task.sink_table_name; LockKey::new(vec![ @@ -195,7 +195,7 @@ impl From<&CreateFlowTaskData> for CreateRequest { let source_table_ids = &value.source_table_ids; CreateRequest { - task_id: Some(api::v1::flow::TaskId { id: flow_id }), + flow_id: Some(api::v1::flow::TaskId { id: flow_id }), source_table_ids: source_table_ids .iter() .map(|table_id| api::v1::TableId { id: *table_id }) @@ -206,7 +206,7 @@ impl From<&CreateFlowTaskData> for CreateRequest { expire_when: value.task.expire_when.clone(), comment: value.task.comment.clone(), sql: value.task.sql.clone(), - task_options: value.task.options.clone(), + flow_options: value.task.flow_options.clone(), } } } @@ -215,12 +215,12 @@ impl From<&CreateFlowTaskData> for FlowInfoValue { fn from(value: &CreateFlowTaskData) -> Self { let CreateFlowTask { catalog_name, - task_name, + flow_name: task_name, sink_table_name, expire_when, comment, sql, - options, + flow_options: options, .. } = value.task.clone(); diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs index ad59a633b2ee..2ca3c316284d 100644 --- a/src/common/meta/src/ddl/create_flow/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -24,7 +24,7 @@ impl CreateFlowProcedure { /// - The sink table doesn't exist. pub(crate) async fn check_creation(&self) -> Result<()> { let catalog_name = &self.data.task.catalog_name; - let task_name = &self.data.task.task_name; + let task_name = &self.data.task.flow_name; let sink_table_name = &self.data.task.sink_table_name; // Ensures the task name doesn't exist. diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index d208fc913b10..8af7211210c9 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -616,7 +616,7 @@ async fn handle_create_flow_task( })?); info!( "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", - create_flow_task.catalog_name, create_flow_task.task_name, + create_flow_task.catalog_name, create_flow_task.flow_name, ); Ok(SubmitDdlTaskResponse { diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 42c3d94286d7..9b75bd6c3963 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -26,8 +26,8 @@ use api::v1::meta::{ TruncateTableTask as PbTruncateTableTask, }; use api::v1::{ - AlterExpr, CreateDatabaseExpr, CreateFlowTaskExpr, CreateTableExpr, DropDatabaseExpr, - DropFlowTaskExpr, DropTableExpr, TruncateTableExpr, + AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr, + DropTableExpr, TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; @@ -733,7 +733,7 @@ impl TryFrom for PbDropDatabaseTask { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateFlowTask { pub catalog_name: String, - pub task_name: String, + pub flow_name: String, pub source_table_names: Vec, pub sink_table_name: TableName, pub or_replace: bool, @@ -741,16 +741,16 @@ pub struct CreateFlowTask { pub expire_when: String, pub comment: String, pub sql: String, - pub options: HashMap, + pub flow_options: HashMap, } impl TryFrom for CreateFlowTask { type Error = error::Error; fn try_from(pb: PbCreateFlowTask) -> Result { - let CreateFlowTaskExpr { + let CreateFlowExpr { catalog_name, - task_name, + flow_name, source_table_names, sink_table_name, or_replace, @@ -758,14 +758,14 @@ impl TryFrom for CreateFlowTask { expire_when, comment, sql, - task_options, - } = pb.create_flow_task.context(error::InvalidProtoMsgSnafu { - err_msg: "expected create_flow_task", + flow_options, + } = pb.create_flow.context(error::InvalidProtoMsgSnafu { + err_msg: "expected create_flow", })?; Ok(CreateFlowTask { catalog_name, - task_name, + flow_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: sink_table_name .context(error::InvalidProtoMsgSnafu { @@ -777,7 +777,7 @@ impl TryFrom for CreateFlowTask { expire_when, comment, sql, - options: task_options, + flow_options, }) } } @@ -786,7 +786,7 @@ impl From for PbCreateFlowTask { fn from( CreateFlowTask { catalog_name, - task_name, + flow_name, source_table_names, sink_table_name, or_replace, @@ -794,13 +794,13 @@ impl From for PbCreateFlowTask { expire_when, comment, sql, - options, + flow_options, }: CreateFlowTask, ) -> Self { PbCreateFlowTask { - create_flow_task: Some(CreateFlowTaskExpr { + create_flow: Some(CreateFlowExpr { catalog_name, - task_name, + flow_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: Some(sink_table_name.into()), or_replace, @@ -808,7 +808,7 @@ impl From for PbCreateFlowTask { expire_when, comment, sql, - task_options: options, + flow_options, }), } } @@ -817,22 +817,22 @@ impl From for PbCreateFlowTask { /// Drop flow pub struct DropFlowTask { pub catalog_name: String, - pub task_name: String, + pub flow_name: String, } impl TryFrom for DropFlowTask { type Error = error::Error; fn try_from(pb: PbDropFlowTask) -> Result { - let DropFlowTaskExpr { + let DropFlowExpr { catalog_name, - task_name, - } = pb.drop_flow_task.context(error::InvalidProtoMsgSnafu { + flow_name, + } = pb.drop_flow.context(error::InvalidProtoMsgSnafu { err_msg: "expected sink_table_name", })?; Ok(DropFlowTask { catalog_name, - task_name, + flow_name, }) } } @@ -841,13 +841,13 @@ impl From for PbDropFlowTask { fn from( DropFlowTask { catalog_name, - task_name, + flow_name, }: DropFlowTask, ) -> Self { PbDropFlowTask { - drop_flow_task: Some(DropFlowTaskExpr { + drop_flow: Some(DropFlowExpr { catalog_name, - task_name, + flow_name, }), } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 551a7da85d31..2009bc56381c 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -137,10 +137,10 @@ impl GrpcQueryHandler for Instance { TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); self.statement_executor.truncate_table(table_name).await? } - DdlExpr::CreateFlowTask(_) => { + DdlExpr::CreateFlow(_) => { unimplemented!() } - DdlExpr::DropFlowTask(_) => { + DdlExpr::DropFlow(_) => { unimplemented!() } } @@ -181,12 +181,12 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte Expr::TruncateTable(expr) => { check_and_fill!(expr); } - Expr::CreateFlowTask(expr) => { + Expr::CreateFlow(expr) => { if expr.catalog_name.is_empty() { expr.catalog_name = catalog.to_string(); } } - Expr::DropFlowTask(expr) => { + Expr::DropFlow(expr) => { if expr.catalog_name.is_empty() { expr.catalog_name = catalog.to_string(); } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 4e7aef084fb2..f0305ed981a5 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -541,7 +541,7 @@ pub fn to_create_flow_task_expr( Ok(CreateFlowTask { catalog_name: query_ctx.current_catalog().to_string(), - task_name: create_flow.flow_name.to_string(), + flow_name: create_flow.flow_name.to_string(), source_table_names, sink_table_name, or_replace: create_flow.or_replace, @@ -552,7 +552,7 @@ pub fn to_create_flow_task_expr( .unwrap_or_default(), comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - options: HashMap::new(), + flow_options: HashMap::new(), }) }