Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent d63686a commit 1dd6324
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_flow/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl CreateFlowProcedure {
.await?;
ensure!(
!exists,
error::TaskAlreadyExistsSnafu {
error::FlowAlreadyExistsSnafu {
flow_name: format!("{}.{}", catalog_name, task_name),
}
);
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_flow/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;

impl CreateFlowProcedure {
/// Allocates the [FlowTaskId].
/// Allocates the [FlowId].
pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
let partitions = 1;
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/ddl/flow_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::sequence::SequenceRef;
pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;

/// [FlowMetadataAllocator] provides the ability of:
/// - [FlowTaskId] Allocation.
/// - [FlowId] Allocation.
/// - [FlownodeId] Selection.
#[derive(Clone)]
pub struct FlowMetadataAllocator {
Expand All @@ -42,13 +42,13 @@ impl FlowMetadataAllocator {
}
}

/// Allocates a the [FlowTaskId].
/// Allocates a the [FlowId].
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
Ok(flow_id)
}

/// Allocates the [FlowTaskId] and [Peer]s.
/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;
Expand Down
77 changes: 77 additions & 0 deletions src/common/meta/src/ddl/task_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use tonic::async_trait;

use crate::error::Result;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::sequence::SequenceRef;

/// The reference of [FlowTaskMetadataAllocator].
pub type FlowTaskMetadataAllocatorRef = Arc<FlowTaskMetadataAllocator>;

/// [FlowTaskMetadataAllocator] provides the ability of:
/// - [FlowId] Allocation.
/// - [FlownodeId] Selection.
#[derive(Clone)]
pub struct FlowTaskMetadataAllocator {
flow_id_sequence: SequenceRef,
partition_peer_allocator: PartitionPeerAllocatorRef,
}

impl FlowTaskMetadataAllocator {
/// Returns the [FlowTaskMetadataAllocator] with [NoopPartitionPeerAllocator].
pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self {
Self {
flow_id_sequence,
partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator),
}
}

/// Allocates a the [FlowId].
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
Ok(flow_id)
}

/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;

Ok((flow_id, peers))
}
}

/// Allocates [Peer]s for partitions.
#[async_trait]
pub trait PartitionPeerAllocator: Send + Sync {
/// Allocates [Peer] nodes for storing partitions.
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>>;
}

/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions.
pub type PartitionPeerAllocatorRef = Arc<dyn PartitionPeerAllocator>;

struct NoopPartitionPeerAllocator;

#[async_trait]
impl PartitionPeerAllocator for NoopPartitionPeerAllocator {
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); partitions])
}
}
6 changes: 3 additions & 3 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ pub enum Error {
location: Location,
},

