Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement the CreateFlowProcedure #3810

Merged
merged 13 commits into from
Apr 29, 2024
45 changes: 32 additions & 13 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use std::{fs, path};
use async_trait::async_trait;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
Expand All @@ -45,6 +47,7 @@ use frontend::server::Services;
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
Expand Down Expand Up @@ -411,30 +414,40 @@ impl StartCommand {
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));

let table_id_sequence = Arc::new(
SequenceBuilder::new("table_id", kv_backend.clone())
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
.build(),
);
let flow_task_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_TASK_ID as u64)
.step(10)
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal_meta.clone(),
kv_backend.clone(),
));

let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_task_meta_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence),
);

let ddl_task_executor = Self::create_ddl_task_executor(
table_metadata_manager,
procedure_manager.clone(),
node_manager.clone(),
multi_cache_invalidator,
table_metadata_manager,
table_meta_allocator,
flow_task_metadata_manager,
flow_task_meta_allocator,
)
.await?;

Expand Down Expand Up @@ -462,20 +475,26 @@ impl StartCommand {
}

pub async fn create_ddl_task_executor(
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_task_metadata_manager: FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
},
procedure_manager,
node_manager,
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
true,
)
.context(InitDdlManagerSnafu)?,
Expand Down
3 changes: 3 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub const DEFAULT_CATALOG_NAME: &str = "greptime";
pub const DEFAULT_SCHEMA_NAME: &str = "public";
pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private";

/// Reserves [0,MIN_USER_FLOW_TASK_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_FLOW_TASK_ID: u32 = 1024;
/// Reserves [0,MIN_USER_TABLE_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_TABLE_ID: u32 = 1024;
Expand Down
18 changes: 16 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use std::sync::Arc;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};

use self::table_meta::TableMetadataAllocatorRef;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef;
use crate::error::Result;
use crate::key::flow_task::FlowTaskMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
Expand All @@ -31,13 +33,15 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_table;
mod physical_table_metadata;
pub mod table_meta;
pub mod task_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
Expand Down Expand Up @@ -93,11 +97,21 @@ pub struct TableMetadata {
pub region_wal_options: HashMap<RegionNumber, String>,
}

/// The context of ddl.
#[derive(Clone)]
pub struct DdlContext {
/// Sends querying and requests to nodes.
pub node_manager: NodeManagerRef,
/// Cache invalidation.
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
/// Keep tracking operating regions.
pub memory_region_keeper: MemoryRegionKeeperRef,
/// Table metadata manager.
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.
pub table_metadata_allocator: TableMetadataAllocatorRef,
/// Flow task metadata manager.
pub flow_task_metadata_manager: FlowTaskMetadataManagerRef,
/// Allocator for flow task metadata.
pub flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
}
Loading
Loading