diff --git a/Cargo.lock b/Cargo.lock index 9812859e0cad..c0ae70705399 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1560,6 +1560,7 @@ dependencies = [ "parking_lot 0.12.3", "prometheus", "prost 0.12.6", + "query", "rand", "serde_json", "snafu 0.8.3", @@ -1955,11 +1956,13 @@ dependencies = [ "common-macro", "common-procedure", "common-procedure-test", + "common-query", "common-recordbatch", "common-telemetry", "common-time", "common-wal", "datafusion-common 38.0.0", + "datafusion-expr 38.0.0", "datatypes", "derive_builder 0.12.0", "etcd-client", @@ -2048,6 +2051,7 @@ dependencies = [ "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", "sqlparser_derive 0.1.1", "statrs", + "store-api", "tokio", ] @@ -3215,6 +3219,7 @@ dependencies = [ "session", "snafu 0.8.3", "store-api", + "substrait 0.8.1", "table", "tokio", "toml 0.8.13", @@ -10244,7 +10249,6 @@ dependencies = [ "common-base", "common-error", "common-macro", - "common-query", "common-recordbatch", "common-wal", "datafusion-expr 38.0.0", diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 3c5f2a68fb76..17ef3ac1b721 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -31,9 +31,11 @@ moka = { workspace = true, features = ["future"] } parking_lot = "0.12" prometheus.workspace = true prost.workspace = true +query.workspace = true rand.workspace = true serde_json.workspace = true snafu.workspace = true +substrait.workspace = true tokio.workspace = true tokio-stream = { workspace = true, features = ["net"] } tonic.workspace = true @@ -42,7 +44,6 @@ tonic.workspace = true common-grpc-expr.workspace = true datanode.workspace = true derive-new = "0.5" -substrait.workspace = true tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 51517b5af123..cc91010aa0cb 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::RegionRequest; use api::v1::ResponseHeader; use arc_swap::ArcSwapOption; use arrow_flight::Ticket; @@ -26,12 +26,15 @@ use common_error::status_code::StatusCode; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::node_manager::Datanode; +use common_query::request::QueryRequest; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use prost::Message; +use query::query_engine::DefaultSerializer; use snafu::{location, Location, OptionExt, ResultExt}; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use tokio_stream::StreamExt; use crate::error::{ @@ -63,6 +66,17 @@ impl Datanode for RegionRequester { } async fn handle_query(&self, request: QueryRequest) -> MetaResult { + let plan = DFLogicalSubstraitConvertor + .encode(&request.plan, DefaultSerializer) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)? + .to_vec(); + let request = api::v1::region::QueryRequest { + header: request.header, + region_id: request.region_id.as_u64(), + plan, + }; + let ticket = Ticket { ticket: request.encode_to_vec().into(), }; diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 70faabb7ee75..347ccc3b5f2b 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -25,11 +25,13 @@ common-grpc-expr.workspace = true common-macro.workspace = true common-procedure.workspace = true common-procedure-test.workspace = true +common-query.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true datafusion-common.workspace = true +datafusion-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index f3a7f9a9fff5..433e7dc96b20 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -131,9 +131,10 @@ mod tests { use std::sync::Arc; use api::region::RegionResponse; - use api::v1::region::{QueryRequest, RegionRequest}; + use api::v1::region::RegionRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; + use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use table::table_name::TableName; diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index 1649ebc00d47..5cb7d4a0f20e 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -13,9 +13,10 @@ // limitations under the License. use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::RegionRequest; use common_error::ext::{BoxedError, ErrorExt, StackError}; use common_error::status_code::StatusCode; +use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use snafu::{ResultExt, Snafu}; diff --git a/src/common/meta/src/node_manager.rs b/src/common/meta/src/node_manager.rs index 3d6bca6416b5..bda4e02d1e34 100644 --- a/src/common/meta/src/node_manager.rs +++ b/src/common/meta/src/node_manager.rs @@ -16,8 +16,9 @@ use std::sync::Arc; use api::region::RegionResponse; use api::v1::flow::{FlowRequest, FlowResponse}; -use api::v1::region::{InsertRequests, QueryRequest, RegionRequest}; +use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; +use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 892f01da0c99..2abfb5f24e09 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -16,8 +16,9 @@ use std::sync::Arc; use api::region::RegionResponse; use api::v1::flow::{FlowRequest, FlowResponse}; -use api::v1::region::{InsertRequests, QueryRequest, RegionRequest}; +use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; +use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index d7a0361965bd..9ba2e11f88fc 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -27,6 +27,7 @@ snafu.workspace = true sqlparser.workspace = true sqlparser_derive = "0.1" statrs = "0.16" +store-api.workspace = true [dev-dependencies] common-base.workspace = true diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 68c7c2568cbc..77eb2d4e24a3 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -17,6 +17,7 @@ pub mod error; mod function; pub mod logical_plan; pub mod prelude; +pub mod request; mod signature; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/common/query/src/request.rs b/src/common/query/src/request.rs new file mode 100644 index 000000000000..a91f72eba02f --- /dev/null +++ b/src/common/query/src/request.rs @@ -0,0 +1,29 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::RegionRequestHeader; +use datafusion_expr::LogicalPlan; +use store_api::storage::RegionId; + +/// The query request to be handled by the RegionServer (Datanode). +pub struct QueryRequest { + /// The header of this request. Often to store some context of the query. None means all to defaults. + pub header: Option, + + /// The id of the region to be queried. + pub region_id: RegionId, + + /// The form of the query: a logical plan. + pub plan: LogicalPlan, +} diff --git a/src/common/query/src/test_util.rs b/src/common/query/src/test_util.rs index 141c284a7baf..ae599f9b1f5b 100644 --- a/src/common/query/src/test_util.rs +++ b/src/common/query/src/test_util.rs @@ -20,7 +20,7 @@ use datafusion::logical_expr::LogicalPlan; use crate::error::Result; use crate::logical_plan::SubstraitPlanDecoder; -/// Dummy `[SubstraitPlanDecoder]` for test. +/// Dummy [`SubstraitPlanDecoder`] for test. pub struct DummyDecoder; impl DummyDecoder { diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index a5408b0c3246..26a7ccb67563 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -57,6 +57,7 @@ servers.workspace = true session.workspace = true snafu.workspace = true store-api.workspace = true +substrait.workspace = true table.workspace = true tokio.workspace = true toml.workspace = true diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index c5956f731bac..d564b417d3a2 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -394,6 +394,14 @@ pub enum Error { location: Location, source: BoxedError, }, + + #[snafu(display("DataFusion"))] + DataFusion { + #[snafu(source)] + error: datafusion::error::DataFusionError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -446,7 +454,8 @@ impl ErrorExt for Error { | IncorrectInternalState { .. } | ShutdownInstance { .. } | RegionEngineNotFound { .. } - | UnsupportedOutput { .. } => StatusCode::Internal, + | UnsupportedOutput { .. } + | DataFusion { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 738d98fdd43b..427d95f1d361 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -18,13 +18,14 @@ use std::ops::Deref; use std::sync::{Arc, RwLock}; use api::region::RegionResponse; -use api::v1::region::{region_request, QueryRequest, RegionResponse as RegionResponseV1}; +use api::v1::region::{region_request, RegionResponse as RegionResponseV1}; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; use async_trait::async_trait; use bytes::Bytes; use common_error::ext::BoxedError; use common_error::status_code::StatusCode; +use common_query::request::QueryRequest; use common_query::OutputData; use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; @@ -32,6 +33,10 @@ use common_telemetry::tracing::{self, info_span}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{error, info, warn}; use dashmap::DashMap; +use datafusion::datasource::{provider_as_source, TableProvider}; +use datafusion::error::Result as DfResult; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_expr::{LogicalPlan, TableSource}; use futures_util::future::try_join_all; use metric_engine::engine::MetricEngine; use mito2::engine::MITO_ENGINE_NAME; @@ -44,7 +49,7 @@ use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as S use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; use session::context::{QueryContextBuilder, QueryContextRef}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; @@ -56,10 +61,10 @@ use store_api::storage::RegionId; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, - FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, - NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, - StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, + self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, + ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, + HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, + RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -138,9 +143,94 @@ impl RegionServer { self.inner.handle_request(region_id, request).await } + async fn table_provider(&self, region_id: RegionId) -> Result> { + let status = self + .inner + .region_map + .get(®ion_id) + .context(RegionNotFoundSnafu { region_id })? + .clone(); + ensure!( + matches!(status, RegionEngineWithStatus::Ready(_)), + RegionNotReadySnafu { region_id } + ); + + self.inner + .table_provider_factory + .create(region_id, status.into_engine()) + .await + .context(ExecuteLogicalPlanSnafu) + } + + /// Handle reads from remote. They're often query requests received by our Arrow Flight service. + pub async fn handle_remote_read( + &self, + request: api::v1::region::QueryRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let provider = self.table_provider(region_id).await?; + let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider)); + + let query_ctx: QueryContextRef = request + .header + .as_ref() + .map(|h| Arc::new(h.into())) + .unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build())); + + let decoder = self + .inner + .query_engine + .engine_context(query_ctx) + .new_plan_decoder() + .context(NewPlanDecoderSnafu)?; + + let plan = decoder + .decode(Bytes::from(request.plan), catalog_list, false) + .await + .context(DecodeLogicalPlanSnafu)?; + + self.inner + .handle_read(QueryRequest { + header: request.header, + region_id, + plan, + }) + .await + } + #[tracing::instrument(skip_all)] pub async fn handle_read(&self, request: QueryRequest) -> Result { - self.inner.handle_read(request).await + let provider = self.table_provider(request.region_id).await?; + + struct RegionDataSourceInjector { + source: Arc, + } + + impl TreeNodeRewriter for RegionDataSourceInjector { + type Node = LogicalPlan; + + fn f_up(&mut self, node: Self::Node) -> DfResult> { + Ok(match node { + LogicalPlan::TableScan(mut scan) => { + scan.source = self.source.clone(); + Transformed::yes(LogicalPlan::TableScan(scan)) + } + _ => Transformed::no(node), + }) + } + } + + let plan = request + .plan + .rewrite(&mut RegionDataSourceInjector { + source: provider_as_source(provider), + }) + .context(DataFusionSnafu)? + .data; + + self.inner + .handle_read(QueryRequest { plan, ..request }) + .await } /// Returns all opened and reportable regions. @@ -302,7 +392,7 @@ impl FlightCraft for RegionServer { request: Request, ) -> TonicResult>> { let ticket = request.into_inner().ticket; - let request = QueryRequest::decode(ticket.as_ref()) + let request = api::v1::region::QueryRequest::decode(ticket.as_ref()) .context(servers_error::InvalidFlightTicketSnafu)?; let tracing_context = request .header @@ -311,7 +401,7 @@ impl FlightCraft for RegionServer { .unwrap_or_default(); let result = self - .handle_read(request) + .handle_remote_read(request) .trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) .await?; @@ -339,10 +429,6 @@ impl RegionEngineWithStatus { RegionEngineWithStatus::Ready(engine) => engine, } } - - pub fn is_registering(&self) -> bool { - matches!(self, Self::Registering(_)) - } } impl Deref for RegionEngineWithStatus { @@ -741,51 +827,16 @@ impl RegionServerInner { pub async fn handle_read(&self, request: QueryRequest) -> Result { // TODO(ruihang): add metrics and set trace id - let QueryRequest { - header, - region_id, - plan, - } = request; - let region_id = RegionId::from_u64(region_id); - // Build query context from gRPC header - let ctx: QueryContextRef = header + let query_ctx: QueryContextRef = request + .header .as_ref() .map(|h| Arc::new(h.into())) .unwrap_or_else(|| QueryContextBuilder::default().build().into()); - // build dummy catalog list - let region_status = self - .region_map - .get(®ion_id) - .with_context(|| RegionNotFoundSnafu { region_id })? - .clone(); - - if region_status.is_registering() { - return error::RegionNotReadySnafu { region_id }.fail(); - } - - let table_provider = self - .table_provider_factory - .create(region_id, region_status.into_engine()) - .await - .context(ExecuteLogicalPlanSnafu)?; - - let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); - let query_engine_ctx = self.query_engine.engine_context(ctx.clone()); - let plan_decoder = query_engine_ctx - .new_plan_decoder() - .context(NewPlanDecoderSnafu)?; - - // decode substrait plan to logical plan and execute it - let logical_plan = plan_decoder - .decode(Bytes::from(plan), catalog_list, false) - .await - .context(DecodeLogicalPlanSnafu)?; - let result = self .query_engine - .execute(logical_plan.into(), ctx) + .execute(request.plan.into(), query_ctx) .await .context(ExecuteLogicalPlanSnafu)?; diff --git a/src/frontend/src/instance/region_query.rs b/src/frontend/src/instance/region_query.rs index 3cbd07e75905..b6d2f00b22d8 100644 --- a/src/frontend/src/instance/region_query.rs +++ b/src/frontend/src/instance/region_query.rs @@ -14,16 +14,15 @@ use std::sync::Arc; -use api::v1::region::QueryRequest; use async_trait::async_trait; use common_error::ext::BoxedError; use common_meta::node_manager::NodeManagerRef; +use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use partition::manager::PartitionRuleManagerRef; use query::error::{RegionQuerySnafu, Result as QueryResult}; use query::region_query::RegionQueryHandler; use snafu::ResultExt; -use store_api::storage::RegionId; use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result}; @@ -56,7 +55,7 @@ impl RegionQueryHandler for FrontendRegionQueryHandler { impl FrontendRegionQueryHandler { async fn do_get_inner(&self, request: QueryRequest) -> Result { - let region_id = RegionId::from_u64(request.region_id); + let region_id = request.region_id; let peer = &self .partition_manager diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 5c9e7a46f65a..55a11cf127d2 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,13 +15,14 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionResponseV1}; +use api::v1::region::{RegionRequest, RegionResponse as RegionResponseV1}; use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager}; use common_meta::peer::Peer; +use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; use common_telemetry::tracing_context::{FutureExt, TracingContext}; diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 004d32e4a295..75d15ae2c221 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; -use common_catalog::consts::default_engine; use common_meta::RegionIdent; use crate::error::Result; @@ -91,8 +90,7 @@ impl HeartbeatHandler for RegionFailureHandler { datanode_id: stat.id, table_id: region_id.table_id(), region_number: region_id.region_number(), - // TODO(LFC): Use the actual table engine (maybe retrieve from heartbeat). - engine: default_engine().to_string(), + engine: x.engine.clone(), } }) .collect(), @@ -109,6 +107,7 @@ impl HeartbeatHandler for RegionFailureHandler { mod tests { use std::assert_matches::assert_matches; + use common_catalog::consts::default_engine; use common_meta::key::MAINTENANCE_KEY; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 23d7fbb832cd..93bbff1f56c7 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -18,10 +18,10 @@ use std::time::Duration; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; -use common_base::bytes::Bytes; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::BoxedError; use common_plugins::GREPTIME_EXEC_READ_COST; +use common_query::request::QueryRequest; use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ @@ -40,7 +40,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::EquivalenceProperties; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; -use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; +use greptime_proto::v1::region::RegionRequestHeader; use greptime_proto::v1::QueryContext; use meter_core::data::ReadItem; use meter_macros::read_meter; @@ -125,7 +125,7 @@ impl MergeScanLogicalPlan { pub struct MergeScanExec { table: TableName, regions: Vec, - substrait_plan: Bytes, + plan: LogicalPlan, schema: SchemaRef, arrow_schema: ArrowSchemaRef, region_query_handler: RegionQueryHandlerRef, @@ -150,7 +150,7 @@ impl MergeScanExec { pub fn new( table: TableName, regions: Vec, - substrait_plan: Bytes, + plan: LogicalPlan, arrow_schema: &ArrowSchema, region_query_handler: RegionQueryHandlerRef, query_ctx: QueryContextRef, @@ -166,7 +166,7 @@ impl MergeScanExec { Ok(Self { table, regions, - substrait_plan, + plan, schema: schema_without_metadata, arrow_schema: arrow_schema_without_metadata, region_query_handler, @@ -178,7 +178,6 @@ impl MergeScanExec { } pub fn to_stream(&self, context: Arc) -> Result { - let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); @@ -192,6 +191,7 @@ impl MergeScanExec { let extensions = self.query_ctx.extensions(); let sub_sgate_metrics_moved = self.sub_stage_metrics.clone(); + let plan = self.plan.clone(); let stream = Box::pin(stream!({ MERGE_SCAN_REGIONS.observe(regions.len() as f64); let _finish_timer = metric.finish_time().timer(); @@ -210,8 +210,8 @@ impl MergeScanExec { extensions: extensions.clone(), }), }), - region_id: region_id.into(), - plan: substrait_plan.clone(), + region_id, + plan: plan.clone(), }; let mut stream = region_query_handler .do_get(request) diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index c3d8b00eaf2d..7b56538da404 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -24,22 +24,19 @@ use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::TableReference; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion_optimizer::analyzer::Analyzer; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; use table::table_name::TableName; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; -use crate::error; use crate::error::{CatalogSnafu, TableNotFoundSnafu}; -use crate::query_engine::DefaultSerializer; use crate::region_query::RegionQueryHandlerRef; pub struct DistExtensionPlanner { @@ -99,13 +96,6 @@ impl ExtensionPlanner for DistExtensionPlanner { // TODO(ruihang): generate different execution plans for different variant merge operation let schema = optimized_plan.schema().as_ref().into(); - // Pass down the original plan, allow execution nodes to do their optimization - let amended_plan = Self::plan_with_full_table_name(input_plan.clone(), &table_name)?; - let substrait_plan = DFLogicalSubstraitConvertor - .encode(&amended_plan, DefaultSerializer) - .context(error::EncodeSubstraitLogicalPlanSnafu)? - .into(); - let query_ctx = session_state .config() .get_extension() @@ -113,7 +103,7 @@ impl ExtensionPlanner for DistExtensionPlanner { let merge_scan_plan = MergeScanExec::new( table_name, regions, - substrait_plan, + input_plan.clone(), &schema, self.region_query_handler.clone(), query_ctx, @@ -130,12 +120,6 @@ impl DistExtensionPlanner { Ok(extractor.table_name) } - /// Apply the fully resolved table name to the TableScan plan - fn plan_with_full_table_name(plan: LogicalPlan, name: &TableName) -> Result { - plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name)) - .map(|x| x.data) - } - async fn get_regions(&self, table_name: &TableName) -> Result> { let table = self .catalog_manager @@ -230,24 +214,3 @@ impl TreeNodeVisitor<'_> for TableNameExtractor { } } } - -struct TableNameRewriter; - -impl TableNameRewriter { - fn rewrite_table_name( - plan: LogicalPlan, - name: &TableName, - ) -> datafusion_common::Result> { - Ok(match plan { - LogicalPlan::TableScan(mut table_scan) => { - table_scan.table_name = TableReference::full( - name.catalog_name.clone(), - name.schema_name.clone(), - name.table_name.clone(), - ); - Transformed::yes(LogicalPlan::TableScan(table_scan)) - } - _ => Transformed::no(plan), - }) - } -} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 35d3fbdb17b9..6eeb764b8a9f 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -140,13 +140,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to encode Substrait logical plan"))] - EncodeSubstraitLogicalPlan { - source: substrait::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("General SQL error"))] Sql { #[snafu(implicit)] @@ -340,7 +333,6 @@ impl ErrorExt for Error { | ColumnSchemaNoDefault { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, - EncodeSubstraitLogicalPlan { source, .. } => source.status_code(), ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(), diff --git a/src/query/src/region_query.rs b/src/query/src/region_query.rs index f9861103e62b..92b2a4e2f98b 100644 --- a/src/query/src/region_query.rs +++ b/src/query/src/region_query.rs @@ -14,8 +14,8 @@ use std::sync::Arc; -use api::v1::region::QueryRequest; use async_trait::async_trait; +use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 5b6f5ddc1603..dd4963483077 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -14,7 +14,6 @@ async-trait.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true -common-query.workspace = true common-recordbatch.workspace = true common-wal.workspace = true datafusion-expr.workspace = true diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 36f41696f6c9..a41565b2f1ff 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -75,7 +75,8 @@ impl ColumnMetadata { column_def.datatype_extension.clone(), ) .into(); - ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) + ColumnSchema::new(&column_def.name, data_type, column_def.is_nullable) + .with_time_index(column_def.semantic_type() == SemanticType::Timestamp) .with_default_constraint(default_constrain) .context(ConvertDatatypesSnafu) } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 337effbd0374..7f88fe78572e 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -21,14 +21,13 @@ use std::sync::{Arc, Mutex}; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use api::region::RegionResponse; use async_trait::async_trait; -use common_error::ext::BoxedError; -use common_query::error::ExecuteRepeatedlySnafu; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; use common_recordbatch::SendableRecordBatchStream; use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::future::join_all; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; use tokio::sync::Semaphore; use crate::logstore::entry; @@ -295,10 +294,12 @@ impl RegionScanner for SinglePartitionScanner { fn scan_partition(&self, _partition: usize) -> Result { let mut stream = self.stream.lock().unwrap(); - stream - .take() - .context(ExecuteRepeatedlySnafu) - .map_err(BoxedError::new) + stream.take().ok_or_else(|| { + BoxedError::new(PlainError::new( + "Not expected to run ExecutionPlan more than once".to_string(), + StatusCode::Unexpected, + )) + }) } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 600945d60719..d01eb82a68c4 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -647,8 +647,6 @@ pub struct TableInfo { /// Id and version of the table. #[builder(default, setter(into))] pub ident: TableIdent, - - // TODO(LFC): Remove the catalog, schema and table names from TableInfo. /// Name of the table. #[builder(setter(into))] pub name: String, diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index ed2d6425a439..b3e26b9bb5cb 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -555,7 +555,7 @@ CREATE TABLE {table_name} ( let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_read(RegionQueryRequest { + .handle_remote_read(RegionQueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), ..Default::default() diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index feff39e136c0..a456f0a75db4 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -249,7 +249,7 @@ mod tests { let region_id = RegionId::new(table_id, *region); let stream = region_server - .handle_read(QueryRequest { + .handle_remote_read(QueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), ..Default::default() diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result index 762f70bdace4..9adcf9eb212c 100644 --- a/tests/cases/distributed/explain/analyze.result +++ b/tests/cases/distributed/explain/analyze.result @@ -31,9 +31,9 @@ explain analyze SELECT count(*) FROM system_metrics; +-+-+-+ | 0_| 0_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[COUNT(greptime.public.system_REDACTED +| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[COUNT(system_REDACTED |_|_|_CoalescePartitionsExec REDACTED -|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(greptime.public.system_REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_SinglePartitionScanner: REDACTED |_|_|_|