Skip to content

Commit

Permalink
refactor: add builder for Frontend (#2849)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield authored Dec 1, 2023
1 parent c0df2b9 commit 9ce9421
Show file tree
Hide file tree
Showing 15 changed files with 402 additions and 361 deletions.
13 changes: 1 addition & 12 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::{Arc, Weak};
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
Expand Down Expand Up @@ -55,7 +54,6 @@ pub struct KvBackendCatalogManager {
cache_invalidator: CacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
}
Expand All @@ -76,16 +74,11 @@ impl CacheInvalidator for KvBackendCatalogManager {
}

impl KvBackendCatalogManager {
pub fn new(
backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
datanode_manager: DatanodeManagerRef,
) -> Arc<Self> {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
cache_invalidator,
datanode_manager,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
},
Expand All @@ -99,10 +92,6 @@ impl KvBackendCatalogManager {
pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}

pub fn datanode_manager(&self) -> DatanodeManagerRef {
self.datanode_manager.clone()
}
}

#[async_trait::async_trait]
Expand Down
10 changes: 2 additions & 8 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;
use std::time::Instant;

use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
Expand Down Expand Up @@ -250,13 +249,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {

let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));

let datanode_clients = Arc::new(DatanodeClients::default());

let catalog_list = KvBackendCatalogManager::new(
cached_meta_backend.clone(),
cached_meta_backend.clone(),
datanode_clients,
);
let catalog_list =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend);
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
Expand Down
24 changes: 12 additions & 12 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pub enum Error {
source: common_meta::error::Error,
},

#[snafu(display("Failed to init DDL manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
location: Location,
Expand Down Expand Up @@ -225,13 +231,6 @@ pub enum Error {
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Failed to parse address {}", addr))]
ParseAddr {
addr: String,
#[snafu(source)]
error: std::net::AddrParseError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -247,9 +246,11 @@ impl ErrorExt for Error {
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => {
source.status_code()
}

Error::IterStream { source, .. }
| Error::InitMetadata { source, .. }
| Error::InitDdlManager { source, .. } => source.status_code(),

Error::ConnectServer { source, .. } => source.status_code(),
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
Expand All @@ -259,8 +260,7 @@ impl ErrorExt for Error {
| Error::NotDataFromOutput { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidDatabaseName { .. }
| Error::ParseAddr { .. } => StatusCode::InvalidArguments,
| Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments,

Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Expand Down
42 changes: 39 additions & 3 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use catalog::kvbackend::CachedMetaKvBackend;
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::logging;
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::MetaClientOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use crate::error::{self, Result, StartFrontendSnafu};
use crate::error::{self, MissingConfigSnafu, Result, StartFrontendSnafu};
use crate::options::{Options, TopLevelOptions};

pub struct Instance {
Expand Down Expand Up @@ -196,10 +204,38 @@ impl StartCommand {
logging::info!("Frontend start command: {:#?}", self);
logging::info!("Frontend options: {:#?}", opts);

let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone())
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client'",
})?;
let meta_client = FeInstance::create_meta_client(meta_client_options)
.await
.context(StartFrontendSnafu)?;

let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())),
]);

let heartbeat_task = HeartbeatTask::new(
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
);

let mut instance = FrontendBuilder::new(
meta_backend.clone(),
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(meta_backend)
.with_plugin(plugins)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.context(StartFrontendSnafu)?;

instance
.build_servers(opts)
.await
Expand Down
7 changes: 6 additions & 1 deletion src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ impl StartCommand {
logging::info!("MetaSrv start command: {:#?}", self);
logging::info!("MetaSrv options: {:#?}", opts);

let instance = MetaSrvInstance::new(opts, plugins)
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;

let instance = MetaSrvInstance::new(opts, plugins, metasrv)
.await
.context(error::BuildMetaServerSnafu)?;

Expand Down
107 changes: 57 additions & 50 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
use std::sync::Arc;
use std::{fs, path};

use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataCreator;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
Expand All @@ -42,9 +44,9 @@ use servers::Mode;
use snafu::ResultExt;

use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StopProcedureManagerSnafu,
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::options::{MixOptions, Options, TopLevelOptions};

Expand Down Expand Up @@ -156,6 +158,7 @@ impl StandaloneOptions {
wal: self.wal,
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
..Default::default()
}
}
Expand Down Expand Up @@ -347,36 +350,25 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;

let region_server = datanode.region_server();
let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));

let catalog_manager = KvBackendCatalogManager::new(
let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(region_server.clone())),
);

catalog_manager
.table_metadata_manager_ref()
.init()
.await
.context(InitMetadataSnafu)?;

// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(
fe_plugins,
kv_backend,
procedure_manager.clone(),
catalog_manager,
region_server,
datanode_manager.clone(),
)
.await?;

let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(fe_plugins)
.try_build()
.await
.context(StartFrontendSnafu)?;

frontend
.build_servers(opts)
.await
Expand All @@ -388,26 +380,41 @@ impl StartCommand {
procedure_manager,
})
}
}

/// Build frontend instance in standalone mode
async fn build_frontend(
plugins: Plugins,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let frontend_instance = FeInstance::try_new_standalone(
kv_backend,
procedure_manager,
catalog_manager,
plugins,
region_server,
)
.await
.context(StartFrontendSnafu)?;
Ok(frontend_instance)
async fn create_ddl_task_executor(
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Result<DdlTaskExecutorRef> {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(StandaloneTableMetadataCreator::new(kv_backend)),
)
.context(InitDdlManagerSnafu)?,
);

Ok(ddl_task_executor)
}

async fn create_table_metadata_manager(
kv_backend: KvBackendRef,
) -> Result<TableMetadataManagerRef> {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));

table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;

Ok(table_metadata_manager)
}
}

#[cfg(test)]
Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,6 @@ pub enum Error {
source: servers::error::Error,
},

#[snafu(display("Missing meta_client_options section in config"))]
MissingMetasrvOpts { location: Location },

#[snafu(display("Failed to find leaders when altering table, table: {}", table))]
LeaderNotFound { table: String, location: Location },

Expand Down Expand Up @@ -299,7 +296,6 @@ impl ErrorExt for Error {
| Error::IllegalPrimaryKeysDef { .. }
| Error::SchemaExists { .. }
| Error::ColumnNotFound { .. }
| Error::MissingMetasrvOpts { .. }
| Error::UnsupportedFormat { .. }
| Error::IllegalAuthConfig { .. }
| Error::EmptyData { .. }
Expand Down
Loading

0 comments on commit 9ce9421

Please sign in to comment.