#[snafu(display("Task already exists: {}", flow_name))]
TaskAlreadyExists {
#[snafu(display("Flow already exists: {}", flow_name))]
FlowAlreadyExists {
flow_name: String,
location: Location,
},
Expand Down Expand Up @@ -511,7 +511,7 @@ impl ErrorExt for Error {
| InvalidEngineType { .. }
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| TaskAlreadyExists { .. }
| FlowAlreadyExists { .. }
| MismatchPrefix { .. }
| DelimiterNotFound { .. } => StatusCode::InvalidArguments,

Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl FlowMetadataManager {
remote_flow_flow_name.flow_id()
);

return error::TaskAlreadyExistsSnafu {
return error::FlowAlreadyExistsSnafu {
flow_name: format!("{}.{}", flow_value.catalog_name, flow_value.flow_name),
}
.fail();
Expand Down Expand Up @@ -309,7 +309,7 @@ mod tests {
assert_eq!(got, flow_value);
let tasks = flow_metadata_manager
.flownode_flow_manager()
.tasks(catalog_name, 1)
.flows(catalog_name, 1)
.try_collect::<Vec<_>>()
.await
.unwrap();
Expand Down Expand Up @@ -376,7 +376,7 @@ mod tests {
.create_flow_metadata(task_id + 1, flow_value)
.await
.unwrap_err();
assert_matches!(err, error::Error::TaskAlreadyExists { .. });
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
}

#[tokio::test]
Expand Down
14 changes: 7 additions & 7 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lazy_static! {
Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}

/// The key stores the metadata of the task.
/// The key stores the metadata of the flow.
///
/// The layout: `__flow/{catalog}/info/{flow_id}`.
pub struct FlowInfoKey(FlowScoped<CatalogScoped<FlowInfoKeyInner>>);
Expand Down Expand Up @@ -66,7 +66,7 @@ impl FlowInfoKey {
self.0.catalog()
}

/// Returns the [FlowTaskId].
/// Returns the [FlowId].
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}
Expand Down Expand Up @@ -115,15 +115,15 @@ impl MetaKey<FlowInfoKeyInner> for FlowInfoKeyInner {
// The metadata of the flow.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowInfoValue {
/// The source tables used by the task.
/// The source tables used by the flow.
pub(crate) source_table_ids: Vec<TableId>,
/// The sink table used by the task.
/// The sink table used by the flow.
pub(crate) sink_table_name: TableName,
/// Which flow nodes this task is running on.
/// Which flow nodes this flow is running on.
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
/// The catalog name.
pub(crate) catalog_name: String,
/// The task name.
/// The flow name.
pub(crate) flow_name: String,
/// The raw sql.
pub(crate) raw_sql: String,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl FlowInfoManager {
Self { kv_backend }
}

/// Returns the [FlowTaskValue] of specified `flow_id`.
/// Returns the [FlowInfoValue] of specified `flow_id`.
pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes();
self.kv_backend
Expand Down
35 changes: 16 additions & 19 deletions src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy_static! {
Regex::new(&format!("^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap();
}

/// The key of mapping {flow_name} to [FlowTaskId].
/// The key of mapping {flow_name} to [FlowId].
///
/// The layout: `__flow/{catalog}/name/{flow_name}`.
pub struct FlowNameKey(FlowScoped<CatalogScoped<FlowNameKeyInner>>);
Expand Down Expand Up @@ -67,7 +67,7 @@ impl MetaKey<FlowNameKey> for FlowNameKey {
}
}

/// The key of mapping name to [FlowTaskId]
/// The key of mapping name to [FlowId]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowNameKeyInner {
pub flow_name: String,
Expand Down Expand Up @@ -95,15 +95,15 @@ impl MetaKey<FlowNameKeyInner> for FlowNameKeyInner {
err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let task = captures[1].to_string();
Ok(FlowNameKeyInner { flow_name: task })
let flow_name = captures[1].to_string();
Ok(FlowNameKeyInner { flow_name })
}
}

impl FlowNameKeyInner {
/// Returns a [FlowNameKeyInner].
pub fn new(task: String) -> Self {
Self { flow_name: task }
pub fn new(flow_name: String) -> Self {
Self { flow_name }
}
}

Expand All @@ -114,12 +114,12 @@ pub struct FlowNameValue {
}

impl FlowNameValue {
/// Returns a [FlowNameValue] with specified [FlowTaskId].
/// Returns a [FlowNameValue] with specified [FlowId].
pub fn new(flow_id: FlowId) -> Self {
Self { flow_id }
}

/// Returns the [FlowTaskId]
/// Returns the [FlowId]
pub fn flow_id(&self) -> FlowId {
self.flow_id
}
Expand All @@ -136,9 +136,9 @@ impl FlowNameManager {
Self { kv_backend }
}

/// Returns the [FlowNameValue] of specified `catalog.task`.
pub async fn get(&self, catalog: &str, task: &str) -> Result<Option<FlowNameValue>> {
let key = FlowNameKey::new(catalog.to_string(), task.to_string());
/// Returns the [FlowNameValue] of specified `catalog.flow`.
pub async fn get(&self, catalog: &str, flow: &str) -> Result<Option<FlowNameValue>> {
let key = FlowNameKey::new(catalog.to_string(), flow.to_string());
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
Expand All @@ -147,9 +147,9 @@ impl FlowNameManager {
.transpose()
}

/// Returns true if the `task` exists.
pub async fn exists(&self, catalog: &str, task: &str) -> Result<bool> {
let key = FlowNameKey::new(catalog.to_string(), task.to_string());
/// Returns true if the `flow` exists.
pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
let key = FlowNameKey::new(catalog.to_string(), flow.to_string());
let raw_key = key.to_bytes();
self.kv_backend.exists(&raw_key).await
}
Expand Down Expand Up @@ -189,11 +189,8 @@ mod tests {

#[test]
fn test_key_serialization() {
let table_task_key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string());
assert_eq!(
b"__flow/my_catalog/name/my_task".to_vec(),
table_task_key.to_bytes(),
);
let key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string());
assert_eq!(b"__flow/my_catalog/name/my_task".to_vec(), key.to_bytes(),);
}

#[test]
Expand Down
24 changes: 12 additions & 12 deletions src/common/meta/src/key/flow/flownode_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ lazy_static! {

const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode";

/// The key of mapping [FlownodeId] to [FlowTaskId].
/// The key of mapping [FlownodeId] to [FlowId].
///
/// The layout `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}`
pub struct FlownodeFlowKey(FlowScoped<CatalogScoped<FlownodeFlowKeyInner>>);
Expand Down Expand Up @@ -84,7 +84,7 @@ impl FlownodeFlowKey {
self.0.catalog()
}

/// Returns the [FlowTaskId].
/// Returns the [FlowId].
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}
Expand All @@ -100,7 +100,7 @@ impl FlownodeFlowKey {
}
}

/// The key of mapping [FlownodeId] to [FlowTaskId].
/// The key of mapping [FlownodeId] to [FlowId].
pub struct FlownodeFlowKeyInner {
flownode_id: FlownodeId,
flow_id: FlowId,
Expand Down Expand Up @@ -171,18 +171,18 @@ pub struct FlownodeFlowManager {
}

/// Decodes `KeyValue` to [FlownodeFlowKey].
pub fn flownode_task_key_decoder(kv: KeyValue) -> Result<FlownodeFlowKey> {
pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result<FlownodeFlowKey> {
FlownodeFlowKey::from_bytes(&kv.key)
}

impl FlownodeFlowManager {
/// Returns a new [FlownodeTaskManager].
/// Returns a new [FlownodeFlowManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}

/// Retrieves all [FlowTaskId] and [PartitionId]s of the specified `flownode_id`.
pub fn tasks(
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
pub fn flows(
&self,
catalog: &str,
flownode_id: FlownodeId,
Expand All @@ -194,15 +194,15 @@ impl FlownodeFlowManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flownode_task_key_decoder),
Arc::new(flownode_flow_key_decoder),
);

Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id())))
}

/// Builds a create flownode task transaction.
/// Builds a create flownode flow transaction.
///
/// Puts `__flownode_task/{flownode_id}/{flow_id}/{partition_id}` keys.
/// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
catalog: &str,
Expand Down Expand Up @@ -230,10 +230,10 @@ mod tests {

#[test]
fn test_key_serialization() {
let flownode_task = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0);
let flownode_flow = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0);
assert_eq!(
b"__flow/my_catalog/flownode/1/2/0".to_vec(),
flownode_task.to_bytes()
flownode_flow.to_bytes()
);
let prefix = FlownodeFlowKey::range_start_key("my_catalog".to_string(), 1);
assert_eq!(b"__flow/my_catalog/flownode/1/".to_vec(), prefix);
Expand Down
Loading

0 comments on commit 1dd6324

Please sign in to comment.