diff --git a/Cargo.lock b/Cargo.lock index 35fabee98b6c..890579284a22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,9 +10,9 @@ checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" [[package]] name = "addr2line" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" dependencies = [ "gimli", ] @@ -581,9 +581,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c90a406b4495d129f00461241616194cb8a032c8d1c53c657f0961d5f8e0498" +checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5" dependencies = [ "brotli 6.0.0", "bzip2", @@ -600,11 +600,11 @@ dependencies = [ [[package]] name = "async-lock" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener 4.0.3", + "event-listener 5.3.1", "event-listener-strategy", "pin-project-lite", ] @@ -828,9 +828,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" +checksum = "17c6a35df3749d2e8bb1b7b21a976d82b15548788d2735b9d82f329268f71a11" dependencies = [ "addr2line", "cc", @@ -1031,19 +1031,19 @@ dependencies = [ [[package]] name = "borsh" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbe5b10e214954177fb1dc9fbd20a1a2608fe99e6c832033bdc7cea287a20d77" +checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed" dependencies = [ "borsh-derive", - "cfg_aliases", + "cfg_aliases 0.2.1", ] [[package]] name = "borsh-derive" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a8646f94ab393e43e8b35a2558b1624bed28b97ee09c5d15456e3c9463f46d" +checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", "proc-macro-crate 3.1.0", @@ -1072,7 +1072,7 @@ checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor 4.0.0", + "brotli-decompressor 4.0.1", ] [[package]] @@ -1087,9 +1087,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "4.0.0" +version = "4.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6221fe77a248b9117d431ad93761222e1cf8ff282d9d1d5d9f53d6299a1cf76" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -1352,11 +1352,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "cfgrammar" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "163348850b1cd34fa99ef1592b5d598ea7e6752f18aff2125b67537e887edb36" +checksum = "ec07af28018dd8b4b52e49eb6e57268b19dda0996d4824889eb07ee0ef67378c" dependencies = [ "indexmap 2.2.6", "lazy_static", @@ -1448,9 +1454,9 @@ dependencies = [ [[package]] name = "clang-sys" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a483f3cbf7cec2e153d424d0e92329d816becc6421389bd494375c6065921b9b" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" dependencies = [ "glob", "libc", @@ -1560,6 +1566,7 @@ dependencies = [ "parking_lot 0.12.3", "prometheus", "prost 0.12.6", + "query", "rand", "serde_json", "snafu 0.8.3", @@ -1683,7 +1690,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ "strum 0.26.2", - "strum_macros 0.26.2", + "strum_macros 0.26.3", "unicode-width", ] @@ -1955,6 +1962,7 @@ dependencies = [ "common-time", "common-wal", "datafusion-common 38.0.0", + "datafusion-expr 38.0.0", "datatypes", "derive_builder 0.12.0", "etcd-client", @@ -2334,9 +2342,9 @@ checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" [[package]] name = "crc32c" -version = "0.6.5" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2" +checksum = "0227b9f93e535d49bc7ce914c066243424ce85ed90864cebd0874b184e9b6947" dependencies = [ "rustc_version", ] @@ -2601,7 +2609,7 @@ dependencies = [ "arrow-array", "arrow-ipc", "arrow-schema", - "async-compression 0.4.10", + "async-compression 0.4.11", "async-trait", "bytes", "bzip2", @@ -2651,7 +2659,7 @@ dependencies = [ "arrow-array", "arrow-ipc", "arrow-schema", - "async-compression 0.4.10", + "async-compression 0.4.11", "async-trait", "bytes", "bzip2", @@ -2807,7 +2815,7 @@ dependencies = [ "paste", "sqlparser 0.44.0", "strum 0.26.2", - "strum_macros 0.26.2", + "strum_macros 0.26.3", ] [[package]] @@ -2824,7 +2832,7 @@ dependencies = [ "serde_json", "sqlparser 0.45.0 (registry+https://github.com/rust-lang/crates.io-index)", "strum 0.26.2", - "strum_macros 0.26.2", + "strum_macros 0.26.3", ] [[package]] @@ -3154,7 +3162,7 @@ dependencies = [ "object_store", "prost 0.12.6", "prost-types 0.12.6", - "substrait 0.34.0", + "substrait 0.34.1", ] [[package]] @@ -3208,6 +3216,7 @@ dependencies = [ "session", "snafu 0.8.3", "store-api", + "substrait 0.8.1", "table", "tokio", "toml 0.8.13", @@ -3621,20 +3630,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "4.0.3" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener" -version = "5.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d9944b8ca13534cdfb2800775f8dd4902ff3fc75a50101466decadfdf322a24" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" dependencies = [ "concurrent-queue", "parking", @@ -3643,11 +3641,11 @@ dependencies = [ [[package]] name = "event-listener-strategy" -version = "0.4.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" dependencies = [ - "event-listener 4.0.3", + "event-listener 5.3.1", "pin-project-lite", ] @@ -4164,9 +4162,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.28.1" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "git2" @@ -4672,9 +4670,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.3" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" dependencies = [ "bytes", "futures-channel", @@ -5358,9 +5356,9 @@ dependencies = [ [[package]] name = "lrlex" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ff18e1bd3ed77d7bc2800a0f8b0e922a3c7ba525505be8bab9cf45dfc4984b" +checksum = "c65e01ebaccc77218ed6fa4f0053daa2124bce4e25a5e83aae0f7ccfc9cbfccb" dependencies = [ "cfgrammar", "getopts", @@ -5376,9 +5374,9 @@ dependencies = [ [[package]] name = "lrpar" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efea5a41b9988b5ae41ea9b2375a52cfa0e483f0210357209caa8d361a24a368" +checksum = "2a4b858180a332aec09d10479a070802b13081077eb94010744bc4e3a11d9768" dependencies = [ "bincode", "cactus", @@ -5398,9 +5396,9 @@ dependencies = [ [[package]] name = "lrtable" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff5668c3bfd279ed24d5b0d24568c48dc993f9beabd51f74d1865a78c1d206ab" +checksum = "4fcefc5628209d1b1f4b2cd0bcefd0e50be80bdf178e886cb07317f5ce4f2856" dependencies = [ "cfgrammar", "fnv", @@ -5903,7 +5901,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "event-listener 5.3.0", + "event-listener 5.3.1", "futures-util", "once_cell", "parking_lot 0.12.3", @@ -6180,7 +6178,7 @@ checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ "bitflags 2.5.0", "cfg-if", - "cfg_aliases", + "cfg_aliases 0.1.1", "libc", "memoffset 0.9.1", ] @@ -6414,9 +6412,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.2" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +checksum = "b8ec7ab813848ba4522158d5517a6093db1ded27575b070f4177b8d12b41db5e" dependencies = [ "memchr", ] @@ -7527,9 +7525,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.84" +version = "1.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec96c6a92621310b51366f1e28d05ef11489516e93be030060e5fc12024a49d6" +checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" dependencies = [ "unicode-ident", ] @@ -10192,9 +10190,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "statrs" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d08e5e1748192713cc281da8b16924fb46be7b0c2431854eadc785823e5696e" +checksum = "b35a062dbadac17a42e0fc64c27f419b25d6fae98572eb43c8814c9e873d7721" dependencies = [ "approx", "lazy_static", @@ -10329,7 +10327,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" dependencies = [ - "strum_macros 0.26.2", + "strum_macros 0.26.3", ] [[package]] @@ -10360,11 +10358,11 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.26.2" +version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" +checksum = "f7993a8e3a9e88a00351486baae9522c91b123a088f76469e5bd5cc17198ea87" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", "rustversion", @@ -10426,9 +10424,9 @@ dependencies = [ [[package]] name = "substrait" -version = "0.34.0" +version = "0.34.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c7ccf682a18309d9039bc16e14d9c7ea3a461e65dfc46e2b611b401b945ef2d" +checksum = "04c77dec9b6c4e48ac828937bbe7cf473b0933168c5d76d51a5816ace7046be9" dependencies = [ "heck 0.5.0", "prettyplease 0.2.20", @@ -10977,9 +10975,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.37.0" +version = "1.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", "bytes", @@ -11007,9 +11005,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", @@ -11215,7 +11213,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.8", + "winnow 0.6.9", ] [[package]] @@ -11357,7 +11355,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "async-compression 0.4.10", + "async-compression 0.4.11", "base64 0.21.7", "bitflags 2.5.0", "bytes", @@ -11615,9 +11613,9 @@ checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" [[package]] name = "triomphe" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" +checksum = "1b2cb4fbb9995eeb36ac86fadf24031ccd58f99d6b4b2d7b911db70bddb80d90" [[package]] name = "try-lock" @@ -12650,9 +12648,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.8" +version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c52e9c97a68071b23e836c9380edae937f17b9c4667bd021973efc689f618d" +checksum = "86c949fede1d13936a99f14fafd3e76fd642b556dd2ce96287fbe2e0151bfac6" dependencies = [ "memchr", ] diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 3c5f2a68fb76..17ef3ac1b721 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -31,9 +31,11 @@ moka = { workspace = true, features = ["future"] } parking_lot = "0.12" prometheus.workspace = true prost.workspace = true +query.workspace = true rand.workspace = true serde_json.workspace = true snafu.workspace = true +substrait.workspace = true tokio.workspace = true tokio-stream = { workspace = true, features = ["net"] } tonic.workspace = true @@ -42,7 +44,6 @@ tonic.workspace = true common-grpc-expr.workspace = true datanode.workspace = true derive-new = "0.5" -substrait.workspace = true tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 51517b5af123..811f26dca152 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::RegionRequest; use api::v1::ResponseHeader; use arc_swap::ArcSwapOption; use arrow_flight::Ticket; @@ -25,13 +25,15 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_meta::node_manager::Datanode; +use common_meta::node_manager::{Datanode, QueryRequest}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use prost::Message; +use query::query_engine::DefaultSerializer; use snafu::{location, Location, OptionExt, ResultExt}; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use tokio_stream::StreamExt; use crate::error::{ @@ -63,6 +65,17 @@ impl Datanode for RegionRequester { } async fn handle_query(&self, request: QueryRequest) -> MetaResult { + let plan = DFLogicalSubstraitConvertor + .encode(&request.plan, DefaultSerializer) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)? + .to_vec(); + let request = api::v1::region::QueryRequest { + header: request.header, + region_id: request.region_id.as_u64(), + plan, + }; + let ticket = Ticket { ticket: request.encode_to_vec().into(), }; diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 70faabb7ee75..096371f046a8 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -30,6 +30,7 @@ common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true datafusion-common.workspace = true +datafusion-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index f3a7f9a9fff5..78dee342ef6c 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -131,7 +131,7 @@ mod tests { use std::sync::Arc; use api::region::RegionResponse; - use api::v1::region::{QueryRequest, RegionRequest}; + use api::v1::region::RegionRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; @@ -143,6 +143,7 @@ mod tests { use crate::ddl::test_util::{create_logical_table, create_physical_table}; use crate::error::{self, Error, Result}; use crate::key::datanode_table::DatanodeTableKey; + use crate::node_manager::QueryRequest; use crate::peer::Peer; use crate::rpc::router::region_distribution; use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index 1649ebc00d47..1bb9327b765d 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::RegionRequest; use common_error::ext::{BoxedError, ErrorExt, StackError}; use common_error::status_code::StatusCode; use common_recordbatch::SendableRecordBatchStream; @@ -22,6 +22,7 @@ use snafu::{ResultExt, Snafu}; use tokio::sync::mpsc; use crate::error::{self, Error, Result}; +use crate::node_manager::QueryRequest; use crate::peer::Peer; use crate::test_util::MockDatanodeHandler; diff --git a/src/common/meta/src/node_manager.rs b/src/common/meta/src/node_manager.rs index 3d6bca6416b5..54fafda5a67f 100644 --- a/src/common/meta/src/node_manager.rs +++ b/src/common/meta/src/node_manager.rs @@ -16,13 +16,27 @@ use std::sync::Arc; use api::region::RegionResponse; use api::v1::flow::{FlowRequest, FlowResponse}; -use api::v1::region::{InsertRequests, QueryRequest, RegionRequest}; +use api::v1::region::{InsertRequests, RegionRequest, RegionRequestHeader}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; +use datafusion_expr::LogicalPlan; +use store_api::storage::RegionId; use crate::error::Result; use crate::peer::Peer; +/// The query request to be handled by the RegionServer(Datanode). +pub struct QueryRequest { + /// The header of this request. Often to store some context of the query. None means all to defaults. + pub header: Option, + + /// The id of the region to be queried. + pub region_id: RegionId, + + /// The form of the query: a logical plan. + pub plan: LogicalPlan, +} + /// The trait for handling requests to datanode. #[async_trait::async_trait] pub trait Datanode: Send + Sync { diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 892f01da0c99..8eacdceae5c3 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use api::region::RegionResponse; use api::v1::flow::{FlowRequest, FlowResponse}; -use api::v1::region::{InsertRequests, QueryRequest, RegionRequest}; +use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; @@ -30,7 +30,7 @@ use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; use crate::node_manager::{ - Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, + Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, QueryRequest, }; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index a5408b0c3246..26a7ccb67563 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -57,6 +57,7 @@ servers.workspace = true session.workspace = true snafu.workspace = true store-api.workspace = true +substrait.workspace = true table.workspace = true tokio.workspace = true toml.workspace = true diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 919a921ec349..d3cf57d4f7f3 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -75,7 +75,7 @@ pub enum Error { DecodeLogicalPlan { #[snafu(implicit)] location: Location, - source: common_query::error::Error, + source: substrait::error::Error, }, #[snafu(display("Incorrect internal state: {}", state))] @@ -387,6 +387,14 @@ pub enum Error { location: Location, source: BoxedError, }, + + #[snafu(display("DataFusion"))] + DataFusion { + #[snafu(source)] + error: datafusion::error::DataFusionError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -439,7 +447,8 @@ impl ErrorExt for Error { | IncorrectInternalState { .. } | ShutdownInstance { .. } | RegionEngineNotFound { .. } - | UnsupportedOutput { .. } => StatusCode::Internal, + | UnsupportedOutput { .. } + | DataFusion { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 13b10c497cef..f330f93521a2 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -18,13 +18,14 @@ use std::ops::Deref; use std::sync::{Arc, RwLock}; use api::region::RegionResponse; -use api::v1::region::{region_request, QueryRequest, RegionResponse as RegionResponseV1}; +use api::v1::region::{region_request, RegionResponse as RegionResponseV1}; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; use async_trait::async_trait; use bytes::Bytes; use common_error::ext::BoxedError; use common_error::status_code::StatusCode; +use common_meta::node_manager::QueryRequest; use common_query::OutputData; use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; @@ -32,6 +33,10 @@ use common_telemetry::tracing::{self, info_span}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, warn}; use dashmap::DashMap; +use datafusion::datasource::{provider_as_source, TableProvider}; +use datafusion::error::Result as DfResult; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_expr::{LogicalPlan, TableSource}; use futures_util::future::try_join_all; use metric_engine::engine::MetricEngine; use mito2::engine::MITO_ENGINE_NAME; @@ -44,20 +49,21 @@ use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as S use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; use session::context::{QueryContextBuilder, QueryContextRef}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; use store_api::storage::RegionId; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, - FindLogicalRegionsSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu, - RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, - UnsupportedOutputSnafu, + self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, + ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleRegionRequestSnafu, + RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result, + StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -125,9 +131,93 @@ impl RegionServer { self.inner.handle_request(region_id, request).await } + async fn table_provider(&self, region_id: RegionId) -> Result> { + let status = self + .inner + .region_map + .get(®ion_id) + .context(RegionNotFoundSnafu { region_id })? + .clone(); + ensure!( + matches!(status, RegionEngineWithStatus::Ready(_)), + RegionNotReadySnafu { region_id } + ); + + self.inner + .table_provider_factory + .create(region_id, status.into_engine()) + .await + .context(ExecuteLogicalPlanSnafu) + } + + /// Handle reads from remote. They're often query requests received by our Arrow Flight service. + pub async fn handle_remote_read( + &self, + request: api::v1::region::QueryRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let provider = self.table_provider(region_id).await?; + let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider)); + + let query_ctx: QueryContextRef = request + .header + .as_ref() + .map(|h| Arc::new(h.into())) + .unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build())); + let state = self + .inner + .query_engine + .engine_context(query_ctx) + .state() + .clone(); + + let plan = DFLogicalSubstraitConvertor + .decode(Bytes::from(request.plan), catalog_list, state) + .await + .context(DecodeLogicalPlanSnafu)?; + + self.inner + .handle_read(QueryRequest { + header: request.header, + region_id, + plan, + }) + .await + } + #[tracing::instrument(skip_all)] pub async fn handle_read(&self, request: QueryRequest) -> Result { - self.inner.handle_read(request).await + let provider = self.table_provider(request.region_id).await?; + + struct RegionDataSourceInjector { + source: Arc, + } + + impl TreeNodeRewriter for RegionDataSourceInjector { + type Node = LogicalPlan; + + fn f_up(&mut self, node: Self::Node) -> DfResult> { + Ok(match node { + LogicalPlan::TableScan(mut scan) => { + scan.source = self.source.clone(); + Transformed::yes(LogicalPlan::TableScan(scan)) + } + _ => Transformed::no(node), + }) + } + } + + let plan = request + .plan + .rewrite(&mut RegionDataSourceInjector { + source: provider_as_source(provider), + }) + .context(DataFusionSnafu)? + .data; + + self.inner + .handle_read(QueryRequest { plan, ..request }) + .await } /// Returns all opened and reportable regions. @@ -289,7 +379,7 @@ impl FlightCraft for RegionServer { request: Request, ) -> TonicResult>> { let ticket = request.into_inner().ticket; - let request = QueryRequest::decode(ticket.as_ref()) + let request = api::v1::region::QueryRequest::decode(ticket.as_ref()) .context(servers_error::InvalidFlightTicketSnafu)?; let tracing_context = request .header @@ -298,7 +388,7 @@ impl FlightCraft for RegionServer { .unwrap_or_default(); let result = self - .handle_read(request) + .handle_remote_read(request) .trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) .await?; @@ -326,10 +416,6 @@ impl RegionEngineWithStatus { RegionEngineWithStatus::Ready(engine) => engine, } } - - pub fn is_registering(&self) -> bool { - matches!(self, Self::Registering(_)) - } } impl Deref for RegionEngineWithStatus { @@ -621,51 +707,16 @@ impl RegionServerInner { pub async fn handle_read(&self, request: QueryRequest) -> Result { // TODO(ruihang): add metrics and set trace id - let QueryRequest { - header, - region_id, - plan, - } = request; - let region_id = RegionId::from_u64(region_id); - // Build query context from gRPC header - let ctx: QueryContextRef = header + let query_ctx: QueryContextRef = request + .header .as_ref() .map(|h| Arc::new(h.into())) .unwrap_or_else(|| QueryContextBuilder::default().build().into()); - // build dummy catalog list - let region_status = self - .region_map - .get(®ion_id) - .with_context(|| RegionNotFoundSnafu { region_id })? - .clone(); - - if region_status.is_registering() { - return error::RegionNotReadySnafu { region_id }.fail(); - } - - let table_provider = self - .table_provider_factory - .create(region_id, region_status.into_engine()) - .await - .context(ExecuteLogicalPlanSnafu)?; - - let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); - let query_engine_ctx = self.query_engine.engine_context(ctx.clone()); - let plan_decoder = query_engine_ctx - .new_plan_decoder() - .context(NewPlanDecoderSnafu)?; - - // decode substrait plan to logical plan and execute it - let logical_plan = plan_decoder - .decode(Bytes::from(plan), catalog_list, false) - .await - .context(DecodeLogicalPlanSnafu)?; - let result = self .query_engine - .execute(logical_plan.into(), ctx) + .execute(request.plan.into(), query_ctx) .await .context(ExecuteLogicalPlanSnafu)?; diff --git a/src/frontend/src/instance/region_query.rs b/src/frontend/src/instance/region_query.rs index 3cbd07e75905..f2040dc00139 100644 --- a/src/frontend/src/instance/region_query.rs +++ b/src/frontend/src/instance/region_query.rs @@ -14,16 +14,14 @@ use std::sync::Arc; -use api::v1::region::QueryRequest; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::node_manager::NodeManagerRef; +use common_meta::node_manager::{NodeManagerRef, QueryRequest}; use common_recordbatch::SendableRecordBatchStream; use partition::manager::PartitionRuleManagerRef; use query::error::{RegionQuerySnafu, Result as QueryResult}; use query::region_query::RegionQueryHandler; use snafu::ResultExt; -use store_api::storage::RegionId; use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result}; @@ -56,7 +54,7 @@ impl RegionQueryHandler for FrontendRegionQueryHandler { impl FrontendRegionQueryHandler { async fn do_get_inner(&self, request: QueryRequest) -> Result { - let region_id = RegionId::from_u64(request.region_id); + let region_id = request.region_id; let peer = &self .partition_manager diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 5c9e7a46f65a..a5bba0d9dedf 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionResponseV1}; +use api::v1::region::{RegionRequest, RegionResponse as RegionResponseV1}; use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_meta::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager}; +use common_meta::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, QueryRequest}; use common_meta::peer::Peer; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 004d32e4a295..75d15ae2c221 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; -use common_catalog::consts::default_engine; use common_meta::RegionIdent; use crate::error::Result; @@ -91,8 +90,7 @@ impl HeartbeatHandler for RegionFailureHandler { datanode_id: stat.id, table_id: region_id.table_id(), region_number: region_id.region_number(), - // TODO(LFC): Use the actual table engine (maybe retrieve from heartbeat). - engine: default_engine().to_string(), + engine: x.engine.clone(), } }) .collect(), @@ -109,6 +107,7 @@ impl HeartbeatHandler for RegionFailureHandler { mod tests { use std::assert_matches::assert_matches; + use common_catalog::consts::default_engine; use common_meta::key::MAINTENANCE_KEY; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 23d7fbb832cd..6fa9b862db4f 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -18,9 +18,9 @@ use std::time::Duration; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; -use common_base::bytes::Bytes; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::BoxedError; +use common_meta::node_manager::QueryRequest; use common_plugins::GREPTIME_EXEC_READ_COST; use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics}; use common_recordbatch::error::ExternalSnafu; @@ -40,7 +40,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::EquivalenceProperties; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; -use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; +use greptime_proto::v1::region::RegionRequestHeader; use greptime_proto::v1::QueryContext; use meter_core::data::ReadItem; use meter_macros::read_meter; @@ -125,7 +125,7 @@ impl MergeScanLogicalPlan { pub struct MergeScanExec { table: TableName, regions: Vec, - substrait_plan: Bytes, + plan: LogicalPlan, schema: SchemaRef, arrow_schema: ArrowSchemaRef, region_query_handler: RegionQueryHandlerRef, @@ -150,7 +150,7 @@ impl MergeScanExec { pub fn new( table: TableName, regions: Vec, - substrait_plan: Bytes, + plan: LogicalPlan, arrow_schema: &ArrowSchema, region_query_handler: RegionQueryHandlerRef, query_ctx: QueryContextRef, @@ -166,7 +166,7 @@ impl MergeScanExec { Ok(Self { table, regions, - substrait_plan, + plan, schema: schema_without_metadata, arrow_schema: arrow_schema_without_metadata, region_query_handler, @@ -178,7 +178,6 @@ impl MergeScanExec { } pub fn to_stream(&self, context: Arc) -> Result { - let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); @@ -192,6 +191,7 @@ impl MergeScanExec { let extensions = self.query_ctx.extensions(); let sub_sgate_metrics_moved = self.sub_stage_metrics.clone(); + let plan = self.plan.clone(); let stream = Box::pin(stream!({ MERGE_SCAN_REGIONS.observe(regions.len() as f64); let _finish_timer = metric.finish_time().timer(); @@ -210,8 +210,8 @@ impl MergeScanExec { extensions: extensions.clone(), }), }), - region_id: region_id.into(), - plan: substrait_plan.clone(), + region_id, + plan: plan.clone(), }; let mut stream = region_query_handler .do_get(request) diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index c3d8b00eaf2d..7b56538da404 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -24,22 +24,19 @@ use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::TableReference; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion_optimizer::analyzer::Analyzer; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; use table::table_name::TableName; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; -use crate::error; use crate::error::{CatalogSnafu, TableNotFoundSnafu}; -use crate::query_engine::DefaultSerializer; use crate::region_query::RegionQueryHandlerRef; pub struct DistExtensionPlanner { @@ -99,13 +96,6 @@ impl ExtensionPlanner for DistExtensionPlanner { // TODO(ruihang): generate different execution plans for different variant merge operation let schema = optimized_plan.schema().as_ref().into(); - // Pass down the original plan, allow execution nodes to do their optimization - let amended_plan = Self::plan_with_full_table_name(input_plan.clone(), &table_name)?; - let substrait_plan = DFLogicalSubstraitConvertor - .encode(&amended_plan, DefaultSerializer) - .context(error::EncodeSubstraitLogicalPlanSnafu)? - .into(); - let query_ctx = session_state .config() .get_extension() @@ -113,7 +103,7 @@ impl ExtensionPlanner for DistExtensionPlanner { let merge_scan_plan = MergeScanExec::new( table_name, regions, - substrait_plan, + input_plan.clone(), &schema, self.region_query_handler.clone(), query_ctx, @@ -130,12 +120,6 @@ impl DistExtensionPlanner { Ok(extractor.table_name) } - /// Apply the fully resolved table name to the TableScan plan - fn plan_with_full_table_name(plan: LogicalPlan, name: &TableName) -> Result { - plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name)) - .map(|x| x.data) - } - async fn get_regions(&self, table_name: &TableName) -> Result> { let table = self .catalog_manager @@ -230,24 +214,3 @@ impl TreeNodeVisitor<'_> for TableNameExtractor { } } } - -struct TableNameRewriter; - -impl TableNameRewriter { - fn rewrite_table_name( - plan: LogicalPlan, - name: &TableName, - ) -> datafusion_common::Result> { - Ok(match plan { - LogicalPlan::TableScan(mut table_scan) => { - table_scan.table_name = TableReference::full( - name.catalog_name.clone(), - name.schema_name.clone(), - name.table_name.clone(), - ); - Transformed::yes(LogicalPlan::TableScan(table_scan)) - } - _ => Transformed::no(plan), - }) - } -} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 35d3fbdb17b9..6eeb764b8a9f 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -140,13 +140,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to encode Substrait logical plan"))] - EncodeSubstraitLogicalPlan { - source: substrait::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("General SQL error"))] Sql { #[snafu(implicit)] @@ -340,7 +333,6 @@ impl ErrorExt for Error { | ColumnSchemaNoDefault { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, - EncodeSubstraitLogicalPlan { source, .. } => source.status_code(), ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(), diff --git a/src/query/src/region_query.rs b/src/query/src/region_query.rs index f9861103e62b..649af831b8af 100644 --- a/src/query/src/region_query.rs +++ b/src/query/src/region_query.rs @@ -14,8 +14,8 @@ use std::sync::Arc; -use api::v1::region::QueryRequest; use async_trait::async_trait; +use common_meta::node_manager::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 36f41696f6c9..a41565b2f1ff 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -75,7 +75,8 @@ impl ColumnMetadata { column_def.datatype_extension.clone(), ) .into(); - ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) + ColumnSchema::new(&column_def.name, data_type, column_def.is_nullable) + .with_time_index(column_def.semantic_type() == SemanticType::Timestamp) .with_default_constraint(default_constrain) .context(ConvertDatatypesSnafu) } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 600945d60719..d01eb82a68c4 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -647,8 +647,6 @@ pub struct TableInfo { /// Id and version of the table. #[builder(default, setter(into))] pub ident: TableIdent, - - // TODO(LFC): Remove the catalog, schema and table names from TableInfo. /// Name of the table. #[builder(setter(into))] pub name: String, diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index ed2d6425a439..b3e26b9bb5cb 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -555,7 +555,7 @@ CREATE TABLE {table_name} ( let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_read(RegionQueryRequest { + .handle_remote_read(RegionQueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), ..Default::default() diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index feff39e136c0..a456f0a75db4 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -249,7 +249,7 @@ mod tests { let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_read(QueryRequest { + .handle_remote_read(QueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), ..Default::default() diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result index 762f70bdace4..9adcf9eb212c 100644 --- a/tests/cases/distributed/explain/analyze.result +++ b/tests/cases/distributed/explain/analyze.result @@ -31,9 +31,9 @@ explain analyze SELECT count(*) FROM system_metrics; +-+-+-+ | 0_| 0_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[COUNT(greptime.public.system_REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[COUNT(system_REDACTED |_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(greptime.public.system_REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_SinglePartitionScanner: REDACTED |_|_|_|