Skip to content

Commit

Permalink
chore: bump proto to b5412f7
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent 1e5c9d0 commit d63686a
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aba235025ac5643c12bfdcefd656af11ad58ea8e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b5412f72257c18410fdccbb893fa5d245b846141" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlowTask(_)) => "ddl.create_flow",
Some(Expr::DropFlowTask(_)) => "ddl.drop_flow",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
None => "ddl.empty",
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Procedure for CreateFlowProcedure {

fn lock_key(&self) -> LockKey {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;
let task_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;

LockKey::new(vec![
Expand Down Expand Up @@ -195,7 +195,7 @@ impl From<&CreateFlowTaskData> for CreateRequest {
let source_table_ids = &value.source_table_ids;

CreateRequest {
task_id: Some(api::v1::flow::TaskId { id: flow_id }),
flow_id: Some(api::v1::flow::TaskId { id: flow_id }),
source_table_ids: source_table_ids
.iter()
.map(|table_id| api::v1::TableId { id: *table_id })
Expand All @@ -206,7 +206,7 @@ impl From<&CreateFlowTaskData> for CreateRequest {
expire_when: value.task.expire_when.clone(),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
task_options: value.task.options.clone(),
flow_options: value.task.flow_options.clone(),
}
}
}
Expand All @@ -215,12 +215,12 @@ impl From<&CreateFlowTaskData> for FlowInfoValue {
fn from(value: &CreateFlowTaskData) -> Self {
let CreateFlowTask {
catalog_name,
task_name,
flow_name: task_name,
sink_table_name,
expire_when,
comment,
sql,
options,
flow_options: options,
..
} = value.task.clone();

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_flow/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl CreateFlowProcedure {
/// - The sink table doesn't exist.
pub(crate) async fn check_creation(&self) -> Result<()> {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;
let task_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;

// Ensures the task name doesn't exist.
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ async fn handle_create_flow_task(
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.task_name,
create_flow_task.catalog_name, create_flow_task.flow_name,
);

Ok(SubmitDdlTaskResponse {
Expand Down
48 changes: 24 additions & 24 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use api::v1::meta::{
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowTaskExpr, CreateTableExpr, DropDatabaseExpr,
DropFlowTaskExpr, DropTableExpr, TruncateTableExpr,
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr,
DropTableExpr, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
Expand Down Expand Up @@ -733,39 +733,39 @@ impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateFlowTask {
pub catalog_name: String,
pub task_name: String,
pub flow_name: String,
pub source_table_names: Vec<TableName>,
pub sink_table_name: TableName,
pub or_replace: bool,
pub create_if_not_exists: bool,
pub expire_when: String,
pub comment: String,
pub sql: String,
pub options: HashMap<String, String>,
pub flow_options: HashMap<String, String>,
}

impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
type Error = error::Error;

fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
let CreateFlowTaskExpr {
let CreateFlowExpr {
catalog_name,
task_name,
flow_name,
source_table_names,
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
comment,
sql,
task_options,
} = pb.create_flow_task.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create_flow_task",
flow_options,
} = pb.create_flow.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create_flow",
})?;

Ok(CreateFlowTask {
catalog_name,
task_name,
flow_name,
source_table_names: source_table_names.into_iter().map(Into::into).collect(),
sink_table_name: sink_table_name
.context(error::InvalidProtoMsgSnafu {
Expand All @@ -777,7 +777,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
expire_when,
comment,
sql,
options: task_options,
flow_options,
})
}
}
Expand All @@ -786,29 +786,29 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
fn from(
CreateFlowTask {
catalog_name,
task_name,
flow_name,
source_table_names,
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
comment,
sql,
options,
flow_options,
}: CreateFlowTask,
) -> Self {
PbCreateFlowTask {
create_flow_task: Some(CreateFlowTaskExpr {
create_flow: Some(CreateFlowExpr {
catalog_name,
task_name,
flow_name,
source_table_names: source_table_names.into_iter().map(Into::into).collect(),
sink_table_name: Some(sink_table_name.into()),
or_replace,
create_if_not_exists,
expire_when,
comment,
sql,
task_options: options,
flow_options,
}),
}
}
Expand All @@ -817,22 +817,22 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
/// Drop flow
pub struct DropFlowTask {
pub catalog_name: String,
pub task_name: String,
pub flow_name: String,
}

impl TryFrom<PbDropFlowTask> for DropFlowTask {
type Error = error::Error;

fn try_from(pb: PbDropFlowTask) -> Result<Self> {
let DropFlowTaskExpr {
let DropFlowExpr {
catalog_name,
task_name,
} = pb.drop_flow_task.context(error::InvalidProtoMsgSnafu {
flow_name,
} = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
err_msg: "expected sink_table_name",
})?;
Ok(DropFlowTask {
catalog_name,
task_name,
flow_name,
})
}
}
Expand All @@ -841,13 +841,13 @@ impl From<DropFlowTask> for PbDropFlowTask {
fn from(
DropFlowTask {
catalog_name,
task_name,
flow_name,
}: DropFlowTask,
) -> Self {
PbDropFlowTask {
drop_flow_task: Some(DropFlowTaskExpr {
drop_flow: Some(DropFlowExpr {
catalog_name,
task_name,
flow_name,
}),
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ impl GrpcQueryHandler for Instance {
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor.truncate_table(table_name).await?
}
DdlExpr::CreateFlowTask(_) => {
DdlExpr::CreateFlow(_) => {
unimplemented!()
}
DdlExpr::DropFlowTask(_) => {
DdlExpr::DropFlow(_) => {
unimplemented!()
}
}
Expand Down Expand Up @@ -181,12 +181,12 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
Expr::TruncateTable(expr) => {
check_and_fill!(expr);
}
Expr::CreateFlowTask(expr) => {
Expr::CreateFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
Expr::DropFlowTask(expr) => {
Expr::DropFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ pub fn to_create_flow_task_expr(

Ok(CreateFlowTask {
catalog_name: query_ctx.current_catalog().to_string(),
task_name: create_flow.flow_name.to_string(),
flow_name: create_flow.flow_name.to_string(),
source_table_names,
sink_table_name,
or_replace: create_flow.or_replace,
Expand All @@ -552,7 +552,7 @@ pub fn to_create_flow_task_expr(
.unwrap_or_default(),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
options: HashMap::new(),
flow_options: HashMap::new(),
})
}

Expand Down

0 comments on commit d63686a

Please sign in to comment.