diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs index 2ca3c316284d..ea7cd57d89c1 100644 --- a/src/common/meta/src/ddl/create_flow/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -36,7 +36,7 @@ impl CreateFlowProcedure { .await?; ensure!( !exists, - error::TaskAlreadyExistsSnafu { + error::FlowAlreadyExistsSnafu { flow_name: format!("{}.{}", catalog_name, task_name), } ); diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index 284ba9bb551a..1681479d9173 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -19,7 +19,7 @@ use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; impl CreateFlowProcedure { - /// Allocates the [FlowTaskId]. + /// Allocates the [FlowId]. pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> { //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. let partitions = 1; diff --git a/src/common/meta/src/ddl/flow_meta.rs b/src/common/meta/src/ddl/flow_meta.rs index 4b7000f60a5d..27e3078a4c10 100644 --- a/src/common/meta/src/ddl/flow_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -25,7 +25,7 @@ use crate::sequence::SequenceRef; pub type FlowMetadataAllocatorRef = Arc; /// [FlowMetadataAllocator] provides the ability of: -/// - [FlowTaskId] Allocation. +/// - [FlowId] Allocation. /// - [FlownodeId] Selection. #[derive(Clone)] pub struct FlowMetadataAllocator { @@ -42,13 +42,13 @@ impl FlowMetadataAllocator { } } - /// Allocates a the [FlowTaskId]. + /// Allocates a the [FlowId]. pub(crate) async fn allocate_flow_id(&self) -> Result { let flow_id = self.flow_id_sequence.next().await? as FlowId; Ok(flow_id) } - /// Allocates the [FlowTaskId] and [Peer]s. + /// Allocates the [FlowId] and [Peer]s. pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { let flow_id = self.allocate_flow_id().await?; let peers = self.partition_peer_allocator.alloc(partitions).await?; diff --git a/src/common/meta/src/ddl/task_meta.rs b/src/common/meta/src/ddl/task_meta.rs new file mode 100644 index 000000000000..d302b9fa9414 --- /dev/null +++ b/src/common/meta/src/ddl/task_meta.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use tonic::async_trait; + +use crate::error::Result; +use crate::key::FlowId; +use crate::peer::Peer; +use crate::sequence::SequenceRef; + +/// The reference of [FlowTaskMetadataAllocator]. +pub type FlowTaskMetadataAllocatorRef = Arc; + +/// [FlowTaskMetadataAllocator] provides the ability of: +/// - [FlowId] Allocation. +/// - [FlownodeId] Selection. +#[derive(Clone)] +pub struct FlowTaskMetadataAllocator { + flow_id_sequence: SequenceRef, + partition_peer_allocator: PartitionPeerAllocatorRef, +} + +impl FlowTaskMetadataAllocator { + /// Returns the [FlowTaskMetadataAllocator] with [NoopPartitionPeerAllocator]. + pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self { + Self { + flow_id_sequence, + partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator), + } + } + + /// Allocates a the [FlowId]. + pub(crate) async fn allocate_flow_id(&self) -> Result { + let flow_id = self.flow_id_sequence.next().await? as FlowId; + Ok(flow_id) + } + + /// Allocates the [FlowId] and [Peer]s. + pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { + let flow_id = self.allocate_flow_id().await?; + let peers = self.partition_peer_allocator.alloc(partitions).await?; + + Ok((flow_id, peers)) + } +} + +/// Allocates [Peer]s for partitions. +#[async_trait] +pub trait PartitionPeerAllocator: Send + Sync { + /// Allocates [Peer] nodes for storing partitions. + async fn alloc(&self, partitions: usize) -> Result>; +} + +/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions. +pub type PartitionPeerAllocatorRef = Arc; + +struct NoopPartitionPeerAllocator; + +#[async_trait] +impl PartitionPeerAllocator for NoopPartitionPeerAllocator { + async fn alloc(&self, partitions: usize) -> Result> { + Ok(vec![Peer::default(); partitions]) + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index aaf881f53bb3..cd7583967188 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -241,8 +241,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Task already exists: {}", flow_name))] - TaskAlreadyExists { + #[snafu(display("Flow already exists: {}", flow_name))] + FlowAlreadyExists { flow_name: String, location: Location, }, @@ -511,7 +511,7 @@ impl ErrorExt for Error { | InvalidEngineType { .. } | AlterLogicalTablesInvalidArguments { .. } | CreateLogicalTablesInvalidArguments { .. } - | TaskAlreadyExists { .. } + | FlowAlreadyExists { .. } | MismatchPrefix { .. } | DelimiterNotFound { .. } => StatusCode::InvalidArguments, diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 6d4ec1cd2bba..51528b46e4ac 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -193,7 +193,7 @@ impl FlowMetadataManager { remote_flow_flow_name.flow_id() ); - return error::TaskAlreadyExistsSnafu { + return error::FlowAlreadyExistsSnafu { flow_name: format!("{}.{}", flow_value.catalog_name, flow_value.flow_name), } .fail(); @@ -309,7 +309,7 @@ mod tests { assert_eq!(got, flow_value); let tasks = flow_metadata_manager .flownode_flow_manager() - .tasks(catalog_name, 1) + .flows(catalog_name, 1) .try_collect::>() .await .unwrap(); @@ -376,7 +376,7 @@ mod tests { .create_flow_metadata(task_id + 1, flow_value) .await .unwrap_err(); - assert_matches!(err, error::Error::TaskAlreadyExists { .. }); + assert_matches!(err, error::Error::FlowAlreadyExists { .. }); } #[tokio::test] diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 50dab7ea2ab7..f9b9ae4b259d 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -37,7 +37,7 @@ lazy_static! { Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); } -/// The key stores the metadata of the task. +/// The key stores the metadata of the flow. /// /// The layout: `__flow/{catalog}/info/{flow_id}`. pub struct FlowInfoKey(FlowScoped>); @@ -66,7 +66,7 @@ impl FlowInfoKey { self.0.catalog() } - /// Returns the [FlowTaskId]. + /// Returns the [FlowId]. pub fn flow_id(&self) -> FlowId { self.0.flow_id } @@ -115,15 +115,15 @@ impl MetaKey for FlowInfoKeyInner { // The metadata of the flow. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowInfoValue { - /// The source tables used by the task. + /// The source tables used by the flow. pub(crate) source_table_ids: Vec, - /// The sink table used by the task. + /// The sink table used by the flow. pub(crate) sink_table_name: TableName, - /// Which flow nodes this task is running on. + /// Which flow nodes this flow is running on. pub(crate) flownode_ids: BTreeMap, /// The catalog name. pub(crate) catalog_name: String, - /// The task name. + /// The flow name. pub(crate) flow_name: String, /// The raw sql. pub(crate) raw_sql: String, @@ -158,7 +158,7 @@ impl FlowInfoManager { Self { kv_backend } } - /// Returns the [FlowTaskValue] of specified `flow_id`. + /// Returns the [FlowInfoValue] of specified `flow_id`. pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result> { let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); self.kv_backend diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 18fcac3686c3..dbb6d81c35b1 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -32,7 +32,7 @@ lazy_static! { Regex::new(&format!("^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap(); } -/// The key of mapping {flow_name} to [FlowTaskId]. +/// The key of mapping {flow_name} to [FlowId]. /// /// The layout: `__flow/{catalog}/name/{flow_name}`. pub struct FlowNameKey(FlowScoped>); @@ -67,7 +67,7 @@ impl MetaKey for FlowNameKey { } } -/// The key of mapping name to [FlowTaskId] +/// The key of mapping name to [FlowId] #[derive(Debug, Clone, PartialEq, Eq)] pub struct FlowNameKeyInner { pub flow_name: String, @@ -95,15 +95,15 @@ impl MetaKey for FlowNameKeyInner { err_msg: format!("Invalid FlowNameKeyInner '{key}'"), })?; // Safety: pass the regex check above - let task = captures[1].to_string(); - Ok(FlowNameKeyInner { flow_name: task }) + let flow_name = captures[1].to_string(); + Ok(FlowNameKeyInner { flow_name }) } } impl FlowNameKeyInner { /// Returns a [FlowNameKeyInner]. - pub fn new(task: String) -> Self { - Self { flow_name: task } + pub fn new(flow_name: String) -> Self { + Self { flow_name } } } @@ -114,12 +114,12 @@ pub struct FlowNameValue { } impl FlowNameValue { - /// Returns a [FlowNameValue] with specified [FlowTaskId]. + /// Returns a [FlowNameValue] with specified [FlowId]. pub fn new(flow_id: FlowId) -> Self { Self { flow_id } } - /// Returns the [FlowTaskId] + /// Returns the [FlowId] pub fn flow_id(&self) -> FlowId { self.flow_id } @@ -136,9 +136,9 @@ impl FlowNameManager { Self { kv_backend } } - /// Returns the [FlowNameValue] of specified `catalog.task`. - pub async fn get(&self, catalog: &str, task: &str) -> Result> { - let key = FlowNameKey::new(catalog.to_string(), task.to_string()); + /// Returns the [FlowNameValue] of specified `catalog.flow`. + pub async fn get(&self, catalog: &str, flow: &str) -> Result> { + let key = FlowNameKey::new(catalog.to_string(), flow.to_string()); let raw_key = key.to_bytes(); self.kv_backend .get(&raw_key) @@ -147,9 +147,9 @@ impl FlowNameManager { .transpose() } - /// Returns true if the `task` exists. - pub async fn exists(&self, catalog: &str, task: &str) -> Result { - let key = FlowNameKey::new(catalog.to_string(), task.to_string()); + /// Returns true if the `flow` exists. + pub async fn exists(&self, catalog: &str, flow: &str) -> Result { + let key = FlowNameKey::new(catalog.to_string(), flow.to_string()); let raw_key = key.to_bytes(); self.kv_backend.exists(&raw_key).await } @@ -189,11 +189,8 @@ mod tests { #[test] fn test_key_serialization() { - let table_task_key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string()); - assert_eq!( - b"__flow/my_catalog/name/my_task".to_vec(), - table_task_key.to_bytes(), - ); + let key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string()); + assert_eq!(b"__flow/my_catalog/name/my_task".to_vec(), key.to_bytes(),); } #[test] diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index 203c14302e83..360b96b0f56f 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -40,7 +40,7 @@ lazy_static! { const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode"; -/// The key of mapping [FlownodeId] to [FlowTaskId]. +/// The key of mapping [FlownodeId] to [FlowId]. /// /// The layout `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}` pub struct FlownodeFlowKey(FlowScoped>); @@ -84,7 +84,7 @@ impl FlownodeFlowKey { self.0.catalog() } - /// Returns the [FlowTaskId]. + /// Returns the [FlowId]. pub fn flow_id(&self) -> FlowId { self.0.flow_id } @@ -100,7 +100,7 @@ impl FlownodeFlowKey { } } -/// The key of mapping [FlownodeId] to [FlowTaskId]. +/// The key of mapping [FlownodeId] to [FlowId]. pub struct FlownodeFlowKeyInner { flownode_id: FlownodeId, flow_id: FlowId, @@ -171,18 +171,18 @@ pub struct FlownodeFlowManager { } /// Decodes `KeyValue` to [FlownodeFlowKey]. -pub fn flownode_task_key_decoder(kv: KeyValue) -> Result { +pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result { FlownodeFlowKey::from_bytes(&kv.key) } impl FlownodeFlowManager { - /// Returns a new [FlownodeTaskManager]. + /// Returns a new [FlownodeFlowManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend } } - /// Retrieves all [FlowTaskId] and [PartitionId]s of the specified `flownode_id`. - pub fn tasks( + /// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`. + pub fn flows( &self, catalog: &str, flownode_id: FlownodeId, @@ -194,15 +194,15 @@ impl FlownodeFlowManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(flownode_task_key_decoder), + Arc::new(flownode_flow_key_decoder), ); Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id()))) } - /// Builds a create flownode task transaction. + /// Builds a create flownode flow transaction. /// - /// Puts `__flownode_task/{flownode_id}/{flow_id}/{partition_id}` keys. + /// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys. pub(crate) fn build_create_txn>( &self, catalog: &str, @@ -230,10 +230,10 @@ mod tests { #[test] fn test_key_serialization() { - let flownode_task = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0); + let flownode_flow = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0); assert_eq!( b"__flow/my_catalog/flownode/1/2/0".to_vec(), - flownode_task.to_bytes() + flownode_flow.to_bytes() ); let prefix = FlownodeFlowKey::range_start_key("my_catalog".to_string(), 1); assert_eq!(b"__flow/my_catalog/flownode/1/".to_vec(), prefix); diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 5a4bf474cdb9..d3cabd86f276 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -40,7 +40,7 @@ lazy_static! { .unwrap(); } -/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId]. +/// The key of mapping [TableId] to [FlownodeId] and [FlowId]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] struct TableFlowKeyInner { table_id: TableId, @@ -49,7 +49,7 @@ struct TableFlowKeyInner { partition_id: FlowPartitionId, } -/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId]. +/// The key of mapping [TableId] to [FlownodeId] and [FlowId]. /// /// The layout: `__flow/{catalog}/table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`. #[derive(Debug, PartialEq)] @@ -100,7 +100,7 @@ impl TableFlowKey { self.0.table_id } - /// Returns the [FlowTaskId]. + /// Returns the [FlowId]. pub fn flow_id(&self) -> FlowId { self.0.flow_id } @@ -182,7 +182,7 @@ impl MetaKey for TableFlowKeyInner { } /// Decodes `KeyValue` to [TableFlowKey]. -pub fn table_task_decoder(kv: KeyValue) -> Result { +pub fn table_flow_decoder(kv: KeyValue) -> Result { TableFlowKey::from_bytes(&kv.key) } @@ -192,7 +192,7 @@ pub struct TableFlowManager { } impl TableFlowManager { - /// Returns a new [TableTaskManager]. + /// Returns a new [TableFlowManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend } } @@ -209,15 +209,15 @@ impl TableFlowManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(table_task_decoder), + Arc::new(table_flow_decoder), ); Box::pin(stream) } - /// Builds a create table task transaction. + /// Builds a create table flow transaction. /// - /// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys. + /// Puts `__table_flow/{table_id}/{node_id}/{partition_id}` keys. pub fn build_create_txn>( &self, catalog: &str, @@ -254,10 +254,10 @@ mod tests { #[test] fn test_key_serialization() { - let table_task_key = TableFlowKey::new("my_catalog".to_string(), 1024, 1, 2, 0); + let table_flow_key = TableFlowKey::new("my_catalog".to_string(), 1024, 1, 2, 0); assert_eq!( b"__flow/my_catalog/source_table/1024/1/2/0".to_vec(), - table_task_key.to_bytes(), + table_flow_key.to_bytes(), ); let prefix = TableFlowKey::range_start_key("my_catalog".to_string(), 1024); assert_eq!(b"__flow/my_catalog/source_table/1024/".to_vec(), prefix);