Skip to content

Commit

Permalink
feat: implement the CreateFlowTaskProcedure
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 28, 2024
1 parent 0049b8c commit 330dab5
Show file tree
Hide file tree
Showing 11 changed files with 415 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
pub mod create_flow_task;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
Expand Down
242 changes: 242 additions & 0 deletions src/common/meta/src/ddl/create_flow_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod check;
mod metadata;

use std::collections::BTreeMap;

use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest};
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::info;
use futures::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use strum::AsRefStr;
use table::metadata::TableId;

use super::utils::add_peer_context_if_needed;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::flow_task::flow_task_info::FlowTaskInfoValue;
use crate::key::FlowTaskId;
use crate::lock_key::{CatalogLock, FlowTaskNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateFlowTask;
use crate::{metrics, ClusterId};

/// The procedure of flow task creation.
pub struct CreateFlowTaskProcedure {
pub context: DdlContext,
pub data: CreateFlowTaskData,
}

impl CreateFlowTaskProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask";

/// Returns a new [CreateFlowTaskProcedure].
pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self {
Self {
context,
data: CreateFlowTaskData {
cluster_id,
task,
flow_task_id: None,
peers: vec![],
source_table_ids: vec![],
state: CreateFlowTaskState::CreateMetadata,
},
}
}

/// Deserializes from `json`.
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(CreateFlowTaskProcedure { context, data })
}

async fn on_prepare(&mut self) -> Result<Status> {
self.check_creation().await?;
self.collect_source_tables().await?;
self.allocate_flow_task_id().await?;
self.data.state = CreateFlowTaskState::FlownodeCreateFlows;

Ok(Status::executing(true))
}

async fn on_flownode_create_flow(&mut self) -> Result<Status> {
self.data.state = CreateFlowTaskState::CreateMetadata;
// Safety: must be allocated.
let mut create_flow_task = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.datanode_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())),
};
create_flow_task.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer.clone()))
});
}

join_all(create_flow_task)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

self.data.state = CreateFlowTaskState::CreateMetadata;
Ok(Status::executing(true))
}

/// Creates flow task metadata.
///
/// Abort(not-retry):
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow task id must be allocated.
let flow_task_id = self.data.flow_task_id.unwrap();
// TODO(weny): Support `or_replace`.
self.context
.flow_task_metadata_manager
.create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value())
.await?;
info!("Created flow task metadata for flow task {flow_task_id}");
Ok(Status::done_with_output(flow_task_id))
}
}

#[async_trait]
impl Procedure for CreateFlowTaskProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;

let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK
.with_label_values(&[state.as_ref()])
.start_timer();

match state {
CreateFlowTaskState::Prepare => self.on_prepare().await,
CreateFlowTaskState::FlownodeCreateFlows => self.on_flownode_create_flow().await,
CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;

LockKey::new(vec![
CatalogLock::Read(catalog_name).into(),
FlowTaskNameLock::new(catalog_name, task_name).into(),
])
}
}

/// The state of [CreateFlowTaskProcedure].
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateFlowTaskState {
/// Prepares to create the flow.
Prepare,
/// Creates flows on the flownode.
FlownodeCreateFlows,
/// Create metadata.
CreateMetadata,
}

/// The serializable data.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateFlowTaskData {
pub(crate) cluster_id: ClusterId,
pub(crate) state: CreateFlowTaskState,
pub(crate) task: CreateFlowTask,
pub(crate) flow_task_id: Option<FlowTaskId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
}

impl CreateFlowTaskData {
/// Converts to [CreateRequest]
/// # Panic
/// Panic if the `flow_task_id` is None.
fn to_create_flow_request(&self) -> CreateRequest {
let flow_task_id = self.flow_task_id.unwrap();
let source_table_ids = &self.source_table_ids;

CreateRequest {
task_id: Some(api::v1::flow::TaskId { id: flow_task_id }),
source_table_ids: source_table_ids
.iter()
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(self.task.sink_table_name.clone().into()),
// Always be true
create_if_not_exists: true,
expire_when: self.task.expire_when.clone(),
comment: self.task.comment.clone(),
sql: self.task.sql.clone(),
task_options: self.task.options.clone(),
}
}

/// Converts to [FlowTaskInfoValue].
fn to_flow_task_info_value(&self) -> FlowTaskInfoValue {
let CreateFlowTask {
catalog_name,
task_name,
sink_table_name,
expire_when,
comment,
sql,
options,
..
} = self.task.clone();

let flownode_ids = self
.peers
.iter()
.enumerate()
.map(|(idx, peer)| (idx as u32, peer.id))
.collect::<BTreeMap<_, _>>();

FlowTaskInfoValue {
source_table_ids: self.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
task_name,
raw_sql: sql,
expire_when,
comment,
options,
}
}
}
44 changes: 44 additions & 0 deletions src/common/meta/src/ddl/create_flow_task/check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use snafu::ensure;

use crate::ddl::create_flow_task::CreateFlowTaskProcedure;
use crate::error::{self, Result};

impl CreateFlowTaskProcedure {
/// Checks:
/// - The new task name doesn't exist.
/// - The source tables exist.
pub(crate) async fn check_creation(&self) -> Result<()> {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;

// Ensures the task name doesn't exist.
let exists = self
.context
.flow_task_metadata_manager
.flow_task_name_manager()
.exists(catalog_name, task_name)
.await?;
ensure!(
!exists,
error::TaskAlreadyExistsSnafu {
task_name: format!("{}.{}", catalog_name, task_name),
}
);

Ok(())
}
}
62 changes: 62 additions & 0 deletions src/common/meta/src/ddl/create_flow_task/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::ddl::create_flow_task::CreateFlowTaskProcedure;
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;

impl CreateFlowTaskProcedure {
/// Allocates the [FlowTaskId].
pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
let partitions = 1;
let (flow_task_id, peers) = self
.context
.flow_task_metadata_allocator
.create(partitions)
.await?;
self.data.flow_task_id = Some(flow_task_id);
self.data.peers = peers;

Ok(())
}

/// Collects source table ids
pub(crate) async fn collect_source_tables(&mut self) -> Result<()> {
// Ensures all source tables exist.
let mut source_table_ids = Vec::with_capacity(self.data.task.source_table_names.len());

for name in &self.data.task.source_table_names {
let key = TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name);
match self
.context
.table_metadata_manager
.table_name_manager()
.get(key)
.await?
{
Some(value) => source_table_ids.push(value.table_id()),
None => {
return error::TableNotFoundSnafu {
table_name: name.to_string(),
}
.fail();
}
}
}

self.data.source_table_ids = source_table_ids;
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ mod tests {
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;

/// A dummy implemented [DatanodeManager].
/// A dummy implemented [NodeManager].
pub struct DummyDatanodeManager;

#[async_trait::async_trait]
Expand Down
8 changes: 1 addition & 7 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use snafu::{Location, Snafu};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use crate::key::FlowTaskId;
use crate::peer::Peer;
use crate::DatanodeId;

Expand Down Expand Up @@ -242,14 +241,9 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Task already exists, task: {}, flow_task_id: {}",
task_name,
flow_task_id
))]
#[snafu(display("Task already exists, task: {}", task_name,))]
TaskAlreadyExists {
task_name: String,
flow_task_id: FlowTaskId,
location: Location,
},

Expand Down
Loading

0 comments on commit 330dab5

Please sign in to comment.