diff --git a/Cargo.lock b/Cargo.lock index 62261bce168..aeaec198f71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -904,7 +904,6 @@ dependencies = [ "rskafka", "serde", "store-api", - "tests-integration", "tokio", "toml 0.8.12", "uuid", @@ -9631,7 +9630,6 @@ dependencies = [ "strum 0.25.0", "table", "tempfile", - "tests-integration", "tikv-jemalloc-ctl", "tokio", "tokio-postgres", @@ -9996,7 +9994,6 @@ dependencies = [ "serde_json", "sqlness", "tempfile", - "tests-integration", "tinytemplate", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 8b02b3f0ce9..aa10d987d27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -233,8 +233,6 @@ sql = { path = "src/sql" } store-api = { path = "src/store-api" } substrait = { path = "src/common/substrait" } table = { path = "src/table" } -# TODO some code depends on this -tests-integration = { path = "tests-integration" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index ed7f038596e..dc5bf8ba972 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -12,7 +12,7 @@ api.workspace = true arrow.workspace = true chrono.workspace = true clap.workspace = true -client.workspace = true +client = { workspace = true, features = ["testing"] } common-base.workspace = true common-telemetry.workspace = true common-wal.workspace = true @@ -33,8 +33,6 @@ rand.workspace = true rskafka.workspace = true serde.workspace = true store-api.workspace = true -# TODO depend `Database` client -tests-integration.workspace = true tokio.workspace = true toml.workspace = true uuid.workspace = true diff --git a/tests-integration/src/database.rs b/src/client/src/database.rs similarity index 80% rename from tests-integration/src/database.rs rename to src/client/src/database.rs index a378c68ea42..e310a73e584 100644 --- a/tests-integration/src/database.rs +++ b/src/client/src/database.rs @@ -23,8 +23,6 @@ use api::v1::{ }; use arrow_flight::Ticket; use async_stream::stream; -use client::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; -use client::{from_grpc_response, Client, Result}; use common_error::ext::{BoxedError, ErrorExt}; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; @@ -37,7 +35,8 @@ use prost::Message; use snafu::{ensure, ResultExt}; use tonic::transport::Channel; -pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; +use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; +use crate::{from_grpc_response, Client, Result}; #[derive(Clone, Debug, Default)] pub struct Database { @@ -105,10 +104,18 @@ impl Database { self.catalog = catalog.into(); } + pub fn catalog(&self) -> &String { + &self.catalog + } + pub fn set_schema(&mut self, schema: impl Into) { self.schema = schema.into(); } + pub fn schema(&self) -> &String { + &self.schema + } + pub fn set_timezone(&mut self, timezone: impl Into) { self.timezone = timezone.into(); } @@ -156,6 +163,13 @@ impl Database { .await } + pub async fn logical_plan(&self, logical_plan: Vec) -> Result { + self.do_get(Request::Query(QueryRequest { + query: Some(Query::LogicalPlan(logical_plan)), + })) + .await + } + pub async fn create(&self, expr: CreateTableExpr) -> Result { self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), @@ -269,17 +283,12 @@ struct FlightContext { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use api::v1::auth_header::AuthScheme; use api::v1::{AuthHeader, Basic}; - use clap::Parser; - use client::Client; - use cmd::error::Result as CmdResult; - use cmd::options::GlobalOptions; - use cmd::{cli, standalone, App}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_telemetry::logging::LoggingOptions; - use super::{Database, FlightContext}; + use super::*; #[test] fn test_flight_ctx() { @@ -295,76 +304,11 @@ mod tests { auth_scheme: Some(basic), }); - assert!(matches!( + assert_matches!( ctx.auth_header, Some(AuthHeader { auth_scheme: Some(AuthScheme::Basic(_)), }) - )) - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_export_create_table_with_quoted_names() -> CmdResult<()> { - let output_dir = tempfile::tempdir().unwrap(); - - let standalone = standalone::Command::parse_from([ - "standalone", - "start", - "--data-home", - &*output_dir.path().to_string_lossy(), - ]); - - let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap(); - let mut instance = standalone.build(standalone_opts).await?; - instance.start().await?; - - let client = Client::with_urls(["127.0.0.1:4001"]); - let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - database - .sql(r#"CREATE DATABASE "cli.export.create_table";"#) - .await - .unwrap(); - database - .sql( - r#"CREATE TABLE "cli.export.create_table"."a.b.c"( - ts TIMESTAMP, - TIME INDEX (ts) - ) engine=mito; - "#, - ) - .await - .unwrap(); - - let output_dir = tempfile::tempdir().unwrap(); - let cli = cli::Command::parse_from([ - "cli", - "export", - "--addr", - "127.0.0.1:4000", - "--output-dir", - &*output_dir.path().to_string_lossy(), - "--target", - "create-table", - ]); - let mut cli_app = cli.build(LoggingOptions::default()).await?; - cli_app.start().await?; - - instance.stop().await?; - - let output_file = output_dir - .path() - .join("greptime-cli.export.create_table.sql"); - let res = std::fs::read_to_string(output_file).unwrap(); - let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( - "ts" TIMESTAMP(3) NOT NULL, - TIME INDEX ("ts") -) - -ENGINE=mito -; -"#; - assert_eq!(res.trim(), expect.trim()); - - Ok(()) + ) } } diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index be8346faf7b..0741c8e1c7a 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(assert_matches)] + mod client; pub mod client_manager; +#[cfg(feature = "testing")] +mod database; pub mod error; pub mod load_balance; mod metrics; @@ -29,6 +33,8 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use snafu::OptionExt; pub use self::client::Client; +#[cfg(feature = "testing")] +pub use self::database::Database; pub use self::error::{Error, Result}; use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu}; diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index c1a7dcdaec9..9bc3d77564a 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -80,6 +80,7 @@ tracing-appender = "0.2" tikv-jemallocator = "0.5" [dev-dependencies] +client = { workspace = true, features = ["testing"] } common-test-util.workspace = true serde.workspace = true temp-env = "0.3" diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 3df64b91ebb..fc443a169d2 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -22,8 +22,8 @@ mod helper; // Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 #[allow(unused)] -// mod repl; -// TODO(weny): Removes it +mod repl; +// TODO(tisonkun): migrate deprecated methods #[allow(deprecated)] mod upgrade; @@ -31,8 +31,8 @@ use async_trait::async_trait; use bench::BenchTableMetadataCommand; use clap::Parser; use common_telemetry::logging::{LoggingOptions, TracingOptions}; +pub use repl::Repl; use tracing_appender::non_blocking::WorkerGuard; -// pub use repl::Repl; use upgrade::UpgradeCommand; use self::export::ExportCommand; diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index b83ac16bfef..00d91640745 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -434,3 +434,80 @@ fn split_database(database: &str) -> Result<(String, Option)> { Ok((catalog.to_string(), Some(schema.to_string()))) } } + +#[cfg(test)] +mod tests { + use clap::Parser; + use client::{Client, Database}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_telemetry::logging::LoggingOptions; + + use crate::error::Result as CmdResult; + use crate::options::GlobalOptions; + use crate::{cli, standalone, App}; + + #[tokio::test(flavor = "multi_thread")] + async fn test_export_create_table_with_quoted_names() -> CmdResult<()> { + let output_dir = tempfile::tempdir().unwrap(); + + let standalone = standalone::Command::parse_from([ + "standalone", + "start", + "--data-home", + &*output_dir.path().to_string_lossy(), + ]); + + let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap(); + let mut instance = standalone.build(standalone_opts).await?; + instance.start().await?; + + let client = Client::with_urls(["127.0.0.1:4001"]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + database + .sql(r#"CREATE DATABASE "cli.export.create_table";"#) + .await + .unwrap(); + database + .sql( + r#"CREATE TABLE "cli.export.create_table"."a.b.c"( + ts TIMESTAMP, + TIME INDEX (ts) + ) engine=mito; + "#, + ) + .await + .unwrap(); + + let output_dir = tempfile::tempdir().unwrap(); + let cli = cli::Command::parse_from([ + "cli", + "export", + "--addr", + "127.0.0.1:4000", + "--output-dir", + &*output_dir.path().to_string_lossy(), + "--target", + "create-table", + ]); + let mut cli_app = cli.build(LoggingOptions::default()).await?; + cli_app.start().await?; + + instance.stop().await?; + + let output_file = output_dir + .path() + .join("greptime-cli.export.create_table.sql"); + let res = std::fs::read_to_string(output_file).unwrap(); + let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) + +ENGINE=mito +; +"#; + assert_eq!(res.trim(), expect.trim()); + + Ok(()) + } +} diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 6759a923fc4..a9e2e21967f 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -16,14 +16,18 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use cache::{ + build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, + TABLE_ROUTE_CACHE_NAME, +}; use catalog::kvbackend::{ - CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, + CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend, }; -use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_config::Mode; use common_error::ext::ErrorExt; -use common_meta::cache_invalidator::MultiCacheInvalidator; +use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::debug; @@ -38,12 +42,13 @@ use query::QueryEngine; use rustyline::error::ReadlineError; use rustyline::Editor; use session::context::QueryContext; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use crate::cli::cmd::ReplCommand; use crate::cli::helper::RustylineHelper; use crate::cli::AttachCommand; +use crate::error; use crate::error::{ CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu, ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu, @@ -257,19 +262,42 @@ async fn create_query_engine(meta_addr: &str) -> Result { let cached_meta_backend = Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); - let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ - cached_meta_backend.clone(), - ])); - let catalog_list = KvBackendCatalogManager::new( + let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( + CacheRegistryBuilder::default() + .add_cache(cached_meta_backend.clone()) + .build(), + ); + let fundamental_cache_registry = + build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + let layered_cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .context(error::BuildCacheRegistrySnafu)? + .build(), + ); + + let table_cache = layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_CACHE_NAME, + })?; + let table_route_cache = layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; + let catalog_manager = KvBackendCatalogManager::new( Mode::Distributed, Some(meta_client.clone()), cached_meta_backend.clone(), - multi_cache_invalidator, + table_cache, + table_route_cache, ) .await; let plugins: Plugins = Default::default(); let state = Arc::new(QueryEngineState::new( - catalog_list, + catalog_manager, None, None, None, diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 0e1fec26dfa..a2a880fa6c1 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -163,6 +163,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to request database, sql: {sql}"))] + RequestDatabase { + sql: String, + #[snafu(source)] + source: client::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to collect RecordBatches"))] CollectRecordBatches { #[snafu(implicit)] @@ -354,6 +363,7 @@ impl ErrorExt for Error { Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => { StatusCode::Internal } + Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source, .. } | Error::PrettyPrintRecordBatches { source, .. } => source.status_code(), Error::StartMetaClient { source, .. } => source.status_code(), diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b355539ea71..369476e6e5b 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -111,7 +111,7 @@ tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] } [dev-dependencies] auth = { workspace = true, features = ["testing"] } catalog = { workspace = true, features = ["testing"] } -client.workspace = true +client = { workspace = true, features = ["testing"] } common-base.workspace = true common-test-util.workspace = true criterion = "0.4" @@ -125,8 +125,6 @@ serde_json.workspace = true session = { workspace = true, features = ["testing"] } table.workspace = true tempfile = "3.0.0" -# TODO depend `Database` client -tests-integration.workspace = true tokio-postgres = "0.7" tokio-postgres-rustls = "0.11" tokio-test = "0.4" diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index 4155f5eac73..9faad45b0c2 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -21,7 +21,7 @@ use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; use auth::tests::MockUserProvider; use auth::UserProviderRef; -use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu}; use servers::grpc::flight::FlightCraftWrapper; @@ -31,7 +31,6 @@ use servers::server::Server; use snafu::ResultExt; use table::test_util::MemTable; use table::TableRef; -use tests_integration::database::Database; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; use tonic::codec::CompressionEncoding; diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index e4db599fd5e..d3e70015134 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -15,7 +15,6 @@ #![feature(assert_matches)] pub mod cluster; -pub mod database; mod grpc; mod influxdb; mod instance; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 35ce52e273b..7cbd640820b 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -21,6 +21,7 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; +use client::Database; use common_base::secrets::ExposeSecret; use common_config::Configurable; use common_meta::key::catalog_name::CatalogNameKey; @@ -56,7 +57,6 @@ use servers::tls::ReloadableTlsServerConfig; use servers::Mode; use session::context::QueryContext; -use crate::database::Database; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001"; diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 48419498e72..7d1f9d57768 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -22,7 +22,7 @@ use api::v1::{ PromqlRequest, RequestHeader, SemanticType, }; use auth::user_provider_from_option; -use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::MITO_ENGINE; use common_grpc::channel_manager::ClientTlsOption; use common_query::Output; @@ -36,7 +36,6 @@ use servers::http::prometheus::{ }; use servers::server::Server; use servers::tls::{TlsMode, TlsOption}; -use tests_integration::database::Database; use tests_integration::test_util::{ setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType, }; diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 7cb36c1645c..6118c863fb1 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -10,7 +10,7 @@ workspace = true [dependencies] async-trait = "0.1" clap.workspace = true -client.workspace = true +client = { workspace = true, features = ["testing"] } common-error.workspace = true common-query.workspace = true common-recordbatch.workspace = true @@ -19,7 +19,5 @@ serde.workspace = true serde_json.workspace = true sqlness = { version = "0.5" } tempfile.workspace = true -# TODO depend `Database` client -tests-integration.workspace = true tinytemplate = "1.2" tokio.workspace = true diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 399f65840b9..ea3e3e1bc10 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -24,13 +24,14 @@ use std::time::Duration; use async_trait::async_trait; use client::error::ServerSnafu; -use client::{Client, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{ + Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; use common_error::ext::ErrorExt; use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use serde::Serialize; use sqlness::{Database, EnvController, QueryContext}; -use tests_integration::database::Database as DB; use tinytemplate::TinyTemplate; use tokio::sync::Mutex as TokioMutex;