Skip to content

Commit

Permalink
refactor: remove substrait ser/de for region query in standalone (#3812)
Browse files Browse the repository at this point in the history
* refactor: remove substrait serde for region query in standalone

* fix ci

* move QueryRequest to common-query

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format toml file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: format toml

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
  • Loading branch information
3 people authored Jun 11, 2024
1 parent 1b00526 commit 5a6021e
Show file tree
Hide file tree
Showing 29 changed files with 208 additions and 138 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
prometheus.workspace = true
prost.workspace = true
query.workspace = true
rand.workspace = true
serde_json.workspace = true
snafu.workspace = true
substrait.workspace = true
tokio.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
tonic.workspace = true
Expand All @@ -42,7 +44,6 @@ tonic.workspace = true
common-grpc-expr.workspace = true
datanode.workspace = true
derive-new = "0.5"
substrait.workspace = true
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand Down
16 changes: 15 additions & 1 deletion src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
Expand All @@ -26,12 +26,15 @@ use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::node_manager::Datanode;
use common_query::request::QueryRequest;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use query::query_engine::DefaultSerializer;
use snafu::{location, Location, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use tokio_stream::StreamExt;

use crate::error::{
Expand Down Expand Up @@ -63,6 +66,17 @@ impl Datanode for RegionRequester {
}

async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let plan = DFLogicalSubstraitConvertor
.encode(&request.plan, DefaultSerializer)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?
.to_vec();
let request = api::v1::region::QueryRequest {
header: request.header,
region_id: request.region_id.as_u64(),
plan,
};

let ticket = Ticket {
ticket: request.encode_to_vec().into(),
};
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ common-grpc-expr.workspace = true
common-macro.workspace = true
common-procedure.workspace = true
common-procedure-test.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ mod tests {
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use table::table_name::TableName;

Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/test_util/datanode_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_error::ext::{BoxedError, ErrorExt, StackError};
use common_error::status_code::StatusCode;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use snafu::{ResultExt, Snafu};
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;

use crate::error::Result;
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;

use crate::cache_invalidator::DummyCacheInvalidator;
Expand Down
1 change: 1 addition & 0 deletions src/common/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ snafu.workspace = true
sqlparser.workspace = true
sqlparser_derive = "0.1"
statrs = "0.16"
store-api.workspace = true

[dev-dependencies]
common-base.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod error;
mod function;
pub mod logical_plan;
pub mod prelude;
pub mod request;
mod signature;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
Expand Down
29 changes: 29 additions & 0 deletions src/common/query/src/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::region::RegionRequestHeader;
use datafusion_expr::LogicalPlan;
use store_api::storage::RegionId;

/// The query request to be handled by the RegionServer (Datanode).
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,

/// The id of the region to be queried.
pub region_id: RegionId,

/// The form of the query: a logical plan.
pub plan: LogicalPlan,
}
2 changes: 1 addition & 1 deletion src/common/query/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datafusion::logical_expr::LogicalPlan;
use crate::error::Result;
use crate::logical_plan::SubstraitPlanDecoder;

/// Dummy `[SubstraitPlanDecoder]` for test.
/// Dummy [`SubstraitPlanDecoder`] for test.
pub struct DummyDecoder;

impl DummyDecoder {
Expand Down
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
Expand Down
11 changes: 10 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,14 @@ pub enum Error {
location: Location,
source: BoxedError,
},

#[snafu(display("DataFusion"))]
DataFusion {
#[snafu(source)]
error: datafusion::error::DataFusionError,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -446,7 +454,8 @@ impl ErrorExt for Error {
| IncorrectInternalState { .. }
| ShutdownInstance { .. }
| RegionEngineNotFound { .. }
| UnsupportedOutput { .. } => StatusCode::Internal,
| UnsupportedOutput { .. }
| DataFusion { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
Expand Down
Loading

0 comments on commit 5a6021e

Please sign in to comment.