Skip to content

Commit

Permalink
chore: fix clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 28, 2024
1 parent 039d0ab commit f68f585
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 135 deletions.
24 changes: 13 additions & 11 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
Expand Down Expand Up @@ -479,20 +479,22 @@ impl StartCommand {
node_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
flow_metadata_manager: FlowTaskMetadataManagerRef,
flow_metadata_allocator: FlowTaskMetadataAllocatorRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_task_metadata_manager: FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
},
procedure_manager,
node_manager,
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
flow_metadata_manager,
flow_metadata_allocator,
Arc::new(MemoryRegionKeeper::default()),
true,
)
.context(InitDdlManagerSnafu)?,
Expand Down
70 changes: 21 additions & 49 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ use derive_builder::Builder;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::TableId;

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
use crate::error::{
Expand All @@ -41,12 +38,9 @@ use crate::error::{
TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
};
use crate::key::flow_task::FlowTaskMetadataManagerRef;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase,
DropLogicalTables, DropTable, TruncateTable,
Expand All @@ -67,38 +61,20 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade
/// The [DdlManager] provides the ability to execute Ddl.
#[derive(Builder)]
pub struct DdlManager {
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_task_metadata_manager: FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
}

/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
impl DdlManager {
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
pub fn try_new(
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_task_metadata_manager: FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
register_loaders: bool,
) -> Result<Self> {
let manager = Self {
ddl_context,
procedure_manager,
node_manager,
cache_invalidator,
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
memory_region_keeper,
};
if register_loaders {
manager.register_loaders()?;
Expand All @@ -108,23 +84,16 @@ impl DdlManager {

/// Returns the [TableMetadataManagerRef].
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
&self.ddl_context.table_metadata_manager
}

/// Returns the [DdlContext]
pub fn create_context(&self) -> DdlContext {
DdlContext {
node_manager: self.node_manager.clone(),
cache_invalidator: self.cache_invalidator.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
table_metadata_allocator: self.table_metadata_allocator.clone(),
flow_task_metadata_manager: self.flow_task_metadata_manager.clone(),
flow_task_metadata_allocator: self.flow_task_metadata_allocator.clone(),
memory_region_keeper: self.memory_region_keeper.clone(),
}
self.ddl_context.clone()
}

fn register_loaders(&self) -> Result<()> {
/// Registers all Ddl loaders.
pub fn register_loaders(&self) -> Result<()> {
let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = vec![
(
CreateTableProcedure::TYPE_NAME,
Expand Down Expand Up @@ -431,7 +400,7 @@ async fn handle_alter_table_task(
let table_ref = alter_table_task.table_ref();

let table_id = ddl_manager
.table_metadata_manager
.table_metadata_manager()
.table_name_manager()
.get(TableNameKey::new(
table_ref.catalog,
Expand Down Expand Up @@ -529,7 +498,7 @@ async fn handle_create_logical_table_tasks(
}
);
let physical_table_id = utils::check_and_get_physical_table_id(
&ddl_manager.table_metadata_manager,
ddl_manager.table_metadata_manager(),
&create_table_tasks,
)
.await?;
Expand Down Expand Up @@ -622,7 +591,7 @@ async fn handle_alter_logical_table_tasks(
table: &alter_table_tasks[0].alter_table.table_name,
};
let physical_table_id =
utils::get_physical_table_id(&ddl_manager.table_metadata_manager, first_table).await?;
utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
let num_logical_tables = alter_table_tasks.len();

let (id, _) = ddl_manager
Expand Down Expand Up @@ -734,6 +703,7 @@ mod tests {
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::task_meta::FlowTaskMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::DdlContext;
use crate::key::flow_task::FlowTaskMetadataManager;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
Expand Down Expand Up @@ -776,14 +746,16 @@ mod tests {
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));

let _ = DdlManager::try_new(
DdlContext {
node_manager: Arc::new(DummyDatanodeManager),
cache_invalidator: Arc::new(DummyCacheInvalidator),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
},
procedure_manager.clone(),
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager.clone(),
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
Arc::new(MemoryRegionKeeper::default()),
true,
);

Expand Down
109 changes: 43 additions & 66 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef};
use common_meta::ddl_manager::{DdlManager, DdlManagerRef};
use common_meta::ddl::task_meta::FlowTaskMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::key::flow_task::FlowTaskMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::WalOptionsAllocator;
Expand Down Expand Up @@ -246,25 +247,46 @@ impl MetasrvBuilder {
.build(),
)),
);
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());

let ddl_manager = build_ddl_manager(
&options,
node_manager,
&procedure_manager,
&mailbox,
&table_metadata_manager,
&table_metadata_allocator,
&flow_task_metadata_manager,
&flow_task_metadata_allocator,
&opening_region_keeper,
)?;
let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
let node_manager = node_manager.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
options.datanode.client_options.timeout_millis,
))
.connect_timeout(Duration::from_millis(
options.datanode.client_options.connect_timeout_millis,
))
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
Arc::new(DatanodeClients::new(datanode_client_channel_config))
});
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
mailbox.clone(),
MetasrvInfo {
server_addr: options.server_addr.clone(),
},
));
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
memory_region_keeper: memory_region_keeper.clone(),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
flow_task_metadata_manager: flow_task_metadata_manager.clone(),
flow_task_metadata_allocator: flow_task_metadata_allocator.clone(),
},
procedure_manager.clone(),
true,
)
.context(error::InitDdlManagerSnafu)?,
);

