Skip to content

Commit

Permalink
feat: implement the CreateFlowProcedure (#3810)
Browse files Browse the repository at this point in the history
* feat: implement `FlowTaskMetadataAllocator`

* feat: add `FlowTaskMetadataManagerRef` and `FlowTaskMetadataAllocatorRef`

* chore: fix clippy

* feat: add `FlowTaskNameLock`

* feat: implement the `CreateFlowTaskProcedure`

* chore: rename to `CreateFlowProcedure`

* chore: apply suggestions from CR

* feat: invoke create flow procedure

* chore: apply suggestions from CR

* refactor: rename TYPE_NAME

* feat: register the procedure

* chore: apply suggestions from CR

* feat: acquire the lock of sink table name
  • Loading branch information
WenyXu authored Apr 29, 2024
1 parent 336db38 commit b493ea1
Show file tree
Hide file tree
Showing 22 changed files with 857 additions and 191 deletions.
45 changes: 32 additions & 13 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use std::{fs, path};
use async_trait::async_trait;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
Expand All @@ -45,6 +47,7 @@ use frontend::server::Services;
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
Expand Down Expand Up @@ -411,30 +414,40 @@ impl StartCommand {
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));

let table_id_sequence = Arc::new(
SequenceBuilder::new("table_id", kv_backend.clone())
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
.build(),
);
let flow_task_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_TASK_ID as u64)
.step(10)
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal_meta.clone(),
kv_backend.clone(),
));

let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_task_meta_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence),
);

let ddl_task_executor = Self::create_ddl_task_executor(
table_metadata_manager,
procedure_manager.clone(),
node_manager.clone(),
multi_cache_invalidator,
table_metadata_manager,
table_meta_allocator,
flow_task_metadata_manager,
flow_task_meta_allocator,
)
.await?;

Expand Down Expand Up @@ -462,20 +475,26 @@ impl StartCommand {
}

pub async fn create_ddl_task_executor(
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_task_metadata_manager: FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
},
procedure_manager,
node_manager,
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
true,
)
.context(InitDdlManagerSnafu)?,
Expand Down
3 changes: 3 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub const DEFAULT_CATALOG_NAME: &str = "greptime";
pub const DEFAULT_SCHEMA_NAME: &str = "public";
pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private";

/// Reserves [0,MIN_USER_FLOW_TASK_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_FLOW_TASK_ID: u32 = 1024;
/// Reserves [0,MIN_USER_TABLE_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_TABLE_ID: u32 = 1024;
Expand Down
18 changes: 16 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use std::sync::Arc;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};

use self::table_meta::TableMetadataAllocatorRef;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef;
use crate::error::Result;
use crate::key::flow_task::FlowTaskMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
Expand All @@ -31,13 +33,15 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_table;
mod physical_table_metadata;
pub mod table_meta;
pub mod task_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
Expand Down Expand Up @@ -93,11 +97,21 @@ pub struct TableMetadata {
pub region_wal_options: HashMap<RegionNumber, String>,
}

/// The context of ddl.
#[derive(Clone)]
pub struct DdlContext {
/// Sends querying and requests to nodes.
pub node_manager: NodeManagerRef,
/// Cache invalidation.
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
/// Keep tracking operating regions.
pub memory_region_keeper: MemoryRegionKeeperRef,
/// Table metadata manager.
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.
pub table_metadata_allocator: TableMetadataAllocatorRef,
/// Flow task metadata manager.
pub flow_task_metadata_manager: FlowTaskMetadataManagerRef,
/// Allocator for flow task metadata.
pub flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
}
Loading

0 comments on commit b493ea1

Please sign in to comment.