diff --git a/Cargo.lock b/Cargo.lock index 453689ebc290..a1076790546f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -897,6 +897,7 @@ dependencies = [ "rskafka", "serde", "store-api", + "tests-integration", "tokio", "toml 0.8.12", "uuid", @@ -1580,6 +1581,7 @@ version = "0.7.2" dependencies = [ "async-trait", "auth", + "base64 0.21.7", "catalog", "chrono", "clap 4.5.4", @@ -1619,6 +1621,7 @@ dependencies = [ "query", "rand", "regex", + "reqwest", "rexpect", "rustyline 10.1.1", "serde", @@ -9198,6 +9201,7 @@ dependencies = [ "strum 0.25.0", "table", "tempfile", + "tests-integration", "tikv-jemalloc-ctl", "tokio", "tokio-postgres", @@ -9541,6 +9545,7 @@ dependencies = [ "serde_json", "sqlness", "tempfile", + "tests-integration", "tinytemplate", "tokio", ] @@ -10244,11 +10249,13 @@ version = "0.7.2" dependencies = [ "api", "arrow-flight", + "async-stream", "async-trait", "auth", "axum", "catalog", "chrono", + "clap 4.5.4", "client", "cmd", "common-base", @@ -10271,6 +10278,7 @@ dependencies = [ "dotenv", "frontend", "futures", + "futures-util", "itertools 0.10.5", "meta-client", "meta-srv", @@ -10290,6 +10298,7 @@ dependencies = [ "serde_json", "servers", "session", + "snafu", "sql", "sqlx", "store-api", @@ -10299,6 +10308,7 @@ dependencies = [ "time", "tokio", "tokio-postgres", + "tokio-stream", "tonic 0.11.0", "tower", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 1e938f58a89d..89a043fd9c4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -223,6 +223,8 @@ sql = { path = "src/sql" } store-api = { path = "src/store-api" } substrait = { path = "src/common/substrait" } table = { path = "src/table" } +# TODO some code depends on this +tests-integration = { path = "tests-integration" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 18b44e944858..ed7f038596e3 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -33,6 +33,8 @@ rand.workspace = true rskafka.workspace = true serde.workspace = true store-api.workspace = true +# TODO depend `Database` client +tests-integration.workspace = true tokio.workspace = true toml.workspace = true uuid.workspace = true diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index bfc26f3daeae..43d28414fa54 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -29,10 +29,11 @@ use client::api::v1::column::Values; use client::api::v1::{ Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType, }; -use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use futures_util::TryStreamExt; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use tests_integration::database::Database; use tokio::task::JoinSet; const CATALOG_NAME: &str = "greptime"; diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs deleted file mode 100644 index 13f116555519..000000000000 --- a/src/client/examples/logical.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType, TableId}; -use client::{Client, Database}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; -use prost::Message; -use substrait_proto::proto::plan_rel::RelType as PlanRelType; -use substrait_proto::proto::read_rel::{NamedTable, ReadType}; -use substrait_proto::proto::rel::RelType; -use substrait_proto::proto::{PlanRel, ReadRel, Rel}; -use tracing::{event, Level}; - -fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) - .unwrap(); - - run(); -} - -#[tokio::main] -async fn run() { - let client = Client::with_urls(vec!["127.0.0.1:3001"]); - - let create_table_expr = CreateTableExpr { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "test_logical_dist_exec".to_string(), - desc: String::default(), - column_defs: vec![ - ColumnDef { - name: "timestamp".to_string(), - data_type: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Timestamp as i32, - comment: String::new(), - ..Default::default() - }, - ColumnDef { - name: "key".to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - comment: String::new(), - ..Default::default() - }, - ColumnDef { - name: "value".to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: String::new(), - ..Default::default() - }, - ], - time_index: "timestamp".to_string(), - primary_keys: vec!["key".to_string()], - create_if_not_exists: false, - table_options: Default::default(), - table_id: Some(TableId { id: 1024 }), - engine: MITO_ENGINE.to_string(), - }; - - let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - let result = db.create(create_table_expr).await.unwrap(); - event!(Level::INFO, "create table result: {:#?}", result); - - let logical = mock_logical_plan(); - event!(Level::INFO, "plan size: {:#?}", logical.len()); - let result = db.logical_plan(logical).await.unwrap(); - - event!(Level::INFO, "result: {:#?}", result); -} - -fn mock_logical_plan() -> Vec { - let catalog_name = "greptime".to_string(); - let schema_name = "public".to_string(); - let table_name = "test_logical_dist_exec".to_string(); - - let named_table = NamedTable { - names: vec![catalog_name, schema_name, table_name], - advanced_extension: None, - }; - let read_type = ReadType::NamedTable(named_table); - - let read_rel = ReadRel { - read_type: Some(read_type), - ..Default::default() - }; - - let mut buf = vec![]; - let rel = Rel { - rel_type: Some(RelType::Read(Box::new(read_rel))), - }; - let plan_rel = PlanRel { - rel_type: Some(PlanRelType::Rel(rel)), - }; - plan_rel.encode(&mut buf).unwrap(); - - buf -} diff --git a/src/client/examples/stream_ingest.rs b/src/client/examples/stream_ingest.rs deleted file mode 100644 index 94f9773096b9..000000000000 --- a/src/client/examples/stream_ingest.rs +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::*; -use client::{Client, Database, DEFAULT_SCHEMA_NAME}; -use derive_new::new; -use tracing::{error, info}; - -fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) - .unwrap(); - - run(); -} - -#[tokio::main] -async fn run() { - let greptimedb_endpoint = - std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned()); - - let greptimedb_dbname = - std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned()); - - let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]); - - let client = Database::new_with_dbname(greptimedb_dbname, grpc_client); - - let stream_inserter = client.streaming_inserter().unwrap(); - - if let Err(e) = stream_inserter - .insert(vec![to_insert_request(weather_records_1())]) - .await - { - error!("Error: {e:?}"); - } - - if let Err(e) = stream_inserter - .insert(vec![to_insert_request(weather_records_2())]) - .await - { - error!("Error: {e:?}"); - } - - let result = stream_inserter.finish().await; - - match result { - Ok(rows) => { - info!("Rows written: {rows}"); - } - Err(e) => { - error!("Error: {e:?}"); - } - }; -} - -#[derive(new)] -struct WeatherRecord { - timestamp_millis: i64, - collector: String, - temperature: f32, - humidity: i32, -} - -fn weather_records_1() -> Vec { - vec![ - WeatherRecord::new(1686109527000, "c1".to_owned(), 26.4, 15), - WeatherRecord::new(1686023127000, "c1".to_owned(), 29.3, 20), - WeatherRecord::new(1685936727000, "c1".to_owned(), 31.8, 13), - WeatherRecord::new(1686109527000, "c2".to_owned(), 20.4, 67), - WeatherRecord::new(1686023127000, "c2".to_owned(), 18.0, 74), - WeatherRecord::new(1685936727000, "c2".to_owned(), 19.2, 81), - ] -} - -fn weather_records_2() -> Vec { - vec![ - WeatherRecord::new(1686109527001, "c3".to_owned(), 26.4, 15), - WeatherRecord::new(1686023127002, "c3".to_owned(), 29.3, 20), - WeatherRecord::new(1685936727003, "c3".to_owned(), 31.8, 13), - WeatherRecord::new(1686109527004, "c4".to_owned(), 20.4, 67), - WeatherRecord::new(1686023127005, "c4".to_owned(), 18.0, 74), - WeatherRecord::new(1685936727006, "c4".to_owned(), 19.2, 81), - ] -} - -/// This function generates some random data and bundle them into a -/// `InsertRequest`. -/// -/// Data structure: -/// -/// - `ts`: a timestamp column -/// - `collector`: a tag column -/// - `temperature`: a value field of f32 -/// - `humidity`: a value field of i32 -/// -fn to_insert_request(records: Vec) -> InsertRequest { - // convert records into columns - let rows = records.len(); - - // transpose records into columns - let (timestamp_millis, collectors, temp, humidity) = records.into_iter().fold( - ( - Vec::with_capacity(rows), - Vec::with_capacity(rows), - Vec::with_capacity(rows), - Vec::with_capacity(rows), - ), - |mut acc, rec| { - acc.0.push(rec.timestamp_millis); - acc.1.push(rec.collector); - acc.2.push(rec.temperature); - acc.3.push(rec.humidity); - - acc - }, - ); - - let columns = vec![ - // timestamp column: `ts` - Column { - column_name: "ts".to_owned(), - values: Some(column::Values { - timestamp_millisecond_values: timestamp_millis, - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - // tag column: collectors - Column { - column_name: "collector".to_owned(), - values: Some(column::Values { - string_values: collectors.into_iter().collect(), - ..Default::default() - }), - semantic_type: SemanticType::Tag as i32, - datatype: ColumnDataType::String as i32, - ..Default::default() - }, - // field column: temperature - Column { - column_name: "temperature".to_owned(), - values: Some(column::Values { - f32_values: temp, - ..Default::default() - }), - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Float32 as i32, - ..Default::default() - }, - // field column: humidity - Column { - column_name: "humidity".to_owned(), - values: Some(column::Values { - i32_values: humidity, - ..Default::default() - }), - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Int32 as i32, - ..Default::default() - }, - ]; - - InsertRequest { - table_name: "weather_demo".to_owned(), - columns, - row_count: rows as u32, - } -} diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 47a8df49f156..5e82295c16f6 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::health_check_client::HealthCheckClient; use api::v1::prometheus_gateway_client::PrometheusGatewayClient; use api::v1::region::region_client::RegionClient as PbRegionClient; @@ -28,21 +27,17 @@ use tonic::transport::Channel; use crate::load_balance::{LoadBalance, Loadbalancer}; use crate::{error, Result}; -pub(crate) struct DatabaseClient { - pub(crate) inner: GreptimeDatabaseClient, -} - -pub(crate) struct FlightClient { +pub struct FlightClient { addr: String, client: FlightServiceClient, } impl FlightClient { - pub(crate) fn addr(&self) -> &str { + pub fn addr(&self) -> &str { &self.addr } - pub(crate) fn mut_inner(&mut self) -> &mut FlightServiceClient { + pub fn mut_inner(&mut self) -> &mut FlightServiceClient { &mut self.client } } @@ -138,7 +133,7 @@ impl Client { Ok((addr, channel)) } - fn max_grpc_recv_message_size(&self) -> usize { + pub fn max_grpc_recv_message_size(&self) -> usize { self.inner .channel_manager .config() @@ -146,7 +141,7 @@ impl Client { .as_bytes() as usize } - fn max_grpc_send_message_size(&self) -> usize { + pub fn max_grpc_send_message_size(&self) -> usize { self.inner .channel_manager .config() @@ -154,7 +149,7 @@ impl Client { .as_bytes() as usize } - pub(crate) fn make_flight_client(&self) -> Result { + pub fn make_flight_client(&self) -> Result { let (addr, channel) = self.find_channel()?; Ok(FlightClient { addr, @@ -164,15 +159,6 @@ impl Client { }) } - pub(crate) fn make_database_client(&self) -> Result { - let (_, channel) = self.find_channel()?; - Ok(DatabaseClient { - inner: GreptimeDatabaseClient::new(channel) - .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size()), - }) - } - pub(crate) fn raw_region_client(&self) -> Result> { let (_, channel) = self.find_channel()?; Ok(PbRegionClient::new(channel) diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 1a854c5daa27..be8346faf7b0 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -14,12 +14,10 @@ mod client; pub mod client_manager; -mod database; pub mod error; pub mod load_balance; mod metrics; pub mod region; -mod stream_insert; pub use api; use api::v1::greptime_response::Response; @@ -31,9 +29,7 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use snafu::OptionExt; pub use self::client::Client; -pub use self::database::Database; pub use self::error::{Error, Result}; -pub use self::stream_insert::StreamInserter; use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu}; pub fn from_grpc_response(response: GreptimeResponse) -> Result { diff --git a/src/client/src/stream_insert.rs b/src/client/src/stream_insert.rs deleted file mode 100644 index a75144786012..000000000000 --- a/src/client/src/stream_insert.rs +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::greptime_database_client::GreptimeDatabaseClient; -use api::v1::greptime_request::Request; -use api::v1::{ - AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader, - RowInsertRequest, RowInsertRequests, -}; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::ReceiverStream; -use tonic::transport::Channel; -use tonic::{Response, Status}; - -use crate::error::{self, Result}; -use crate::from_grpc_response; - -/// A structure that provides some methods for streaming data insert. -/// -/// [`StreamInserter`] cannot be constructed via the `StreamInserter::new` method. -/// You can use the following way to obtain [`StreamInserter`]. -/// -/// ```ignore -/// let grpc_client = Client::with_urls(vec!["127.0.0.1:4002"]); -/// let client = Database::new_with_dbname("db_name", grpc_client); -/// let stream_inserter = client.streaming_inserter().unwrap(); -/// ``` -/// -/// If you want to see a concrete usage example, please see -/// [stream_inserter.rs](https://github.com/GreptimeTeam/greptimedb/blob/main/src/client/examples/stream_ingest.rs). -pub struct StreamInserter { - sender: mpsc::Sender, - - auth_header: Option, - - dbname: String, - - join: JoinHandle, Status>>, -} - -impl StreamInserter { - pub(crate) fn new( - mut client: GreptimeDatabaseClient, - dbname: String, - auth_header: Option, - channel_size: usize, - ) -> StreamInserter { - let (send, recv) = tokio::sync::mpsc::channel(channel_size); - - let join: JoinHandle, Status>> = - tokio::spawn(async move { - let recv_stream = ReceiverStream::new(recv); - client.handle_requests(recv_stream).await - }); - - StreamInserter { - sender: send, - auth_header, - dbname, - join, - } - } - - pub async fn insert(&self, requests: Vec) -> Result<()> { - let inserts = InsertRequests { inserts: requests }; - let request = self.to_rpc_request(Request::Inserts(inserts)); - - self.sender.send(request).await.map_err(|e| { - error::ClientStreamingSnafu { - err_msg: e.to_string(), - } - .build() - }) - } - - pub async fn row_insert(&self, requests: Vec) -> Result<()> { - let inserts = RowInsertRequests { inserts: requests }; - let request = self.to_rpc_request(Request::RowInserts(inserts)); - - self.sender.send(request).await.map_err(|e| { - error::ClientStreamingSnafu { - err_msg: e.to_string(), - } - .build() - }) - } - - pub async fn finish(self) -> Result { - drop(self.sender); - - let response = self.join.await.unwrap()?; - let response = response.into_inner(); - from_grpc_response(response) - } - - fn to_rpc_request(&self, request: Request) -> GreptimeRequest { - GreptimeRequest { - header: Some(RequestHeader { - authorization: self.auth_header.clone(), - dbname: self.dbname.clone(), - ..Default::default() - }), - request: Some(request), - } - } -} diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 24bbe69df18c..ae27c4e374d6 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -18,6 +18,7 @@ workspace = true [dependencies] async-trait.workspace = true auth.workspace = true +base64.workspace = true catalog.workspace = true chrono.workspace = true clap.workspace = true @@ -58,6 +59,7 @@ prost.workspace = true query.workspace = true rand.workspace = true regex.workspace = true +reqwest.workspace = true rustyline = "10.1" serde.workspace = true serde_json.workspace = true diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 35dc1e4ba7dc..c44b99ad6bf9 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -22,7 +22,7 @@ mod helper; // Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 #[allow(unused)] -mod repl; +// mod repl; // TODO(weny): Removes it #[allow(deprecated)] mod upgrade; @@ -31,7 +31,7 @@ use async_trait::async_trait; use bench::BenchTableMetadataCommand; use clap::Parser; use common_telemetry::logging::LoggingOptions; -pub use repl::Repl; +// pub use repl::Repl; use upgrade::UpgradeCommand; use self::export::ExportCommand; diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 4122fbd6d7a1..70ca80d11db3 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -16,14 +16,14 @@ use std::path::Path; use std::sync::Arc; use async_trait::async_trait; +use base64::engine::general_purpose; +use base64::Engine; use clap::{Parser, ValueEnum}; -use client::api::v1::auth_header::AuthScheme; -use client::api::v1::Basic; -use client::{Client, Database, OutputData, DEFAULT_SCHEMA_NAME}; -use common_recordbatch::util::collect; +use client::DEFAULT_SCHEMA_NAME; use common_telemetry::{debug, error, info, warn}; -use datatypes::scalars::ScalarVector; -use datatypes::vectors::{StringVector, Vector}; +use serde_json::Value; +use servers::http::greptime_result_v1::GreptimedbV1Response; +use servers::http::GreptimeQueryOutput; use snafu::{OptionExt, ResultExt}; use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; @@ -31,9 +31,8 @@ use tokio::sync::Semaphore; use crate::cli::{Instance, Tool}; use crate::error::{ - CollectRecordBatchesSnafu, ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu, - IllegalConfigSnafu, InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, - Result, + EmptyResultSnafu, Error, FileIoSnafu, HttpQuerySqlSnafu, InvalidDatabaseNameSnafu, Result, + SerdeJsonSnafu, }; type TableReference = (String, String, String); @@ -80,51 +79,75 @@ pub struct ExportCommand { impl ExportCommand { pub async fn build(&self) -> Result { - let client = Client::with_urls([self.addr.clone()]); - client - .health_check() - .await - .with_context(|_| ConnectServerSnafu { - addr: self.addr.clone(), - })?; let (catalog, schema) = split_database(&self.database)?; - let mut database_client = Database::new( - catalog.clone(), - schema.clone().unwrap_or(DEFAULT_SCHEMA_NAME.to_string()), - client, - ); - if let Some(auth_basic) = &self.auth_basic { - let (username, password) = auth_basic.split_once(':').context(IllegalConfigSnafu { - msg: "auth_basic cannot be split by ':'".to_string(), - })?; - database_client.set_auth(AuthScheme::Basic(Basic { - username: username.to_string(), - password: password.to_string(), - })); - } + let auth_header = if let Some(basic) = &self.auth_basic { + let encoded = general_purpose::STANDARD.encode(basic); + Some(format!("basic {}", encoded)) + } else { + None + }; Ok(Instance::new(Box::new(Export { - client: database_client, + addr: self.addr.clone(), catalog, schema, output_dir: self.output_dir.clone(), parallelism: self.export_jobs, target: self.target.clone(), + auth_header, }))) } } pub struct Export { - client: Database, + addr: String, catalog: String, schema: Option, output_dir: String, parallelism: usize, target: ExportTarget, + auth_header: Option, } impl Export { + /// Execute one single sql query. + async fn sql(&self, sql: &str) -> Result>>> { + let url = format!( + "http://{}/v1/sql?db={}-{}&sql={}", + self.addr, + self.catalog, + self.schema.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), + sql + ); + + let mut request = reqwest::Client::new() + .get(&url) + .header("Content-Type", "application/x-www-form-urlencoded"); + if let Some(ref auth) = self.auth_header { + request = request.header("Authorization", auth); + } + + let response = request.send().await.with_context(|_| HttpQuerySqlSnafu { + reason: format!("bad url: {}", url), + })?; + let response = response + .error_for_status() + .with_context(|_| HttpQuerySqlSnafu { + reason: format!("query failed: {}", sql), + })?; + + let text = response.text().await.with_context(|_| HttpQuerySqlSnafu { + reason: "cannot get response text".to_string(), + })?; + + let body = serde_json::from_str::(&text).context(SerdeJsonSnafu)?; + Ok(body.output().first().and_then(|output| match output { + GreptimeQueryOutput::Records(records) => Some(records.rows().clone()), + GreptimeQueryOutput::AffectedRows(_) => None, + })) + } + /// Iterate over all db names. /// /// Newbie: `db_name` is catalog + schema. @@ -132,35 +155,19 @@ impl Export { if let Some(schema) = &self.schema { Ok(vec![(self.catalog.clone(), schema.clone())]) } else { - let mut client = self.client.clone(); - client.set_catalog(self.catalog.clone()); - let result = - client - .sql("show databases") - .await - .with_context(|_| RequestDatabaseSnafu { - sql: "show databases".to_string(), - })?; - let OutputData::Stream(stream) = result.data else { - NotDataFromOutputSnafu.fail()? + let result = self.sql("show databases").await?; + let Some(records) = result else { + EmptyResultSnafu.fail()? }; - let record_batch = collect(stream) - .await - .context(CollectRecordBatchesSnafu)? - .pop() - .context(EmptyResultSnafu)?; - let schemas = record_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let mut result = Vec::with_capacity(schemas.len()); - for i in 0..schemas.len() { - let schema = schemas.get_data(i).unwrap().to_owned(); + let mut result = Vec::with_capacity(records.len()); + for value in records { + let serde_json::Value::String(schema) = &value[0] else { + unreachable!() + }; if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME { continue; } - result.push((self.catalog.clone(), schema)); + result.push((self.catalog.clone(), schema.clone())); } Ok(result) } @@ -172,54 +179,30 @@ impl Export { // TODO: SQL injection hurts let sql = format!( "select table_catalog, table_schema, table_name from \ - information_schema.tables where table_type = \'BASE TABLE\'\ + information_schema.tables where table_type = \'BASE TABLE\' \ and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'", ); - let mut client = self.client.clone(); - client.set_catalog(catalog); - client.set_schema(schema); - let result = client - .sql(&sql) - .await - .with_context(|_| RequestDatabaseSnafu { sql })?; - let OutputData::Stream(stream) = result.data else { - NotDataFromOutputSnafu.fail()? - }; - let Some(record_batch) = collect(stream) - .await - .context(CollectRecordBatchesSnafu)? - .pop() - else { - return Ok(vec![]); + let result = self.sql(&sql).await?; + let Some(records) = result else { + EmptyResultSnafu.fail()? }; - debug!("Fetched table list: {}", record_batch.pretty_print()); + debug!("Fetched table list: {:?}", records); - if record_batch.num_rows() == 0 { + if records.is_empty() { return Ok(vec![]); } - let mut result = Vec::with_capacity(record_batch.num_rows()); - let catalog_column = record_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let schema_column = record_batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - let table_column = record_batch - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - for i in 0..record_batch.num_rows() { - let catalog = catalog_column.get_data(i).unwrap().to_owned(); - let schema = schema_column.get_data(i).unwrap().to_owned(); - let table = table_column.get_data(i).unwrap().to_owned(); - result.push((catalog, schema, table)); + let mut result = Vec::with_capacity(records.len()); + for value in records { + let mut t = Vec::with_capacity(3); + for v in &value { + let serde_json::Value::String(value) = v else { + unreachable!() + }; + t.push(value); + } + result.push((t[0].clone(), t[1].clone(), t[2].clone())); } Ok(result) @@ -230,30 +213,15 @@ impl Export { r#"show create table "{}"."{}"."{}""#, catalog, schema, table ); - let mut client = self.client.clone(); - client.set_catalog(catalog); - client.set_schema(schema); - let result = client - .sql(&sql) - .await - .with_context(|_| RequestDatabaseSnafu { sql })?; - let OutputData::Stream(stream) = result.data else { - NotDataFromOutputSnafu.fail()? + let result = self.sql(&sql).await?; + let Some(records) = result else { + EmptyResultSnafu.fail()? }; - let record_batch = collect(stream) - .await - .context(CollectRecordBatchesSnafu)? - .pop() - .context(EmptyResultSnafu)?; - let create_table = record_batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap() - .get_data(0) - .unwrap(); - - Ok(format!("{create_table};\n")) + let serde_json::Value::String(create_table) = &records[0][1] else { + unreachable!() + }; + + Ok(format!("{};\n", create_table)) } async fn export_create_table(&self) -> Result<()> { @@ -321,20 +289,13 @@ impl Export { .context(FileIoSnafu)?; let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); - let mut client = self.client.clone(); - client.set_catalog(catalog.clone()); - client.set_schema(schema.clone()); - // copy database to let sql = format!( "copy database {} to '{}' with (format='parquet');", schema, output_dir.to_str().unwrap() ); - client - .sql(sql.clone()) - .await - .context(RequestDatabaseSnafu { sql })?; + self.sql(&sql).await?; info!("finished exporting {catalog}.{schema} data"); // export copy from sql @@ -420,82 +381,3 @@ fn split_database(database: &str) -> Result<(String, Option)> { Ok((catalog.to_string(), Some(schema.to_string()))) } } - -#[cfg(test)] -mod tests { - use clap::Parser; - use client::{Client, Database}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - - use crate::error::Result; - use crate::options::{CliOptions, Options}; - use crate::{cli, standalone, App}; - - #[tokio::test(flavor = "multi_thread")] - async fn test_export_create_table_with_quoted_names() -> Result<()> { - let output_dir = tempfile::tempdir().unwrap(); - - let standalone = standalone::Command::parse_from([ - "standalone", - "start", - "--data-home", - &*output_dir.path().to_string_lossy(), - ]); - let Options::Standalone(standalone_opts) = - standalone.load_options(&CliOptions::default())? - else { - unreachable!() - }; - let mut instance = standalone.build(*standalone_opts).await?; - instance.start().await?; - - let client = Client::with_urls(["127.0.0.1:4001"]); - let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - database - .sql(r#"CREATE DATABASE "cli.export.create_table";"#) - .await - .unwrap(); - database - .sql( - r#"CREATE TABLE "cli.export.create_table"."a.b.c"( - ts TIMESTAMP, - TIME INDEX (ts) - ) engine=mito; - "#, - ) - .await - .unwrap(); - - let output_dir = tempfile::tempdir().unwrap(); - let cli = cli::Command::parse_from([ - "cli", - "export", - "--addr", - "127.0.0.1:4001", - "--output-dir", - &*output_dir.path().to_string_lossy(), - "--target", - "create-table", - ]); - let mut cli_app = cli.build().await?; - cli_app.start().await?; - - instance.stop().await?; - - let output_file = output_dir - .path() - .join("greptime-cli.export.create_table.sql"); - let res = std::fs::read_to_string(output_file).unwrap(); - let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( - "ts" TIMESTAMP(3) NOT NULL, - TIME INDEX ("ts") -) - -ENGINE=mito -; -"#; - assert_eq!(res.trim(), expect.trim()); - - Ok(()) - } -} diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index c622aa462140..219d52bbb98b 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -19,7 +19,7 @@ use std::time::Instant; use catalog::kvbackend::{ CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, }; -use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_error::ext::ErrorExt; use common_meta::cache_invalidator::MultiCacheInvalidator; diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 1951ed5b0e29..d24f00f5be44 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -139,13 +139,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to request database, sql: {sql}"))] - RequestDatabase { - sql: String, - location: Location, - source: client::Error, - }, - #[snafu(display("Failed to collect RecordBatches"))] CollectRecordBatches { location: Location, @@ -218,6 +211,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to run http request: {reason}"))] + HttpQuerySql { + reason: String, + #[snafu(source)] + error: reqwest::Error, + location: Location, + }, + #[snafu(display("Expect data from output, but got another thing"))] NotDataFromOutput { location: Location }, @@ -290,8 +291,9 @@ impl ErrorExt for Error { Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), Error::StartWalOptionsAllocator { source, .. } => source.status_code(), - Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, - Error::RequestDatabase { source, .. } => source.status_code(), + Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => { + StatusCode::Internal + } Error::CollectRecordBatches { source, .. } | Error::PrettyPrintRecordBatches { source, .. } => source.status_code(), Error::StartMetaClient { source, .. } => source.status_code(), diff --git a/src/common/test-util/src/recordbatch.rs b/src/common/test-util/src/recordbatch.rs index 64a6262a08f3..47c949d40715 100644 --- a/src/common/test-util/src/recordbatch.rs +++ b/src/common/test-util/src/recordbatch.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use client::Database; use common_query::OutputData; use common_recordbatch::util; @@ -21,22 +20,6 @@ pub enum ExpectedOutput<'a> { QueryResult(&'a str), } -pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { - let output = db.sql(sql).await.unwrap(); - let output = output.data; - - match (&output, expected) { - (OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => { - assert_eq!(*x, y, "actual: \n{}", x) - } - (OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x)) - | (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => { - check_output_stream(output, x).await - } - _ => panic!(), - } -} - pub async fn check_output_stream(output: OutputData, expected: &str) { let recordbatches = match output { OutputData::Stream(stream) => util::collect_batches(stream).await.unwrap(), diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index ef8a2f751d7c..6f1e89daf58c 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -125,6 +125,8 @@ serde_json.workspace = true session = { workspace = true, features = ["testing"] } table.workspace = true tempfile = "3.0.0" +# TODO depend `Database` client +tests-integration.workspace = true tokio-postgres = "0.7" tokio-postgres-rustls = "0.11" tokio-test = "0.4" diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index 701dce7419f5..183fabb5d44e 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -21,7 +21,7 @@ use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; use auth::tests::MockUserProvider; use auth::UserProviderRef; -use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu}; use servers::grpc::flight::FlightCraftWrapper; @@ -31,6 +31,7 @@ use servers::server::Server; use snafu::ResultExt; use table::test_util::MemTable; use table::TableRef; +use tests_integration::database::Database; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index f843e6615c1b..e6807620c049 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -13,11 +13,13 @@ workspace = true [dependencies] api.workspace = true arrow-flight.workspace = true +async-stream.workspace = true async-trait = "0.1" auth.workspace = true axum.workspace = true catalog.workspace = true chrono.workspace = true +clap.workspace = true client = { workspace = true, features = ["testing"] } cmd.workspace = true common-base.workspace = true @@ -38,6 +40,7 @@ datatypes.workspace = true dotenv.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true +futures-util.workspace = true meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } mysql_async = { version = "0.33", default-features = false, features = [ @@ -53,6 +56,7 @@ secrecy = "0.8" serde_json.workspace = true servers = { workspace = true, features = ["testing"] } session.workspace = true +snafu.workspace = true sql.workspace = true sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", @@ -65,6 +69,7 @@ table.workspace = true tempfile.workspace = true time = "0.3" tokio.workspace = true +tokio-stream = { workspace = true, features = ["net"] } tonic.workspace = true tower = "0.4" uuid.workspace = true diff --git a/src/client/src/database.rs b/tests-integration/src/database.rs similarity index 72% rename from src/client/src/database.rs rename to tests-integration/src/database.rs index 70dc7397f5ed..31254cefdcc2 100644 --- a/src/client/src/database.rs +++ b/tests-integration/src/database.rs @@ -14,15 +14,17 @@ use api::v1::auth_header::AuthScheme; use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ - AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DeleteRequests, DropTableExpr, - GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, RequestHeader, - RowInsertRequests, TruncateTableExpr, + AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests, + QueryRequest, RequestHeader, }; use arrow_flight::Ticket; use async_stream::stream; +use client::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; +use client::{from_grpc_response, Client, Result}; use common_error::ext::{BoxedError, ErrorExt}; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; @@ -33,9 +35,7 @@ use common_telemetry::tracing_context::W3cTrace; use futures_util::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; - -use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; -use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter}; +use tonic::transport::Channel; pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; @@ -57,6 +57,19 @@ pub struct Database { ctx: FlightContext, } +pub struct DatabaseClient { + pub inner: GreptimeDatabaseClient, +} + +fn make_database_client(client: &Client) -> Result { + let (_, channel) = client.find_channel()?; + Ok(DatabaseClient { + inner: GreptimeDatabaseClient::new(channel) + .max_decoding_message_size(client.max_grpc_recv_message_size()) + .max_encoding_message_size(client.max_grpc_send_message_size()), + }) +} + impl Database { /// Create database service client using catalog and schema pub fn new(catalog: impl Into, schema: impl Into, client: Client) -> Self { @@ -88,34 +101,14 @@ impl Database { } } - pub fn catalog(&self) -> &String { - &self.catalog - } - pub fn set_catalog(&mut self, catalog: impl Into) { self.catalog = catalog.into(); } - pub fn schema(&self) -> &String { - &self.schema - } - pub fn set_schema(&mut self, schema: impl Into) { self.schema = schema.into(); } - pub fn dbname(&self) -> &String { - &self.dbname - } - - pub fn set_dbname(&mut self, dbname: impl Into) { - self.dbname = dbname.into(); - } - - pub fn timezone(&self) -> &String { - &self.timezone - } - pub fn set_timezone(&mut self, timezone: impl Into) { self.timezone = timezone.into(); } @@ -127,42 +120,11 @@ impl Database { } pub async fn insert(&self, requests: InsertRequests) -> Result { - let _timer = metrics::METRIC_GRPC_INSERT.start_timer(); self.handle(Request::Inserts(requests)).await } - pub async fn row_insert(&self, requests: RowInsertRequests) -> Result { - let _timer = metrics::METRIC_GRPC_INSERT.start_timer(); - self.handle(Request::RowInserts(requests)).await - } - - pub fn streaming_inserter(&self) -> Result { - self.streaming_inserter_with_channel_size(65536) - } - - pub fn streaming_inserter_with_channel_size( - &self, - channel_size: usize, - ) -> Result { - let client = self.client.make_database_client()?.inner; - - let stream_inserter = StreamInserter::new( - client, - self.dbname().to_string(), - self.ctx.auth_header.clone(), - channel_size, - ); - - Ok(stream_inserter) - } - - pub async fn delete(&self, request: DeleteRequests) -> Result { - let _timer = metrics::METRIC_GRPC_DELETE.start_timer(); - self.handle(Request::Deletes(request)).await - } - async fn handle(&self, request: Request) -> Result { - let mut client = self.client.make_database_client()?.inner; + let mut client = make_database_client(&self.client)?.inner; let request = self.to_rpc_request(request); let response = client.handle(request).await?.into_inner(); from_grpc_response(response) @@ -188,43 +150,13 @@ impl Database { where S: AsRef, { - let _timer = metrics::METRIC_GRPC_SQL.start_timer(); self.do_get(Request::Query(QueryRequest { query: Some(Query::Sql(sql.as_ref().to_string())), })) .await } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - let _timer = metrics::METRIC_GRPC_LOGICAL_PLAN.start_timer(); - self.do_get(Request::Query(QueryRequest { - query: Some(Query::LogicalPlan(logical_plan)), - })) - .await - } - - pub async fn prom_range_query( - &self, - promql: &str, - start: &str, - end: &str, - step: &str, - ) -> Result { - let _timer = metrics::METRIC_GRPC_PROMQL_RANGE_QUERY.start_timer(); - self.do_get(Request::Query(QueryRequest { - query: Some(Query::PromRangeQuery(PromRangeQuery { - query: promql.to_string(), - start: start.to_string(), - end: end.to_string(), - step: step.to_string(), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), - })), - })) - .await - } - pub async fn create(&self, expr: CreateTableExpr) -> Result { - let _timer = metrics::METRIC_GRPC_CREATE_TABLE.start_timer(); self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), })) @@ -232,32 +164,13 @@ impl Database { } pub async fn alter(&self, expr: AlterExpr) -> Result { - let _timer = metrics::METRIC_GRPC_ALTER.start_timer(); self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::Alter(expr)), })) .await } - pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - let _timer = metrics::METRIC_GRPC_DROP_TABLE.start_timer(); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(expr)), - })) - .await - } - - pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result { - let _timer = metrics::METRIC_GRPC_TRUNCATE_TABLE.start_timer(); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::TruncateTable(expr)), - })) - .await - } - async fn do_get(&self, request: Request) -> Result { - // FIXME(paomian): should be added some labels for metrics - let _timer = metrics::METRIC_GRPC_DO_GET.start_timer(); let request = self.to_rpc_request(request); let request = Ticket { ticket: request.encode_to_vec().into(), @@ -267,7 +180,7 @@ impl Database { let response = client.mut_inner().do_get(request).await.map_err(|e| { let tonic_code = e.code(); - let e: error::Error = e.into(); + let e: Error = e.into(); let code = e.status_code(); let msg = e.to_string(); let error = Error::FlightGet { @@ -350,7 +263,7 @@ impl Database { } #[derive(Default, Debug, Clone)] -pub struct FlightContext { +struct FlightContext { auth_header: Option, } @@ -358,8 +271,14 @@ pub struct FlightContext { mod tests { use api::v1::auth_header::AuthScheme; use api::v1::{AuthHeader, Basic}; + use clap::Parser; + use client::Client; + use cmd::error::Result as CmdResult; + use cmd::options::{CliOptions, Options}; + use cmd::{cli, standalone, App}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use crate::database::FlightContext; + use super::{Database, FlightContext}; #[test] fn test_flight_ctx() { @@ -382,4 +301,72 @@ mod tests { }) )) } + + #[tokio::test(flavor = "multi_thread")] + async fn test_export_create_table_with_quoted_names() -> CmdResult<()> { + let output_dir = tempfile::tempdir().unwrap(); + + let standalone = standalone::Command::parse_from([ + "standalone", + "start", + "--data-home", + &*output_dir.path().to_string_lossy(), + ]); + let Options::Standalone(standalone_opts) = + standalone.load_options(&CliOptions::default())? + else { + unreachable!() + }; + let mut instance = standalone.build(*standalone_opts).await?; + instance.start().await?; + + let client = Client::with_urls(["127.0.0.1:4001"]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + database + .sql(r#"CREATE DATABASE "cli.export.create_table";"#) + .await + .unwrap(); + database + .sql( + r#"CREATE TABLE "cli.export.create_table"."a.b.c"( + ts TIMESTAMP, + TIME INDEX (ts) + ) engine=mito; + "#, + ) + .await + .unwrap(); + + let output_dir = tempfile::tempdir().unwrap(); + let cli = cli::Command::parse_from([ + "cli", + "export", + "--addr", + "127.0.0.1:4000", + "--output-dir", + &*output_dir.path().to_string_lossy(), + "--target", + "create-table", + ]); + let mut cli_app = cli.build().await?; + cli_app.start().await?; + + instance.stop().await?; + + let output_file = output_dir + .path() + .join("greptime-cli.export.create_table.sql"); + let res = std::fs::read_to_string(output_file).unwrap(); + let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) + +ENGINE=mito +; +"#; + assert_eq!(res.trim(), expect.trim()); + + Ok(()) + } } diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index d3e700151345..e4db599fd5e7 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -15,6 +15,7 @@ #![feature(assert_matches)] pub mod cluster; +pub mod database; mod grpc; mod influxdb; mod instance; diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 0b38ac252c30..5a14751bbc45 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -20,7 +20,7 @@ use api::v1::{ PromqlRequest, RequestHeader, SemanticType, }; use auth::user_provider_from_option; -use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::MITO_ENGINE; use common_query::Output; use common_recordbatch::RecordBatches; @@ -30,6 +30,7 @@ use servers::http::prometheus::{ PrometheusResponse, }; use servers::server::Server; +use tests_integration::database::Database; use tests_integration::test_util::{ setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType, }; diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 6e8848de5c83..7cb36c1645cb 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -19,5 +19,7 @@ serde.workspace = true serde_json.workspace = true sqlness = { version = "0.5" } tempfile.workspace = true +# TODO depend `Database` client +tests-integration.workspace = true tinytemplate = "1.2" tokio.workspace = true diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index ea3e3e1bc10a..399f65840b9c 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -24,14 +24,13 @@ use std::time::Duration; use async_trait::async_trait; use client::error::ServerSnafu; -use client::{ - Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use client::{Client, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use serde::Serialize; use sqlness::{Database, EnvController, QueryContext}; +use tests_integration::database::Database as DB; use tinytemplate::TinyTemplate; use tokio::sync::Mutex as TokioMutex;