let region_migration_manager = Arc::new(RegionMigrationManager::new(
procedure_manager.clone(),
DefaultContextFactory::new(
table_metadata_manager.clone(),
opening_region_keeper.clone(),
memory_region_keeper.clone(),
mailbox.clone(),
options.server_addr.clone(),
),
Expand Down Expand Up @@ -305,7 +327,7 @@ impl MetasrvBuilder {
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
memory_region_keeper.clone(),
);

let group = HeartbeatHandlerGroup::new(pushers);
Expand Down Expand Up @@ -360,7 +382,7 @@ impl MetasrvBuilder {
)
.await,
plugins: plugins.unwrap_or_else(Plugins::default),
memory_region_keeper: opening_region_keeper,
memory_region_keeper,
region_migration_manager,
})
}
Expand Down Expand Up @@ -406,51 +428,6 @@ fn build_procedure_manager(
Arc::new(LocalManager::new(manager_config, Arc::new(state_store)))
}

fn build_ddl_manager(
options: &MetasrvOptions,
datanode_clients: Option<NodeManagerRef>,
procedure_manager: &ProcedureManagerRef,
mailbox: &MailboxRef,
table_metadata_manager: &TableMetadataManagerRef,
table_metadata_allocator: &TableMetadataAllocatorRef,
flow_task_metadata_manager: &FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: &FlowTaskMetadataAllocatorRef,
memory_region_keeper: &MemoryRegionKeeperRef,
) -> Result<DdlManagerRef> {
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
options.datanode.client_options.timeout_millis,
))
.connect_timeout(Duration::from_millis(
options.datanode.client_options.connect_timeout_millis,
))
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
Arc::new(DatanodeClients::new(datanode_client_channel_config))
});
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
mailbox.clone(),
MetasrvInfo {
server_addr: options.server_addr.clone(),
},
));

Ok(Arc::new(
DdlManager::try_new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_metadata_allocator.clone(),
flow_task_metadata_manager.clone(),
flow_task_metadata_allocator.clone(),
memory_region_keeper.clone(),
true,
)
.context(error::InitDdlManagerSnafu)?,
))
}

impl Default for MetasrvBuilder {
fn default() -> Self {
Self::new()
Expand Down
21 changes: 12 additions & 9 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_config::KvBackendConfig;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::task_meta::FlowTaskMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow_task::FlowTaskMetadataManager;
use common_meta::key::TableMetadataManager;
Expand Down Expand Up @@ -151,24 +152,26 @@ impl GreptimeDbStandaloneBuilder {
mix_options.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_task_meta_allocator = Arc::new(
let flow_task_metadata_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence),
);

let ddl_task_executor = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager: node_manager.clone(),
cache_invalidator: multi_cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
},
procedure_manager.clone(),
node_manager.clone(),
multi_cache_invalidator,
table_metadata_manager,
table_meta_allocator,
flow_task_metadata_manager,
flow_task_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
register_procedure_loaders,
)
.unwrap(),
Expand Down

0 comments on commit f68f585

Please sign in to comment.